refactor topology naming

pull/1/head
Mike Barry 2021-04-12 07:14:05 -04:00
rodzic 7f6ccadc21
commit 09de796fe5
6 zmienionych plików z 111 dodań i 99 usunięć

Wyświetl plik

@ -40,11 +40,12 @@ public class MbtilesWriter {
output.delete();
MbtilesWriter writer = new MbtilesWriter(config.stats());
var topology = Topology.readFromIterator("mbtiles_reader", stats, features.getAll())
.addBuffer("mbtiles_reader_queue", 50_000, 1_000)
.addWorker("mbtiles_encoder", config.threads(), writer::tileEncoder)
.addBuffer("mbtiles_writer_queue", 50_000, 1_000)
.sinkTo("mbtiles_writer", 1, writer::tileWriter);
var topology = Topology.start("mbtiles", stats)
.readFromIterator("reader", features.getAll())
.addBuffer("reader_queue", 50_000, 1_000)
.addWorker("encoder", config.threads(), writer::tileEncoder)
.addBuffer("writer_queue", 50_000, 1_000)
.sinkTo("writer", 1, writer::tileWriter);
var loggers = new ProgressLoggers("mbtiles")
.addRatePercentCounter("features", featureCount, writer.featuresProcessed)

Wyświetl plik

@ -60,37 +60,38 @@ public class OpenStreetMapReader implements Closeable {
public void pass1(FlatMapConfig config) {
Profile profile = config.profile();
var topology = Topology.readFromQueue(stats,
osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats)
).sinkToConsumer("osm_pass1_processor", 1, (readerElement) -> {
if (readerElement instanceof ReaderNode node) {
TOTAL_NODES.incrementAndGet();
nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat()));
} else if (readerElement instanceof ReaderWay) {
TOTAL_WAYS.incrementAndGet();
} else if (readerElement instanceof ReaderRelation rel) {
TOTAL_RELATIONS.incrementAndGet();
List<RelationInfo> infos = profile.preprocessOsmRelation(rel);
if (infos != null) {
for (RelationInfo info : infos) {
relationInfo.put(rel.getId(), info);
relationInfoSizes.addAndGet(info.sizeBytes());
var topology = Topology.start("osm_pass1", stats)
.readFromQueue(
osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats)
).sinkToConsumer("process", 1, (readerElement) -> {
if (readerElement instanceof ReaderNode node) {
TOTAL_NODES.incrementAndGet();
nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat()));
} else if (readerElement instanceof ReaderWay) {
TOTAL_WAYS.incrementAndGet();
} else if (readerElement instanceof ReaderRelation rel) {
TOTAL_RELATIONS.incrementAndGet();
List<RelationInfo> infos = profile.preprocessOsmRelation(rel);
if (infos != null) {
for (RelationInfo info : infos) {
relationInfo.put(rel.getId(), info);
relationInfoSizes.addAndGet(info.sizeBytes());
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
wayToRelations.put(member.getRef(), rel.getId());
}
}
}
}
if (rel.hasTag("type", "multipolygon")) {
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
wayToRelations.put(member.getRef(), rel.getId());
waysInMultipolygon.add(member.getRef());
}
}
}
}
if (rel.hasTag("type", "multipolygon")) {
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
waysInMultipolygon.add(member.getRef());
}
}
}
}
});
});
var loggers = new ProgressLoggers("osm_pass1")
.addRateCounter("nodes", TOTAL_NODES)
@ -114,43 +115,44 @@ public class OpenStreetMapReader implements Closeable {
AtomicLong featuresWritten = new AtomicLong(0);
CountDownLatch waysDone = new CountDownLatch(processThreads);
var topology = Topology.readFromQueue(stats,
osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats)
).<RenderedFeature>addWorker("osm_pass2_processor", processThreads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
ReaderElement readerElement;
while ((readerElement = prev.get()) != null) {
SourceFeature feature = null;
if (readerElement instanceof ReaderNode node) {
nodesProcessed.incrementAndGet();
feature = new NodeSourceFeature(node);
} else if (readerElement instanceof ReaderWay way) {
waysProcessed.incrementAndGet();
feature = new WaySourceFeature(way);
} else if (readerElement instanceof ReaderRelation rel) {
// ensure all ways finished processing before we start relations
if (waysDone.getCount() > 0) {
waysDone.countDown();
waysDone.await();
var topology = Topology.start("osm_pass2", stats)
.readFromQueue(
osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats)
).<RenderedFeature>addWorker("process", processThreads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
ReaderElement readerElement;
while ((readerElement = prev.get()) != null) {
SourceFeature feature = null;
if (readerElement instanceof ReaderNode node) {
nodesProcessed.incrementAndGet();
feature = new NodeSourceFeature(node);
} else if (readerElement instanceof ReaderWay way) {
waysProcessed.incrementAndGet();
feature = new WaySourceFeature(way);
} else if (readerElement instanceof ReaderRelation rel) {
// ensure all ways finished processing before we start relations
if (waysDone.getCount() > 0) {
waysDone.countDown();
waysDone.await();
}
relsProcessed.incrementAndGet();
if (rel.hasTag("type", "multipolygon")) {
feature = new MultipolygonSourceFeature(rel);
}
}
relsProcessed.incrementAndGet();
if (rel.hasTag("type", "multipolygon")) {
feature = new MultipolygonSourceFeature(rel);
if (feature != null) {
features.reset(feature);
profile.processFeature(feature, features);
for (RenderableFeature renderable : features.all()) {
renderer.renderFeature(renderable, next);
}
}
}
if (feature != null) {
features.reset(feature);
profile.processFeature(feature, features);
for (RenderableFeature renderable : features.all()) {
renderer.renderFeature(renderable, next);
}
}
}
// just in case a worker skipped over all relations
waysDone.countDown();
}).addBuffer("osm_pass2_feature_queue", 50_000, 1_000)
.sinkToConsumer("osm_pass2_writer", 1, (item) -> {
// just in case a worker skipped over all relations
waysDone.countDown();
}).addBuffer("feature_queue", 50_000, 1_000)
.sinkToConsumer("write", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});

Wyświetl plik

@ -34,9 +34,10 @@ public abstract class Reader {
AtomicLong featuresRead = new AtomicLong(0);
AtomicLong featuresWritten = new AtomicLong(0);
var topology = Topology.fromGenerator(name + "_read", stats, open())
.addBuffer(name + "_reader", 1000)
.<RenderedFeature>addWorker(name + "_process", threads, (prev, next) -> {
var topology = Topology.start(name, stats)
.fromGenerator("read", open())
.addBuffer("read_queue", 1000)
.<RenderedFeature>addWorker("process", threads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
SourceFeature sourceFeature;
while ((sourceFeature = prev.get()) != null) {
@ -50,8 +51,8 @@ public abstract class Reader {
}
}
})
.addBuffer(name + "_writer", 1000)
.sinkToConsumer(name + "_write", 1, (item) -> {
.addBuffer("write_queue", 1000)
.sinkToConsumer("write", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});

Wyświetl plik

@ -13,28 +13,8 @@ public record Topology<T>(
Worker worker
) {
public static <T> Bufferable<?, T> fromGenerator(String name, Stats stats, SourceStep<T> producer, int threads) {
return (queueName, size, batchSize) -> {
var nextQueue = new WorkQueue<T>(queueName, size, batchSize, stats);
Worker worker = new Worker(name, stats, threads, () -> producer.run(nextQueue));
return new Builder<>(name, nextQueue, worker, stats);
};
}
public static <T> Bufferable<?, T> fromGenerator(String name, Stats stats, SourceStep<T> producer) {
return fromGenerator(name, stats, producer, 1);
}
public static <T> Bufferable<?, T> readFromIterator(String name, Stats stats, Iterator<T> iter) {
return fromGenerator(name, stats, next -> {
while (iter.hasNext()) {
next.accept(iter.next());
}
}, 1);
}
public static <T> Builder<?, T> readFromQueue(Stats stats, WorkQueue<T> input) {
return new Builder<>(input, stats);
public static Empty start(String prefix, Stats stats) {
return new Empty(prefix, stats);
}
public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) {
@ -71,7 +51,35 @@ public record Topology<T>(
}
}
public static record Empty(String prefix, Stats stats) {
public <T> Bufferable<?, T> fromGenerator(String name, SourceStep<T> producer, int threads) {
return (queueName, size, batchSize) -> {
var nextQueue = new WorkQueue<T>(prefix, queueName, size, batchSize, stats);
Worker worker = new Worker(prefix, name, stats, threads, () -> producer.run(nextQueue));
return new Builder<>(prefix, name, nextQueue, worker, stats);
};
}
public <T> Bufferable<?, T> fromGenerator(String name, SourceStep<T> producer) {
return fromGenerator(name, producer, 1);
}
public <T> Bufferable<?, T> readFromIterator(String name, Iterator<T> iter) {
return fromGenerator(name, next -> {
while (iter.hasNext()) {
next.accept(iter.next());
}
}, 1);
}
public <T> Builder<?, T> readFromQueue(WorkQueue<T> input) {
return new Builder<>(input, stats);
}
}
public static record Builder<I, O>(
String prefix,
String name,
Topology.Builder<?, I> previous,
WorkQueue<I> inputQueue,
@ -79,20 +87,20 @@ public record Topology<T>(
Worker worker, Stats stats
) {
public Builder(String name, WorkQueue<O> outputQueue, Worker worker, Stats stats) {
this(name, null, null, outputQueue, worker, stats);
public Builder(String prefix, String name, WorkQueue<O> outputQueue, Worker worker, Stats stats) {
this(prefix, name, null, null, outputQueue, worker, stats);
}
public Builder(WorkQueue<O> outputQueue, Stats stats) {
this(null, outputQueue, null, stats);
this(null, null, outputQueue, null, stats);
}
public <O2> Bufferable<O, O2> addWorker(String name, int threads, WorkerStep<O, O2> step) {
Builder<I, O> curr = this;
return (queueName, size, batchSize) -> {
var nextOutputQueue = new WorkQueue<O2>(queueName, size, batchSize, stats);
var worker = new Worker(name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
return new Builder<>(name, curr, outputQueue, nextOutputQueue, worker, stats);
var nextOutputQueue = new WorkQueue<O2>(prefix, queueName, size, batchSize, stats);
var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats);
};
}
@ -103,7 +111,7 @@ public record Topology<T>(
public Topology<O> sinkTo(String name, int threads, SinkStep<O> step) {
var previousTopology = previous.build();
var worker = new Worker(name, stats, threads, () -> step.run(outputQueue));
var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue));
return new Topology<>(name, previousTopology, outputQueue, worker);
}

Wyświetl plik

@ -7,7 +7,7 @@ import java.util.function.Supplier;
public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
public WorkQueue(String prefix, String name, int capacity, int maxBatch, Stats stats) {
}

Wyświetl plik

@ -5,7 +5,7 @@ import com.onthegomap.flatmap.stats.Stats;
public class Worker {
public Worker(String name, Stats stats, int threads, RunnableThatThrows task) {
public Worker(String prefix, String name, Stats stats, int threads, RunnableThatThrows task) {
}