diff --git a/pom.xml b/pom.xml
index 85215898..4ac6dc1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -195,5 +195,12 @@
+
+
+
+ src/test/resources
+ true
+
+
diff --git a/src/main/java/com/onthegomap/flatmap/OsmInputFile.java b/src/main/java/com/onthegomap/flatmap/OsmInputFile.java
index a827418b..773680d1 100644
--- a/src/main/java/com/onthegomap/flatmap/OsmInputFile.java
+++ b/src/main/java/com/onthegomap/flatmap/OsmInputFile.java
@@ -2,13 +2,18 @@ package com.onthegomap.flatmap;
import com.google.protobuf.ByteString;
import com.graphhopper.reader.ReaderElement;
-import com.onthegomap.flatmap.monitoring.Stats;
-import com.onthegomap.flatmap.worker.Topology;
-import com.onthegomap.flatmap.worker.WorkQueue;
+import com.graphhopper.reader.osm.pbf.PbfDecoder;
+import com.graphhopper.reader.osm.pbf.PbfStreamSplitter;
+import com.graphhopper.reader.osm.pbf.Sink;
+import com.onthegomap.flatmap.worker.Topology.SourceStep;
+import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.openstreetmap.osmosis.osmbinary.Fileformat.Blob;
@@ -64,18 +69,31 @@ public class OsmInputFile {
}
}
- public WorkQueue newReaderQueue(String name, int threads, int size, int batchSize, Stats stats) {
- return null;
+ public void readTo(Consumer next, int threads) throws IOException {
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ try (var stream = new BufferedInputStream(new FileInputStream(file), 50000)) {
+ PbfStreamSplitter streamSplitter = new PbfStreamSplitter(new DataInputStream(stream));
+ var sink = new ReaderElementSink(next);
+ PbfDecoder pbfDecoder = new PbfDecoder(streamSplitter, executorService, threads + 1, sink);
+ pbfDecoder.run();
+ } finally {
+ executorService.shutdownNow();
+ }
}
- public Topology.Builder, ReaderElement> newTopology(
- String prefix,
- int readerThreads,
- int size,
- int batchSize,
- Stats stats
- ) {
- return Topology.start(prefix, stats)
- .readFromQueue(newReaderQueue(prefix + "_reader_queue", readerThreads, size, batchSize, stats));
+ public SourceStep read(int threads) {
+ return next -> readTo(next, threads);
+ }
+
+ private static record ReaderElementSink(Consumer queue) implements Sink {
+
+ @Override
+ public void process(ReaderElement readerElement) {
+ queue.accept(readerElement);
+ }
+
+ @Override
+ public void complete() {
+ }
}
}
diff --git a/src/main/java/com/onthegomap/flatmap/Wikidata.java b/src/main/java/com/onthegomap/flatmap/Wikidata.java
index 1cbae368..8e630179 100644
--- a/src/main/java/com/onthegomap/flatmap/Wikidata.java
+++ b/src/main/java/com/onthegomap/flatmap/Wikidata.java
@@ -15,6 +15,7 @@ import com.graphhopper.util.StopWatch;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
+import com.onthegomap.flatmap.worker.Topology;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -135,7 +136,9 @@ public class Wikidata {
Wikidata fetcher = new Wikidata(writer, Client.wrap(client), 5_000);
fetcher.loadExisting(oldMappings);
- var topology = infile.newTopology("wikidata", readerThreads, 50_000, 10_000, stats)
+ var topology = Topology.start("wikidata", stats)
+ .fromGenerator("pbf", infile.read(readerThreads))
+ .addBuffer("reader_queue", 50_000, 10_000)
.addWorker("filter", processThreads, fetcher::filter)
.addBuffer("fetch_queue", 50_000)
.sinkTo("fetch", 1, prev -> {
@@ -257,7 +260,7 @@ public class Wikidata {
try {
StopWatch timer = new StopWatch().start();
LongObjectMap