more scaffolding

pull/1/head
Mike Barry 2021-04-12 06:05:32 -04:00
rodzic c15126eecd
commit 1273c6bd9b
13 zmienionych plików z 312 dodań i 31 usunięć

Wyświetl plik

@ -73,4 +73,14 @@ public class GeoUtils {
return copy;
}
};
static final double QUANTIZED_WORLD_SIZE = Math.pow(2, 31);
public static long encodeFlatLocation(double lon, double lat) {
double worldX = getWorldX(lon);
double worldY = getWorldY(lat);
long x = (long) (worldX * QUANTIZED_WORLD_SIZE);
long y = (long) (worldY * QUANTIZED_WORLD_SIZE);
return (x << 32) | y;
}
}

Wyświetl plik

@ -5,7 +5,7 @@ import java.io.File;
public class MbtilesWriter {
public static void writeOutput(MergeSortFeatureMap features, File output, int threads) {
public static void writeOutput(MergeSortFeatureMap features, File output, FlatMapConfig threads) {
}
}

Wyświetl plik

@ -1,5 +1,6 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.collections.LongLongMap;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.reader.NaturalEarthReader;
@ -62,13 +63,13 @@ public class OpenMapTilesMain {
FileUtils.forceMkdir(tmpDir.toFile());
File nodeDb = tmpDir.resolve("node.db").toFile();
Path featureDb = tmpDir.resolve("feature.db");
LongLongMap nodeLocations = new LongLongMap.MapdbSortedTable(nodeDb);
MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, stats);
FeatureRenderer renderer = new FeatureRenderer(stats);
FlatMapConfig config = new FlatMapConfig(profile, envelope, threads, stats, logIntervalSeconds);
if (fetchWikidata) {
stats.time("wikidata",
() -> Wikidata.fetch(osmInputFile, wikidataNamesFile, config));
stats.time("wikidata", () -> Wikidata.fetch(osmInputFile, wikidataNamesFile, config));
}
if (useWikidata) {
translations.addTranslationProvider(Wikidata.load(wikidataNamesFile));
@ -86,9 +87,9 @@ public class OpenMapTilesMain {
.process("natural_earth", renderer, featureMap, config)
);
try (var osmReader = new OpenStreetMapReader(osmInputFile, nodeDb, stats)) {
stats.time("osm_pass1", () -> osmReader.pass1(profile, threads));
stats.time("osm_pass2", () -> osmReader.pass2(renderer, profile, threads));
try (var osmReader = new OpenStreetMapReader(osmInputFile, nodeLocations, stats)) {
stats.time("osm_pass1", () -> osmReader.pass1(config));
stats.time("osm_pass2", () -> osmReader.pass2(renderer, featureMap, config));
}
LOGGER.info("Deleting node.db to make room for mbtiles");
@ -96,7 +97,7 @@ public class OpenMapTilesMain {
nodeDb.delete();
stats.time("sort", featureMap::sort);
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureMap, output, threads));
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureMap, output, config));
stats.stopTimer("import");

Wyświetl plik

@ -1,6 +1,9 @@
package com.onthegomap.flatmap;
import com.google.protobuf.ByteString;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.WorkQueue;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
@ -59,4 +62,8 @@ public class OsmInputFile {
throw new RuntimeException(e);
}
}
public WorkQueue<ReaderElement> newReaderQueue(String name, int threads, int size, int batchSize, Stats stats) {
return null;
}
}

Wyświetl plik

@ -11,4 +11,5 @@ public interface Profile {
void processFeature(SourceFeature sourceFeature, RenderableFeatures features);
void release();
}

Wyświetl plik

@ -3,6 +3,7 @@ package com.onthegomap.flatmap;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import com.onthegomap.flatmap.worker.Worker;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
@ -28,10 +29,18 @@ public class ProgressLoggers {
return this;
}
public ProgressLoggers addFileSize(File filePath) {
return this;
}
public ProgressLoggers addProcessStats() {
return this;
}
public ProgressLoggers addInMemoryObject(String name, LongSupplier size) {
return this;
}
public ProgressLoggers addThreadPoolStats(String name, String prefix) {
return this;
}

Wyświetl plik

@ -2,11 +2,9 @@ package com.onthegomap.flatmap;
import org.locationtech.jts.geom.Geometry;
public class SourceFeature {
public interface SourceFeature {
public Geometry getGeometry() {
return null;
}
Geometry getGeometry();
// props
// lazy geometry
// lazy centroid

Wyświetl plik

@ -1,5 +1,41 @@
package com.onthegomap.flatmap.collections;
public interface LongLongMap {
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
public interface LongLongMap extends Closeable {
void put(long key, long value);
long get(long key);
File filePath();
class MapdbSortedTable implements LongLongMap {
public MapdbSortedTable(File nodeDb) {
}
@Override
public void put(long key, long value) {
}
@Override
public long get(long key) {
return 0;
}
@Override
public File filePath() {
return null;
}
@Override
public void close() throws IOException {
}
}
}

Wyświetl plik

@ -2,4 +2,21 @@ package com.onthegomap.flatmap.collections;
public interface LongLongMultimap {
void put(long key, long value);
class FewUnorderedBinarySearchMultimap implements LongLongMultimap {
@Override
public void put(long key, long value) {
}
}
class ManyOrderedBinarySearchMultimap implements LongLongMultimap {
@Override
public void put(long key, long value) {
}
}
}

Wyświetl plik

@ -1,30 +1,228 @@
package com.onthegomap.flatmap.reader;
import com.carrotsearch.hppc.LongHashSet;
import com.graphhopper.coll.GHLongHashSet;
import com.graphhopper.coll.GHLongObjectHashMap;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.flatmap.FeatureRenderer;
import com.onthegomap.flatmap.FlatMapConfig;
import com.onthegomap.flatmap.GeoUtils;
import com.onthegomap.flatmap.OsmInputFile;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.LongLongMap;
import com.onthegomap.flatmap.collections.LongLongMultimap;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.locationtech.jts.geom.Geometry;
public class OpenStreetMapReader implements Closeable {
public OpenStreetMapReader(OsmInputFile osmInputFile, File nodeDb, Stats stats) {
private final OsmInputFile osmInputFile;
private final Stats stats;
private final LongLongMap nodeDb;
private final AtomicLong TOTAL_NODES = new AtomicLong(0);
private final AtomicLong TOTAL_WAYS = new AtomicLong(0);
private final AtomicLong TOTAL_RELATIONS = new AtomicLong(0);
// need a few large objects to process ways in relations, should be small enough to keep in memory
// for routes (750k rels 40m ways) and boundaries (650k rels, 8m ways)
// need to store route info to use later when processing ways
// <~500mb
private GHLongObjectHashMap<RelationInfo> relationInfo = new GHLongObjectHashMap<>();
private final AtomicLong relationInfoSizes = new AtomicLong(0);
// ~800mb, ~1.6GB when sorting
private LongLongMultimap wayToRelations = new LongLongMultimap.FewUnorderedBinarySearchMultimap();
// for multipolygons need to store way info (20m ways, 800m nodes) to use when processing relations (4.5m)
// ~300mb
private LongHashSet waysInMultipolygon = new GHLongHashSet();
// ~7GB
private LongLongMultimap multipolygonWayGeometries = new LongLongMultimap.ManyOrderedBinarySearchMultimap();
public OpenStreetMapReader(OsmInputFile osmInputFile, LongLongMap nodeDb, Stats stats) {
this.osmInputFile = osmInputFile;
this.nodeDb = nodeDb;
this.stats = stats;
}
public void pass1(OpenMapTilesProfile profile, int threads) {
public void pass1(FlatMapConfig config) {
Profile profile = config.profile();
var topology = Topology.readFromQueue(stats,
osmInputFile.newReaderQueue("osm_pass1_reader_queue", config.threads() - 1, 50_000, 10_000, stats)
).sinkToConsumer("osm_pass1_processor", 1, (readerElement) -> {
if (readerElement instanceof ReaderNode node) {
TOTAL_NODES.incrementAndGet();
nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat()));
} else if (readerElement instanceof ReaderWay) {
TOTAL_WAYS.incrementAndGet();
} else if (readerElement instanceof ReaderRelation rel) {
TOTAL_RELATIONS.incrementAndGet();
List<RelationInfo> infos = profile.preprocessOsmRelation(rel);
if (infos != null) {
for (RelationInfo info : infos) {
relationInfo.put(rel.getId(), info);
relationInfoSizes.addAndGet(info.sizeBytes());
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
wayToRelations.put(member.getRef(), rel.getId());
}
}
}
}
if (rel.hasTag("type", "multipolygon")) {
for (ReaderRelation.Member member : rel.getMembers()) {
if (member.getType() == ReaderRelation.Member.WAY) {
waysInMultipolygon.add(member.getRef());
}
}
}
}
});
var loggers = new ProgressLoggers("osm_pass1")
.addRateCounter("nodes", TOTAL_NODES)
.addFileSize(nodeDb.filePath())
.addRateCounter("ways", TOTAL_WAYS)
.addRateCounter("rels", TOTAL_RELATIONS)
.addProcessStats()
.addInMemoryObject("hppc", this::getBigObjectSizeBytes)
.addThreadPoolStats("pbf", "PBF")
.addThreadPoolStats("parse", "pool-")
.addTopologyStats(topology);
topology.awaitAndLog(loggers, config.logIntervalSeconds());
}
public void pass2(FeatureRenderer renderer, OpenMapTilesProfile profile, int threads) {
public void pass2(FeatureRenderer renderer, MergeSortFeatureMap writer, int readerThreads, int processThreads,
FlatMapConfig config) {
Profile profile = config.profile();
AtomicLong nodesProcessed = new AtomicLong(0);
AtomicLong waysProcessed = new AtomicLong(0);
AtomicLong relsProcessed = new AtomicLong(0);
AtomicLong featuresWritten = new AtomicLong(0);
CountDownLatch waysDone = new CountDownLatch(processThreads);
var topology = Topology.readFromQueue(stats,
osmInputFile.newReaderQueue("osm_pass2_reader_queue", readerThreads, 50_000, 1_000, stats)
).<RenderedFeature>addWorker("osm_pass2_processor", processThreads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
ReaderElement readerElement;
while ((readerElement = prev.get()) != null) {
SourceFeature feature = null;
if (readerElement instanceof ReaderNode node) {
nodesProcessed.incrementAndGet();
feature = new NodeSourceFeature(node);
} else if (readerElement instanceof ReaderWay way) {
waysProcessed.incrementAndGet();
feature = new WaySourceFeature(way);
} else if (readerElement instanceof ReaderRelation rel) {
// ensure all ways finished processing before we start relations
if (waysDone.getCount() > 0) {
waysDone.countDown();
waysDone.await();
}
relsProcessed.incrementAndGet();
if (rel.hasTag("type", "multipolygon")) {
feature = new MultipolygonSourceFeature(rel);
}
}
if (feature != null) {
features.reset(feature);
profile.processFeature(feature, features);
for (RenderableFeature renderable : features.all()) {
renderer.renderFeature(renderable, next);
}
}
}
// just in case a worker skipped over all relations
waysDone.countDown();
}).addBuffer("osm_pass2_feature_queue", 50_000, 1_000)
.sinkToConsumer("osm_pass2_writer", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});
var logger = new ProgressLoggers("osm_pass2")
.addRatePercentCounter("nodes", TOTAL_NODES.get(), nodesProcessed)
.addFileSize(nodeDb.filePath())
.addRatePercentCounter("ways", TOTAL_WAYS.get(), waysProcessed)
.addRatePercentCounter("rels", TOTAL_RELATIONS.get(), relsProcessed)
.addRateCounter("features", featuresWritten)
.addFileSize(writer::getStorageSize)
.addProcessStats()
.addInMemoryObject("hppc", this::getBigObjectSizeBytes)
.addThreadPoolStats("pbf", "PBF")
.addThreadPoolStats("parse", "pool-")
.addTopologyStats(topology);
topology.awaitAndLog(logger, config.logIntervalSeconds());
}
private long getBigObjectSizeBytes() {
return 0;
}
@Override
public void close() throws IOException {
public void close() {
multipolygonWayGeometries = null;
wayToRelations = null;
waysInMultipolygon = null;
relationInfo = null;
}
public static class RelationInfo {
public long sizeBytes() {
return 0;
}
}
private static class NodeSourceFeature implements SourceFeature {
public NodeSourceFeature(ReaderNode node) {
super();
}
@Override
public Geometry getGeometry() {
return null;
}
}
private static class WaySourceFeature implements SourceFeature {
public WaySourceFeature(ReaderWay way) {
super();
}
@Override
public Geometry getGeometry() {
return null;
}
}
private static class MultipolygonSourceFeature implements SourceFeature {
public MultipolygonSourceFeature(ReaderRelation relation) {
super();
}
@Override
public Geometry getGeometry() {
return null;
}
}
}

Wyświetl plik

@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory;
public abstract class Reader {
private final Stats stats;
private Logger LOGGER = LoggerFactory.getLogger(getClass());
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
public Reader(Stats stats) {
this.stats = stats;
@ -33,11 +33,10 @@ public abstract class Reader {
OpenMapTilesProfile profile = config.profile();
AtomicLong featuresRead = new AtomicLong(0);
AtomicLong featuresWritten = new AtomicLong(0);
LOGGER.info("[" + name + "] Reading with " + threads + " threads");
var topology = Topology.fromGenerator(name + "-read", stats, open())
.addBuffer(name + "-reader", 1000)
.<RenderedFeature>addWorker(name + "-process", threads, (prev, next) -> {
var topology = Topology.fromGenerator(name + "_read", stats, open())
.addBuffer(name + "_reader", 1000)
.<RenderedFeature>addWorker(name + "_process", threads, (prev, next) -> {
RenderableFeatures features = new RenderableFeatures();
SourceFeature sourceFeature;
while ((sourceFeature = prev.get()) != null) {
@ -51,8 +50,8 @@ public abstract class Reader {
}
}
})
.addBuffer(name + "-writer", 1000)
.sinkToConsumer("write", 1, (item) -> {
.addBuffer(name + "_writer", 1000)
.sinkToConsumer(name + "_write", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});

Wyświetl plik

@ -39,17 +39,17 @@ public record Topology<T>(
public interface SourceStep<O> {
void run(Consumer<O> next);
void run(Consumer<O> next) throws Exception;
}
public interface WorkerStep<I, O> {
void run(Supplier<I> prev, Consumer<O> next);
void run(Supplier<I> prev, Consumer<O> next) throws Exception;
}
public interface SinkStep<I> {
void run(Supplier<I> prev);
void run(Supplier<I> prev) throws Exception;
}
public interface Bufferable<I, O> {

Wyświetl plik

@ -5,7 +5,7 @@ import com.onthegomap.flatmap.stats.Stats;
public class Worker {
public Worker(String name, Stats stats, int threads, Runnable task) {
public Worker(String name, Stats stats, int threads, RunnableThatThrows task) {
}
@ -15,4 +15,9 @@ public class Worker {
public void awaitAndLog(ProgressLoggers loggers, long logIntervalSeconds) {
}
public interface RunnableThatThrows {
void run() throws Exception;
}
}