worker topology framework

pull/1/head
Mike Barry 2021-04-11 16:10:28 -04:00
rodzic 066c88703a
commit c15126eecd
20 zmienionych plików z 323 dodań i 103 usunięć

Wyświetl plik

@ -76,7 +76,13 @@ public class Arguments {
}
public Stats getStats() {
// TODO
return null;
return new Stats.InMemory();
}
public int integer(String key, String description, int defaultValue) {
String value = getArg(key, Integer.toString(defaultValue));
int parsed = Integer.parseInt(value);
LOGGER.info(description + ": " + parsed);
return parsed;
}
}

Wyświetl plik

@ -1,18 +1,14 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.Worker.WorkerSink;
import java.util.function.Consumer;
public class FeatureRenderer {
public FeatureRenderer(MergeSortFeatureMap featureMap, Stats stats) {
public FeatureRenderer(Stats stats) {
}
public WorkerSink<RenderedFeature> newWriterQueue(String name) {
}
public void renderFeature(RenderableFeature renderable) {
public void renderFeature(RenderableFeature renderable, Consumer<RenderedFeature> consumer) {
}
}

Wyświetl plik

@ -0,0 +1,15 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.stats.Stats;
import org.locationtech.jts.geom.Envelope;
public record FlatMapConfig(
OpenMapTilesProfile profile,
Envelope envelope,
int threads,
Stats stats,
long logIntervalSeconds
) {
}

Wyświetl plik

@ -10,6 +10,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,7 +24,7 @@ public class OpenMapTilesMain {
stats.startTimer("import");
LOGGER.info("Arguments:");
OsmInputFile osmInputFile = new OsmInputFile(
arguments.inputFile("input", "OSM input file", "./data/sources/massachusetts-latest.osm.pbf"));
arguments.inputFile("input", "OSM input file", "./data/sources/north-america_us_massachusetts.pbf"));
File centerlines = arguments
.inputFile("centerline", "lake centerlines input", "./data/sources/lake_centerline.shp.zip");
File naturalEarth = arguments.inputFile("natural_earth", "natural earth input",
@ -31,7 +32,9 @@ public class OpenMapTilesMain {
File waterPolygons = arguments.inputFile("water_polygons", "water polygons input",
"./data/sources/water-polygons-split-3857.zip");
double[] bounds = arguments.bounds("bounds", "bounds", osmInputFile);
Envelope envelope = new Envelope(bounds[0], bounds[2], bounds[1], bounds[3]);
int threads = arguments.threads();
int logIntervalSeconds = arguments.integer("loginterval", "seconds between logs", 10);
Path tmpDir = arguments.file("tmpdir", "temp directory", "./data/tmp").toPath();
boolean fetchWikidata = arguments.get("fetch_wikidata", "fetch wikidata translations", false);
boolean useWikidata = arguments.get("use_wikidata", "use wikidata translations", true);
@ -45,38 +48,43 @@ public class OpenMapTilesMain {
if (fetchWikidata) {
LOGGER.info("- [wikidata] Fetch OpenStreetMap element name translations from wikidata");
}
LOGGER.info("- [lake_centerlines] Extract lake centerlines");
LOGGER.info("- [water_polygons] Process ocean polygons");
LOGGER.info("- [natural_earth] Process natural earth features");
LOGGER.info("- [osm_pass1] Pre-process OpenStreetMap input (store node locations then relation members)");
LOGGER.info("- [osm_pass2] Process OpenStreetMap nodes, ways, then relations");
LOGGER.info("- [sort] Sort rendered features by tile ID");
LOGGER.info("- [mbtiles] Encode each tile and write to " + output);
LOGGER.info(" [lake_centerlines] Extract lake centerlines");
LOGGER.info(" [water_polygons] Process ocean polygons");
LOGGER.info(" [natural_earth] Process natural earth features");
LOGGER.info(" [osm_pass1] Pre-process OpenStreetMap input (store node locations then relation members)");
LOGGER.info(" [osm_pass2] Process OpenStreetMap nodes, ways, then relations");
LOGGER.info(" [sort] Sort rendered features by tile ID");
LOGGER.info(" [mbtiles] Encode each tile and write to " + output);
var translations = Translations.defaultProvider(languages);
var profile = new OpenMapTilesProfile();
if (fetchWikidata) {
stats.time("wikidata",
() -> Wikidata.fetch(osmInputFile, wikidataNamesFile, threads, profile, stats));
}
if (useWikidata) {
translations.addTranslationProvider(Wikidata.load(wikidataNamesFile));
}
FileUtils.forceMkdir(tmpDir.toFile());
File nodeDb = tmpDir.resolve("node.db").toFile();
Path featureDb = tmpDir.resolve("feature.db");
MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, stats);
FeatureRenderer renderer = new FeatureRenderer(featureMap, stats);
FeatureRenderer renderer = new FeatureRenderer(stats);
FlatMapConfig config = new FlatMapConfig(profile, envelope, threads, stats, logIntervalSeconds);
if (fetchWikidata) {
stats.time("wikidata",
() -> Wikidata.fetch(osmInputFile, wikidataNamesFile, config));
}
if (useWikidata) {
translations.addTranslationProvider(Wikidata.load(wikidataNamesFile));
}
stats.time("lake_centerlines", () ->
new ShapefileReader("EPSG:3857", centerlines, stats).process(renderer, profile, threads));
new ShapefileReader("EPSG:3857", centerlines, stats)
.process("lake_centerlines", renderer, featureMap, config));
stats.time("water_polygons", () ->
new ShapefileReader(waterPolygons, stats).process(renderer, profile, threads));
new ShapefileReader(waterPolygons, stats)
.process("water_polygons", renderer, featureMap, config)
);
stats.time("natural_earth", () ->
new NaturalEarthReader(naturalEarth, tmpDir.resolve("natearth.sqlite").toFile(), stats)
.process(renderer, profile, threads));
.process("natural_earth", renderer, featureMap, config)
);
try (var osmReader = new OpenStreetMapReader(osmInputFile, nodeDb, stats)) {
stats.time("osm_pass1", () -> osmReader.pass1(profile, threads));

Wyświetl plik

@ -1,5 +1,8 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import com.onthegomap.flatmap.worker.Worker;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
@ -16,4 +19,37 @@ public class ProgressLoggers {
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) {
return this;
}
public ProgressLoggers addRateCounter(String name, AtomicLong featuresWritten) {
return this;
}
public ProgressLoggers addFileSize(LongSupplier getStorageSize) {
return this;
}
public ProgressLoggers addProcessStats() {
return this;
}
public ProgressLoggers addThreadPoolStats(String name, String prefix) {
return this;
}
public ProgressLoggers addThreadPoolStats(String name, Worker worker) {
return addThreadPoolStats(name, worker.getPrefix());
}
public ProgressLoggers addQueueStats(WorkQueue<?> queue) {
return this;
}
public ProgressLoggers addTopologyStats(Topology<?> topology) {
if (topology == null) {
return this;
}
return addTopologyStats(topology.previous())
.addQueueStats(topology.inputQueue())
.addThreadPoolStats(topology.name(), topology.worker());
}
}

Wyświetl plik

@ -5,6 +5,7 @@ import org.opengis.geometry.Geometry;
public class RenderableFeature {
public Geometry getGeometry() {
return null;
}
// layer
// attrs

Wyświetl plik

@ -1,8 +1,11 @@
package com.onthegomap.flatmap;
import java.util.List;
public class RenderableFeatures {
public Iterable<RenderableFeature> all() {
return List.of();
}
public void reset(SourceFeature sourceFeature) {

Wyświetl plik

@ -5,6 +5,7 @@ import org.locationtech.jts.geom.Geometry;
public class SourceFeature {
public Geometry getGeometry() {
return null;
}
// props
// lazy geometry

Wyświetl plik

@ -1,15 +1,12 @@
package com.onthegomap.flatmap;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.flatmap.profiles.Profile;
import com.onthegomap.flatmap.stats.Stats;
import java.io.File;
import java.util.Map;
public class Wikidata {
public static void fetch(OsmInputFile infile, File outfile, int threads, Profile profile,
Stats stats) {
public static void fetch(OsmInputFile infile, File outfile, FlatMapConfig config) {
// TODO
}

Wyświetl plik

@ -1,9 +1,11 @@
package com.onthegomap.flatmap.collections;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.stats.Stats;
import java.nio.file.Path;
import java.util.function.Consumer;
public class MergeSortFeatureMap {
public class MergeSortFeatureMap implements Consumer<RenderedFeature> {
public MergeSortFeatureMap(Path featureDb, Stats stats) {
@ -11,4 +13,13 @@ public class MergeSortFeatureMap {
public void sort() {
}
@Override
public void accept(RenderedFeature renderedFeature) {
}
public long getStorageSize() {
return 0;
}
}

Wyświetl plik

@ -3,7 +3,7 @@ package com.onthegomap.flatmap.profiles;
import com.graphhopper.reader.ReaderRelation;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.reader.OpenStreetMapReader.RelationInfo;
import java.util.List;
@ -24,7 +24,9 @@ public class OpenMapTilesProfile implements Profile {
}
@Override
public void processFeature(SourceFeature sourceFeature, RenderableFeature features) {
public void processFeature(SourceFeature sourceFeature,
RenderableFeatures features) {
}
}

Wyświetl plik

@ -2,15 +2,22 @@ package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.io.File;
public class NaturalEarthReader implements Reader {
public class NaturalEarthReader extends Reader {
public NaturalEarthReader(File input, File tmpFile, Stats stats) {
super(stats);
}
@Override
public SourceFeature getNext() {
public long getCount() {
return 0;
}
@Override
public SourceStep<SourceFeature> open() {
return null;
}
}

Wyświetl plik

@ -1,15 +1,17 @@
package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.FeatureRenderer;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.FlatMapConfig;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.Worker;
import com.onthegomap.flatmap.worker.Worker.WorkerSource;
import java.io.IOException;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.util.concurrent.atomic.AtomicLong;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
@ -18,68 +20,55 @@ import org.slf4j.LoggerFactory;
public abstract class Reader {
private final Stats stats;
private final Envelope envelope;
private Logger LOGGER = LoggerFactory.getLogger(getClass());
public Reader(Stats stats, Envelope envelope) {
public Reader(Stats stats) {
this.stats = stats;
this.envelope = envelope;
}
protected void log(String message) {
LOGGER.info("[" + getName() + "] " + message);
}
protected abstract String getName();
public final void process(FeatureRenderer renderer, Profile profile, int threads) {
threads = Math.max(threads, 1);
public final void process(String name, FeatureRenderer renderer, MergeSortFeatureMap writer, FlatMapConfig config) {
long featureCount = getCount();
int threads = config.threads();
Envelope env = config.envelope();
OpenMapTilesProfile profile = config.profile();
AtomicLong featuresRead = new AtomicLong(0);
log("Reading with " + threads + " threads");
try (
var source = open(getName() + "-reader");
var sink = renderer.newWriterQueue(getName() + "-writer")
) {
var worker = new Worker(getName() + "-processor", stats, threads, i -> {
SourceFeature sourceFeature;
AtomicLong featuresWritten = new AtomicLong(0);
LOGGER.info("[" + name + "] Reading with " + threads + " threads");
var topology = Topology.fromGenerator(name + "-read", stats, open())
.addBuffer(name + "-reader", 1000)
.<RenderedFeature>addWorker(name + "-process", threads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
var sourceQueue = source.queue();
while ((sourceFeature = sourceQueue.getNext()) != null) {
SourceFeature sourceFeature;
while ((sourceFeature = prev.get()) != null) {
featuresRead.incrementAndGet();
features.reset(sourceFeature);
if (sourceFeature.getGeometry().getEnvelopeInternal().intersects(envelope)) {
if (sourceFeature.getGeometry().getEnvelopeInternal().intersects(env)) {
profile.processFeature(sourceFeature, features);
for (RenderableFeature renderable : features.all()) {
renderer.renderFeature(renderable);
renderer.renderFeature(renderable, next);
}
}
}
})
.addBuffer(name + "-writer", 1000)
.sinkToConsumer("write", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});
// TODO:
// -where should this go ?
// -should the renderer hold a reusable feature writer / queue ?
var loggers = new ProgressLoggers(name)
.addRatePercentCounter("read", featureCount, featuresRead)
.addRateCounter("write", featuresWritten)
.addFileSize(writer::getStorageSize)
.addProcessStats()
.addTopologyStats(topology);
var loggers = new ProgressLoggers(getName())
.addRatePercentCounter("read", featureCount, featuresRead)
.addRateCounter("write", featuresWritten)
.addFileSize(featureMap::getStorageSize)
.addProcessStats()
.addThreadPoolStats("read", getName() + "-reader")
.addQueueStats(readerQueue)
.addThreadPoolStats("process", worker)
.addQueueStats(toWrite)
.addThreadPoolStats("write", writer);
worker.awaitAndLong(loggers);
} catch (IOException e) {
throw new RuntimeException(e);
}
topology.awaitAndLog(loggers, config.logIntervalSeconds());
}
protected abstract long getCount();
public abstract long getCount();
protected abstract WorkerSource<SourceFeature> open(String workerName);
public abstract SourceStep<SourceFeature> open();
}

Wyświetl plik

@ -1,8 +1,8 @@
package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.FeatureRenderer;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.io.File;
public class ShapefileReader extends Reader {
@ -12,8 +12,16 @@ public class ShapefileReader extends Reader {
}
public ShapefileReader(File input, Stats stats) {
this(null, input, stats);
}
public void process(FeatureRenderer renderer, Profile profile, int threads) {
@Override
public long getCount() {
return 0;
}
@Override
public SourceStep<SourceFeature> open() {
return null;
}
}

Wyświetl plik

@ -1,5 +1,9 @@
package com.onthegomap.flatmap.stats;
import com.graphhopper.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface Stats {
void time(String name, Runnable task);
@ -9,4 +13,32 @@ public interface Stats {
void startTimer(String name);
void stopTimer(String name);
class InMemory implements Stats {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemory.class);
@Override
public void time(String name, Runnable task) {
StopWatch timer = new StopWatch().start();
LOGGER.info("[" + name + "] Starting...");
task.run();
LOGGER.info("[" + name + "] Finished in " + timer.stop());
}
@Override
public void printSummary() {
}
@Override
public void startTimer(String name) {
}
@Override
public void stopTimer(String name) {
}
}
}

Wyświetl plik

@ -1,6 +0,0 @@
package com.onthegomap.flatmap.worker;
public interface Sink<T> {
void process(T item);
}

Wyświetl plik

@ -1,6 +0,0 @@
package com.onthegomap.flatmap.worker;
public interface Source<T> {
T getNext();
}

Wyświetl plik

@ -0,0 +1,110 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.stats.Stats;
import java.util.function.Consumer;
import java.util.function.Supplier;
public record Topology<T>(
String name,
com.onthegomap.flatmap.worker.Topology<?> previous,
WorkQueue<T> inputQueue,
Worker worker
) {
public static <T> Bufferable<?, T> fromGenerator(String name, Stats stats, SourceStep<T> producer, int threads) {
return (queueName, size, batchSize) -> {
var nextQueue = new WorkQueue<T>(queueName, size, batchSize, stats);
Worker worker = new Worker(name, stats, threads, () -> producer.run(nextQueue));
return new Builder<>(name, nextQueue, worker, stats);
};
}
public static <T> Bufferable<?, T> fromGenerator(String name, Stats stats, SourceStep<T> producer) {
return fromGenerator(name, stats, producer, 1);
}
public static <T> Builder<?, T> readFromQueue(Stats stats, WorkQueue<T> input) {
return new Builder<>(input, stats);
}
public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) {
if (previous != null) {
previous.awaitAndLog(loggers, logIntervalSeconds);
} else { // producer is responsible for closing the initial input queue
inputQueue.close();
}
worker.awaitAndLog(loggers, logIntervalSeconds);
}
public interface SourceStep<O> {
void run(Consumer<O> next);
}
public interface WorkerStep<I, O> {
void run(Supplier<I> prev, Consumer<O> next);
}
public interface SinkStep<I> {
void run(Supplier<I> prev);
}
public interface Bufferable<I, O> {
Builder<I, O> addBuffer(String name, int size, int batchSize);
default Builder<I, O> addBuffer(String name, int size) {
return addBuffer(name, size, 1);
}
}
public static record Builder<I, O>(
String name,
Topology.Builder<?, I> previous,
WorkQueue<I> inputQueue,
WorkQueue<O> outputQueue,
Worker worker, Stats stats
) {
public Builder(String name, WorkQueue<O> outputQueue, Worker worker, Stats stats) {
this(name, null, null, outputQueue, worker, stats);
}
public Builder(WorkQueue<O> outputQueue, Stats stats) {
this(null, outputQueue, null, stats);
}
public <O2> Bufferable<O, O2> addWorker(String name, int threads, WorkerStep<O, O2> step) {
Builder<I, O> curr = this;
return (queueName, size, batchSize) -> {
var nextOutputQueue = new WorkQueue<O2>(queueName, size, batchSize, stats);
var worker = new Worker(name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
return new Builder<>(name, curr, outputQueue, nextOutputQueue, worker, stats);
};
}
private Topology<I> build() {
var previousTopology = previous == null || previous.worker == null ? null : previous.build();
return new Topology<>(name, previousTopology, inputQueue, worker);
}
public Topology<O> sinkTo(String name, int threads, SinkStep<O> step) {
var previousTopology = previous.build();
var worker = new Worker(name, stats, threads, () -> step.run(outputQueue));
return new Topology<>(name, previousTopology, outputQueue, worker);
}
public Topology<O> sinkToConsumer(String name, int threads, Consumer<O> step) {
return sinkTo(name, threads, (prev) -> {
O item;
while ((item = prev.get()) != null) {
step.accept(item);
}
});
}
}
}

Wyświetl plik

@ -1,15 +1,18 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.stats.Stats;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
}
@Override
public void close() throws IOException {
public void close() {
}

Wyświetl plik

@ -1,11 +1,18 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.stats.Stats;
import java.util.function.Consumer;
public class Worker {
public Worker(String name, Stats stats, int threads, Consumer<Integer> task) {
public Worker(String name, Stats stats, int threads, Runnable task) {
}
public String getPrefix() {
return null;
}
public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) {
}
}