From 726e6d01071b2656a3d01ece4c33ed6fc5184d11 Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Thu, 5 May 2022 22:02:18 -0400 Subject: [PATCH] Parallel temp feature writes using `--write-threads` argument (#213) --- .github/workflows/performance.yml | 4 +- .../BenchmarkExternalMergeSort.java | 32 ++-- .../collection/ExternalMergeSort.java | 165 +++++++++++++----- .../planetiler/collection/FeatureGroup.java | 11 +- .../planetiler/collection/FeatureSort.java | 15 +- .../planetiler/config/PlanetilerConfig.java | 14 +- .../planetiler/reader/SimpleReader.java | 23 ++- .../planetiler/reader/osm/OsmReader.java | 21 ++- .../onthegomap/planetiler/util/BinPack.java | 46 +++++ .../planetiler/util/CloseableConusmer.java | 12 ++ .../planetiler/worker/WorkQueue.java | 2 +- .../planetiler/PlanetilerTests.java | 6 +- .../collection/FeatureGroupTest.java | 5 +- .../collection/FeatureSortTest.java | 61 +++++-- .../planetiler/util/BinBackTest.java | 56 ++++++ sonar-project.properties | 4 +- 16 files changed, 381 insertions(+), 96 deletions(-) create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/util/BinPack.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/util/CloseableConusmer.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/util/BinBackTest.java diff --git a/.github/workflows/performance.yml b/.github/workflows/performance.yml index dcc4a10e..38dc2a12 100644 --- a/.github/workflows/performance.yml +++ b/.github/workflows/performance.yml @@ -72,14 +72,14 @@ jobs: - name: 'Run branch' run: | - rm -f data/out.mbtiles + rm -rf data/out.mbtiles data/tmp cp branch/planetiler-dist/target/*with-deps.jar run.jar java -Xms${{ env.RAM }} -Xmx${{ env.RAM }} -jar run.jar --area="${{ env.AREA }}" "${{ env.BOUNDS_ARG }}" --mbtiles=data/out.mbtiles 2>&1 | tee log ls -alh run.jar | tee -a log cat log | strip-ansi > build-info/branchlogs.txt - name: 'Run base' run: | - rm -f data/out.mbtiles + rm -rf data/out.mbtiles data/tmp cp base/planetiler-dist/target/*with-deps.jar run.jar java -Xms${{ env.RAM }} -Xmx${{ env.RAM }} -jar run.jar --area="${{ env.AREA }}" "${{ env.BOUNDS_ARG }}" --mbtiles=data/out.mbtiles 2>&1 | tee log ls -alh run.jar | tee -a log diff --git a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java index 0af15033..11798069 100644 --- a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java +++ b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/BenchmarkExternalMergeSort.java @@ -39,9 +39,10 @@ public class BenchmarkExternalMergeSort { var config = PlanetilerConfig.defaults(); try { List results = new ArrayList<>(); - for (int limit : List.of(500_000_000, 2_000_000_000)) { - results.add(run(path, number, limit, false, true, true, config)); - results.add(run(path, number, limit, true, true, true, config)); + int limit = 2_000_000_000; + for (int writers : List.of(1, 2, 4)) { + results.add(run(path, writers, number, limit, false, true, true, config)); + results.add(run(path, writers, number, limit, true, true, true, config)); } for (var result : results) { System.err.println(result); @@ -51,17 +52,18 @@ public class BenchmarkExternalMergeSort { } } - private record Results( String write, String read, String sort, - int chunks, long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort, + int chunks, + int writeWorkers, int readWorkers, + long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort, boolean madvise ) {} - private static Results run(Path tmpDir, long items, int chunkSizeLimit, boolean mmap, boolean parallelSort, + private static Results run(Path tmpDir, int writeWorkers, long items, int chunkSizeLimit, boolean mmap, + boolean parallelSort, boolean madvise, PlanetilerConfig config) { boolean gzip = false; - int writeWorkers = 1; int sortWorkers = Runtime.getRuntime().availableProcessors(); int readWorkers = 1; FileUtils.delete(tmpDir); @@ -86,6 +88,8 @@ public class BenchmarkExternalMergeSort { FORMAT.numeric(items * NANOSECONDS_PER_SECOND / readTimer.elapsed().wall().toNanos()) + "/s", FORMAT.duration(sortTimer.elapsed().wall()), sorter.chunks(), + writeWorkers, + readWorkers, items, chunkSizeLimit, gzip, @@ -116,12 +120,14 @@ public class BenchmarkExternalMergeSort { private static void doWrites(int writeWorkers, long items, ExternalMergeSort sorter) { var counters = Counter.newMultiThreadCounter(); var writer = new Worker("write", Stats.inMemory(), writeWorkers, () -> { - var counter = counters.counterForThread(); - var random = ThreadLocalRandom.current(); - long toWrite = items / writeWorkers; - for (long i = 0; i < toWrite; i++) { - sorter.add(new SortableFeature(random.nextLong(), TEST_DATA)); - counter.inc(); + try (var writerForThread = sorter.writerForThread()) { + var counter = counters.counterForThread(); + var random = ThreadLocalRandom.current(); + long toWrite = items / writeWorkers; + for (long i = 0; i < toWrite; i++) { + writerForThread.accept(new SortableFeature(random.nextLong(), TEST_DATA)); + counter.inc(); + } } }); ProgressLoggers loggers = ProgressLoggers.create() diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java index 0674dd80..6210ebd3 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java @@ -7,7 +7,9 @@ import com.onthegomap.planetiler.stats.ProcessInfo; import com.onthegomap.planetiler.stats.ProgressLoggers; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.stats.Timer; +import com.onthegomap.planetiler.util.BinPack; import com.onthegomap.planetiler.util.ByteBufferUtil; +import com.onthegomap.planetiler.util.CloseableConusmer; import com.onthegomap.planetiler.util.FileUtils; import com.onthegomap.planetiler.worker.WorkerPipeline; import java.io.BufferedInputStream; @@ -26,14 +28,17 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.PriorityQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.zip.Deflater; @@ -63,7 +68,8 @@ class ExternalMergeSort implements FeatureSort { private final int chunkSizeLimit; private final int workers; private final AtomicLong features = new AtomicLong(0); - private final List chunks = new ArrayList<>(); + private final List chunks = new CopyOnWriteArrayList<>(); + private final AtomicInteger chunkNum = new AtomicInteger(0); private final boolean gzip; private final PlanetilerConfig config; private final int readerLimit; @@ -72,7 +78,6 @@ class ExternalMergeSort implements FeatureSort { private final boolean parallelSort; private final boolean madvise; private final AtomicBoolean madviseFailed = new AtomicBoolean(false); - private Chunk currentChunk; private volatile boolean sorted = false; ExternalMergeSort(Path tempDir, PlanetilerConfig config, Stats stats) { @@ -118,7 +123,6 @@ class ExternalMergeSort implements FeatureSort { try { FileUtils.deleteDirectory(dir); Files.createDirectories(dir); - newChunk(); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -134,17 +138,8 @@ class ExternalMergeSort implements FeatureSort { } @Override - public void add(SortableFeature item) { - try { - assert !sorted; - features.incrementAndGet(); - currentChunk.add(item); - if (currentChunk.bytesInMemory > chunkSizeLimit) { - newChunk(); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public CloseableConusmer writerForThread() { + return new ThreadLocalWriter(); } @Override @@ -160,9 +155,9 @@ class ExternalMergeSort implements FeatureSort { @Override public void sort() { assert !sorted; - if (currentChunk != null) { + for (var chunk : chunks) { try { - currentChunk.close(); + chunk.close(); } catch (IOException e) { // ok } @@ -175,12 +170,32 @@ class ExternalMergeSort implements FeatureSort { AtomicLong sorting = new AtomicLong(0); AtomicLong doneCounter = new AtomicLong(0); + // we may end up with many small chunks because each thread-local writer starts a new one + // so group together smaller chunks that can be sorted together in-memory to minimize the + // number of chunks that the reader needs to deal with + List> groups = BinPack.pack( + chunks, + chunkSizeLimit, + chunk -> chunk.bytesInMemory + ); + + LOGGER.info("Grouped {} chunks into {}", chunks.size(), groups.size()); + var pipeline = WorkerPipeline.start("sort", stats) - .readFromTiny("item_queue", chunks) - .sinkToConsumer("worker", workers, chunk -> { + .readFromTiny("item_queue", groups) + .sinkToConsumer("worker", workers, group -> { try { readSemaphore.acquire(); - var toSort = time(reading, chunk::readAll); + var chunk = group.get(0); + var others = group.stream().skip(1).toList(); + var toSort = time(reading, () -> { + // merge all chunks into first one, and remove the others + var result = chunk.readAllAndMergeIn(others); + for (var other : others) { + other.remove(); + } + return result; + }); readSemaphore.release(); time(sorting, toSort::sort); @@ -223,6 +238,10 @@ class ExternalMergeSort implements FeatureSort { public Iterator iterator() { assert sorted; + if (chunks.isEmpty()) { + return Collections.emptyIterator(); + } + // k-way merge to interleave all the sorted chunks PriorityQueue> queue = new PriorityQueue<>(chunks.size()); for (Chunk chunk : chunks) { @@ -250,15 +269,6 @@ class ExternalMergeSort implements FeatureSort { }; } - private void newChunk() throws IOException { - Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1)); - chunkPath.toFile().deleteOnExit(); - if (currentChunk != null) { - currentChunk.close(); - } - chunks.add(currentChunk = new Chunk(chunkPath)); - } - public int chunks() { return chunks.size(); } @@ -400,6 +410,50 @@ class ExternalMergeSort implements FeatureSort { abstract SortableFeature readNextFeature(); } + /** Writer that a single thread can use to write features independent of writers used in other threads. */ + @NotThreadSafe + private class ThreadLocalWriter implements CloseableConusmer { + private Chunk currentChunk; + + private ThreadLocalWriter() { + try { + newChunk(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void accept(SortableFeature item) { + assert !sorted; + try { + features.incrementAndGet(); + currentChunk.add(item); + if (currentChunk.bytesInMemory > chunkSizeLimit) { + newChunk(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void newChunk() throws IOException { + Path chunkPath = dir.resolve("chunk" + chunkNum.incrementAndGet()); + FileUtils.deleteOnExit(chunkPath); + if (currentChunk != null) { + currentChunk.close(); + } + chunks.add(currentChunk = new Chunk(chunkPath)); + } + + @Override + public void close() throws IOException { + if (currentChunk != null) { + currentChunk.close(); + } + } + } + /** Write features to the chunk file through a memory-mapped file. */ private class WriterMmap implements Writer { private final FileChannel channel; @@ -467,19 +521,34 @@ class ExternalMergeSort implements FeatureSort { itemCount++; } - private SortableChunk readAll() { - try (var iterator = newReader()) { - SortableFeature[] featuresToSort = new SortableFeature[itemCount]; - int i = 0; - while (iterator.hasNext()) { - featuresToSort[i] = iterator.next(); - i++; + private SortableChunk readAllAndMergeIn(Collection others) { + // first, grow this chunk + int newItems = itemCount; + int newBytes = bytesInMemory; + for (var other : others) { + if (Integer.MAX_VALUE - newItems < other.itemCount) { + throw new IllegalStateException("Too many items in merged chunk: " + itemCount + "+" + + others.stream().map(c -> c.itemCount).toList()); } - if (i != itemCount) { - throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i); + if (Integer.MAX_VALUE - newBytes < other.bytesInMemory) { + throw new IllegalStateException("Too big merged chunk: " + bytesInMemory + "+" + + others.stream().map(c -> c.bytesInMemory).toList()); } - return new SortableChunk(featuresToSort); + newItems += other.itemCount; + newBytes += other.bytesInMemory; } + // then read items from all chunks into memory + SortableChunk result = new SortableChunk(newItems); + result.readAll(this); + itemCount = newItems; + bytesInMemory = newBytes; + for (var other : others) { + result.readAll(other); + } + if (result.i != itemCount) { + throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + result.i); + } + return result; } private Writer newWriter(Path path) { @@ -495,15 +564,21 @@ class ExternalMergeSort implements FeatureSort { writer.close(); } + public void remove() { + chunks.remove(this); + FileUtils.delete(path); + } + /** * A container for all features in a chunk read into memory for sorting. */ private class SortableChunk { private SortableFeature[] featuresToSort; + private int i = 0; - private SortableChunk(SortableFeature[] featuresToSort) { - this.featuresToSort = featuresToSort; + private SortableChunk(int itemCount) { + this.featuresToSort = new SortableFeature[itemCount]; } public SortableChunk sort() { @@ -526,6 +601,14 @@ class ExternalMergeSort implements FeatureSort { throw new UncheckedIOException(e); } } + + private void readAll(Chunk chunk) { + try (var iterator = chunk.newReader()) { + while (iterator.hasNext()) { + featuresToSort[i++] = iterator.next(); + } + } + } } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java index e8e0fce1..74bd9def 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java @@ -9,6 +9,7 @@ import com.onthegomap.planetiler.geo.GeometryType; import com.onthegomap.planetiler.geo.TileCoord; import com.onthegomap.planetiler.render.RenderedFeature; import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.CloseableConusmer; import com.onthegomap.planetiler.util.CommonStringEncoder; import com.onthegomap.planetiler.util.DiskBacked; import com.onthegomap.planetiler.util.LayerStats; @@ -44,8 +45,7 @@ import org.slf4j.LoggerFactory; * supported (see {@link CommonStringEncoder}) */ @NotThreadSafe -public final class FeatureGroup implements Consumer, Iterable, - DiskBacked { +public final class FeatureGroup implements Iterable, DiskBacked { public static final int SORT_KEY_BITS = 23; public static final int SORT_KEY_MAX = (1 << (SORT_KEY_BITS - 1)) - 1; @@ -246,10 +246,9 @@ public final class FeatureGroup implements Consumer, Iterable writerForThread() { + return sorter.writerForThread(); } private volatile boolean prepared = false; diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java index 4a6166d0..99b10cc0 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java @@ -1,8 +1,10 @@ package com.onthegomap.planetiler.collection; +import com.onthegomap.planetiler.util.CloseableConusmer; import com.onthegomap.planetiler.util.DiskBacked; import com.onthegomap.planetiler.util.MemoryEstimator; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -26,7 +28,7 @@ interface FeatureSort extends Iterable, DiskBacked, MemoryEstim /** Returns a feature sorter that sorts all features in memory. Suitable for toy examples (unit tests). */ static FeatureSort newInMemory() { - List list = new ArrayList<>(); + List list = Collections.synchronizedList(new ArrayList<>()); return new FeatureSort() { @Override public void sort() { @@ -39,8 +41,8 @@ interface FeatureSort extends Iterable, DiskBacked, MemoryEstim } @Override - public void add(SortableFeature newEntry) { - list.add(newEntry); + public CloseableConusmer writerForThread() { + return list::add; } @Override @@ -74,6 +76,9 @@ interface FeatureSort extends Iterable, DiskBacked, MemoryEstim return list; } - void add(SortableFeature newEntry); - + /** + * Returns a new writer that can be used to write features from a single thread independent of writers used from other + * threads. + */ + CloseableConusmer writerForThread(); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java index 867b314d..67bf2878 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java @@ -12,6 +12,8 @@ public record PlanetilerConfig( Arguments arguments, Bounds bounds, int threads, + int featureWriteThreads, + int featureProcessThreads, Duration logInterval, int minzoom, int maxzoom, @@ -75,10 +77,20 @@ public record PlanetilerConfig( "default storage type for temporary data, one of " + Stream.of(Storage.values()).map( Storage::id).toList(), fallbackTempStorage); + int threads = arguments.threads(); + int featureWriteThreads = + arguments.getInteger("write_threads", "number of threads to use when writing temp features", + // defaults: <48 cpus=1 writer, 48-80=2 writers, 80-112=3 writers, 112-144=4 writers, ... + Math.max(1, (threads - 16) / 32 + 1)); + int featureProcessThreads = + arguments.getInteger("process_threads", "number of threads to use when processing input features", + Math.max(threads < 4 ? threads : (threads - featureWriteThreads), 1)); return new PlanetilerConfig( arguments, new Bounds(arguments.bounds("bounds", "bounds")), - arguments.threads(), + threads, + featureWriteThreads, + featureProcessThreads, arguments.getDuration("loginterval", "time between logs", "10s"), arguments.getInteger("minzoom", "minimum zoom level", MIN_MINZOOM), arguments.getInteger("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM), diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/SimpleReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/SimpleReader.java index 1915a985..b1cf867d 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/SimpleReader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/SimpleReader.java @@ -11,6 +11,7 @@ import com.onthegomap.planetiler.stats.ProgressLoggers; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.worker.WorkerPipeline; import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.locationtech.jts.geom.Envelope; @@ -48,7 +49,8 @@ public abstract class SimpleReader implements Closeable { public final void process(FeatureGroup writer, PlanetilerConfig config) { var timer = stats.startStage(sourceName); long featureCount = getCount(); - int threads = config.threads(); + int writeThreads = config.featureWriteThreads(); + int processThreads = config.featureProcessThreads(); Envelope latLonBounds = config.bounds().latLon(); AtomicLong featuresRead = new AtomicLong(0); AtomicLong featuresWritten = new AtomicLong(0); @@ -56,7 +58,7 @@ public abstract class SimpleReader implements Closeable { var pipeline = WorkerPipeline.start(sourceName, stats) .fromGenerator("read", read()) .addBuffer("read_queue", 1000) - .addWorker("process", threads, (prev, next) -> { + .addWorker("process", processThreads, (prev, next) -> { var featureCollectors = new FeatureCollector.Factory(config, stats); try (FeatureRenderer renderer = newFeatureRenderer(writer, config, next)) { for (SourceFeature sourceFeature : prev) { @@ -78,9 +80,13 @@ public abstract class SimpleReader implements Closeable { // output large batches since each input may map to many tiny output features (i.e. slicing ocean tiles) // which turns enqueueing into the bottleneck .addBuffer("write_queue", 50_000, 1_000) - .sinkToConsumer("write", 1, item -> { - featuresWritten.incrementAndGet(); - writer.accept(item); + .sinkTo("write", writeThreads, prev -> { + try (var threadLocalWriter = writer.writerForThread()) { + for (var item : prev) { + featuresWritten.incrementAndGet(); + threadLocalWriter.accept(item); + } + } }); var loggers = ProgressLoggers.create() @@ -95,8 +101,13 @@ public abstract class SimpleReader implements Closeable { pipeline.awaitAndLog(loggers, config.logInterval()); // hook for profile to do any post-processing after this source is read - try (var featureRenderer = newFeatureRenderer(writer, config, writer)) { + try ( + var threadLocalWriter = writer.writerForThread(); + var featureRenderer = newFeatureRenderer(writer, config, threadLocalWriter) + ) { profile.finish(sourceName, new FeatureCollector.Factory(config, stats), featureRenderer); + } catch (IOException e) { + LOGGER.warn("Error closing writer", e); } timer.stop(); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/osm/OsmReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/osm/OsmReader.java index b95e5f36..2ea7e6f8 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/osm/OsmReader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/osm/OsmReader.java @@ -304,8 +304,8 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate { */ public void pass2(FeatureGroup writer, PlanetilerConfig config) { var timer = stats.startStage("osm_pass2"); - int threads = config.threads(); - int processThreads = Math.max(threads < 4 ? threads : (threads - 1), 1); + int writeThreads = config.featureWriteThreads(); + int processThreads = config.featureProcessThreads(); Counter.MultiThreadCounter blocksProcessed = Counter.newMultiThreadCounter(); // track relation count separately because they get enqueued onto the distributor near the end Counter.MultiThreadCounter relationsProcessed = Counter.newMultiThreadCounter(); @@ -323,7 +323,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate { var pipeline = WorkerPipeline.start("osm_pass2", stats) .fromGenerator("read", osmBlockSource::forEachBlock) - .addBuffer("pbf_blocks", Math.max(10, threads / 2)) + .addBuffer("pbf_blocks", Math.max(10, processThreads / 2)) .addWorker("process", processThreads, (prev, next) -> { // avoid contention trying to get the thread-local counters by getting them once when thread starts Counter blocks = blocksProcessed.counterForThread(); @@ -369,7 +369,13 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate { } }).addBuffer("feature_queue", 50_000, 1_000) // FeatureGroup writes need to be single-threaded - .sinkToConsumer("write", 1, writer); + .sinkTo("write", writeThreads, prev -> { + try (var writerForThread = writer.writerForThread()) { + for (var item : prev) { + writerForThread.accept(item); + } + } + }); var logger = ProgressLoggers.create() .addRatePercentCounter("nodes", pass1Phaser.nodes(), pass2Phaser::nodes, true) @@ -393,7 +399,10 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate { timer.stop(); - try (var renderer = createFeatureRenderer(writer, config, writer)) { + try ( + var writerForThread = writer.writerForThread(); + var renderer = createFeatureRenderer(writer, config, writerForThread) + ) { profile.finish(name, new FeatureCollector.Factory(config, stats), renderer); } catch (Exception e) { LOGGER.error("Error calling profile.finish", e); @@ -740,7 +749,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate { @Override protected Geometry computeWorldGeometry() throws GeometryException { - return canBePolygon() ? polygon() : line(); + return super.canBePolygon() ? polygon() : line(); } @Override diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/BinPack.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/BinPack.java new file mode 100644 index 00000000..3229307e --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/BinPack.java @@ -0,0 +1,46 @@ +package com.onthegomap.planetiler.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.function.ToLongFunction; + +/** + * Implements a best-effort 1-D bin packing using the + * best-fit decreasing + * algorithm. + */ +public class BinPack { + private BinPack() {} + + /** + * Returns {@code items} grouped into an approximately minimum number of bins under {@code maxBinSize} according to + * {@code getSize} function. + */ + public static List> pack(Collection items, long maxBinSize, ToLongFunction getSize) { + class Bin { + long size = 0; + final List items = new ArrayList<>(); + } + var descendingItems = items.stream().sorted(Comparator.comparingLong(getSize).reversed()).toList(); + List bins = new ArrayList<>(); + for (var item : descendingItems) { + long size = getSize.applyAsLong(item); + var bestBin = bins.stream() + .filter(b -> maxBinSize - b.size >= size) + // Instead of using the first bin that this element fits in, use the "fullest" bin. + // This makes the algorithm "best-fit decreasing" instead of "first-fit decreasing" + .max(Comparator.comparingLong(bin -> bin.size)); + Bin bin; + if (bestBin.isPresent()) { + bin = bestBin.get(); + } else { + bins.add(bin = new Bin()); + } + bin.items.add(item); + bin.size += size; + } + return bins.stream().map(bin -> bin.items).toList(); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/CloseableConusmer.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/CloseableConusmer.java new file mode 100644 index 00000000..a33c6d8d --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/CloseableConusmer.java @@ -0,0 +1,12 @@ +package com.onthegomap.planetiler.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.function.Consumer; + +@FunctionalInterface +public interface CloseableConusmer extends Consumer, Closeable { + + @Override + default void close() throws IOException {} +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/WorkQueue.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/WorkQueue.java index 7b9d9520..d961509f 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/WorkQueue.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/worker/WorkQueue.java @@ -56,7 +56,7 @@ public class WorkQueue implements AutoCloseable, IterableOnce, Consumer * @param stats stats to monitor this with */ public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { - this.pendingBatchesCapacity = capacity / maxBatch; + this.pendingBatchesCapacity = Math.max(1, capacity / maxBatch); this.batchSize = maxBatch; itemQueue = new ArrayBlockingQueue<>(pendingBatchesCapacity); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java index d37dd77b..2205295a 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java @@ -1579,7 +1579,11 @@ class PlanetilerTests { Files.copy(originalOsm, tempOsm); Planetiler.create(Arguments.fromArgs( "--tmpdir", tempDir.toString(), - "--free-osm-after-read" + "--free-osm-after-read", + // ensure we exercise the multi-threaded code + "--write-threads=2", + "--process-threads=2", + "--threads=4" )) .setProfile(new Profile.NullProfile() { @Override diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java index b379791b..1bed8479 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureGroupTest.java @@ -13,6 +13,7 @@ import com.onthegomap.planetiler.geo.GeometryType; import com.onthegomap.planetiler.geo.TileCoord; import com.onthegomap.planetiler.render.RenderedFeature; import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.CloseableConusmer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,6 +31,7 @@ class FeatureGroupTest { private final FeatureSort sorter = FeatureSort.newInMemory(); private FeatureGroup features = new FeatureGroup(sorter, new Profile.NullProfile(), Stats.inMemory()); + private CloseableConusmer featureWriter = features.writerForThread(); @Test void testEmpty() { @@ -65,7 +67,7 @@ class FeatureGroupTest { sortKey, hasGroup ? Optional.of(new RenderedFeature.Group(group, limit)) : Optional.empty() ); - features.accept(features.newRenderedFeatureEncoder().apply(feature)); + featureWriter.accept(features.newRenderedFeatureEncoder().apply(feature)); } private Map>> getFeatures() { @@ -212,6 +214,7 @@ class FeatureGroupTest { return items; } }, Stats.inMemory()); + featureWriter = features.writerForThread(); putWithGroup( 1, "layer", Map.of("id", 3), newPoint(5, 6), 2, 1, 2 ); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureSortTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureSortTest.java index b19ef17c..2e803efe 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureSortTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/FeatureSortTest.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -21,10 +22,11 @@ class FeatureSortTest { @TempDir Path tmpDir; - private static SortableFeature newEntry(int i) { + private SortableFeature newEntry(int i) { return new SortableFeature(Long.MIN_VALUE + i, new byte[]{(byte) i, (byte) (1 + i)}); } + private FeatureSort newSorter(int workers, int chunkSizeLimit, boolean gzip, boolean mmap) { return new ExternalMergeSort(tmpDir, workers, chunkSizeLimit, gzip, mmap, true, true, config, Stats.inMemory()); @@ -32,7 +34,7 @@ class FeatureSortTest { @Test void testEmpty() { - FeatureSort sorter = newSorter(1, 100, false, false); + var sorter = newSorter(1, 100, false, false); sorter.sort(); assertEquals(List.of(), sorter.toList()); } @@ -40,7 +42,8 @@ class FeatureSortTest { @Test void testSingle() { FeatureSort sorter = newSorter(1, 100, false, false); - sorter.add(newEntry(1)); + var writer = sorter.writerForThread(); + writer.accept(newEntry(1)); sorter.sort(); assertEquals(List.of(newEntry(1)), sorter.toList()); } @@ -48,8 +51,9 @@ class FeatureSortTest { @Test void testTwoItemsOneChunk() { FeatureSort sorter = newSorter(1, 100, false, false); - sorter.add(newEntry(2)); - sorter.add(newEntry(1)); + var writer = sorter.writerForThread(); + writer.accept(newEntry(2)); + writer.accept(newEntry(1)); sorter.sort(); assertEquals(List.of(newEntry(1), newEntry(2)), sorter.toList()); } @@ -57,8 +61,9 @@ class FeatureSortTest { @Test void testTwoItemsTwoChunks() { FeatureSort sorter = newSorter(1, 0, false, false); - sorter.add(newEntry(2)); - sorter.add(newEntry(1)); + var writer = sorter.writerForThread(); + writer.accept(newEntry(2)); + writer.accept(newEntry(1)); sorter.sort(); assertEquals(List.of(newEntry(1), newEntry(2)), sorter.toList()); } @@ -66,14 +71,45 @@ class FeatureSortTest { @Test void testTwoWorkers() { FeatureSort sorter = newSorter(2, 0, false, false); - sorter.add(newEntry(4)); - sorter.add(newEntry(3)); - sorter.add(newEntry(2)); - sorter.add(newEntry(1)); + var writer = sorter.writerForThread(); + writer.accept(newEntry(4)); + writer.accept(newEntry(3)); + writer.accept(newEntry(2)); + writer.accept(newEntry(1)); sorter.sort(); assertEquals(List.of(newEntry(1), newEntry(2), newEntry(3), newEntry(4)), sorter.toList()); } + @Test + void testTwoWriters() { + FeatureSort sorter = newSorter(2, 0, false, false); + var writer1 = sorter.writerForThread(); + var writer2 = sorter.writerForThread(); + writer1.accept(newEntry(4)); + writer1.accept(newEntry(3)); + writer2.accept(newEntry(2)); + writer2.accept(newEntry(1)); + sorter.sort(); + assertEquals(List.of(newEntry(1), newEntry(2), newEntry(3), newEntry(4)), sorter.toList()); + } + + @Test + void testMultipleWritersThatGetCombined() { + FeatureSort sorter = newSorter(2, 2_000_000, false, false); + var writer1 = sorter.writerForThread(); + var writer2 = sorter.writerForThread(); + var writer3 = sorter.writerForThread(); + writer1.accept(newEntry(4)); + writer1.accept(newEntry(3)); + writer2.accept(newEntry(2)); + writer2.accept(newEntry(1)); + writer3.accept(newEntry(5)); + writer3.accept(newEntry(6)); + sorter.sort(); + assertEquals(Stream.of(1, 2, 3, 4, 5, 6).map(this::newEntry).toList(), + sorter.toList()); + } + @ParameterizedTest @CsvSource({ "false,false", @@ -90,7 +126,8 @@ class FeatureSortTest { } Collections.shuffle(shuffled, new Random(0)); FeatureSort sorter = newSorter(2, 20_000, gzip, mmap); - shuffled.forEach(sorter::add); + var writer = sorter.writerForThread(); + shuffled.forEach(writer); sorter.sort(); assertEquals(sorted, sorter.toList()); } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/BinBackTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/BinBackTest.java new file mode 100644 index 00000000..b0cda685 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/BinBackTest.java @@ -0,0 +1,56 @@ +package com.onthegomap.planetiler.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class BinBackTest { + + @ParameterizedTest + @CsvSource(value = { + "3;[];[]", + "2;[1];[[1]]", + "2;[2];[[2]]", + "2;[3];[[3]]", + "3;[1,2,3];[[3], [2, 1]]", + "5;[1,2,3];[[3,2],[1]]", + "6;[1,2,3];[[3,2,1]]", + "2;[1,2,3];[[3],[2],[1]]", + "1;[1,2,3];[[3],[2],[1]]", + }, delimiter = ';') + void test(int limit, String inputString, String expectedString) { + List input = parseList(inputString); + List> expected = parseListList(expectedString); + // make sure we parsed correctly + assertEqualsIgnoringWhitespace(inputString, input, "failed to parse input"); + assertEqualsIgnoringWhitespace(expectedString, expected, "failed to parse expected"); + + assertEquals(expected, BinPack.pack(input, limit, i -> i)); + } + + private static List parseList(String string) { + return Stream.of(string.replaceAll("[\\[\\]]", "").split(",")) + .map(String::strip) + .filter(s -> !s.isBlank()) + .map(Long::parseLong) + .toList(); + } + + private static List> parseListList(String string) { + return Stream.of(string.replaceAll("((^\\[)|(]$))", "").split("]\\s*,\\s*\\[")) + .map(BinBackTest::parseList) + .filter(l -> !l.isEmpty()) + .toList(); + } + + private static void assertEqualsIgnoringWhitespace(Object expected, Object actual, String message) { + assertEquals( + expected.toString().replaceAll("\\s", ""), + actual.toString().replaceAll("\\s", ""), + message + ); + } +} diff --git a/sonar-project.properties b/sonar-project.properties index 400cc957..ea8eda77 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -1,4 +1,4 @@ -sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699,js3776,js1121,js107 +sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699,js3776,js1121,js107,js1192 # subjective sonar.issue.ignore.multicriteria.js1659.ruleKey=java:S1659 sonar.issue.ignore.multicriteria.js1659.resourceKey=**/*.java @@ -14,6 +14,8 @@ sonar.issue.ignore.multicriteria.js1121.ruleKey=java:S1121 sonar.issue.ignore.multicriteria.js1121.resourceKey=**/*.java sonar.issue.ignore.multicriteria.js107.ruleKey=java:S107 sonar.issue.ignore.multicriteria.js107.resourceKey=**/*.java +sonar.issue.ignore.multicriteria.js1192.ruleKey=java:S1192 +sonar.issue.ignore.multicriteria.js1192.resourceKey=**/*.java # layer constructors need same signatures sonar.issue.ignore.multicriteria.js1172.ruleKey=java:S1172