option to gzip intermediate temp storage fles

pull/1/head
Mike Barry 2021-07-17 06:51:13 -04:00
rodzic 10bc2186f0
commit 5660ee8f3f
5 zmienionych plików z 55 dodań i 25 usunięć

Wyświetl plik

@ -14,6 +14,7 @@ public record CommonParams(
boolean deferIndexCreation,
boolean optimizeDb,
boolean forceOverwrite,
boolean gzipTempStorage,
// computed
Envelope worldBounds,
@ -28,7 +29,8 @@ public record CommonParams(
int maxzoom,
boolean deferIndexCreation,
boolean optimizeDb,
boolean forceOverwrite
boolean forceOverwrite,
boolean gzipTempStorage
) {
this(
latLonBounds,
@ -39,6 +41,7 @@ public record CommonParams(
deferIndexCreation,
optimizeDb,
forceOverwrite,
gzipTempStorage,
// computed
GeoUtils.toWorldBounds(latLonBounds),
@ -78,7 +81,8 @@ public record CommonParams(
arguments.integer("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM),
arguments.get("defer_mbtiles_index_creation", "add index to mbtiles file after finished writing", false),
arguments.get("optimize_db", "optimize mbtiles after writing", false),
arguments.get("force", "force overwriting output file", false)
arguments.get("force", "force overwriting output file", false),
arguments.get("gzip_temp", "gzip temporary feature storage (uses more CPU, but less disk space)", false)
);
}
}

Wyświetl plik

@ -167,7 +167,8 @@ public class FlatMapRunner {
Path nodeDbPath = tmpDir.resolve("node.db");
nodeLocations = LongLongMap.newFileBackedSortedTable(nodeDbPath);
Path featureDbPath = tmpDir.resolve("feature.db");
featureDb = FeatureSort.newExternalMergeSort(tmpDir.resolve("feature.db"), config.threads(), stats);
featureDb = FeatureSort
.newExternalMergeSort(tmpDir.resolve("feature.db"), config.threads(), config.gzipTempStorage(), stats);
featureMap = new FeatureGroup(featureDb, profile, stats);
stats.monitorFile("nodes", nodeDbPath);
stats.monitorFile("features", featureDbPath);

Wyświetl plik

@ -11,6 +11,8 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
@ -23,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,10 +44,11 @@ class ExternalMergeSort implements FeatureSort {
private final AtomicLong features = new AtomicLong(0);
private final List<Chunk> chunks = new ArrayList<>();
private final boolean gzip;
private Chunk current;
private volatile boolean sorted = false;
ExternalMergeSort(Path tempDir, int threads, Stats stats) {
ExternalMergeSort(Path tempDir, int threads, boolean gzip, Stats stats) {
this(
tempDir,
threads,
@ -51,14 +56,16 @@ class ExternalMergeSort implements FeatureSort {
MAX_CHUNK_SIZE,
(ProcessInfo.getMaxMemoryBytes() / 2) / threads
),
gzip,
stats
);
}
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, boolean gzip, Stats stats) {
this.dir = dir;
this.stats = stats;
this.chunkSizeLimit = chunkSizeLimit;
this.gzip = gzip;
long memory = ProcessInfo.getMaxMemoryBytes();
if (chunkSizeLimit > memory / 2) {
throw new IllegalStateException(
@ -75,6 +82,22 @@ class ExternalMergeSort implements FeatureSort {
}
}
private DataInputStream newInputStream(Path path) throws IOException {
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path), 50_000);
if (gzip) {
inputStream = new GZIPInputStream(inputStream);
}
return new DataInputStream(inputStream);
}
private DataOutputStream newOutputStream(Path path) throws IOException {
OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(path), 50_000);
if (gzip) {
outputStream = new GZIPOutputStream(outputStream);
}
return new DataOutputStream(outputStream);
}
@Override
public void add(Entry item) {
try {
@ -194,7 +217,7 @@ class ExternalMergeSort implements FeatureSort {
chunks.add(current = new Chunk(chunkPath));
}
private static class Chunk implements Closeable {
private class Chunk implements Closeable {
private final Path path;
private final DataOutputStream outputStream;
@ -203,7 +226,7 @@ class ExternalMergeSort implements FeatureSort {
private Chunk(Path path) throws IOException {
this.path = path;
this.outputStream = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(path), 50_000));
this.outputStream = newOutputStream(path);
}
public PeekableScanner newReader() {
@ -240,8 +263,7 @@ class ExternalMergeSort implements FeatureSort {
}
public SortableChunk flush() {
try (DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(path), 50_000))) {
try (DataOutputStream out = newOutputStream(path)) {
for (Entry feature : featuresToSort) {
write(out, feature);
}
@ -280,7 +302,7 @@ class ExternalMergeSort implements FeatureSort {
}
}
private static class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private final int count;
private int read = 0;
@ -290,7 +312,7 @@ class ExternalMergeSort implements FeatureSort {
PeekableScanner(Path path, int count) {
this.count = count;
try {
input = new DataInputStream(new BufferedInputStream(Files.newInputStream(path), 50_000));
input = newInputStream(path);
next = readNextFeature();
} catch (IOException e) {
throw new IllegalStateException(e);

Wyświetl plik

@ -11,12 +11,12 @@ import org.jetbrains.annotations.NotNull;
public interface FeatureSort extends Iterable<FeatureSort.Entry> {
static FeatureSort newExternalMergeSort(Path tempDir, int threads, Stats stats) {
return new ExternalMergeSort(tempDir, threads, stats);
static FeatureSort newExternalMergeSort(Path tempDir, int threads, boolean gzip, Stats stats) {
return new ExternalMergeSort(tempDir, threads, gzip, stats);
}
static FeatureSort newExternalMergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
return new ExternalMergeSort(dir, workers, chunkSizeLimit, stats);
static FeatureSort newExternalMergeSort(Path dir, int workers, int chunkSizeLimit, boolean gzip, Stats stats) {
return new ExternalMergeSort(dir, workers, chunkSizeLimit, gzip, stats);
}
static FeatureSort newInMemory() {

Wyświetl plik

@ -10,6 +10,8 @@ import java.util.List;
import java.util.Random;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
public class FeatureSortTest {
@ -20,20 +22,20 @@ public class FeatureSortTest {
return new FeatureSort.Entry(Long.MIN_VALUE + i, new byte[]{(byte) i, (byte) (1 + i)});
}
private FeatureSort newSorter(int workers, int chunkSizeLimit) {
return FeatureSort.newExternalMergeSort(tmpDir, workers, chunkSizeLimit, new Stats.InMemory());
private FeatureSort newSorter(int workers, int chunkSizeLimit, boolean gzip) {
return FeatureSort.newExternalMergeSort(tmpDir, workers, chunkSizeLimit, gzip, new Stats.InMemory());
}
@Test
public void testEmpty() {
FeatureSort sorter = newSorter(1, 100);
FeatureSort sorter = newSorter(1, 100, false);
sorter.sort();
assertEquals(List.of(), sorter.toList());
}
@Test
public void testSingle() {
FeatureSort sorter = newSorter(1, 100);
FeatureSort sorter = newSorter(1, 100, false);
sorter.add(newEntry(1));
sorter.sort();
assertEquals(List.of(newEntry(1)), sorter.toList());
@ -41,7 +43,7 @@ public class FeatureSortTest {
@Test
public void testTwoItemsOneChunk() {
FeatureSort sorter = newSorter(1, 100);
FeatureSort sorter = newSorter(1, 100, false);
sorter.add(newEntry(2));
sorter.add(newEntry(1));
sorter.sort();
@ -50,7 +52,7 @@ public class FeatureSortTest {
@Test
public void testTwoItemsTwoChunks() {
FeatureSort sorter = newSorter(1, 0);
FeatureSort sorter = newSorter(1, 0, false);
sorter.add(newEntry(2));
sorter.add(newEntry(1));
sorter.sort();
@ -59,7 +61,7 @@ public class FeatureSortTest {
@Test
public void testTwoWorkers() {
FeatureSort sorter = newSorter(2, 0);
FeatureSort sorter = newSorter(2, 0, false);
sorter.add(newEntry(4));
sorter.add(newEntry(3));
sorter.add(newEntry(2));
@ -68,8 +70,9 @@ public class FeatureSortTest {
assertEquals(List.of(newEntry(1), newEntry(2), newEntry(3), newEntry(4)), sorter.toList());
}
@Test
public void testManyItems() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testManyItems(boolean gzip) {
List<FeatureSort.Entry> sorted = new ArrayList<>();
List<FeatureSort.Entry> shuffled = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
@ -77,7 +80,7 @@ public class FeatureSortTest {
sorted.add(newEntry(i));
}
Collections.shuffle(shuffled, new Random(0));
FeatureSort sorter = newSorter(2, 20_000);
FeatureSort sorter = newSorter(2, 20_000, gzip);
shuffled.forEach(sorter::add);
sorter.sort();
assertEquals(sorted, sorter.toList());