From 09de796fe520fd9b5319e84fbe33c8e11f4331b3 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Mon, 12 Apr 2021 07:14:05 -0400 Subject: [PATCH] refactor topology naming --- .../com/onthegomap/flatmap/MbtilesWriter.java | 11 +- .../flatmap/reader/OpenStreetMapReader.java | 118 +++++++++--------- .../com/onthegomap/flatmap/reader/Reader.java | 11 +- .../onthegomap/flatmap/worker/Topology.java | 66 +++++----- .../onthegomap/flatmap/worker/WorkQueue.java | 2 +- .../com/onthegomap/flatmap/worker/Worker.java | 2 +- 6 files changed, 111 insertions(+), 99 deletions(-) diff --git a/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java b/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java index f60fe374..964cfb54 100644 --- a/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java +++ b/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java @@ -40,11 +40,12 @@ public class MbtilesWriter { output.delete(); MbtilesWriter writer = new MbtilesWriter(config.stats()); - var topology = Topology.readFromIterator("mbtiles_reader", stats, features.getAll()) - .addBuffer("mbtiles_reader_queue", 50_000, 1_000) - .addWorker("mbtiles_encoder", config.threads(), writer::tileEncoder) - .addBuffer("mbtiles_writer_queue", 50_000, 1_000) - .sinkTo("mbtiles_writer", 1, writer::tileWriter); + var topology = Topology.start("mbtiles", stats) + .readFromIterator("reader", features.getAll()) + .addBuffer("reader_queue", 50_000, 1_000) + .addWorker("encoder", config.threads(), writer::tileEncoder) + .addBuffer("writer_queue", 50_000, 1_000) + .sinkTo("writer", 1, writer::tileWriter); var loggers = new ProgressLoggers("mbtiles") .addRatePercentCounter("features", featureCount, writer.featuresProcessed) diff --git a/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java b/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java index 9c0fbf0e..d897d5c0 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java @@ -60,37 +60,38 @@ public class OpenStreetMapReader implements Closeable { public void pass1(FlatMapConfig config) { Profile profile = config.profile(); - var topology = Topology.readFromQueue(stats, - osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats) - ).sinkToConsumer("osm_pass1_processor", 1, (readerElement) -> { - if (readerElement instanceof ReaderNode node) { - TOTAL_NODES.incrementAndGet(); - nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat())); - } else if (readerElement instanceof ReaderWay) { - TOTAL_WAYS.incrementAndGet(); - } else if (readerElement instanceof ReaderRelation rel) { - TOTAL_RELATIONS.incrementAndGet(); - List infos = profile.preprocessOsmRelation(rel); - if (infos != null) { - for (RelationInfo info : infos) { - relationInfo.put(rel.getId(), info); - relationInfoSizes.addAndGet(info.sizeBytes()); + var topology = Topology.start("osm_pass1", stats) + .readFromQueue( + osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats) + ).sinkToConsumer("process", 1, (readerElement) -> { + if (readerElement instanceof ReaderNode node) { + TOTAL_NODES.incrementAndGet(); + nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat())); + } else if (readerElement instanceof ReaderWay) { + TOTAL_WAYS.incrementAndGet(); + } else if (readerElement instanceof ReaderRelation rel) { + TOTAL_RELATIONS.incrementAndGet(); + List infos = profile.preprocessOsmRelation(rel); + if (infos != null) { + for (RelationInfo info : infos) { + relationInfo.put(rel.getId(), info); + relationInfoSizes.addAndGet(info.sizeBytes()); + for (ReaderRelation.Member member : rel.getMembers()) { + if (member.getType() == ReaderRelation.Member.WAY) { + wayToRelations.put(member.getRef(), rel.getId()); + } + } + } + } + if (rel.hasTag("type", "multipolygon")) { for (ReaderRelation.Member member : rel.getMembers()) { if (member.getType() == ReaderRelation.Member.WAY) { - wayToRelations.put(member.getRef(), rel.getId()); + waysInMultipolygon.add(member.getRef()); } } } } - if (rel.hasTag("type", "multipolygon")) { - for (ReaderRelation.Member member : rel.getMembers()) { - if (member.getType() == ReaderRelation.Member.WAY) { - waysInMultipolygon.add(member.getRef()); - } - } - } - } - }); + }); var loggers = new ProgressLoggers("osm_pass1") .addRateCounter("nodes", TOTAL_NODES) @@ -114,43 +115,44 @@ public class OpenStreetMapReader implements Closeable { AtomicLong featuresWritten = new AtomicLong(0); CountDownLatch waysDone = new CountDownLatch(processThreads); - var topology = Topology.readFromQueue(stats, - osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats) - ).addWorker("osm_pass2_processor", processThreads, (prev, next) -> { - RenderableFeatures features = new RenderableFeatures(); - ReaderElement readerElement; - while ((readerElement = prev.get()) != null) { - SourceFeature feature = null; - if (readerElement instanceof ReaderNode node) { - nodesProcessed.incrementAndGet(); - feature = new NodeSourceFeature(node); - } else if (readerElement instanceof ReaderWay way) { - waysProcessed.incrementAndGet(); - feature = new WaySourceFeature(way); - } else if (readerElement instanceof ReaderRelation rel) { - // ensure all ways finished processing before we start relations - if (waysDone.getCount() > 0) { - waysDone.countDown(); - waysDone.await(); + var topology = Topology.start("osm_pass2", stats) + .readFromQueue( + osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats) + ).addWorker("process", processThreads, (prev, next) -> { + RenderableFeatures features = new RenderableFeatures(); + ReaderElement readerElement; + while ((readerElement = prev.get()) != null) { + SourceFeature feature = null; + if (readerElement instanceof ReaderNode node) { + nodesProcessed.incrementAndGet(); + feature = new NodeSourceFeature(node); + } else if (readerElement instanceof ReaderWay way) { + waysProcessed.incrementAndGet(); + feature = new WaySourceFeature(way); + } else if (readerElement instanceof ReaderRelation rel) { + // ensure all ways finished processing before we start relations + if (waysDone.getCount() > 0) { + waysDone.countDown(); + waysDone.await(); + } + relsProcessed.incrementAndGet(); + if (rel.hasTag("type", "multipolygon")) { + feature = new MultipolygonSourceFeature(rel); + } } - relsProcessed.incrementAndGet(); - if (rel.hasTag("type", "multipolygon")) { - feature = new MultipolygonSourceFeature(rel); + if (feature != null) { + features.reset(feature); + profile.processFeature(feature, features); + for (RenderableFeature renderable : features.all()) { + renderer.renderFeature(renderable, next); + } } } - if (feature != null) { - features.reset(feature); - profile.processFeature(feature, features); - for (RenderableFeature renderable : features.all()) { - renderer.renderFeature(renderable, next); - } - } - } - // just in case a worker skipped over all relations - waysDone.countDown(); - }).addBuffer("osm_pass2_feature_queue", 50_000, 1_000) - .sinkToConsumer("osm_pass2_writer", 1, (item) -> { + // just in case a worker skipped over all relations + waysDone.countDown(); + }).addBuffer("feature_queue", 50_000, 1_000) + .sinkToConsumer("write", 1, (item) -> { featuresWritten.incrementAndGet(); writer.accept(item); }); diff --git a/src/main/java/com/onthegomap/flatmap/reader/Reader.java b/src/main/java/com/onthegomap/flatmap/reader/Reader.java index 5c82f244..9e582e9d 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/Reader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/Reader.java @@ -34,9 +34,10 @@ public abstract class Reader { AtomicLong featuresRead = new AtomicLong(0); AtomicLong featuresWritten = new AtomicLong(0); - var topology = Topology.fromGenerator(name + "_read", stats, open()) - .addBuffer(name + "_reader", 1000) - .addWorker(name + "_process", threads, (prev, next) -> { + var topology = Topology.start(name, stats) + .fromGenerator("read", open()) + .addBuffer("read_queue", 1000) + .addWorker("process", threads, (prev, next) -> { RenderableFeatures features = new RenderableFeatures(); SourceFeature sourceFeature; while ((sourceFeature = prev.get()) != null) { @@ -50,8 +51,8 @@ public abstract class Reader { } } }) - .addBuffer(name + "_writer", 1000) - .sinkToConsumer(name + "_write", 1, (item) -> { + .addBuffer("write_queue", 1000) + .sinkToConsumer("write", 1, (item) -> { featuresWritten.incrementAndGet(); writer.accept(item); }); diff --git a/src/main/java/com/onthegomap/flatmap/worker/Topology.java b/src/main/java/com/onthegomap/flatmap/worker/Topology.java index ca742dc5..0956d2ec 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Topology.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Topology.java @@ -13,28 +13,8 @@ public record Topology( Worker worker ) { - public static Bufferable fromGenerator(String name, Stats stats, SourceStep producer, int threads) { - return (queueName, size, batchSize) -> { - var nextQueue = new WorkQueue(queueName, size, batchSize, stats); - Worker worker = new Worker(name, stats, threads, () -> producer.run(nextQueue)); - return new Builder<>(name, nextQueue, worker, stats); - }; - } - - public static Bufferable fromGenerator(String name, Stats stats, SourceStep producer) { - return fromGenerator(name, stats, producer, 1); - } - - public static Bufferable readFromIterator(String name, Stats stats, Iterator iter) { - return fromGenerator(name, stats, next -> { - while (iter.hasNext()) { - next.accept(iter.next()); - } - }, 1); - } - - public static Builder readFromQueue(Stats stats, WorkQueue input) { - return new Builder<>(input, stats); + public static Empty start(String prefix, Stats stats) { + return new Empty(prefix, stats); } public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) { @@ -71,7 +51,35 @@ public record Topology( } } + public static record Empty(String prefix, Stats stats) { + + public Bufferable fromGenerator(String name, SourceStep producer, int threads) { + return (queueName, size, batchSize) -> { + var nextQueue = new WorkQueue(prefix, queueName, size, batchSize, stats); + Worker worker = new Worker(prefix, name, stats, threads, () -> producer.run(nextQueue)); + return new Builder<>(prefix, name, nextQueue, worker, stats); + }; + } + + public Bufferable fromGenerator(String name, SourceStep producer) { + return fromGenerator(name, producer, 1); + } + + public Bufferable readFromIterator(String name, Iterator iter) { + return fromGenerator(name, next -> { + while (iter.hasNext()) { + next.accept(iter.next()); + } + }, 1); + } + + public Builder readFromQueue(WorkQueue input) { + return new Builder<>(input, stats); + } + } + public static record Builder( + String prefix, String name, Topology.Builder previous, WorkQueue inputQueue, @@ -79,20 +87,20 @@ public record Topology( Worker worker, Stats stats ) { - public Builder(String name, WorkQueue outputQueue, Worker worker, Stats stats) { - this(name, null, null, outputQueue, worker, stats); + public Builder(String prefix, String name, WorkQueue outputQueue, Worker worker, Stats stats) { + this(prefix, name, null, null, outputQueue, worker, stats); } public Builder(WorkQueue outputQueue, Stats stats) { - this(null, outputQueue, null, stats); + this(null, null, outputQueue, null, stats); } public Bufferable addWorker(String name, int threads, WorkerStep step) { Builder curr = this; return (queueName, size, batchSize) -> { - var nextOutputQueue = new WorkQueue(queueName, size, batchSize, stats); - var worker = new Worker(name, stats, threads, () -> step.run(outputQueue, nextOutputQueue)); - return new Builder<>(name, curr, outputQueue, nextOutputQueue, worker, stats); + var nextOutputQueue = new WorkQueue(prefix, queueName, size, batchSize, stats); + var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue, nextOutputQueue)); + return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats); }; } @@ -103,7 +111,7 @@ public record Topology( public Topology sinkTo(String name, int threads, SinkStep step) { var previousTopology = previous.build(); - var worker = new Worker(name, stats, threads, () -> step.run(outputQueue)); + var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue)); return new Topology<>(name, previousTopology, outputQueue, worker); } diff --git a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java index ee787ae6..7f99a397 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java +++ b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java @@ -7,7 +7,7 @@ import java.util.function.Supplier; public class WorkQueue implements Closeable, Supplier, Consumer { - public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { + public WorkQueue(String prefix, String name, int capacity, int maxBatch, Stats stats) { } diff --git a/src/main/java/com/onthegomap/flatmap/worker/Worker.java b/src/main/java/com/onthegomap/flatmap/worker/Worker.java index 2cdf861c..63f40afa 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Worker.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Worker.java @@ -5,7 +5,7 @@ import com.onthegomap.flatmap.stats.Stats; public class Worker { - public Worker(String name, Stats stats, int threads, RunnableThatThrows task) { + public Worker(String prefix, String name, Stats stats, int threads, RunnableThatThrows task) { }