From 99c0f8ae9c52de0b2f0729a4d8674d9966750f7b Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Tue, 24 May 2022 18:46:52 -0400 Subject: [PATCH] Add `--feature-read-threads` option to read features in parallel when writing tiles (#225) --- .../BenchmarkExternalMergeSort.java | 48 ++-- .../collection/ExternalMergeSort.java | 40 +--- .../planetiler/collection/FeatureGroup.java | 82 ++++--- .../planetiler/collection/FeatureSort.java | 57 ++++- .../planetiler/collection/HasLongSortKey.java | 12 + .../planetiler/collection/IterableOnce.java | 28 +-- .../planetiler/collection/LongMerger.java | 211 ++++++++++++++++++ .../collection/SortableFeature.java | 2 +- .../collection/SupplierIterator.java | 41 ++++ .../planetiler/config/PlanetilerConfig.java | 3 + .../planetiler/mbtiles/MbtilesWriter.java | 48 +++- .../worker/IntConsumerThatThrows.java | 21 ++ .../planetiler/worker/RunnableThatThrows.java | 5 +- .../onthegomap/planetiler/worker/Worker.java | 17 +- .../planetiler/PlanetilerTests.java | 28 ++- .../collection/FeatureGroupTest.java | 44 ++++ .../collection/IterableOnceTest.java | 1 + .../planetiler/collection/LongMergerTest.java | 183 +++++++++++++++ .../planetiler/worker/WorkQueueTest.java | 11 +- .../planetiler/worker/WorkerTest.java | 6 +- 20 files changed, 735 insertions(+), 153 deletions(-) create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/collection/HasLongSortKey.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SupplierIterator.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/worker/IntConsumerThatThrows.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java diff --git a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java index b2cfb8e3..98ec6bc0 100644 --- a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java +++ b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java @@ -41,30 +41,32 @@ public class BenchmarkExternalMergeSort { FileUtils.delete(path); FileUtils.deleteOnExit(path); var config = PlanetilerConfig.defaults(); - for (int i = 0; i < 3; i++) { - try { - List results = new ArrayList<>(); - for (int chunks : List.of(1, 10, 100, 1_000, 10_000)) { - results.add(run(path, 1, number, chunks, true, true, true, config)); + try { + List results = new ArrayList<>(); + for (int chunks : List.of(100, 200, 500)) { + for (int readThreads : List.of(1, 2, 3, 4)) { + for (boolean mmap : List.of(false, true)) { + results.add(run(path, 1, readThreads, number, chunks, mmap, true, true, config)); + } } - for (var result : results) { - System.err.println(result); - } - } finally { - FileUtils.delete(path); } + for (var result : results) { + System.err.println(result.chunks + "\t" + result.readThreads + "\t" + result.mmap + "\t" + result.read); + } + } finally { + FileUtils.delete(path); } } private record Results( String write, String read, String sort, int chunks, - int writeWorkers, int readWorkers, + int writeWorkers, int readThreads, long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort, boolean madvise ) {} - private static Results run(Path tmpDir, int writeWorkers, long items, int numChunks, + private static Results run(Path tmpDir, int writeWorkers, int readThreads, long items, int numChunks, boolean mmap, boolean parallelSort, boolean madvise, PlanetilerConfig config) { long chunkSizeLimit = items * ITEM_MEMORY_BYTES / numChunks; if (chunkSizeLimit > Integer.MAX_VALUE) { @@ -72,7 +74,6 @@ public class BenchmarkExternalMergeSort { } boolean gzip = false; int sortWorkers = Runtime.getRuntime().availableProcessors(); - int readWorkers = 1; FileUtils.delete(tmpDir); var sorter = new ExternalMergeSort(tmpDir, sortWorkers, (int) chunkSizeLimit, gzip, mmap, parallelSort, madvise, config, @@ -87,7 +88,7 @@ public class BenchmarkExternalMergeSort { sortTimer.stop(); var readTimer = Timer.start(); - doReads(readWorkers, items, sorter); + doReads(readThreads, items, sorter); readTimer.stop(); return new Results( @@ -96,7 +97,7 @@ public class BenchmarkExternalMergeSort { FORMAT.duration(sortTimer.elapsed().wall()), sorter.chunks(), writeWorkers, - readWorkers, + readThreads, items, (int) chunkSizeLimit, gzip, @@ -106,11 +107,12 @@ public class BenchmarkExternalMergeSort { ); } - private static void doReads(int readWorkers, long items, ExternalMergeSort sorter) { + private static void doReads(int threads, long items, ExternalMergeSort sorter) { var counters = Counter.newMultiThreadCounter(); - var reader = new Worker("read", Stats.inMemory(), readWorkers, () -> { + Iterable q = threads > 1 ? sorter.parallelIterator(Stats.inMemory(), threads) : sorter; + var reader = new Worker("read", Stats.inMemory(), 1, () -> { var counter = counters.counterForThread(); - for (var ignored : sorter) { + for (var ignored : q) { counter.inc(); } }); @@ -119,8 +121,14 @@ public class BenchmarkExternalMergeSort { .addFileSize(sorter) .newLine() .addProcessStats() - .newLine() - .addThreadPoolStats("reader", reader); + .newLine(); + if (q instanceof FeatureSort.ParallelIterator pi) { + loggers + .addThreadPoolStats("read", pi.reader()) + .addThreadPoolStats("merge", reader); + } else { + loggers.addThreadPoolStats("read", reader); + } reader.awaitAndLog(loggers, Duration.ofSeconds(1)); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java index 2378fe87..a00e6e15 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java @@ -235,8 +235,11 @@ class ExternalMergeSort implements FeatureSort { } @Override - public Iterator iterator() { + public Iterator iterator(int shard, int shards) { assert sorted; + if (shard < 0 || shard >= shards) { + throw new IllegalArgumentException("Bad shard params: shard=%d shards=%d".formatted(shard, shards)); + } if (chunks.isEmpty()) { return Collections.emptyIterator(); @@ -244,36 +247,14 @@ class ExternalMergeSort implements FeatureSort { // k-way merge to interleave all the sorted chunks List iterators = new ArrayList<>(); - for (Chunk chunk : chunks) { + for (int i = shard; i < chunks.size(); i += shards) { + var chunk = chunks.get(i); if (chunk.itemCount > 0) { iterators.add(chunk.newReader()); } } - LongMinHeap heap = LongMinHeap.newArrayHeap(iterators.size()); - for (int i = 0; i < iterators.size(); i++) { - heap.push(i, iterators.get(i).nextKey()); - } - return new Iterator<>() { - @Override - public boolean hasNext() { - return !heap.isEmpty(); - } - - @Override - public SortableFeature next() { - int i = heap.peekId(); - Reader iterator = iterators.get(i); - assert iterator != null; - SortableFeature next = iterator.next(); - if (iterator.hasNext()) { - heap.updateHead(iterator.nextKey()); - } else { - heap.poll(); - } - return next; - } - }; + return LongMerger.mergeIterators(iterators); } public int chunks() { @@ -299,8 +280,6 @@ class ExternalMergeSort implements FeatureSort { @Override void close(); - - long nextKey(); } /** Compresses bytes with minimal impact on write performance. Equivalent to {@code gzip -1} */ @@ -412,11 +391,6 @@ class ExternalMergeSort implements FeatureSort { return current; } - @Override - public final long nextKey() { - return next.key(); - } - abstract SortableFeature readNextFeature(); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java index 1cda4e96..3b1dc373 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java @@ -14,6 +14,7 @@ import com.onthegomap.planetiler.util.CommonStringEncoder; import com.onthegomap.planetiler.util.DiskBacked; import com.onthegomap.planetiler.util.Hashing; import com.onthegomap.planetiler.util.LayerStats; +import com.onthegomap.planetiler.worker.Worker; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; @@ -58,6 +59,7 @@ public final class FeatureGroup implements Iterable, private final CommonStringEncoder commonStrings; private final Stats stats; private final LayerStats layerStats = new LayerStats(); + private volatile boolean prepared = false; FeatureGroup(FeatureSort sorter, Profile profile, CommonStringEncoder commonStrings, Stats stats) { this.sorter = sorter; @@ -122,6 +124,20 @@ public final class FeatureGroup implements Iterable, } } + static GeometryType decodeGeomType(byte geomTypeAndScale) { + return GeometryType.valueOf((byte) (geomTypeAndScale & 0b111)); + } + + static int decodeScale(byte geomTypeAndScale) { + return (geomTypeAndScale & 0xff) >>> 3; + } + + static byte encodeGeomTypeAndScale(VectorTile.VectorGeometry geometry) { + assert geometry.geomType().asByte() >= 0 && geometry.geomType().asByte() <= 8; + assert geometry.scale() >= 0 && geometry.scale() < (1 << 5); + return (byte) ((geometry.geomType().asByte() & 0xff) | (geometry.scale() << 3)); + } + /** * Returns statistics about each layer written through {@link #newRenderedFeatureEncoder()} including min/max zoom, * features on elements in that layer, and their types. @@ -134,8 +150,6 @@ public final class FeatureGroup implements Iterable, return sorter.numFeaturesWritten(); } - public interface RenderedFeatureEncoder extends Function, Closeable {} - /** Returns a function for a single thread to use to serialize rendered features. */ public RenderedFeatureEncoder newRenderedFeatureEncoder() { return new RenderedFeatureEncoder() { @@ -233,32 +247,32 @@ public final class FeatureGroup implements Iterable, return packer.toByteArray(); } - static GeometryType decodeGeomType(byte geomTypeAndScale) { - return GeometryType.valueOf((byte) (geomTypeAndScale & 0b111)); - } - - static int decodeScale(byte geomTypeAndScale) { - return (geomTypeAndScale & 0xff) >>> 3; - } - - static byte encodeGeomTypeAndScale(VectorTile.VectorGeometry geometry) { - assert geometry.geomType().asByte() >= 0 && geometry.geomType().asByte() <= 8; - assert geometry.scale() >= 0 && geometry.scale() < (1 << 5); - return (byte) ((geometry.geomType().asByte() & 0xff) | (geometry.scale() << 3)); - } - /** Returns a new feature writer that can be used for a single thread. */ public CloseableConusmer writerForThread() { return sorter.writerForThread(); } - private volatile boolean prepared = false; - - /** Iterates through features grouped by tile ID. */ @Override public Iterator iterator() { prepare(); - Iterator entries = sorter.iterator(); + return groupIntoTiles(sorter.iterator()); + } + + /** + * Reads temp features using {@code threads} parallel threads and merges into a sorted list. + * + * @param threads The number of parallel read threads to spawn + * @return a {@link Reader} with a handle to the new read threads that were spawned, and in {@link Iterable} that can + * be used to iterate over the results. + */ + public Reader parallelIterator(int threads) { + prepare(); + var parIter = sorter.parallelIterator(stats, threads); + return new Reader(parIter.reader(), () -> groupIntoTiles(parIter.iterator())); + } + + private Iterator groupIntoTiles(Iterator entries) { + // entries are sorted by tile ID, so group consecutive entries in same tile into tiles if (!entries.hasNext()) { return Collections.emptyIterator(); } @@ -315,6 +329,10 @@ public final class FeatureGroup implements Iterable, } } + public interface RenderedFeatureEncoder extends Function, Closeable {} + + public record Reader(Worker readWorker, Iterable result) {} + /** Features contained in a single tile. */ public class TileFeatures { @@ -328,6 +346,18 @@ public final class FeatureGroup implements Iterable, this.tileCoord = TileCoord.decode(tileCoord); } + private static void unscale(List features) { + for (int i = 0; i < features.size(); i++) { + var feature = features.get(i); + if (feature != null) { + VectorTile.VectorGeometry geometry = feature.geometry(); + if (geometry.scale() != 0) { + features.set(i, feature.copyWithNewGeometry(geometry.unscale())); + } + } + } + } + /** Returns the number of features read including features discarded from being over the limit in a group. */ public long getNumFeaturesProcessed() { return numFeaturesProcessed.get(); @@ -449,18 +479,6 @@ public final class FeatureGroup implements Iterable, return encoder; } - private static void unscale(List features) { - for (int i = 0; i < features.size(); i++) { - var feature = features.get(i); - if (feature != null) { - VectorTile.VectorGeometry geometry = feature.geometry(); - if (geometry.scale() != 0) { - features.set(i, feature.copyWithNewGeometry(geometry.unscale())); - } - } - } - } - private void postProcessAndAddLayerFeatures(VectorTile encoder, String layer, List features) { try { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java index 99b10cc0..b9fbeb38 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java @@ -1,13 +1,17 @@ package com.onthegomap.planetiler.collection; +import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.util.CloseableConusmer; import com.onthegomap.planetiler.util.DiskBacked; import com.onthegomap.planetiler.util.MemoryEstimator; +import com.onthegomap.planetiler.worker.WeightedHandoffQueue; +import com.onthegomap.planetiler.worker.Worker; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.stream.IntStream; import javax.annotation.concurrent.NotThreadSafe; /** @@ -55,11 +59,21 @@ interface FeatureSort extends Iterable, DiskBacked, MemoryEstim return 0; } - @Override public Iterator iterator() { return list.iterator(); } + + @Override + public Iterator iterator(int shard, int shards) { + if (shard < 0 || shard >= shards) { + throw new IllegalArgumentException("Bad shard params: shard=%d shards=%d".formatted(shard, shards)); + } + return IntStream.range(0, list.size()) + .filter(d -> d % shards == shard) + .mapToObj(list::get) + .iterator(); + } }; } @@ -81,4 +95,45 @@ interface FeatureSort extends Iterable, DiskBacked, MemoryEstim * threads. */ CloseableConusmer writerForThread(); + + @Override + default Iterator iterator() { + return iterator(0, 1); + } + + /** + * Returns an iterator over a subset of the features. + * + * @param shard The index of this iterator + * @param shards The total number of iterators that will be used + * @return An iterator over a subset of features that when combined with all other iterators will iterate over the + * full set. + */ + Iterator iterator(int shard, int shards); + + /** + * Reads temp features using {@code threads} parallel threads and merges into a sorted list. + * + * @param stats Stat tracker + * @param threads The number of parallel read threads to spawn + * @return a {@link ParallelIterator} with a handle to the new read threads that were spawned, and in {@link Iterable} + * that can be used to iterate over the results. + */ + default ParallelIterator parallelIterator(Stats stats, int threads) { + List> queues = IntStream.range(0, threads) + .mapToObj(i -> new WeightedHandoffQueue(500, 10_000)) + .toList(); + Worker reader = new Worker("read", stats, threads, shard -> { + try (var next = queues.get(shard)) { + Iterator entries = iterator(shard, threads); + while (entries.hasNext()) { + next.accept(entries.next(), 1); + } + } + }); + return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues)); + } + + record ParallelIterator(Worker reader, @Override Iterator iterator) + implements Iterable {} } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/HasLongSortKey.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/HasLongSortKey.java new file mode 100644 index 00000000..8482d435 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/HasLongSortKey.java @@ -0,0 +1,12 @@ +package com.onthegomap.planetiler.collection; + +/** + * An item with a {@code long key} that can be used for sorting/grouping. + * + * These items can be sorted or grouped by {@link FeatureSort}/{@link FeatureGroup} implementations. Sorted lists can + * also be merged using {@link LongMerger}. + */ +public interface HasLongSortKey { + /** Value to sort/group items by. */ + long key(); +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/IterableOnce.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/IterableOnce.java index d6822d04..b236f594 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/IterableOnce.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/IterableOnce.java @@ -1,7 +1,6 @@ package com.onthegomap.planetiler.collection; import java.util.Iterator; -import java.util.NoSuchElementException; import java.util.function.Supplier; /** @@ -14,31 +13,6 @@ public interface IterableOnce extends Iterable, Supplier { @Override default Iterator iterator() { - return new Iterator<>() { - T next = null; - boolean stale = true; - - private void advance() { - if (stale) { - next = get(); - stale = false; - } - } - - @Override - public boolean hasNext() { - advance(); - return next != null; - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - stale = true; - return next; - } - }; + return new SupplierIterator<>(this); } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java new file mode 100644 index 00000000..16bcfa9d --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java @@ -0,0 +1,211 @@ +package com.onthegomap.planetiler.collection; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +/** + * A utility for merging sorted lists of items with a {@code long} key to sort by. + */ +public class LongMerger { + // Has a general-purpose KWayMerge implementation using a min heap and specialized (faster) + // TwoWayMerge/ThreeWayMerge implementations when a small number of lists are being merged. + + private LongMerger() {} + + /** Merges sorted items from {@link Supplier Suppliers} that return {@code null} when there are no items left. */ + public static Iterator mergeSuppliers(List> suppliers) { + return mergeIterators(suppliers.stream().map(SupplierIterator::new).toList()); + } + + /** Merges sorted iterators into a combined iterator over all the items. */ + public static Iterator mergeIterators(List> iterators) { + return switch (iterators.size()) { + case 0 -> Collections.emptyIterator(); + case 1 -> iterators.get(0); + case 2 -> new TwoWayMerge<>(iterators.get(0), iterators.get(1)); + case 3 -> new ThreeWayMerge<>(iterators.get(0), iterators.get(1), iterators.get(2)); + default -> new KWayMerge<>(iterators); + }; + } + + private static class TwoWayMerge implements Iterator { + T a, b; + long ak = Long.MAX_VALUE, bk = Long.MAX_VALUE; + final Iterator inputA, inputB; + + TwoWayMerge(Iterator inputA, Iterator inputB) { + this.inputA = inputA; + this.inputB = inputB; + if (inputA.hasNext()) { + a = inputA.next(); + ak = a.key(); + } + if (inputB.hasNext()) { + b = inputB.next(); + bk = b.key(); + } + } + + @Override + public boolean hasNext() { + return a != null || b != null; + } + + @Override + public T next() { + T result; + if (ak < bk) { + result = a; + if (inputA.hasNext()) { + a = inputA.next(); + ak = a.key(); + } else { + a = null; + ak = Long.MAX_VALUE; + } + } else if (bk == Long.MAX_VALUE) { + throw new NoSuchElementException(); + } else { + result = b; + if (inputB.hasNext()) { + b = inputB.next(); + bk = b.key(); + } else { + b = null; + bk = Long.MAX_VALUE; + } + } + return result; + } + } + + private static class ThreeWayMerge implements Iterator { + T a, b, c; + long ak = Long.MAX_VALUE, bk = Long.MAX_VALUE, ck = Long.MAX_VALUE; + final Iterator inputA, inputB, inputC; + + ThreeWayMerge(Iterator inputA, Iterator inputB, Iterator inputC) { + this.inputA = inputA; + this.inputB = inputB; + this.inputC = inputC; + if (inputA.hasNext()) { + a = inputA.next(); + ak = a.key(); + } + if (inputB.hasNext()) { + b = inputB.next(); + bk = b.key(); + } + if (inputC.hasNext()) { + c = inputC.next(); + ck = c.key(); + } + } + + @Override + public boolean hasNext() { + return a != null || b != null || c != null; + } + + @Override + public T next() { + T result; + // use at most 2 comparisons to get the next item + if (ak < bk) { + if (ak < ck) { + // ACB / ABC + result = a; + if (inputA.hasNext()) { + a = inputA.next(); + ak = a.key(); + } else { + a = null; + ak = Long.MAX_VALUE; + } + } else { + // CBA + result = c; + if (inputC.hasNext()) { + c = inputC.next(); + ck = c.key(); + } else { + c = null; + ck = Long.MAX_VALUE; + } + } + } else if (ck < bk) { + // CAB + result = c; + if (inputC.hasNext()) { + c = inputC.next(); + ck = c.key(); + } else { + c = null; + ck = Long.MAX_VALUE; + } + } else if (bk == Long.MAX_VALUE) { + throw new NoSuchElementException(); + } else { + // BAC / BCA + result = b; + if (inputB.hasNext()) { + b = inputB.next(); + bk = b.key(); + } else { + b = null; + bk = Long.MAX_VALUE; + } + } + return result; + } + } + + private static class KWayMerge implements Iterator { + private final T[] items; + private final Iterator[] iterators; + private final LongMinHeap heap; + + @SuppressWarnings("unchecked") + KWayMerge(List> inputIterators) { + this.iterators = new Iterator[inputIterators.size()]; + this.items = (T[]) new HasLongSortKey[inputIterators.size()]; + this.heap = LongMinHeap.newArrayHeap(inputIterators.size()); + int outIdx = 0; + for (Iterator iter : inputIterators) { + if (iter.hasNext()) { + var item = iter.next(); + items[outIdx] = item; + iterators[outIdx] = iter; + heap.push(outIdx++, item.key()); + } + } + } + + @Override + public boolean hasNext() { + return !heap.isEmpty(); + } + + @Override + public T next() { + if (heap.isEmpty()) { + throw new NoSuchElementException(); + } + int id = heap.peekId(); + T result = items[id]; + Iterator iterator = iterators[id]; + if (iterator.hasNext()) { + T next = iterator.next(); + items[id] = next; + heap.updateHead(next.key()); + } else { + items[id] = null; + heap.poll(); + } + return result; + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SortableFeature.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SortableFeature.java index 2abe06ec..e5df6c48 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SortableFeature.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SortableFeature.java @@ -2,7 +2,7 @@ package com.onthegomap.planetiler.collection; import java.util.Arrays; -public record SortableFeature(long key, byte[] value) implements Comparable { +public record SortableFeature(@Override long key, byte[] value) implements Comparable, HasLongSortKey { @Override public int compareTo(SortableFeature o) { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SupplierIterator.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SupplierIterator.java new file mode 100644 index 00000000..fa2426d5 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/SupplierIterator.java @@ -0,0 +1,41 @@ +package com.onthegomap.planetiler.collection; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +/** + * Adapts a {@link Supplier} that returns {@code null} when no items are left to an {@link Iterator} where + * {@link #hasNext()} returns {@code false} when there are no items left. + */ +public class SupplierIterator implements Iterator { + private final Supplier supplier; + T next = null; + boolean stale = true; + + public SupplierIterator(Supplier supplier) { + this.supplier = supplier; + } + + private void advance() { + if (stale) { + next = supplier.get(); + stale = false; + } + } + + @Override + public boolean hasNext() { + advance(); + return next != null; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + stale = true; + return next; + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java index dbf45b0d..86286e9f 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java @@ -14,6 +14,7 @@ public record PlanetilerConfig( int threads, int featureWriteThreads, int featureProcessThreads, + int featureReadThreads, Duration logInterval, int minzoom, int maxzoom, @@ -92,6 +93,8 @@ public record PlanetilerConfig( threads, featureWriteThreads, featureProcessThreads, + arguments.getInteger("feature_read_threads", "number of threads to use when reading features at tile write time", + threads < 32 ? 1 : 2), arguments.getDuration("loginterval", "time between logs", "10s"), arguments.getInteger("minzoom", "minimum zoom level", MIN_MINZOOM), arguments.getInteger("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM), diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/MbtilesWriter.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/MbtilesWriter.java index 39e746bf..bf17e1de 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/MbtilesWriter.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/MbtilesWriter.java @@ -18,6 +18,7 @@ import com.onthegomap.planetiler.util.FileUtils; import com.onthegomap.planetiler.util.Format; import com.onthegomap.planetiler.util.LayerStats; import com.onthegomap.planetiler.worker.WorkQueue; +import com.onthegomap.planetiler.worker.Worker; import com.onthegomap.planetiler.worker.WorkerPipeline; import java.io.IOException; import java.nio.file.Path; @@ -58,13 +59,13 @@ public class MbtilesWriter { private final Counter.Readable[] tilesByZoom; private final Counter.Readable[] totalTileSizesByZoom; private final LongAccumulator[] maxTileSizesByZoom; - private final FeatureGroup features; + private final Iterable inputTiles; private final AtomicReference lastTileWritten = new AtomicReference<>(); private final MbtilesMetadata mbtilesMetadata; - private MbtilesWriter(FeatureGroup features, Mbtiles db, PlanetilerConfig config, MbtilesMetadata mbtilesMetadata, - Stats stats, LayerStats layerStats) { - this.features = features; + private MbtilesWriter(Iterable inputTiles, Mbtiles db, PlanetilerConfig config, + MbtilesMetadata mbtilesMetadata, Stats stats, LayerStats layerStats) { + this.inputTiles = inputTiles; this.db = db; this.config = config; this.mbtilesMetadata = mbtilesMetadata; @@ -102,7 +103,27 @@ public class MbtilesWriter { public static void writeOutput(FeatureGroup features, Mbtiles output, DiskBacked fileSize, MbtilesMetadata mbtilesMetadata, PlanetilerConfig config, Stats stats) { var timer = stats.startStage("mbtiles"); - MbtilesWriter writer = new MbtilesWriter(features, output, config, mbtilesMetadata, stats, + + int readThreads = config.featureReadThreads(); + int threads = config.threads(); + int processThreads = threads < 10 ? threads : threads - readThreads; + + // when using more than 1 read thread: (N read threads) -> (1 merge thread) -> ... + // when using 1 read thread we just have: (1 read & merge thread) -> ... + Worker readWorker = null; + Iterable inputTiles; + String secondStageName; + if (readThreads == 1) { + secondStageName = "read"; + inputTiles = features; + } else { + secondStageName = "merge"; + var reader = features.parallelIterator(readThreads); + inputTiles = reader.result(); + readWorker = reader.readWorker(); + } + + MbtilesWriter writer = new MbtilesWriter(inputTiles, output, config, mbtilesMetadata, stats, features.layerStats()); var pipeline = WorkerPipeline.start("mbtiles", stats); @@ -124,7 +145,7 @@ public class MbtilesWriter { */ WorkQueue writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats); encodeBranch = pipeline - .fromGenerator("read", next -> { + .fromGenerator(secondStageName, next -> { var writerEnqueuer = writerQueue.threadLocalWriter(); writer.readFeaturesAndBatch(batch -> { next.accept(batch); @@ -134,7 +155,7 @@ public class MbtilesWriter { // use only 1 thread since readFeaturesAndBatch needs to be single-threaded }, 1) .addBuffer("reader_queue", queueSize) - .sinkTo("encode", config.threads(), writer::tileEncoderSink); + .sinkTo("encode", processThreads, writer::tileEncoderSink); // the tile writer will wait on the result of each batch to ensure tiles are written in order writeBranch = pipeline.readFromQueue(writerQueue) @@ -147,9 +168,9 @@ public class MbtilesWriter { */ encodeBranch = pipeline // use only 1 thread since readFeaturesAndBatch needs to be single-threaded - .fromGenerator("read", writer::readFeaturesAndBatch, 1) + .fromGenerator(secondStageName, writer::readFeaturesAndBatch, 1) .addBuffer("reader_queue", queueSize) - .addWorker("encoder", config.threads(), writer::tileEncoder) + .addWorker("encoder", processThreads, writer::tileEncoder) .addBuffer("writer_queue", queueSize) // use only 1 thread since tileWriter needs to be single-threaded .sinkTo("write", 1, writer::tileWriter); @@ -162,8 +183,11 @@ public class MbtilesWriter { .addFileSize(fileSize) .newLine() .addProcessStats() - .newLine() - .addPipelineStats(encodeBranch) + .newLine(); + if (readWorker != null) { + loggers.addThreadPoolStats("read", readWorker); + } + loggers.addPipelineStats(encodeBranch) .addPipelineStats(writeBranch) .newLine() .add(writer::getLastTileLogDetails); @@ -197,7 +221,7 @@ public class MbtilesWriter { TileBatch batch = new TileBatch(); long featuresInThisBatch = 0; long tilesInThisBatch = 0; - for (var feature : features) { + for (var feature : inputTiles) { int z = feature.tileCoord().z(); if (z != currentZoom) { LOGGER.trace("Starting z{}", z); diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/IntConsumerThatThrows.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/IntConsumerThatThrows.java new file mode 100644 index 00000000..49347f2a --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/IntConsumerThatThrows.java @@ -0,0 +1,21 @@ +package com.onthegomap.planetiler.worker; + +import static com.onthegomap.planetiler.util.Exceptions.throwFatalException; + +/** + * A function that takes an integer can throw checked exceptions. + */ +@FunctionalInterface +public interface IntConsumerThatThrows { + + @SuppressWarnings("java:S112") + void accept(int value) throws Exception; + + default void runAndWrapException(int value) { + try { + accept(value); + } catch (Exception e) { + throwFatalException(e); + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java index d1b9b01b..d2c0ea73 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/RunnableThatThrows.java @@ -1,18 +1,21 @@ package com.onthegomap.planetiler.worker; +import static com.onthegomap.planetiler.util.Exceptions.throwFatalException; + /** * A function that can throw checked exceptions. */ @FunctionalInterface public interface RunnableThatThrows { + @SuppressWarnings("java:S112") void run() throws Exception; default void runAndWrapException() { try { run(); } catch (Exception e) { - throw new RuntimeException(e); + throwFatalException(e); } } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/Worker.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/Worker.java index daeb91b4..aec95fe5 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/Worker.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/Worker.java @@ -38,19 +38,34 @@ public class Worker { */ @SuppressWarnings("java:S1181") public Worker(String prefix, Stats stats, int threads, RunnableThatThrows task) { + this(prefix, stats, threads, i -> task.run()); + } + + /** + * Constructs a new reader and immediately starts {@code threads} thread all running {@code task}. + * + * @param prefix string ID to add to logs and stats + * @param stats stats collector for this thread pool + * @param threads number of parallel threads to run {@code task} in + * @param task the work to do in each thread, called with the ID of this thread, from {@code 0} to + * {@code threads - 1}. + */ + @SuppressWarnings("java:S1181") + public Worker(String prefix, Stats stats, int threads, IntConsumerThatThrows task) { this.prefix = prefix; stats.gauge(prefix + "_threads", threads); var es = Executors.newFixedThreadPool(threads, new NamedThreadFactory(prefix)); String parentStage = LogUtil.getStage(); List> results = new ArrayList<>(); for (int i = 0; i < threads; i++) { + final int threadId = i; results.add(CompletableFuture.runAsync(() -> { LogUtil.setStage(parentStage, prefix); String id = Thread.currentThread().getName(); LOGGER.trace("Starting worker"); try { long start = System.nanoTime(); - task.run(); + task.accept(threadId); stats.timers().finishedWorker(prefix, Duration.ofNanos(System.nanoTime() - start)); } catch (Throwable e) { System.err.println("Worker " + id + " died"); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java index b9b2c599..60e82ea1 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java @@ -3,7 +3,6 @@ package com.onthegomap.planetiler; import static com.onthegomap.planetiler.TestUtils.*; import static org.junit.jupiter.api.Assertions.*; -import com.onthegomap.planetiler.TestUtils.OsmXml; import com.onthegomap.planetiler.collection.FeatureGroup; import com.onthegomap.planetiler.collection.LongLongMap; import com.onthegomap.planetiler.collection.LongLongMultimap; @@ -83,6 +82,8 @@ class PlanetilerTests { ); private final Stats stats = Stats.inMemory(); + @TempDir + Path tempDir; private static T with(T elem, Consumer fn) { fn.accept(elem); @@ -1574,19 +1575,20 @@ class PlanetilerTests { assertEquals(11, results.tiles.size()); } - @Test - void testPlanetilerRunner(@TempDir Path tempDir) throws Exception { + @ParameterizedTest + @ValueSource(strings = { + "", + "--write-threads=2 --process-threads=2 --feature-read-threads=2 --threads=4", + "--emit-tiles-in-order=false", + "--free-osm-after-read", + }) + void testPlanetilerRunner(String args) throws Exception { Path originalOsm = TestUtils.pathToResource("monaco-latest.osm.pbf"); Path mbtiles = tempDir.resolve("output.mbtiles"); Path tempOsm = tempDir.resolve("monaco-temp.osm.pbf"); Files.copy(originalOsm, tempOsm); Planetiler.create(Arguments.fromArgs( - "--tmpdir", tempDir.toString(), - "--free-osm-after-read", - // ensure we exercise the multi-threaded code - "--write-threads=2", - "--process-threads=2", - "--threads=4" + ("--tmpdir" + tempDir + " " + args).split("\\s+") )) .setProfile(new Profile.NullProfile() { @Override @@ -1603,7 +1605,9 @@ class PlanetilerTests { .run(); // make sure it got deleted after write - assertFalse(Files.exists(tempOsm)); + if (args.contains("free-osm-after-read")) { + assertFalse(Files.exists(tempOsm)); + } try (Mbtiles db = Mbtiles.newReadOnlyDatabase(mbtiles)) { int features = 0; @@ -1631,7 +1635,7 @@ class PlanetilerTests { } @Test - void testPlanetilerMemoryCheck(@TempDir Path tempDir) { + void testPlanetilerMemoryCheck() { assertThrows(Exception.class, () -> runWithProfile(tempDir, new Profile.NullProfile() { @Override public long estimateIntermediateDiskBytes(long osmSize) { @@ -1656,7 +1660,7 @@ class PlanetilerTests { } @Test - void testPlanetilerMemoryCheckForce(@TempDir Path tempDir) throws Exception { + void testPlanetilerMemoryCheckForce() throws Exception { runWithProfile(tempDir, new Profile.NullProfile() { @Override public long estimateIntermediateDiskBytes(long osmSize) { diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java index d302a6e5..a3f49da4 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java @@ -93,6 +93,20 @@ class FeatureGroupTest { return map; } + + private Map>> getFeaturesParallel() { + Map>> map = new TreeMap<>(); + var reader = features.parallelIterator(2); + for (FeatureGroup.TileFeatures tile : reader.result()) { + for (var feature : VectorTile.decode(tile.getVectorTileEncoder().encode())) { + map.computeIfAbsent(tile.tileCoord().encoded(), (i) -> new TreeMap<>()) + .computeIfAbsent(feature.layer(), l -> new ArrayList<>()) + .add(new Feature(feature.attrs(), decodeSilently(feature.geometry()))); + } + } + return map; + } + private record Feature(Map attrs, Geometry geom) {} @Test @@ -125,6 +139,36 @@ class FeatureGroupTest { )))), getFeatures()); } + @Test + void testShardedRead() { + put(3, "layer3", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)); + put(3, "layer4", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)); + put(2, "layer", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)); + put(1, "layer", Map.of("a", 1, "b", 2L), newPoint(1, 2)); + put(1, "layer2", Map.of("c", 3d, "d", true), newPoint(3, 4)); + sorter.sort(); + assertEquals(new TreeMap<>(Map.of( + 1, new TreeMap<>(Map.of( + "layer", List.of( + new Feature(Map.of("a", 1L, "b", 2L), newPoint(1, 2)) + ), + "layer2", List.of( + new Feature(Map.of("c", 3d, "d", true), newPoint(3, 4)) + ) + )), 2, new TreeMap<>(Map.of( + "layer", List.of( + new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)) + ) + )), 3, new TreeMap<>(Map.of( + "layer3", List.of( + new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)) + ), + "layer4", List.of( + new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6)) + ) + )))), getFeaturesParallel()); + } + @Test void testPutPointsWithSortKey() { putWithSortKey( diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/IterableOnceTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/IterableOnceTest.java index f48bc12b..81c32338 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/IterableOnceTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/IterableOnceTest.java @@ -15,6 +15,7 @@ import java.util.Set; import java.util.stream.Stream; import org.junit.jupiter.api.Test; +// also tests SupplierIterator class IterableOnceTest { @Test diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java new file mode 100644 index 00000000..e0a8f896 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java @@ -0,0 +1,183 @@ +package com.onthegomap.planetiler.collection; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Supplier; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class LongMergerTest { + record Item(long key) implements HasLongSortKey {} + record ItemList(List items) {} + + private static ItemList list(long... items) { + return new ItemList(LongStream.of(items).mapToObj(Item::new).toList()); + } + + private static List merge(ItemList... lists) { + List list = new ArrayList<>(); + var iter = LongMerger.mergeIterators(Stream.of(lists) + .map(d -> d.items.iterator()) + .toList()); + iter.forEachRemaining(item -> list.add(item.key)); + assertThrows(NoSuchElementException.class, iter::next); + return list; + } + + @Test + void testMergeEmpty() { + assertEquals(List.of(), merge()); + } + + @Test + void testMergeSupplier() { + List list = new ArrayList<>(); + var iter = LongMerger.mergeSuppliers(Stream.of(new ItemList[]{list(1, 2)}) + .map(d -> d.items.iterator()) + .>map(d -> () -> { + try { + return d.next(); + } catch (NoSuchElementException e) { + return null; + } + }) + .toList()); + iter.forEachRemaining(item -> list.add(item.key)); + assertThrows(NoSuchElementException.class, iter::next); + assertEquals(List.of(1L, 2L), list); + } + + @Test + void testMerge1() { + assertEquals(List.of(), merge(list())); + assertEquals(List.of(1L), merge(list(1))); + assertEquals(List.of(1L, 2L), merge(list(1, 2))); + } + + @ParameterizedTest + @CsvSource(value = { + ",,", + "1,,1", + "1,1,1 1", + "1 2,,1 2", + "1 2,2 3,1 2 2 3", + "1,2,1 2", + "1 2,3,1 2 3", + "1 3,2,1 2 3", + }, nullValues = {"null"}) + void testMerge2(String a, String b, String output) { + var listA = list(parse(a)); + var listB = list(parse(b)); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listA, listB) + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listA) + ); + } + + @ParameterizedTest + @CsvSource(value = { + ",,,", + "1,,,1", + "1,1,1,1 1 1", + "1 2,,,1 2", + "1 2,2 3,,1 2 2 3", + "1,2,,1 2", + "1,2,3,1 2 3", + "1 2,3,4,1 2 3 4", + "1 3,2,4,1 2 3 4", + }, nullValues = {""}) + void testMerge3(String a, String b, String c, String output) { + var listA = list(parse(a)); + var listB = list(parse(b)); + var listC = list(parse(c)); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listA, listB, listC), + "ABC" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listA, listC, listB), + "ACB" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listA, listC), + "BAC" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listC, listA), + "BCA" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listC, listA, listB), + "CAB" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listC, listB, listA), + "CBA" + ); + } + + @ParameterizedTest + @CsvSource(value = { + ",,,,", + "1,,,,1", + "1,1,1,1,1 1 1 1", + "1 2,,,,1 2", + "1 2,3,,,1 2 3", + "1 3,2,,,1 2 3", + "1 3,2 4,,,1 2 3 4", + "1 5,2 4,,,1 2 4 5", + "1 2,2 3,,,1 2 2 3", + }, nullValues = {""}) + void testMerge4(String a, String b, String c, String d, String output) { + var listA = list(parse(a)); + var listB = list(parse(b)); + var listC = list(parse(c)); + var listD = list(parse(d)); + + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listA, listB, listC, listD), + "ABCD" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listA, listC, listD), + "BACD" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listC, listA, listD), + "BCAD" + ); + assertEquals( + LongStream.of(parse(output)).boxed().toList(), + merge(listB, listC, listD, listA), + "BCDA" + ); + } + + private static long[] parse(String in) { + return in == null ? new long[0] : Stream.of(in.split("\\s+")) + .map(String::strip) + .filter(d -> !d.isBlank()) + .mapToLong(Long::parseLong) + .toArray(); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkQueueTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkQueueTest.java index aa352a87..06358091 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkQueueTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkQueueTest.java @@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import com.onthegomap.planetiler.stats.Stats; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -62,11 +61,7 @@ class WorkQueueTest { @Timeout(10) void testTwoWriters() { WorkQueue q = newQueue(2); - AtomicInteger ni = new AtomicInteger(0); - new Worker("worker", stats, 2, () -> { - int i = ni.getAndIncrement(); - q.accept(i); - }).await(); + new Worker("worker", stats, 2, q::accept).await(); q.close(); assertEquals(2, q.getPending()); Set found = new TreeSet<>(); @@ -82,9 +77,7 @@ class WorkQueueTest { @Timeout(10) void testTwoWritersManyElements() { WorkQueue q = newQueue(2); - AtomicInteger ni = new AtomicInteger(0); - new Worker("worker", stats, 2, () -> { - int i = ni.getAndIncrement(); + new Worker("worker", stats, 2, i -> { q.accept(i * 3); q.accept(i * 3 + 1); q.accept(i * 3 + 2); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkerTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkerTest.java index 7106bb11..3a590c9b 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkerTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/worker/WorkerTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.onthegomap.planetiler.ExpectedException; import com.onthegomap.planetiler.stats.Stats; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -13,9 +12,8 @@ class WorkerTest { @Test @Timeout(10) void testExceptionHandled() { - AtomicInteger counter = new AtomicInteger(0); - var worker = new Worker("prefix", Stats.inMemory(), 4, () -> { - if (counter.incrementAndGet() == 1) { + var worker = new Worker("prefix", Stats.inMemory(), 4, workerNum -> { + if (workerNum == 1) { throw new ExpectedException(); } else { Thread.sleep(5000);