Add `--feature-read-threads` option to read features in parallel when writing tiles (#225)

pull/235/head
Michael Barry 2022-05-24 18:46:52 -04:00 zatwierdzone przez GitHub
rodzic f5206b3a73
commit 99c0f8ae9c
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
20 zmienionych plików z 735 dodań i 153 usunięć

Wyświetl plik

@ -41,30 +41,32 @@ public class BenchmarkExternalMergeSort {
FileUtils.delete(path);
FileUtils.deleteOnExit(path);
var config = PlanetilerConfig.defaults();
for (int i = 0; i < 3; i++) {
try {
List<Results> results = new ArrayList<>();
for (int chunks : List.of(1, 10, 100, 1_000, 10_000)) {
results.add(run(path, 1, number, chunks, true, true, true, config));
try {
List<Results> results = new ArrayList<>();
for (int chunks : List.of(100, 200, 500)) {
for (int readThreads : List.of(1, 2, 3, 4)) {
for (boolean mmap : List.of(false, true)) {
results.add(run(path, 1, readThreads, number, chunks, mmap, true, true, config));
}
}
for (var result : results) {
System.err.println(result);
}
} finally {
FileUtils.delete(path);
}
for (var result : results) {
System.err.println(result.chunks + "\t" + result.readThreads + "\t" + result.mmap + "\t" + result.read);
}
} finally {
FileUtils.delete(path);
}
}
private record Results(
String write, String read, String sort,
int chunks,
int writeWorkers, int readWorkers,
int writeWorkers, int readThreads,
long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort,
boolean madvise
) {}
private static Results run(Path tmpDir, int writeWorkers, long items, int numChunks,
private static Results run(Path tmpDir, int writeWorkers, int readThreads, long items, int numChunks,
boolean mmap, boolean parallelSort, boolean madvise, PlanetilerConfig config) {
long chunkSizeLimit = items * ITEM_MEMORY_BYTES / numChunks;
if (chunkSizeLimit > Integer.MAX_VALUE) {
@ -72,7 +74,6 @@ public class BenchmarkExternalMergeSort {
}
boolean gzip = false;
int sortWorkers = Runtime.getRuntime().availableProcessors();
int readWorkers = 1;
FileUtils.delete(tmpDir);
var sorter =
new ExternalMergeSort(tmpDir, sortWorkers, (int) chunkSizeLimit, gzip, mmap, parallelSort, madvise, config,
@ -87,7 +88,7 @@ public class BenchmarkExternalMergeSort {
sortTimer.stop();
var readTimer = Timer.start();
doReads(readWorkers, items, sorter);
doReads(readThreads, items, sorter);
readTimer.stop();
return new Results(
@ -96,7 +97,7 @@ public class BenchmarkExternalMergeSort {
FORMAT.duration(sortTimer.elapsed().wall()),
sorter.chunks(),
writeWorkers,
readWorkers,
readThreads,
items,
(int) chunkSizeLimit,
gzip,
@ -106,11 +107,12 @@ public class BenchmarkExternalMergeSort {
);
}
private static void doReads(int readWorkers, long items, ExternalMergeSort sorter) {
private static void doReads(int threads, long items, ExternalMergeSort sorter) {
var counters = Counter.newMultiThreadCounter();
var reader = new Worker("read", Stats.inMemory(), readWorkers, () -> {
Iterable<SortableFeature> q = threads > 1 ? sorter.parallelIterator(Stats.inMemory(), threads) : sorter;
var reader = new Worker("read", Stats.inMemory(), 1, () -> {
var counter = counters.counterForThread();
for (var ignored : sorter) {
for (var ignored : q) {
counter.inc();
}
});
@ -119,8 +121,14 @@ public class BenchmarkExternalMergeSort {
.addFileSize(sorter)
.newLine()
.addProcessStats()
.newLine()
.addThreadPoolStats("reader", reader);
.newLine();
if (q instanceof FeatureSort.ParallelIterator pi) {
loggers
.addThreadPoolStats("read", pi.reader())
.addThreadPoolStats("merge", reader);
} else {
loggers.addThreadPoolStats("read", reader);
}
reader.awaitAndLog(loggers, Duration.ofSeconds(1));
}

Wyświetl plik

@ -235,8 +235,11 @@ class ExternalMergeSort implements FeatureSort {
}
@Override
public Iterator<SortableFeature> iterator() {
public Iterator<SortableFeature> iterator(int shard, int shards) {
assert sorted;
if (shard < 0 || shard >= shards) {
throw new IllegalArgumentException("Bad shard params: shard=%d shards=%d".formatted(shard, shards));
}
if (chunks.isEmpty()) {
return Collections.emptyIterator();
@ -244,36 +247,14 @@ class ExternalMergeSort implements FeatureSort {
// k-way merge to interleave all the sorted chunks
List<Reader> iterators = new ArrayList<>();
for (Chunk chunk : chunks) {
for (int i = shard; i < chunks.size(); i += shards) {
var chunk = chunks.get(i);
if (chunk.itemCount > 0) {
iterators.add(chunk.newReader());
}
}
LongMinHeap heap = LongMinHeap.newArrayHeap(iterators.size());
for (int i = 0; i < iterators.size(); i++) {
heap.push(i, iterators.get(i).nextKey());
}
return new Iterator<>() {
@Override
public boolean hasNext() {
return !heap.isEmpty();
}
@Override
public SortableFeature next() {
int i = heap.peekId();
Reader iterator = iterators.get(i);
assert iterator != null;
SortableFeature next = iterator.next();
if (iterator.hasNext()) {
heap.updateHead(iterator.nextKey());
} else {
heap.poll();
}
return next;
}
};
return LongMerger.mergeIterators(iterators);
}
public int chunks() {
@ -299,8 +280,6 @@ class ExternalMergeSort implements FeatureSort {
@Override
void close();
long nextKey();
}
/** Compresses bytes with minimal impact on write performance. Equivalent to {@code gzip -1} */
@ -412,11 +391,6 @@ class ExternalMergeSort implements FeatureSort {
return current;
}
@Override
public final long nextKey() {
return next.key();
}
abstract SortableFeature readNextFeature();
}

Wyświetl plik

@ -14,6 +14,7 @@ import com.onthegomap.planetiler.util.CommonStringEncoder;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.Hashing;
import com.onthegomap.planetiler.util.LayerStats;
import com.onthegomap.planetiler.worker.Worker;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
@ -58,6 +59,7 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
private final CommonStringEncoder commonStrings;
private final Stats stats;
private final LayerStats layerStats = new LayerStats();
private volatile boolean prepared = false;
FeatureGroup(FeatureSort sorter, Profile profile, CommonStringEncoder commonStrings, Stats stats) {
this.sorter = sorter;
@ -122,6 +124,20 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
}
}
static GeometryType decodeGeomType(byte geomTypeAndScale) {
return GeometryType.valueOf((byte) (geomTypeAndScale & 0b111));
}
static int decodeScale(byte geomTypeAndScale) {
return (geomTypeAndScale & 0xff) >>> 3;
}
static byte encodeGeomTypeAndScale(VectorTile.VectorGeometry geometry) {
assert geometry.geomType().asByte() >= 0 && geometry.geomType().asByte() <= 8;
assert geometry.scale() >= 0 && geometry.scale() < (1 << 5);
return (byte) ((geometry.geomType().asByte() & 0xff) | (geometry.scale() << 3));
}
/**
* Returns statistics about each layer written through {@link #newRenderedFeatureEncoder()} including min/max zoom,
* features on elements in that layer, and their types.
@ -134,8 +150,6 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
return sorter.numFeaturesWritten();
}
public interface RenderedFeatureEncoder extends Function<RenderedFeature, SortableFeature>, Closeable {}
/** Returns a function for a single thread to use to serialize rendered features. */
public RenderedFeatureEncoder newRenderedFeatureEncoder() {
return new RenderedFeatureEncoder() {
@ -233,32 +247,32 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
return packer.toByteArray();
}
static GeometryType decodeGeomType(byte geomTypeAndScale) {
return GeometryType.valueOf((byte) (geomTypeAndScale & 0b111));
}
static int decodeScale(byte geomTypeAndScale) {
return (geomTypeAndScale & 0xff) >>> 3;
}
static byte encodeGeomTypeAndScale(VectorTile.VectorGeometry geometry) {
assert geometry.geomType().asByte() >= 0 && geometry.geomType().asByte() <= 8;
assert geometry.scale() >= 0 && geometry.scale() < (1 << 5);
return (byte) ((geometry.geomType().asByte() & 0xff) | (geometry.scale() << 3));
}
/** Returns a new feature writer that can be used for a single thread. */
public CloseableConusmer<SortableFeature> writerForThread() {
return sorter.writerForThread();
}
private volatile boolean prepared = false;
/** Iterates through features grouped by tile ID. */
@Override
public Iterator<TileFeatures> iterator() {
prepare();
Iterator<SortableFeature> entries = sorter.iterator();
return groupIntoTiles(sorter.iterator());
}
/**
* Reads temp features using {@code threads} parallel threads and merges into a sorted list.
*
* @param threads The number of parallel read threads to spawn
* @return a {@link Reader} with a handle to the new read threads that were spawned, and in {@link Iterable} that can
* be used to iterate over the results.
*/
public Reader parallelIterator(int threads) {
prepare();
var parIter = sorter.parallelIterator(stats, threads);
return new Reader(parIter.reader(), () -> groupIntoTiles(parIter.iterator()));
}
private Iterator<TileFeatures> groupIntoTiles(Iterator<SortableFeature> entries) {
// entries are sorted by tile ID, so group consecutive entries in same tile into tiles
if (!entries.hasNext()) {
return Collections.emptyIterator();
}
@ -315,6 +329,10 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
}
}
public interface RenderedFeatureEncoder extends Function<RenderedFeature, SortableFeature>, Closeable {}
public record Reader(Worker readWorker, Iterable<TileFeatures> result) {}
/** Features contained in a single tile. */
public class TileFeatures {
@ -328,6 +346,18 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
this.tileCoord = TileCoord.decode(tileCoord);
}
private static void unscale(List<VectorTile.Feature> features) {
for (int i = 0; i < features.size(); i++) {
var feature = features.get(i);
if (feature != null) {
VectorTile.VectorGeometry geometry = feature.geometry();
if (geometry.scale() != 0) {
features.set(i, feature.copyWithNewGeometry(geometry.unscale()));
}
}
}
}
/** Returns the number of features read including features discarded from being over the limit in a group. */
public long getNumFeaturesProcessed() {
return numFeaturesProcessed.get();
@ -449,18 +479,6 @@ public final class FeatureGroup implements Iterable<FeatureGroup.TileFeatures>,
return encoder;
}
private static void unscale(List<VectorTile.Feature> features) {
for (int i = 0; i < features.size(); i++) {
var feature = features.get(i);
if (feature != null) {
VectorTile.VectorGeometry geometry = feature.geometry();
if (geometry.scale() != 0) {
features.set(i, feature.copyWithNewGeometry(geometry.unscale()));
}
}
}
}
private void postProcessAndAddLayerFeatures(VectorTile encoder, String layer,
List<VectorTile.Feature> features) {
try {

Wyświetl plik

@ -1,13 +1,17 @@
package com.onthegomap.planetiler.collection;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.CloseableConusmer;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.worker.WeightedHandoffQueue;
import com.onthegomap.planetiler.worker.Worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.concurrent.NotThreadSafe;
/**
@ -55,11 +59,21 @@ interface FeatureSort extends Iterable<SortableFeature>, DiskBacked, MemoryEstim
return 0;
}
@Override
public Iterator<SortableFeature> iterator() {
return list.iterator();
}
@Override
public Iterator<SortableFeature> iterator(int shard, int shards) {
if (shard < 0 || shard >= shards) {
throw new IllegalArgumentException("Bad shard params: shard=%d shards=%d".formatted(shard, shards));
}
return IntStream.range(0, list.size())
.filter(d -> d % shards == shard)
.mapToObj(list::get)
.iterator();
}
};
}
@ -81,4 +95,45 @@ interface FeatureSort extends Iterable<SortableFeature>, DiskBacked, MemoryEstim
* threads.
*/
CloseableConusmer<SortableFeature> writerForThread();
@Override
default Iterator<SortableFeature> iterator() {
return iterator(0, 1);
}
/**
* Returns an iterator over a subset of the features.
*
* @param shard The index of this iterator
* @param shards The total number of iterators that will be used
* @return An iterator over a subset of features that when combined with all other iterators will iterate over the
* full set.
*/
Iterator<SortableFeature> iterator(int shard, int shards);
/**
* Reads temp features using {@code threads} parallel threads and merges into a sorted list.
*
* @param stats Stat tracker
* @param threads The number of parallel read threads to spawn
* @return a {@link ParallelIterator} with a handle to the new read threads that were spawned, and in {@link Iterable}
* that can be used to iterate over the results.
*/
default ParallelIterator parallelIterator(Stats stats, int threads) {
List<WeightedHandoffQueue<SortableFeature>> queues = IntStream.range(0, threads)
.mapToObj(i -> new WeightedHandoffQueue<SortableFeature>(500, 10_000))
.toList();
Worker reader = new Worker("read", stats, threads, shard -> {
try (var next = queues.get(shard)) {
Iterator<SortableFeature> entries = iterator(shard, threads);
while (entries.hasNext()) {
next.accept(entries.next(), 1);
}
}
});
return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues));
}
record ParallelIterator(Worker reader, @Override Iterator<SortableFeature> iterator)
implements Iterable<SortableFeature> {}
}

Wyświetl plik

@ -0,0 +1,12 @@
package com.onthegomap.planetiler.collection;
/**
* An item with a {@code long key} that can be used for sorting/grouping.
*
* These items can be sorted or grouped by {@link FeatureSort}/{@link FeatureGroup} implementations. Sorted lists can
* also be merged using {@link LongMerger}.
*/
public interface HasLongSortKey {
/** Value to sort/group items by. */
long key();
}

Wyświetl plik

@ -1,7 +1,6 @@
package com.onthegomap.planetiler.collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
/**
@ -14,31 +13,6 @@ public interface IterableOnce<T> extends Iterable<T>, Supplier<T> {
@Override
default Iterator<T> iterator() {
return new Iterator<>() {
T next = null;
boolean stale = true;
private void advance() {
if (stale) {
next = get();
stale = false;
}
}
@Override
public boolean hasNext() {
advance();
return next != null;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
stale = true;
return next;
}
};
return new SupplierIterator<>(this);
}
}

Wyświetl plik

@ -0,0 +1,211 @@
package com.onthegomap.planetiler.collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
/**
* A utility for merging sorted lists of items with a {@code long} key to sort by.
*/
public class LongMerger {
// Has a general-purpose KWayMerge implementation using a min heap and specialized (faster)
// TwoWayMerge/ThreeWayMerge implementations when a small number of lists are being merged.
private LongMerger() {}
/** Merges sorted items from {@link Supplier Suppliers} that return {@code null} when there are no items left. */
public static <T extends HasLongSortKey> Iterator<T> mergeSuppliers(List<? extends Supplier<T>> suppliers) {
return mergeIterators(suppliers.stream().map(SupplierIterator::new).toList());
}
/** Merges sorted iterators into a combined iterator over all the items. */
public static <T extends HasLongSortKey> Iterator<T> mergeIterators(List<? extends Iterator<T>> iterators) {
return switch (iterators.size()) {
case 0 -> Collections.emptyIterator();
case 1 -> iterators.get(0);
case 2 -> new TwoWayMerge<>(iterators.get(0), iterators.get(1));
case 3 -> new ThreeWayMerge<>(iterators.get(0), iterators.get(1), iterators.get(2));
default -> new KWayMerge<>(iterators);
};
}
private static class TwoWayMerge<T extends HasLongSortKey> implements Iterator<T> {
T a, b;
long ak = Long.MAX_VALUE, bk = Long.MAX_VALUE;
final Iterator<T> inputA, inputB;
TwoWayMerge(Iterator<T> inputA, Iterator<T> inputB) {
this.inputA = inputA;
this.inputB = inputB;
if (inputA.hasNext()) {
a = inputA.next();
ak = a.key();
}
if (inputB.hasNext()) {
b = inputB.next();
bk = b.key();
}
}
@Override
public boolean hasNext() {
return a != null || b != null;
}
@Override
public T next() {
T result;
if (ak < bk) {
result = a;
if (inputA.hasNext()) {
a = inputA.next();
ak = a.key();
} else {
a = null;
ak = Long.MAX_VALUE;
}
} else if (bk == Long.MAX_VALUE) {
throw new NoSuchElementException();
} else {
result = b;
if (inputB.hasNext()) {
b = inputB.next();
bk = b.key();
} else {
b = null;
bk = Long.MAX_VALUE;
}
}
return result;
}
}
private static class ThreeWayMerge<T extends HasLongSortKey> implements Iterator<T> {
T a, b, c;
long ak = Long.MAX_VALUE, bk = Long.MAX_VALUE, ck = Long.MAX_VALUE;
final Iterator<T> inputA, inputB, inputC;
ThreeWayMerge(Iterator<T> inputA, Iterator<T> inputB, Iterator<T> inputC) {
this.inputA = inputA;
this.inputB = inputB;
this.inputC = inputC;
if (inputA.hasNext()) {
a = inputA.next();
ak = a.key();
}
if (inputB.hasNext()) {
b = inputB.next();
bk = b.key();
}
if (inputC.hasNext()) {
c = inputC.next();
ck = c.key();
}
}
@Override
public boolean hasNext() {
return a != null || b != null || c != null;
}
@Override
public T next() {
T result;
// use at most 2 comparisons to get the next item
if (ak < bk) {
if (ak < ck) {
// ACB / ABC
result = a;
if (inputA.hasNext()) {
a = inputA.next();
ak = a.key();
} else {
a = null;
ak = Long.MAX_VALUE;
}
} else {
// CBA
result = c;
if (inputC.hasNext()) {
c = inputC.next();
ck = c.key();
} else {
c = null;
ck = Long.MAX_VALUE;
}
}
} else if (ck < bk) {
// CAB
result = c;
if (inputC.hasNext()) {
c = inputC.next();
ck = c.key();
} else {
c = null;
ck = Long.MAX_VALUE;
}
} else if (bk == Long.MAX_VALUE) {
throw new NoSuchElementException();
} else {
// BAC / BCA
result = b;
if (inputB.hasNext()) {
b = inputB.next();
bk = b.key();
} else {
b = null;
bk = Long.MAX_VALUE;
}
}
return result;
}
}
private static class KWayMerge<T extends HasLongSortKey> implements Iterator<T> {
private final T[] items;
private final Iterator<T>[] iterators;
private final LongMinHeap heap;
@SuppressWarnings("unchecked")
KWayMerge(List<? extends Iterator<T>> inputIterators) {
this.iterators = new Iterator[inputIterators.size()];
this.items = (T[]) new HasLongSortKey[inputIterators.size()];
this.heap = LongMinHeap.newArrayHeap(inputIterators.size());
int outIdx = 0;
for (Iterator<T> iter : inputIterators) {
if (iter.hasNext()) {
var item = iter.next();
items[outIdx] = item;
iterators[outIdx] = iter;
heap.push(outIdx++, item.key());
}
}
}
@Override
public boolean hasNext() {
return !heap.isEmpty();
}
@Override
public T next() {
if (heap.isEmpty()) {
throw new NoSuchElementException();
}
int id = heap.peekId();
T result = items[id];
Iterator<T> iterator = iterators[id];
if (iterator.hasNext()) {
T next = iterator.next();
items[id] = next;
heap.updateHead(next.key());
} else {
items[id] = null;
heap.poll();
}
return result;
}
}
}

Wyświetl plik

@ -2,7 +2,7 @@ package com.onthegomap.planetiler.collection;
import java.util.Arrays;
public record SortableFeature(long key, byte[] value) implements Comparable<SortableFeature> {
public record SortableFeature(@Override long key, byte[] value) implements Comparable<SortableFeature>, HasLongSortKey {
@Override
public int compareTo(SortableFeature o) {

Wyświetl plik

@ -0,0 +1,41 @@
package com.onthegomap.planetiler.collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
/**
* Adapts a {@link Supplier} that returns {@code null} when no items are left to an {@link Iterator} where
* {@link #hasNext()} returns {@code false} when there are no items left.
*/
public class SupplierIterator<T> implements Iterator<T> {
private final Supplier<T> supplier;
T next = null;
boolean stale = true;
public SupplierIterator(Supplier<T> supplier) {
this.supplier = supplier;
}
private void advance() {
if (stale) {
next = supplier.get();
stale = false;
}
}
@Override
public boolean hasNext() {
advance();
return next != null;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
stale = true;
return next;
}
}

Wyświetl plik

@ -14,6 +14,7 @@ public record PlanetilerConfig(
int threads,
int featureWriteThreads,
int featureProcessThreads,
int featureReadThreads,
Duration logInterval,
int minzoom,
int maxzoom,
@ -92,6 +93,8 @@ public record PlanetilerConfig(
threads,
featureWriteThreads,
featureProcessThreads,
arguments.getInteger("feature_read_threads", "number of threads to use when reading features at tile write time",
threads < 32 ? 1 : 2),
arguments.getDuration("loginterval", "time between logs", "10s"),
arguments.getInteger("minzoom", "minimum zoom level", MIN_MINZOOM),
arguments.getInteger("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM),

Wyświetl plik

@ -18,6 +18,7 @@ import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.LayerStats;
import com.onthegomap.planetiler.worker.WorkQueue;
import com.onthegomap.planetiler.worker.Worker;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.IOException;
import java.nio.file.Path;
@ -58,13 +59,13 @@ public class MbtilesWriter {
private final Counter.Readable[] tilesByZoom;
private final Counter.Readable[] totalTileSizesByZoom;
private final LongAccumulator[] maxTileSizesByZoom;
private final FeatureGroup features;
private final Iterable<FeatureGroup.TileFeatures> inputTiles;
private final AtomicReference<TileCoord> lastTileWritten = new AtomicReference<>();
private final MbtilesMetadata mbtilesMetadata;
private MbtilesWriter(FeatureGroup features, Mbtiles db, PlanetilerConfig config, MbtilesMetadata mbtilesMetadata,
Stats stats, LayerStats layerStats) {
this.features = features;
private MbtilesWriter(Iterable<FeatureGroup.TileFeatures> inputTiles, Mbtiles db, PlanetilerConfig config,
MbtilesMetadata mbtilesMetadata, Stats stats, LayerStats layerStats) {
this.inputTiles = inputTiles;
this.db = db;
this.config = config;
this.mbtilesMetadata = mbtilesMetadata;
@ -102,7 +103,27 @@ public class MbtilesWriter {
public static void writeOutput(FeatureGroup features, Mbtiles output, DiskBacked fileSize,
MbtilesMetadata mbtilesMetadata, PlanetilerConfig config, Stats stats) {
var timer = stats.startStage("mbtiles");
MbtilesWriter writer = new MbtilesWriter(features, output, config, mbtilesMetadata, stats,
int readThreads = config.featureReadThreads();
int threads = config.threads();
int processThreads = threads < 10 ? threads : threads - readThreads;
// when using more than 1 read thread: (N read threads) -> (1 merge thread) -> ...
// when using 1 read thread we just have: (1 read & merge thread) -> ...
Worker readWorker = null;
Iterable<FeatureGroup.TileFeatures> inputTiles;
String secondStageName;
if (readThreads == 1) {
secondStageName = "read";
inputTiles = features;
} else {
secondStageName = "merge";
var reader = features.parallelIterator(readThreads);
inputTiles = reader.result();
readWorker = reader.readWorker();
}
MbtilesWriter writer = new MbtilesWriter(inputTiles, output, config, mbtilesMetadata, stats,
features.layerStats());
var pipeline = WorkerPipeline.start("mbtiles", stats);
@ -124,7 +145,7 @@ public class MbtilesWriter {
*/
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats);
encodeBranch = pipeline
.<TileBatch>fromGenerator("read", next -> {
.<TileBatch>fromGenerator(secondStageName, next -> {
var writerEnqueuer = writerQueue.threadLocalWriter();
writer.readFeaturesAndBatch(batch -> {
next.accept(batch);
@ -134,7 +155,7 @@ public class MbtilesWriter {
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
}, 1)
.addBuffer("reader_queue", queueSize)
.sinkTo("encode", config.threads(), writer::tileEncoderSink);
.sinkTo("encode", processThreads, writer::tileEncoderSink);
// the tile writer will wait on the result of each batch to ensure tiles are written in order
writeBranch = pipeline.readFromQueue(writerQueue)
@ -147,9 +168,9 @@ public class MbtilesWriter {
*/
encodeBranch = pipeline
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
.fromGenerator("read", writer::readFeaturesAndBatch, 1)
.fromGenerator(secondStageName, writer::readFeaturesAndBatch, 1)
.addBuffer("reader_queue", queueSize)
.addWorker("encoder", config.threads(), writer::tileEncoder)
.addWorker("encoder", processThreads, writer::tileEncoder)
.addBuffer("writer_queue", queueSize)
// use only 1 thread since tileWriter needs to be single-threaded
.sinkTo("write", 1, writer::tileWriter);
@ -162,8 +183,11 @@ public class MbtilesWriter {
.addFileSize(fileSize)
.newLine()
.addProcessStats()
.newLine()
.addPipelineStats(encodeBranch)
.newLine();
if (readWorker != null) {
loggers.addThreadPoolStats("read", readWorker);
}
loggers.addPipelineStats(encodeBranch)
.addPipelineStats(writeBranch)
.newLine()
.add(writer::getLastTileLogDetails);
@ -197,7 +221,7 @@ public class MbtilesWriter {
TileBatch batch = new TileBatch();
long featuresInThisBatch = 0;
long tilesInThisBatch = 0;
for (var feature : features) {
for (var feature : inputTiles) {
int z = feature.tileCoord().z();
if (z != currentZoom) {
LOGGER.trace("Starting z{}", z);

Wyświetl plik

@ -0,0 +1,21 @@
package com.onthegomap.planetiler.worker;
import static com.onthegomap.planetiler.util.Exceptions.throwFatalException;
/**
* A function that takes an integer can throw checked exceptions.
*/
@FunctionalInterface
public interface IntConsumerThatThrows {
@SuppressWarnings("java:S112")
void accept(int value) throws Exception;
default void runAndWrapException(int value) {
try {
accept(value);
} catch (Exception e) {
throwFatalException(e);
}
}
}

Wyświetl plik

@ -1,18 +1,21 @@
package com.onthegomap.planetiler.worker;
import static com.onthegomap.planetiler.util.Exceptions.throwFatalException;
/**
* A function that can throw checked exceptions.
*/
@FunctionalInterface
public interface RunnableThatThrows {
@SuppressWarnings("java:S112")
void run() throws Exception;
default void runAndWrapException() {
try {
run();
} catch (Exception e) {
throw new RuntimeException(e);
throwFatalException(e);
}
}
}

Wyświetl plik

@ -38,19 +38,34 @@ public class Worker {
*/
@SuppressWarnings("java:S1181")
public Worker(String prefix, Stats stats, int threads, RunnableThatThrows task) {
this(prefix, stats, threads, i -> task.run());
}
/**
* Constructs a new reader and immediately starts {@code threads} thread all running {@code task}.
*
* @param prefix string ID to add to logs and stats
* @param stats stats collector for this thread pool
* @param threads number of parallel threads to run {@code task} in
* @param task the work to do in each thread, called with the ID of this thread, from {@code 0} to
* {@code threads - 1}.
*/
@SuppressWarnings("java:S1181")
public Worker(String prefix, Stats stats, int threads, IntConsumerThatThrows task) {
this.prefix = prefix;
stats.gauge(prefix + "_threads", threads);
var es = Executors.newFixedThreadPool(threads, new NamedThreadFactory(prefix));
String parentStage = LogUtil.getStage();
List<CompletableFuture<?>> results = new ArrayList<>();
for (int i = 0; i < threads; i++) {
final int threadId = i;
results.add(CompletableFuture.runAsync(() -> {
LogUtil.setStage(parentStage, prefix);
String id = Thread.currentThread().getName();
LOGGER.trace("Starting worker");
try {
long start = System.nanoTime();
task.run();
task.accept(threadId);
stats.timers().finishedWorker(prefix, Duration.ofNanos(System.nanoTime() - start));
} catch (Throwable e) {
System.err.println("Worker " + id + " died");

Wyświetl plik

@ -3,7 +3,6 @@ package com.onthegomap.planetiler;
import static com.onthegomap.planetiler.TestUtils.*;
import static org.junit.jupiter.api.Assertions.*;
import com.onthegomap.planetiler.TestUtils.OsmXml;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
@ -83,6 +82,8 @@ class PlanetilerTests {
);
private final Stats stats = Stats.inMemory();
@TempDir
Path tempDir;
private static <T extends OsmElement> T with(T elem, Consumer<T> fn) {
fn.accept(elem);
@ -1574,19 +1575,20 @@ class PlanetilerTests {
assertEquals(11, results.tiles.size());
}
@Test
void testPlanetilerRunner(@TempDir Path tempDir) throws Exception {
@ParameterizedTest
@ValueSource(strings = {
"",
"--write-threads=2 --process-threads=2 --feature-read-threads=2 --threads=4",
"--emit-tiles-in-order=false",
"--free-osm-after-read",
})
void testPlanetilerRunner(String args) throws Exception {
Path originalOsm = TestUtils.pathToResource("monaco-latest.osm.pbf");
Path mbtiles = tempDir.resolve("output.mbtiles");
Path tempOsm = tempDir.resolve("monaco-temp.osm.pbf");
Files.copy(originalOsm, tempOsm);
Planetiler.create(Arguments.fromArgs(
"--tmpdir", tempDir.toString(),
"--free-osm-after-read",
// ensure we exercise the multi-threaded code
"--write-threads=2",
"--process-threads=2",
"--threads=4"
("--tmpdir" + tempDir + " " + args).split("\\s+")
))
.setProfile(new Profile.NullProfile() {
@Override
@ -1603,7 +1605,9 @@ class PlanetilerTests {
.run();
// make sure it got deleted after write
assertFalse(Files.exists(tempOsm));
if (args.contains("free-osm-after-read")) {
assertFalse(Files.exists(tempOsm));
}
try (Mbtiles db = Mbtiles.newReadOnlyDatabase(mbtiles)) {
int features = 0;
@ -1631,7 +1635,7 @@ class PlanetilerTests {
}
@Test
void testPlanetilerMemoryCheck(@TempDir Path tempDir) {
void testPlanetilerMemoryCheck() {
assertThrows(Exception.class, () -> runWithProfile(tempDir, new Profile.NullProfile() {
@Override
public long estimateIntermediateDiskBytes(long osmSize) {
@ -1656,7 +1660,7 @@ class PlanetilerTests {
}
@Test
void testPlanetilerMemoryCheckForce(@TempDir Path tempDir) throws Exception {
void testPlanetilerMemoryCheckForce() throws Exception {
runWithProfile(tempDir, new Profile.NullProfile() {
@Override
public long estimateIntermediateDiskBytes(long osmSize) {

Wyświetl plik

@ -93,6 +93,20 @@ class FeatureGroupTest {
return map;
}
private Map<Integer, Map<String, List<Feature>>> getFeaturesParallel() {
Map<Integer, Map<String, List<Feature>>> map = new TreeMap<>();
var reader = features.parallelIterator(2);
for (FeatureGroup.TileFeatures tile : reader.result()) {
for (var feature : VectorTile.decode(tile.getVectorTileEncoder().encode())) {
map.computeIfAbsent(tile.tileCoord().encoded(), (i) -> new TreeMap<>())
.computeIfAbsent(feature.layer(), l -> new ArrayList<>())
.add(new Feature(feature.attrs(), decodeSilently(feature.geometry())));
}
}
return map;
}
private record Feature(Map<String, Object> attrs, Geometry geom) {}
@Test
@ -125,6 +139,36 @@ class FeatureGroupTest {
)))), getFeatures());
}
@Test
void testShardedRead() {
put(3, "layer3", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(3, "layer4", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(2, "layer", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(1, "layer", Map.of("a", 1, "b", 2L), newPoint(1, 2));
put(1, "layer2", Map.of("c", 3d, "d", true), newPoint(3, 4));
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
new Feature(Map.of("a", 1L, "b", 2L), newPoint(1, 2))
),
"layer2", List.of(
new Feature(Map.of("c", 3d, "d", true), newPoint(3, 4))
)
)), 2, new TreeMap<>(Map.of(
"layer", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
)
)), 3, new TreeMap<>(Map.of(
"layer3", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
),
"layer4", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
)
)))), getFeaturesParallel());
}
@Test
void testPutPointsWithSortKey() {
putWithSortKey(

Wyświetl plik

@ -15,6 +15,7 @@ import java.util.Set;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
// also tests SupplierIterator
class IterableOnceTest {
@Test

Wyświetl plik

@ -0,0 +1,183 @@
package com.onthegomap.planetiler.collection;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
class LongMergerTest {
record Item(long key) implements HasLongSortKey {}
record ItemList(List<Item> items) {}
private static ItemList list(long... items) {
return new ItemList(LongStream.of(items).mapToObj(Item::new).toList());
}
private static List<Long> merge(ItemList... lists) {
List<Long> list = new ArrayList<>();
var iter = LongMerger.mergeIterators(Stream.of(lists)
.map(d -> d.items.iterator())
.toList());
iter.forEachRemaining(item -> list.add(item.key));
assertThrows(NoSuchElementException.class, iter::next);
return list;
}
@Test
void testMergeEmpty() {
assertEquals(List.of(), merge());
}
@Test
void testMergeSupplier() {
List<Long> list = new ArrayList<>();
var iter = LongMerger.mergeSuppliers(Stream.of(new ItemList[]{list(1, 2)})
.map(d -> d.items.iterator())
.<Supplier<Item>>map(d -> () -> {
try {
return d.next();
} catch (NoSuchElementException e) {
return null;
}
})
.toList());
iter.forEachRemaining(item -> list.add(item.key));
assertThrows(NoSuchElementException.class, iter::next);
assertEquals(List.of(1L, 2L), list);
}
@Test
void testMerge1() {
assertEquals(List.of(), merge(list()));
assertEquals(List.of(1L), merge(list(1)));
assertEquals(List.of(1L, 2L), merge(list(1, 2)));
}
@ParameterizedTest
@CsvSource(value = {
",,",
"1,,1",
"1,1,1 1",
"1 2,,1 2",
"1 2,2 3,1 2 2 3",
"1,2,1 2",
"1 2,3,1 2 3",
"1 3,2,1 2 3",
}, nullValues = {"null"})
void testMerge2(String a, String b, String output) {
var listA = list(parse(a));
var listB = list(parse(b));
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listA, listB)
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listA)
);
}
@ParameterizedTest
@CsvSource(value = {
",,,",
"1,,,1",
"1,1,1,1 1 1",
"1 2,,,1 2",
"1 2,2 3,,1 2 2 3",
"1,2,,1 2",
"1,2,3,1 2 3",
"1 2,3,4,1 2 3 4",
"1 3,2,4,1 2 3 4",
}, nullValues = {""})
void testMerge3(String a, String b, String c, String output) {
var listA = list(parse(a));
var listB = list(parse(b));
var listC = list(parse(c));
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listA, listB, listC),
"ABC"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listA, listC, listB),
"ACB"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listA, listC),
"BAC"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listC, listA),
"BCA"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listC, listA, listB),
"CAB"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listC, listB, listA),
"CBA"
);
}
@ParameterizedTest
@CsvSource(value = {
",,,,",
"1,,,,1",
"1,1,1,1,1 1 1 1",
"1 2,,,,1 2",
"1 2,3,,,1 2 3",
"1 3,2,,,1 2 3",
"1 3,2 4,,,1 2 3 4",
"1 5,2 4,,,1 2 4 5",
"1 2,2 3,,,1 2 2 3",
}, nullValues = {""})
void testMerge4(String a, String b, String c, String d, String output) {
var listA = list(parse(a));
var listB = list(parse(b));
var listC = list(parse(c));
var listD = list(parse(d));
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listA, listB, listC, listD),
"ABCD"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listA, listC, listD),
"BACD"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listC, listA, listD),
"BCAD"
);
assertEquals(
LongStream.of(parse(output)).boxed().toList(),
merge(listB, listC, listD, listA),
"BCDA"
);
}
private static long[] parse(String in) {
return in == null ? new long[0] : Stream.of(in.split("\\s+"))
.map(String::strip)
.filter(d -> !d.isBlank())
.mapToLong(Long::parseLong)
.toArray();
}
}

Wyświetl plik

@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import com.onthegomap.planetiler.stats.Stats;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -62,11 +61,7 @@ class WorkQueueTest {
@Timeout(10)
void testTwoWriters() {
WorkQueue<Integer> q = newQueue(2);
AtomicInteger ni = new AtomicInteger(0);
new Worker("worker", stats, 2, () -> {
int i = ni.getAndIncrement();
q.accept(i);
}).await();
new Worker("worker", stats, 2, q::accept).await();
q.close();
assertEquals(2, q.getPending());
Set<Integer> found = new TreeSet<>();
@ -82,9 +77,7 @@ class WorkQueueTest {
@Timeout(10)
void testTwoWritersManyElements() {
WorkQueue<Integer> q = newQueue(2);
AtomicInteger ni = new AtomicInteger(0);
new Worker("worker", stats, 2, () -> {
int i = ni.getAndIncrement();
new Worker("worker", stats, 2, i -> {
q.accept(i * 3);
q.accept(i * 3 + 1);
q.accept(i * 3 + 2);

Wyświetl plik

@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import com.onthegomap.planetiler.ExpectedException;
import com.onthegomap.planetiler.stats.Stats;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -13,9 +12,8 @@ class WorkerTest {
@Test
@Timeout(10)
void testExceptionHandled() {
AtomicInteger counter = new AtomicInteger(0);
var worker = new Worker("prefix", Stats.inMemory(), 4, () -> {
if (counter.incrementAndGet() == 1) {
var worker = new Worker("prefix", Stats.inMemory(), 4, workerNum -> {
if (workerNum == 1) {
throw new ExpectedException();
} else {
Thread.sleep(5000);