From c15126eecdaa1c25a7c6a68cfdc334100f0b7a34 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Sun, 11 Apr 2021 16:10:28 -0400 Subject: [PATCH] worker topology framework --- .../com/onthegomap/flatmap/Arguments.java | 10 +- .../onthegomap/flatmap/FeatureRenderer.java | 10 +- .../com/onthegomap/flatmap/FlatMapConfig.java | 15 +++ .../onthegomap/flatmap/OpenMapTilesMain.java | 48 ++++---- .../onthegomap/flatmap/ProgressLoggers.java | 36 ++++++ .../onthegomap/flatmap/RenderableFeature.java | 1 + .../flatmap/RenderableFeatures.java | 3 + .../com/onthegomap/flatmap/SourceFeature.java | 1 + .../java/com/onthegomap/flatmap/Wikidata.java | 5 +- .../collections/MergeSortFeatureMap.java | 13 ++- .../flatmap/profiles/OpenMapTilesProfile.java | 6 +- .../flatmap/reader/NaturalEarthReader.java | 11 +- .../com/onthegomap/flatmap/reader/Reader.java | 81 ++++++------- .../flatmap/reader/ShapefileReader.java | 14 ++- .../com/onthegomap/flatmap/stats/Stats.java | 32 +++++ .../com/onthegomap/flatmap/worker/Sink.java | 6 - .../com/onthegomap/flatmap/worker/Source.java | 6 - .../onthegomap/flatmap/worker/Topology.java | 110 ++++++++++++++++++ .../onthegomap/flatmap/worker/WorkQueue.java | 7 +- .../com/onthegomap/flatmap/worker/Worker.java | 11 +- 20 files changed, 323 insertions(+), 103 deletions(-) create mode 100644 src/main/java/com/onthegomap/flatmap/FlatMapConfig.java delete mode 100644 src/main/java/com/onthegomap/flatmap/worker/Sink.java delete mode 100644 src/main/java/com/onthegomap/flatmap/worker/Source.java create mode 100644 src/main/java/com/onthegomap/flatmap/worker/Topology.java diff --git a/src/main/java/com/onthegomap/flatmap/Arguments.java b/src/main/java/com/onthegomap/flatmap/Arguments.java index 90e40ba7..bde87305 100644 --- a/src/main/java/com/onthegomap/flatmap/Arguments.java +++ b/src/main/java/com/onthegomap/flatmap/Arguments.java @@ -76,7 +76,13 @@ public class Arguments { } public Stats getStats() { - // TODO - return null; + return new Stats.InMemory(); + } + + public int integer(String key, String description, int defaultValue) { + String value = getArg(key, Integer.toString(defaultValue)); + int parsed = Integer.parseInt(value); + LOGGER.info(description + ": " + parsed); + return parsed; } } diff --git a/src/main/java/com/onthegomap/flatmap/FeatureRenderer.java b/src/main/java/com/onthegomap/flatmap/FeatureRenderer.java index 85c28b11..471eb6cd 100644 --- a/src/main/java/com/onthegomap/flatmap/FeatureRenderer.java +++ b/src/main/java/com/onthegomap/flatmap/FeatureRenderer.java @@ -1,18 +1,14 @@ package com.onthegomap.flatmap; -import com.onthegomap.flatmap.collections.MergeSortFeatureMap; import com.onthegomap.flatmap.stats.Stats; -import com.onthegomap.flatmap.worker.Worker.WorkerSink; +import java.util.function.Consumer; public class FeatureRenderer { - public FeatureRenderer(MergeSortFeatureMap featureMap, Stats stats) { + public FeatureRenderer(Stats stats) { } - public WorkerSink newWriterQueue(String name) { - } - - public void renderFeature(RenderableFeature renderable) { + public void renderFeature(RenderableFeature renderable, Consumer consumer) { } } diff --git a/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java b/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java new file mode 100644 index 00000000..7c6663c5 --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java @@ -0,0 +1,15 @@ +package com.onthegomap.flatmap; + +import com.onthegomap.flatmap.profiles.OpenMapTilesProfile; +import com.onthegomap.flatmap.stats.Stats; +import org.locationtech.jts.geom.Envelope; + +public record FlatMapConfig( + OpenMapTilesProfile profile, + Envelope envelope, + int threads, + Stats stats, + long logIntervalSeconds +) { + +} diff --git a/src/main/java/com/onthegomap/flatmap/OpenMapTilesMain.java b/src/main/java/com/onthegomap/flatmap/OpenMapTilesMain.java index 3c46ed9c..2deded58 100644 --- a/src/main/java/com/onthegomap/flatmap/OpenMapTilesMain.java +++ b/src/main/java/com/onthegomap/flatmap/OpenMapTilesMain.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; import org.apache.commons.io.FileUtils; +import org.locationtech.jts.geom.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +24,7 @@ public class OpenMapTilesMain { stats.startTimer("import"); LOGGER.info("Arguments:"); OsmInputFile osmInputFile = new OsmInputFile( - arguments.inputFile("input", "OSM input file", "./data/sources/massachusetts-latest.osm.pbf")); + arguments.inputFile("input", "OSM input file", "./data/sources/north-america_us_massachusetts.pbf")); File centerlines = arguments .inputFile("centerline", "lake centerlines input", "./data/sources/lake_centerline.shp.zip"); File naturalEarth = arguments.inputFile("natural_earth", "natural earth input", @@ -31,7 +32,9 @@ public class OpenMapTilesMain { File waterPolygons = arguments.inputFile("water_polygons", "water polygons input", "./data/sources/water-polygons-split-3857.zip"); double[] bounds = arguments.bounds("bounds", "bounds", osmInputFile); + Envelope envelope = new Envelope(bounds[0], bounds[2], bounds[1], bounds[3]); int threads = arguments.threads(); + int logIntervalSeconds = arguments.integer("loginterval", "seconds between logs", 10); Path tmpDir = arguments.file("tmpdir", "temp directory", "./data/tmp").toPath(); boolean fetchWikidata = arguments.get("fetch_wikidata", "fetch wikidata translations", false); boolean useWikidata = arguments.get("use_wikidata", "use wikidata translations", true); @@ -45,38 +48,43 @@ public class OpenMapTilesMain { if (fetchWikidata) { LOGGER.info("- [wikidata] Fetch OpenStreetMap element name translations from wikidata"); } - LOGGER.info("- [lake_centerlines] Extract lake centerlines"); - LOGGER.info("- [water_polygons] Process ocean polygons"); - LOGGER.info("- [natural_earth] Process natural earth features"); - LOGGER.info("- [osm_pass1] Pre-process OpenStreetMap input (store node locations then relation members)"); - LOGGER.info("- [osm_pass2] Process OpenStreetMap nodes, ways, then relations"); - LOGGER.info("- [sort] Sort rendered features by tile ID"); - LOGGER.info("- [mbtiles] Encode each tile and write to " + output); + LOGGER.info(" [lake_centerlines] Extract lake centerlines"); + LOGGER.info(" [water_polygons] Process ocean polygons"); + LOGGER.info(" [natural_earth] Process natural earth features"); + LOGGER.info(" [osm_pass1] Pre-process OpenStreetMap input (store node locations then relation members)"); + LOGGER.info(" [osm_pass2] Process OpenStreetMap nodes, ways, then relations"); + LOGGER.info(" [sort] Sort rendered features by tile ID"); + LOGGER.info(" [mbtiles] Encode each tile and write to " + output); var translations = Translations.defaultProvider(languages); var profile = new OpenMapTilesProfile(); - if (fetchWikidata) { - stats.time("wikidata", - () -> Wikidata.fetch(osmInputFile, wikidataNamesFile, threads, profile, stats)); - } - if (useWikidata) { - translations.addTranslationProvider(Wikidata.load(wikidataNamesFile)); - } - FileUtils.forceMkdir(tmpDir.toFile()); File nodeDb = tmpDir.resolve("node.db").toFile(); Path featureDb = tmpDir.resolve("feature.db"); MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, stats); - FeatureRenderer renderer = new FeatureRenderer(featureMap, stats); + FeatureRenderer renderer = new FeatureRenderer(stats); + FlatMapConfig config = new FlatMapConfig(profile, envelope, threads, stats, logIntervalSeconds); + + if (fetchWikidata) { + stats.time("wikidata", + () -> Wikidata.fetch(osmInputFile, wikidataNamesFile, config)); + } + if (useWikidata) { + translations.addTranslationProvider(Wikidata.load(wikidataNamesFile)); + } stats.time("lake_centerlines", () -> - new ShapefileReader("EPSG:3857", centerlines, stats).process(renderer, profile, threads)); + new ShapefileReader("EPSG:3857", centerlines, stats) + .process("lake_centerlines", renderer, featureMap, config)); stats.time("water_polygons", () -> - new ShapefileReader(waterPolygons, stats).process(renderer, profile, threads)); + new ShapefileReader(waterPolygons, stats) + .process("water_polygons", renderer, featureMap, config) + ); stats.time("natural_earth", () -> new NaturalEarthReader(naturalEarth, tmpDir.resolve("natearth.sqlite").toFile(), stats) - .process(renderer, profile, threads)); + .process("natural_earth", renderer, featureMap, config) + ); try (var osmReader = new OpenStreetMapReader(osmInputFile, nodeDb, stats)) { stats.time("osm_pass1", () -> osmReader.pass1(profile, threads)); diff --git a/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java b/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java index a1ad8415..3c7e4ce1 100644 --- a/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java +++ b/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java @@ -1,5 +1,8 @@ package com.onthegomap.flatmap; +import com.onthegomap.flatmap.worker.Topology; +import com.onthegomap.flatmap.worker.WorkQueue; +import com.onthegomap.flatmap.worker.Worker; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; @@ -16,4 +19,37 @@ public class ProgressLoggers { public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) { return this; } + + public ProgressLoggers addRateCounter(String name, AtomicLong featuresWritten) { + return this; + } + + public ProgressLoggers addFileSize(LongSupplier getStorageSize) { + return this; + } + + public ProgressLoggers addProcessStats() { + return this; + } + + public ProgressLoggers addThreadPoolStats(String name, String prefix) { + return this; + } + + public ProgressLoggers addThreadPoolStats(String name, Worker worker) { + return addThreadPoolStats(name, worker.getPrefix()); + } + + public ProgressLoggers addQueueStats(WorkQueue queue) { + return this; + } + + public ProgressLoggers addTopologyStats(Topology topology) { + if (topology == null) { + return this; + } + return addTopologyStats(topology.previous()) + .addQueueStats(topology.inputQueue()) + .addThreadPoolStats(topology.name(), topology.worker()); + } } diff --git a/src/main/java/com/onthegomap/flatmap/RenderableFeature.java b/src/main/java/com/onthegomap/flatmap/RenderableFeature.java index dffb50e1..28bd6100 100644 --- a/src/main/java/com/onthegomap/flatmap/RenderableFeature.java +++ b/src/main/java/com/onthegomap/flatmap/RenderableFeature.java @@ -5,6 +5,7 @@ import org.opengis.geometry.Geometry; public class RenderableFeature { public Geometry getGeometry() { + return null; } // layer // attrs diff --git a/src/main/java/com/onthegomap/flatmap/RenderableFeatures.java b/src/main/java/com/onthegomap/flatmap/RenderableFeatures.java index dc0d6681..262489e6 100644 --- a/src/main/java/com/onthegomap/flatmap/RenderableFeatures.java +++ b/src/main/java/com/onthegomap/flatmap/RenderableFeatures.java @@ -1,8 +1,11 @@ package com.onthegomap.flatmap; +import java.util.List; + public class RenderableFeatures { public Iterable all() { + return List.of(); } public void reset(SourceFeature sourceFeature) { diff --git a/src/main/java/com/onthegomap/flatmap/SourceFeature.java b/src/main/java/com/onthegomap/flatmap/SourceFeature.java index 5af9ab8c..d6f868a1 100644 --- a/src/main/java/com/onthegomap/flatmap/SourceFeature.java +++ b/src/main/java/com/onthegomap/flatmap/SourceFeature.java @@ -5,6 +5,7 @@ import org.locationtech.jts.geom.Geometry; public class SourceFeature { public Geometry getGeometry() { + return null; } // props // lazy geometry diff --git a/src/main/java/com/onthegomap/flatmap/Wikidata.java b/src/main/java/com/onthegomap/flatmap/Wikidata.java index 3a4a820a..289fc788 100644 --- a/src/main/java/com/onthegomap/flatmap/Wikidata.java +++ b/src/main/java/com/onthegomap/flatmap/Wikidata.java @@ -1,15 +1,12 @@ package com.onthegomap.flatmap; import com.graphhopper.reader.ReaderElement; -import com.onthegomap.flatmap.profiles.Profile; -import com.onthegomap.flatmap.stats.Stats; import java.io.File; import java.util.Map; public class Wikidata { - public static void fetch(OsmInputFile infile, File outfile, int threads, Profile profile, - Stats stats) { + public static void fetch(OsmInputFile infile, File outfile, FlatMapConfig config) { // TODO } diff --git a/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java b/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java index 720e8dc2..32367352 100644 --- a/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java +++ b/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java @@ -1,9 +1,11 @@ package com.onthegomap.flatmap.collections; +import com.onthegomap.flatmap.RenderedFeature; import com.onthegomap.flatmap.stats.Stats; import java.nio.file.Path; +import java.util.function.Consumer; -public class MergeSortFeatureMap { +public class MergeSortFeatureMap implements Consumer { public MergeSortFeatureMap(Path featureDb, Stats stats) { @@ -11,4 +13,13 @@ public class MergeSortFeatureMap { public void sort() { } + + @Override + public void accept(RenderedFeature renderedFeature) { + + } + + public long getStorageSize() { + return 0; + } } diff --git a/src/main/java/com/onthegomap/flatmap/profiles/OpenMapTilesProfile.java b/src/main/java/com/onthegomap/flatmap/profiles/OpenMapTilesProfile.java index a3a0b3ca..a632eb5e 100644 --- a/src/main/java/com/onthegomap/flatmap/profiles/OpenMapTilesProfile.java +++ b/src/main/java/com/onthegomap/flatmap/profiles/OpenMapTilesProfile.java @@ -3,7 +3,7 @@ package com.onthegomap.flatmap.profiles; import com.graphhopper.reader.ReaderRelation; import com.onthegomap.flatmap.Profile; -import com.onthegomap.flatmap.RenderableFeature; +import com.onthegomap.flatmap.RenderableFeatures; import com.onthegomap.flatmap.SourceFeature; import com.onthegomap.flatmap.reader.OpenStreetMapReader.RelationInfo; import java.util.List; @@ -24,7 +24,9 @@ public class OpenMapTilesProfile implements Profile { } @Override - public void processFeature(SourceFeature sourceFeature, RenderableFeature features) { + public void processFeature(SourceFeature sourceFeature, + RenderableFeatures features) { } + } diff --git a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java index b1d5ac12..50c376d2 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java @@ -2,15 +2,22 @@ package com.onthegomap.flatmap.reader; import com.onthegomap.flatmap.SourceFeature; import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.io.File; -public class NaturalEarthReader implements Reader { +public class NaturalEarthReader extends Reader { public NaturalEarthReader(File input, File tmpFile, Stats stats) { + super(stats); } @Override - public SourceFeature getNext() { + public long getCount() { + return 0; + } + + @Override + public SourceStep open() { return null; } } diff --git a/src/main/java/com/onthegomap/flatmap/reader/Reader.java b/src/main/java/com/onthegomap/flatmap/reader/Reader.java index d93e0ec7..1e53cebc 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/Reader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/Reader.java @@ -1,15 +1,17 @@ package com.onthegomap.flatmap.reader; import com.onthegomap.flatmap.FeatureRenderer; -import com.onthegomap.flatmap.Profile; +import com.onthegomap.flatmap.FlatMapConfig; import com.onthegomap.flatmap.ProgressLoggers; import com.onthegomap.flatmap.RenderableFeature; import com.onthegomap.flatmap.RenderableFeatures; +import com.onthegomap.flatmap.RenderedFeature; import com.onthegomap.flatmap.SourceFeature; +import com.onthegomap.flatmap.collections.MergeSortFeatureMap; +import com.onthegomap.flatmap.profiles.OpenMapTilesProfile; import com.onthegomap.flatmap.stats.Stats; -import com.onthegomap.flatmap.worker.Worker; -import com.onthegomap.flatmap.worker.Worker.WorkerSource; -import java.io.IOException; +import com.onthegomap.flatmap.worker.Topology; +import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.util.concurrent.atomic.AtomicLong; import org.locationtech.jts.geom.Envelope; import org.slf4j.Logger; @@ -18,68 +20,55 @@ import org.slf4j.LoggerFactory; public abstract class Reader { private final Stats stats; - private final Envelope envelope; private Logger LOGGER = LoggerFactory.getLogger(getClass()); - public Reader(Stats stats, Envelope envelope) { + public Reader(Stats stats) { this.stats = stats; - this.envelope = envelope; } - protected void log(String message) { - LOGGER.info("[" + getName() + "] " + message); - } - - protected abstract String getName(); - - public final void process(FeatureRenderer renderer, Profile profile, int threads) { - threads = Math.max(threads, 1); + public final void process(String name, FeatureRenderer renderer, MergeSortFeatureMap writer, FlatMapConfig config) { long featureCount = getCount(); + int threads = config.threads(); + Envelope env = config.envelope(); + OpenMapTilesProfile profile = config.profile(); AtomicLong featuresRead = new AtomicLong(0); - log("Reading with " + threads + " threads"); - try ( - var source = open(getName() + "-reader"); - var sink = renderer.newWriterQueue(getName() + "-writer") - ) { - var worker = new Worker(getName() + "-processor", stats, threads, i -> { - SourceFeature sourceFeature; + AtomicLong featuresWritten = new AtomicLong(0); + LOGGER.info("[" + name + "] Reading with " + threads + " threads"); + + var topology = Topology.fromGenerator(name + "-read", stats, open()) + .addBuffer(name + "-reader", 1000) + .addWorker(name + "-process", threads, (prev, next) -> { RenderableFeatures features = new RenderableFeatures(); - var sourceQueue = source.queue(); - while ((sourceFeature = sourceQueue.getNext()) != null) { + SourceFeature sourceFeature; + while ((sourceFeature = prev.get()) != null) { featuresRead.incrementAndGet(); features.reset(sourceFeature); - if (sourceFeature.getGeometry().getEnvelopeInternal().intersects(envelope)) { + if (sourceFeature.getGeometry().getEnvelopeInternal().intersects(env)) { profile.processFeature(sourceFeature, features); for (RenderableFeature renderable : features.all()) { - renderer.renderFeature(renderable); + renderer.renderFeature(renderable, next); } } } + }) + .addBuffer(name + "-writer", 1000) + .sinkToConsumer("write", 1, (item) -> { + featuresWritten.incrementAndGet(); + writer.accept(item); }); -// TODO: -// -where should this go ? -// -should the renderer hold a reusable feature writer / queue ? + var loggers = new ProgressLoggers(name) + .addRatePercentCounter("read", featureCount, featuresRead) + .addRateCounter("write", featuresWritten) + .addFileSize(writer::getStorageSize) + .addProcessStats() + .addTopologyStats(topology); - var loggers = new ProgressLoggers(getName()) - .addRatePercentCounter("read", featureCount, featuresRead) - .addRateCounter("write", featuresWritten) - .addFileSize(featureMap::getStorageSize) - .addProcessStats() - .addThreadPoolStats("read", getName() + "-reader") - .addQueueStats(readerQueue) - .addThreadPoolStats("process", worker) - .addQueueStats(toWrite) - .addThreadPoolStats("write", writer); - - worker.awaitAndLong(loggers); - } catch (IOException e) { - throw new RuntimeException(e); - } + topology.awaitAndLog(loggers, config.logIntervalSeconds()); } - protected abstract long getCount(); + public abstract long getCount(); - protected abstract WorkerSource open(String workerName); + public abstract SourceStep open(); } diff --git a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java index 2b4732bd..2931f3d8 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java @@ -1,8 +1,8 @@ package com.onthegomap.flatmap.reader; -import com.onthegomap.flatmap.FeatureRenderer; -import com.onthegomap.flatmap.Profile; +import com.onthegomap.flatmap.SourceFeature; import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.io.File; public class ShapefileReader extends Reader { @@ -12,8 +12,16 @@ public class ShapefileReader extends Reader { } public ShapefileReader(File input, Stats stats) { + this(null, input, stats); } - public void process(FeatureRenderer renderer, Profile profile, int threads) { + @Override + public long getCount() { + return 0; + } + + @Override + public SourceStep open() { + return null; } } diff --git a/src/main/java/com/onthegomap/flatmap/stats/Stats.java b/src/main/java/com/onthegomap/flatmap/stats/Stats.java index c5f1103d..cad678fd 100644 --- a/src/main/java/com/onthegomap/flatmap/stats/Stats.java +++ b/src/main/java/com/onthegomap/flatmap/stats/Stats.java @@ -1,5 +1,9 @@ package com.onthegomap.flatmap.stats; +import com.graphhopper.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public interface Stats { void time(String name, Runnable task); @@ -9,4 +13,32 @@ public interface Stats { void startTimer(String name); void stopTimer(String name); + + class InMemory implements Stats { + + private static final Logger LOGGER = LoggerFactory.getLogger(InMemory.class); + + @Override + public void time(String name, Runnable task) { + StopWatch timer = new StopWatch().start(); + LOGGER.info("[" + name + "] Starting..."); + task.run(); + LOGGER.info("[" + name + "] Finished in " + timer.stop()); + } + + @Override + public void printSummary() { + + } + + @Override + public void startTimer(String name) { + + } + + @Override + public void stopTimer(String name) { + + } + } } diff --git a/src/main/java/com/onthegomap/flatmap/worker/Sink.java b/src/main/java/com/onthegomap/flatmap/worker/Sink.java deleted file mode 100644 index 3a5f3680..00000000 --- a/src/main/java/com/onthegomap/flatmap/worker/Sink.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.onthegomap.flatmap.worker; - -public interface Sink { - - void process(T item); -} diff --git a/src/main/java/com/onthegomap/flatmap/worker/Source.java b/src/main/java/com/onthegomap/flatmap/worker/Source.java deleted file mode 100644 index 1c99d5f6..00000000 --- a/src/main/java/com/onthegomap/flatmap/worker/Source.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.onthegomap.flatmap.worker; - -public interface Source { - - T getNext(); -} diff --git a/src/main/java/com/onthegomap/flatmap/worker/Topology.java b/src/main/java/com/onthegomap/flatmap/worker/Topology.java new file mode 100644 index 00000000..72063ce1 --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/worker/Topology.java @@ -0,0 +1,110 @@ +package com.onthegomap.flatmap.worker; + +import com.onthegomap.flatmap.ProgressLoggers; +import com.onthegomap.flatmap.stats.Stats; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public record Topology( + String name, + com.onthegomap.flatmap.worker.Topology previous, + WorkQueue inputQueue, + Worker worker +) { + + public static Bufferable fromGenerator(String name, Stats stats, SourceStep producer, int threads) { + return (queueName, size, batchSize) -> { + var nextQueue = new WorkQueue(queueName, size, batchSize, stats); + Worker worker = new Worker(name, stats, threads, () -> producer.run(nextQueue)); + return new Builder<>(name, nextQueue, worker, stats); + }; + } + + public static Bufferable fromGenerator(String name, Stats stats, SourceStep producer) { + return fromGenerator(name, stats, producer, 1); + } + + public static Builder readFromQueue(Stats stats, WorkQueue input) { + return new Builder<>(input, stats); + } + + public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) { + if (previous != null) { + previous.awaitAndLog(loggers, logIntervalSeconds); + } else { // producer is responsible for closing the initial input queue + inputQueue.close(); + } + worker.awaitAndLog(loggers, logIntervalSeconds); + } + + public interface SourceStep { + + void run(Consumer next); + } + + public interface WorkerStep { + + void run(Supplier prev, Consumer next); + } + + public interface SinkStep { + + void run(Supplier prev); + } + + public interface Bufferable { + + Builder addBuffer(String name, int size, int batchSize); + + default Builder addBuffer(String name, int size) { + + return addBuffer(name, size, 1); + } + } + + public static record Builder( + String name, + Topology.Builder previous, + WorkQueue inputQueue, + WorkQueue outputQueue, + Worker worker, Stats stats + ) { + + public Builder(String name, WorkQueue outputQueue, Worker worker, Stats stats) { + this(name, null, null, outputQueue, worker, stats); + } + + public Builder(WorkQueue outputQueue, Stats stats) { + this(null, outputQueue, null, stats); + } + + public Bufferable addWorker(String name, int threads, WorkerStep step) { + Builder curr = this; + return (queueName, size, batchSize) -> { + var nextOutputQueue = new WorkQueue(queueName, size, batchSize, stats); + var worker = new Worker(name, stats, threads, () -> step.run(outputQueue, nextOutputQueue)); + return new Builder<>(name, curr, outputQueue, nextOutputQueue, worker, stats); + }; + } + + private Topology build() { + var previousTopology = previous == null || previous.worker == null ? null : previous.build(); + return new Topology<>(name, previousTopology, inputQueue, worker); + } + + public Topology sinkTo(String name, int threads, SinkStep step) { + var previousTopology = previous.build(); + var worker = new Worker(name, stats, threads, () -> step.run(outputQueue)); + return new Topology<>(name, previousTopology, outputQueue, worker); + } + + public Topology sinkToConsumer(String name, int threads, Consumer step) { + return sinkTo(name, threads, (prev) -> { + O item; + while ((item = prev.get()) != null) { + step.accept(item); + } + }); + } + } +} diff --git a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java index a28a5f6d..ee787ae6 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java +++ b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java @@ -1,15 +1,18 @@ package com.onthegomap.flatmap.worker; +import com.onthegomap.flatmap.stats.Stats; import java.io.Closeable; -import java.io.IOException; import java.util.function.Consumer; import java.util.function.Supplier; public class WorkQueue implements Closeable, Supplier, Consumer { + public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { + + } @Override - public void close() throws IOException { + public void close() { } diff --git a/src/main/java/com/onthegomap/flatmap/worker/Worker.java b/src/main/java/com/onthegomap/flatmap/worker/Worker.java index a9248460..27a3206a 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Worker.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Worker.java @@ -1,11 +1,18 @@ package com.onthegomap.flatmap.worker; +import com.onthegomap.flatmap.ProgressLoggers; import com.onthegomap.flatmap.stats.Stats; -import java.util.function.Consumer; public class Worker { - public Worker(String name, Stats stats, int threads, Consumer task) { + public Worker(String name, Stats stats, int threads, Runnable task) { } + + public String getPrefix() { + return null; + } + + public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) { + } }