got osm reader working

pull/1/head
Mike Barry 2021-04-16 07:13:05 -04:00
rodzic b80a576348
commit f26ecce4f2
9 zmienionych plików z 132 dodań i 33 usunięć

Wyświetl plik

@ -195,5 +195,12 @@
</plugin>
</plugins>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</project>

Wyświetl plik

@ -2,13 +2,18 @@ package com.onthegomap.flatmap;
import com.google.protobuf.ByteString;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import com.graphhopper.reader.osm.pbf.PbfDecoder;
import com.graphhopper.reader.osm.pbf.PbfStreamSplitter;
import com.graphhopper.reader.osm.pbf.Sink;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.openstreetmap.osmosis.osmbinary.Fileformat.Blob;
@ -64,18 +69,31 @@ public class OsmInputFile {
}
}
public WorkQueue<ReaderElement> newReaderQueue(String name, int threads, int size, int batchSize, Stats stats) {
return null;
public void readTo(Consumer<ReaderElement> next, int threads) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try (var stream = new BufferedInputStream(new FileInputStream(file), 50000)) {
PbfStreamSplitter streamSplitter = new PbfStreamSplitter(new DataInputStream(stream));
var sink = new ReaderElementSink(next);
PbfDecoder pbfDecoder = new PbfDecoder(streamSplitter, executorService, threads + 1, sink);
pbfDecoder.run();
} finally {
executorService.shutdownNow();
}
}
public Topology.Builder<?, ReaderElement> newTopology(
String prefix,
int readerThreads,
int size,
int batchSize,
Stats stats
) {
return Topology.start(prefix, stats)
.readFromQueue(newReaderQueue(prefix + "_reader_queue", readerThreads, size, batchSize, stats));
public SourceStep<ReaderElement> read(int threads) {
return next -> readTo(next, threads);
}
private static record ReaderElementSink(Consumer<ReaderElement> queue) implements Sink {
@Override
public void process(ReaderElement readerElement) {
queue.accept(readerElement);
}
@Override
public void complete() {
}
}
}

Wyświetl plik

@ -15,6 +15,7 @@ import com.graphhopper.util.StopWatch;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.worker.Topology;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@ -135,7 +136,9 @@ public class Wikidata {
Wikidata fetcher = new Wikidata(writer, Client.wrap(client), 5_000);
fetcher.loadExisting(oldMappings);
var topology = infile.newTopology("wikidata", readerThreads, 50_000, 10_000, stats)
var topology = Topology.start("wikidata", stats)
.fromGenerator("pbf", infile.read(readerThreads))
.addBuffer("reader_queue", 50_000, 10_000)
.addWorker("filter", processThreads, fetcher::filter)
.addBuffer("fetch_queue", 50_000)
.sinkTo("fetch", 1, prev -> {
@ -257,7 +260,7 @@ public class Wikidata {
try {
StopWatch timer = new StopWatch().start();
LongObjectMap<Map<String, String>> results = queryWikidata(qidsToFetch);
LOGGER.info("Fetched batch " + batches.incrementAndGet() + " " + qidsToFetch.size() + " " + timer.stop());
LOGGER.info("Fetched batch " + batches.incrementAndGet() + " (" + qidsToFetch.size() + " qids) " + timer.stop());
writeTranslations(results);
} catch (IOException | InterruptedException | URISyntaxException e) {
throw new RuntimeException(e);
@ -351,7 +354,7 @@ public class Wikidata {
public Map<String, String> getNameTranslations(ReaderElement elem) {
long wikidataId = parseQid(elem.getTag("wikidata"));
if (wikidataId > 0) {
return data.get(wikidataId);
return get(wikidataId);
}
return null;
}

Wyświetl plik

@ -1,6 +1,8 @@
package com.onthegomap.flatmap.monitoring;
import com.graphhopper.util.StopWatch;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,13 +23,13 @@ public interface Stats {
class InMemory implements Stats {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemory.class);
private Map<String, StopWatch> timers = new TreeMap<>();
@Override
public void time(String name, Runnable task) {
StopWatch timer = new StopWatch().start();
LOGGER.info("[" + name + "] Starting...");
startTimer(name);
task.run();
LOGGER.info("[" + name + "] Finished in " + timer.stop());
stopTimer(name);
}
@Override
@ -37,12 +39,13 @@ public interface Stats {
@Override
public void startTimer(String name) {
timers.put(name, new StopWatch().start());
LOGGER.info("[" + name + "] Starting...");
}
@Override
public void stopTimer(String name) {
LOGGER.info("[" + name + "] Finished in " + timers.get(name).stop());
}
@Override

Wyświetl plik

@ -61,9 +61,9 @@ public class OpenStreetMapReader implements Closeable {
public void pass1(FlatMapConfig config) {
Profile profile = config.profile();
var topology = Topology.start("osm_pass1", stats)
.readFromQueue(
osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats)
).sinkToConsumer("process", 1, (readerElement) -> {
.fromGenerator("pbf", osmInputFile.read(config.threads() - 1))
.addBuffer("reader_queue", 50_000, 10_000)
.sinkToConsumer("process", 1, (readerElement) -> {
if (readerElement instanceof ReaderNode node) {
TOTAL_NODES.incrementAndGet();
nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat()));
@ -116,9 +116,9 @@ public class OpenStreetMapReader implements Closeable {
CountDownLatch waysDone = new CountDownLatch(processThreads);
var topology = Topology.start("osm_pass2", stats)
.readFromQueue(
osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats)
).<RenderedFeature>addWorker("process", processThreads, (prev, next) -> {
.fromGenerator("pbf", osmInputFile.read(readerThreads))
.addBuffer("reader_queue", 50_000, 1_000)
.<RenderedFeature>addWorker("process", processThreads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
ReaderElement readerElement;
while ((readerElement = prev.get()) != null) {

Wyświetl plik

@ -30,6 +30,18 @@ public record Topology<T>(
}
}
public void await() {
if (previous != null) {
previous.await();
}
if (inputQueue != null) {
inputQueue.close();
}
if (worker != null) {
worker.await();
}
}
public interface SourceStep<O> {
void run(Consumer<O> next) throws Exception;

Wyświetl plik

@ -2,7 +2,10 @@ package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.monitoring.Stats;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -22,6 +25,7 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
private final int pendingBatchesCapacity;
private volatile boolean hasIncomingData = true;
private final AtomicInteger pendingCount = new AtomicInteger(0);
private final List<Closeable> closeables = new ArrayList<>();
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
this.pendingBatchesCapacity = capacity / maxBatch;
@ -31,16 +35,19 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
@Override
public void close() {
for (Queue<T> q : queues.values()) {
try {
try {
for (Queue<T> q : queues.values()) {
if (!q.isEmpty()) {
itemQueue.put(q);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
hasIncomingData = false;
for (Closeable closeable : closeables) {
closeable.close();
}
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
hasIncomingData = false;
}
@Override
@ -116,5 +123,9 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public int getCapacity() {
return pendingBatchesCapacity * batchSize;
}
public void alsoClose(Closeable toClose) {
closeables.add(toClose);
}
}

Wyświetl plik

@ -0,0 +1,45 @@
package com.onthegomap.flatmap;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.flatmap.monitoring.Stats.InMemory;
import com.onthegomap.flatmap.worker.Topology;
import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class OsmInputFileTest {
private OsmInputFile file = new OsmInputFile(new File("src/test/resources/andorra-latest.osm.pbf"));
@Test
public void testGetBounds() {
assertArrayEquals(new double[]{1.412368, 42.4276, 1.787481, 42.65717}, file.getBounds());
}
@Test
@Timeout(30)
public void testReadAndorraTwice() {
for (int i = 1; i <= 2; i++) {
AtomicInteger nodes = new AtomicInteger(0);
AtomicInteger ways = new AtomicInteger(0);
AtomicInteger rels = new AtomicInteger(0);
Topology.start("test", new InMemory())
.fromGenerator("pbf", file.read(2))
.addBuffer("reader_queue", 1_000, 100)
.sinkToConsumer("counter", 1, elem -> {
switch (elem.getType()) {
case ReaderElement.NODE -> nodes.incrementAndGet();
case ReaderElement.WAY -> ways.incrementAndGet();
case ReaderElement.RELATION -> rels.incrementAndGet();
}
}).await();
assertEquals(246_028, nodes.get(), "nodes pass " + i);
assertEquals(12_677, ways.get(), "ways pass " + i);
assertEquals(287, rels.get(), "rels pass " + i);
}
}
}

Plik binarny nie jest wyświetlany.