kopia lustrzana https://github.com/onthegomap/planetiler
fixes from running on world
rodzic
d89e2731ee
commit
80521c7ca7
|
@ -90,6 +90,11 @@
|
||||||
<artifactId>jackson-datatype-jdk8</artifactId>
|
<artifactId>jackson-datatype-jdk8</artifactId>
|
||||||
<version>${jackson.version}</version>
|
<version>${jackson.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||||
|
<artifactId>jackson-dataformat-xml</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.prometheus</groupId>
|
<groupId>io.prometheus</groupId>
|
||||||
<artifactId>simpleclient</artifactId>
|
<artifactId>simpleclient</artifactId>
|
||||||
|
|
|
@ -14,6 +14,8 @@ public record CommonParams(
|
||||||
boolean deferIndexCreation,
|
boolean deferIndexCreation,
|
||||||
boolean optimizeDb,
|
boolean optimizeDb,
|
||||||
boolean emitTilesInOrder,
|
boolean emitTilesInOrder,
|
||||||
|
int mbtilesFeatureMultiplier,
|
||||||
|
int mbtilesMinTilesPerBatch,
|
||||||
boolean forceOverwrite,
|
boolean forceOverwrite,
|
||||||
boolean gzipTempStorage,
|
boolean gzipTempStorage,
|
||||||
String longLongMap,
|
String longLongMap,
|
||||||
|
@ -32,6 +34,8 @@ public record CommonParams(
|
||||||
boolean deferIndexCreation,
|
boolean deferIndexCreation,
|
||||||
boolean optimizeDb,
|
boolean optimizeDb,
|
||||||
boolean emitTilesInOrder,
|
boolean emitTilesInOrder,
|
||||||
|
int mbtilesFeatureMultiplier,
|
||||||
|
int mbtilesMinTilesPerBatch,
|
||||||
boolean forceOverwrite,
|
boolean forceOverwrite,
|
||||||
boolean gzipTempStorage,
|
boolean gzipTempStorage,
|
||||||
String longLongMap
|
String longLongMap
|
||||||
|
@ -45,6 +49,8 @@ public record CommonParams(
|
||||||
deferIndexCreation,
|
deferIndexCreation,
|
||||||
optimizeDb,
|
optimizeDb,
|
||||||
emitTilesInOrder,
|
emitTilesInOrder,
|
||||||
|
mbtilesFeatureMultiplier,
|
||||||
|
mbtilesMinTilesPerBatch,
|
||||||
forceOverwrite,
|
forceOverwrite,
|
||||||
gzipTempStorage,
|
gzipTempStorage,
|
||||||
longLongMap,
|
longLongMap,
|
||||||
|
@ -87,7 +93,9 @@ public record CommonParams(
|
||||||
arguments.integer("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM),
|
arguments.integer("maxzoom", "maximum zoom level (limit 14)", MAX_MAXZOOM),
|
||||||
arguments.get("defer_mbtiles_index_creation", "add index to mbtiles file after finished writing", false),
|
arguments.get("defer_mbtiles_index_creation", "add index to mbtiles file after finished writing", false),
|
||||||
arguments.get("optimize_db", "optimize mbtiles after writing", false),
|
arguments.get("optimize_db", "optimize mbtiles after writing", false),
|
||||||
arguments.get("emit_tiles_in_order", "emit tiles in index order", false),
|
arguments.get("emit_tiles_in_order", "emit tiles in index order", true),
|
||||||
|
arguments.integer("mbtiles_feature_multiplier", "mbtiles feature multiplier", 100),
|
||||||
|
arguments.integer("mbtiles_min_tiles_per_batch", "min tiles per batch", 1),
|
||||||
arguments.get("force", "force overwriting output file", false),
|
arguments.get("force", "force overwriting output file", false),
|
||||||
arguments.get("gzip_temp", "gzip temporary feature storage (uses more CPU, but less disk space)", false),
|
arguments.get("gzip_temp", "gzip temporary feature storage (uses more CPU, but less disk space)", false),
|
||||||
arguments.get("llmap", "type of long long map", "mapdb")
|
arguments.get("llmap", "type of long long map", "mapdb")
|
||||||
|
|
|
@ -33,14 +33,14 @@ public interface Stats extends AutoCloseable {
|
||||||
|
|
||||||
void counter(String name, Supplier<Number> supplier);
|
void counter(String name, Supplier<Number> supplier);
|
||||||
|
|
||||||
default Counter.Readable longCounter(String name) {
|
default Counter.MultiThreadCounter longCounter(String name) {
|
||||||
Counter.Readable counter = Counter.newMultiThreadCounter();
|
Counter.MultiThreadCounter counter = Counter.newMultiThreadCounter();
|
||||||
counter(name, counter::get);
|
counter(name, counter::get);
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
default Counter nanoCounter(String name) {
|
default Counter.MultiThreadCounter nanoCounter(String name) {
|
||||||
Counter.Readable counter = Counter.newMultiThreadCounter();
|
Counter.MultiThreadCounter counter = Counter.newMultiThreadCounter();
|
||||||
counter(name, () -> counter.get() / NANOSECONDS_PER_SECOND);
|
counter(name, () -> counter.get() / NANOSECONDS_PER_SECOND);
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
@ -82,13 +82,13 @@ public interface Stats extends AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Counter.Readable longCounter(String name) {
|
public Counter.MultiThreadCounter longCounter(String name) {
|
||||||
return Counter.newSingleThreadCounter();
|
return Counter.newMultiThreadCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Counter nanoCounter(String name) {
|
public Counter.MultiThreadCounter nanoCounter(String name) {
|
||||||
return Counter.noop();
|
return Counter.newMultiThreadCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -202,7 +202,6 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima
|
||||||
renderer.accept(renderable);
|
renderer.accept(renderable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodeCache.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// just in case a worker skipped over all relations
|
// just in case a worker skipped over all relations
|
||||||
|
@ -526,9 +525,6 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima
|
||||||
}
|
}
|
||||||
|
|
||||||
Coordinate getCoordinate(long id);
|
Coordinate getCoordinate(long id);
|
||||||
|
|
||||||
default void reset() {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class NodeGeometryCache implements NodeLocationProvider {
|
private class NodeGeometryCache implements NodeLocationProvider {
|
||||||
|
@ -558,7 +554,4 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima
|
||||||
return seq;
|
return seq;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void reset() {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.locationtech.jts.geom.CoordinateSequence;
|
||||||
import org.locationtech.jts.geom.Geometry;
|
import org.locationtech.jts.geom.Geometry;
|
||||||
import org.locationtech.jts.geom.LinearRing;
|
import org.locationtech.jts.geom.LinearRing;
|
||||||
import org.locationtech.jts.geom.Polygon;
|
import org.locationtech.jts.geom.Polygon;
|
||||||
|
import org.locationtech.jts.geom.TopologyException;
|
||||||
import org.locationtech.jts.geom.prep.PreparedPolygon;
|
import org.locationtech.jts.geom.prep.PreparedPolygon;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -103,6 +104,16 @@ public class OsmMultipolygon {
|
||||||
OpenStreetMapReader.NodeLocationProvider nodeCache,
|
OpenStreetMapReader.NodeLocationProvider nodeCache,
|
||||||
long osmId,
|
long osmId,
|
||||||
double minGap
|
double minGap
|
||||||
|
) throws GeometryException {
|
||||||
|
return build(rings, nodeCache, osmId, minGap, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Geometry build(
|
||||||
|
List<LongArrayList> rings,
|
||||||
|
OpenStreetMapReader.NodeLocationProvider nodeCache,
|
||||||
|
long osmId,
|
||||||
|
double minGap,
|
||||||
|
boolean fix
|
||||||
) throws GeometryException {
|
) throws GeometryException {
|
||||||
try {
|
try {
|
||||||
if (rings.size() == 0) {
|
if (rings.size() == 0) {
|
||||||
|
@ -117,6 +128,9 @@ public class OsmMultipolygon {
|
||||||
if (firstId == lastId || tryClose(segment, nodeCache, minGap)) {
|
if (firstId == lastId || tryClose(segment, nodeCache, minGap)) {
|
||||||
CoordinateSequence coordinates = nodeCache.getWayGeometry(segment);
|
CoordinateSequence coordinates = nodeCache.getWayGeometry(segment);
|
||||||
Polygon poly = GeoUtils.JTS_FACTORY.createPolygon(coordinates);
|
Polygon poly = GeoUtils.JTS_FACTORY.createPolygon(coordinates);
|
||||||
|
if (fix) {
|
||||||
|
poly = (Polygon) GeoUtils.fixPolygon(poly);
|
||||||
|
}
|
||||||
polygons.add(new Ring(poly));
|
polygons.add(new Ring(poly));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,6 +147,14 @@ public class OsmMultipolygon {
|
||||||
}
|
}
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new GeometryException("osm_invalid_multipolygon", "error building multipolygon " + osmId + ": " + e);
|
throw new GeometryException("osm_invalid_multipolygon", "error building multipolygon " + osmId + ": " + e);
|
||||||
|
} catch (TopologyException e) {
|
||||||
|
if (!fix) {
|
||||||
|
// retry but fix every polygon first
|
||||||
|
System.err.println("FIXING!");
|
||||||
|
return build(rings, nodeCache, osmId, minGap, true);
|
||||||
|
} else {
|
||||||
|
throw new GeometryException("osm_invalid_multipolygon", "error building multipolygon " + osmId + ": " + e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,8 @@ public record Topology<T>(
|
||||||
public <T> Bufferable<?, T> fromGenerator(String name, SourceStep<T> producer, int threads) {
|
public <T> Bufferable<?, T> fromGenerator(String name, SourceStep<T> producer, int threads) {
|
||||||
return (queueName, size, batchSize) -> {
|
return (queueName, size, batchSize) -> {
|
||||||
var nextQueue = new WorkQueue<T>(prefix + "_" + queueName, size, batchSize, stats);
|
var nextQueue = new WorkQueue<T>(prefix + "_" + queueName, size, batchSize, stats);
|
||||||
Worker worker = new Worker(prefix + "_" + name, stats, threads, () -> producer.run(nextQueue));
|
Worker worker = new Worker(prefix + "_" + name, stats, threads,
|
||||||
|
() -> producer.run(nextQueue.threadLocalWriter()));
|
||||||
return new Builder<>(prefix, name, nextQueue, worker, stats);
|
return new Builder<>(prefix, name, nextQueue, worker, stats);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -103,8 +104,9 @@ public record Topology<T>(
|
||||||
|
|
||||||
public <T> Builder<?, T> readFromTiny(String name, Collection<T> items) {
|
public <T> Builder<?, T> readFromTiny(String name, Collection<T> items) {
|
||||||
WorkQueue<T> queue = new WorkQueue<>(prefix + "_" + name, items.size(), 1, stats);
|
WorkQueue<T> queue = new WorkQueue<>(prefix + "_" + name, items.size(), 1, stats);
|
||||||
|
Consumer<T> writer = queue.threadLocalWriter();
|
||||||
for (T item : items) {
|
for (T item : items) {
|
||||||
queue.accept(item);
|
writer.accept(item);
|
||||||
}
|
}
|
||||||
return readFromQueue(queue);
|
return readFromQueue(queue);
|
||||||
}
|
}
|
||||||
|
@ -135,7 +137,8 @@ public record Topology<T>(
|
||||||
Builder<I, O> curr = this;
|
Builder<I, O> curr = this;
|
||||||
return (queueName, size, batchSize) -> {
|
return (queueName, size, batchSize) -> {
|
||||||
var nextOutputQueue = new WorkQueue<O2>(prefix + "_" + queueName, size, batchSize, stats);
|
var nextOutputQueue = new WorkQueue<O2>(prefix + "_" + queueName, size, batchSize, stats);
|
||||||
var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
|
var worker = new Worker(prefix + "_" + name, stats, threads,
|
||||||
|
() -> step.run(outputQueue.threadLocalReader(), nextOutputQueue.threadLocalWriter()));
|
||||||
return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats);
|
return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -147,7 +150,7 @@ public record Topology<T>(
|
||||||
|
|
||||||
public Topology<O> sinkTo(String name, int threads, SinkStep<O> step) {
|
public Topology<O> sinkTo(String name, int threads, SinkStep<O> step) {
|
||||||
var previousTopology = build();
|
var previousTopology = build();
|
||||||
var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue));
|
var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue.threadLocalReader()));
|
||||||
return new Topology<>(name, previousTopology, outputQueue, worker);
|
return new Topology<>(name, previousTopology, outputQueue, worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,18 +13,18 @@ import java.util.function.Supplier;
|
||||||
|
|
||||||
public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
|
public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
|
||||||
|
|
||||||
private final ThreadLocal<Queue<T>> itemWriteBatchProvider = new ThreadLocal<>();
|
private final ThreadLocal<WriterForThread> writerProvider = ThreadLocal.withInitial(WriterForThread::new);
|
||||||
private final ThreadLocal<Queue<T>> itemReadBatchProvider = new ThreadLocal<>();
|
private final ThreadLocal<ReaderForThread> readerProvider = ThreadLocal.withInitial(ReaderForThread::new);
|
||||||
private final BlockingQueue<Queue<T>> itemQueue;
|
private final BlockingQueue<Queue<T>> itemQueue;
|
||||||
private final int batchSize;
|
private final int batchSize;
|
||||||
private final ConcurrentHashMap<Long, Queue<T>> queues = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Long, Queue<T>> queues = new ConcurrentHashMap<>();
|
||||||
private final int pendingBatchesCapacity;
|
private final int pendingBatchesCapacity;
|
||||||
private final Counter enqueueCountStat;
|
private final Counter.MultiThreadCounter enqueueCountStatAll;
|
||||||
private final Counter enqueueBlockTimeNanos;
|
private final Counter.MultiThreadCounter enqueueBlockTimeNanosAll;
|
||||||
private final Counter dequeueCountStat;
|
private final Counter.MultiThreadCounter dequeueCountStatAll;
|
||||||
private final Counter dequeueBlockTimeNanos;
|
private final Counter.MultiThreadCounter dequeueBlockTimeNanosAll;
|
||||||
private volatile boolean hasIncomingData = true;
|
private volatile boolean hasIncomingData = true;
|
||||||
private final Counter.Readable pendingCount = Counter.newMultiThreadCounter();
|
private final Counter.MultiThreadCounter pendingCountAll = Counter.newMultiThreadCounter();
|
||||||
|
|
||||||
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
|
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
|
||||||
this.pendingBatchesCapacity = capacity / maxBatch;
|
this.pendingBatchesCapacity = capacity / maxBatch;
|
||||||
|
@ -36,10 +36,10 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
|
||||||
stats.gauge(name + "_capacity", this::getCapacity);
|
stats.gauge(name + "_capacity", this::getCapacity);
|
||||||
stats.gauge(name + "_size", this::getPending);
|
stats.gauge(name + "_size", this::getPending);
|
||||||
|
|
||||||
this.enqueueCountStat = stats.longCounter(name + "_enqueue_count");
|
this.enqueueCountStatAll = stats.longCounter(name + "_enqueue_count");
|
||||||
this.enqueueBlockTimeNanos = stats.nanoCounter(name + "_enqueue_block_time_seconds");
|
this.enqueueBlockTimeNanosAll = stats.nanoCounter(name + "_enqueue_block_time_seconds");
|
||||||
this.dequeueCountStat = stats.longCounter(name + "_dequeue_count");
|
this.dequeueCountStatAll = stats.longCounter(name + "_dequeue_count");
|
||||||
this.dequeueBlockTimeNanos = stats.nanoCounter(name + "_dequeue_block_time_seconds");
|
this.dequeueBlockTimeNanosAll = stats.nanoCounter(name + "_dequeue_block_time_seconds");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -56,80 +56,113 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public Consumer<T> threadLocalWriter() {
|
||||||
public void accept(T item) {
|
return writerProvider.get();
|
||||||
// past 4-8 concurrent writers, start getting lock contention adding to the blocking queue so add to the
|
|
||||||
// queue in lass frequent, larger batches
|
|
||||||
Queue<T> writeBatch = itemWriteBatchProvider.get();
|
|
||||||
if (writeBatch == null) {
|
|
||||||
itemWriteBatchProvider.set(writeBatch = new ArrayDeque<>(batchSize));
|
|
||||||
queues.put(Thread.currentThread().getId(), writeBatch);
|
|
||||||
}
|
|
||||||
|
|
||||||
writeBatch.offer(item);
|
|
||||||
pendingCount.inc();
|
|
||||||
|
|
||||||
if (writeBatch.size() >= batchSize) {
|
|
||||||
flushWrites();
|
|
||||||
}
|
|
||||||
enqueueCountStat.inc();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushWrites() {
|
public Supplier<T> threadLocalReader() {
|
||||||
Queue<T> writeBatch = itemWriteBatchProvider.get();
|
return readerProvider.get();
|
||||||
if (writeBatch != null && !writeBatch.isEmpty()) {
|
}
|
||||||
try {
|
|
||||||
itemWriteBatchProvider.set(null);
|
@Override
|
||||||
queues.remove(Thread.currentThread().getId());
|
public void accept(T item) {
|
||||||
// blocks if full
|
writerProvider.get().accept(item);
|
||||||
if (!itemQueue.offer(writeBatch)) {
|
|
||||||
long start = System.nanoTime();
|
|
||||||
itemQueue.put(writeBatch);
|
|
||||||
enqueueBlockTimeNanos.incBy(System.nanoTime() - start);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
throw new RuntimeException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T get() {
|
public T get() {
|
||||||
Queue<T> itemBatch = itemReadBatchProvider.get();
|
return readerProvider.get().get();
|
||||||
|
}
|
||||||
|
|
||||||
if (itemBatch == null || itemBatch.isEmpty()) {
|
private class WriterForThread implements Consumer<T> {
|
||||||
long start = System.nanoTime();
|
|
||||||
do {
|
|
||||||
if (!hasIncomingData && itemQueue.isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((itemBatch = itemQueue.poll()) == null) {
|
Queue<T> writeBatch = null;
|
||||||
try {
|
Counter pendingCount = pendingCountAll.counterForThread();
|
||||||
itemBatch = itemQueue.poll(100, TimeUnit.MILLISECONDS);
|
Counter enqueueCountStat = enqueueCountStatAll.counterForThread();
|
||||||
if (itemBatch != null) {
|
Counter enqueueBlockTimeNanos = enqueueBlockTimeNanosAll.counterForThread();
|
||||||
break;
|
|
||||||
}
|
@Override
|
||||||
} catch (InterruptedException e) {
|
public void accept(T item) {
|
||||||
Thread.currentThread().interrupt();
|
// past 4-8 concurrent writers, start getting lock contention adding to the blocking queue so add to the
|
||||||
break;// signal EOF
|
// queue in lass frequent, larger batches
|
||||||
|
if (writeBatch == null) {
|
||||||
|
writeBatch = new ArrayDeque<>(batchSize);
|
||||||
|
queues.put(Thread.currentThread().getId(), writeBatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeBatch.offer(item);
|
||||||
|
pendingCount.inc();
|
||||||
|
|
||||||
|
if (writeBatch.size() >= batchSize) {
|
||||||
|
flushWrites();
|
||||||
|
}
|
||||||
|
enqueueCountStat.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void flushWrites() {
|
||||||
|
if (writeBatch != null && !writeBatch.isEmpty()) {
|
||||||
|
try {
|
||||||
|
Queue<T> oldWriteBatch = writeBatch;
|
||||||
|
writeBatch = null;
|
||||||
|
queues.remove(Thread.currentThread().getId());
|
||||||
|
// blocks if full
|
||||||
|
if (!itemQueue.offer(oldWriteBatch)) {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
itemQueue.put(oldWriteBatch);
|
||||||
|
enqueueBlockTimeNanos.incBy(System.nanoTime() - start);
|
||||||
}
|
}
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
} while (itemBatch == null);
|
}
|
||||||
itemReadBatchProvider.set(itemBatch);
|
|
||||||
dequeueBlockTimeNanos.incBy(System.nanoTime() - start);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
T result = itemBatch == null ? null : itemBatch.poll();
|
private class ReaderForThread implements Supplier<T> {
|
||||||
if (result != null) {
|
|
||||||
pendingCount.incBy(-1);
|
Queue<T> readBatch = null;
|
||||||
|
Counter dequeueBlockTimeNanos = dequeueBlockTimeNanosAll.counterForThread();
|
||||||
|
Counter pendingCount = pendingCountAll.counterForThread();
|
||||||
|
Counter dequeueCountStat = dequeueCountStatAll.counterForThread();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
Queue<T> itemBatch = readBatch;
|
||||||
|
|
||||||
|
if (itemBatch == null || itemBatch.isEmpty()) {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
do {
|
||||||
|
if (!hasIncomingData && itemQueue.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((itemBatch = itemQueue.poll()) == null) {
|
||||||
|
try {
|
||||||
|
itemBatch = itemQueue.poll(100, TimeUnit.MILLISECONDS);
|
||||||
|
if (itemBatch != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;// signal EOF
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (itemBatch == null);
|
||||||
|
readBatch = itemBatch;
|
||||||
|
dequeueBlockTimeNanos.incBy(System.nanoTime() - start);
|
||||||
|
}
|
||||||
|
|
||||||
|
T result = itemBatch == null ? null : itemBatch.poll();
|
||||||
|
if (result != null) {
|
||||||
|
pendingCount.incBy(-1);
|
||||||
|
}
|
||||||
|
dequeueCountStat.inc();
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
dequeueCountStat.inc();
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPending() {
|
public int getPending() {
|
||||||
return (int) pendingCount.get();
|
return (int) pendingCountAll.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCapacity() {
|
public int getCapacity() {
|
||||||
|
|
|
@ -102,27 +102,26 @@ public class MbtilesWriter {
|
||||||
int queueSize = 10_000;
|
int queueSize = 10_000;
|
||||||
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats);
|
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats);
|
||||||
|
|
||||||
Topology<TileBatch> encodeBranch, writeBranch;
|
Topology<TileBatch> encodeBranch, writeBranch = null;
|
||||||
if (true || config.emitTilesInOrder()) {
|
if (config.emitTilesInOrder()) {
|
||||||
encodeBranch = topology
|
encodeBranch = topology
|
||||||
.<TileBatch>fromGenerator("reader", next -> writer.readFeatures(batch -> {
|
.<TileBatch>fromGenerator("reader", next -> writer.readFeatures(batch -> {
|
||||||
next.accept(batch);
|
next.accept(batch);
|
||||||
writerQueue.accept(batch); // also send immediately to writer
|
writerQueue.accept(batch); // also send immediately to writer
|
||||||
}), 1)
|
}), 1)
|
||||||
.addBuffer("reader_queue", queueSize)
|
.addBuffer("reader_queue", queueSize)
|
||||||
.sinkTo("encoder", config.threads(), writer::tileEncoder);
|
.sinkTo("encoder", config.threads(), writer::tileEncoderSink);
|
||||||
|
|
||||||
// the tile writer will wait on the result of each batch to ensure tiles are written in order
|
// the tile writer will wait on the result of each batch to ensure tiles are written in order
|
||||||
writeBranch = topology.readFromQueue(writerQueue)
|
writeBranch = topology.readFromQueue(writerQueue)
|
||||||
.sinkTo("writer", 1, writer::tileWriter);
|
.sinkTo("writer", 1, writer::tileWriter);
|
||||||
} else {
|
} else {
|
||||||
// TODO
|
encodeBranch = topology
|
||||||
// encodeBranch = topology
|
.fromGenerator("reader", writer::readFeatures, 1)
|
||||||
// .fromGenerator("reader", writer::readFeatures, 1)
|
.addBuffer("reader_queue", queueSize)
|
||||||
// .addBuffer("reader_queue", queueSize)
|
.addWorker("encoder", config.threads(), writer::tileEncoder)
|
||||||
// .addWorker("encoder", config.threads(), (prev, next) -> {
|
.addBuffer("writer_queue", queueSize)
|
||||||
// TOO
|
.sinkTo("writer", 1, writer::tileWriter);
|
||||||
// })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var loggers = new ProgressLoggers("mbtiles")
|
var loggers = new ProgressLoggers("mbtiles")
|
||||||
|
@ -148,7 +147,9 @@ public class MbtilesWriter {
|
||||||
}));
|
}));
|
||||||
|
|
||||||
encodeBranch.awaitAndLog(loggers, config.logInterval());
|
encodeBranch.awaitAndLog(loggers, config.logInterval());
|
||||||
writeBranch.awaitAndLog(loggers, config.logInterval());
|
if (writeBranch != null) {
|
||||||
|
writeBranch.awaitAndLog(loggers, config.logInterval());
|
||||||
|
}
|
||||||
writer.printTileStats();
|
writer.printTileStats();
|
||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
|
@ -178,14 +179,16 @@ public class MbtilesWriter {
|
||||||
long featuresInThisBatch = 0;
|
long featuresInThisBatch = 0;
|
||||||
long tilesInThisBatch = 0;
|
long tilesInThisBatch = 0;
|
||||||
// 249 vs. 24,900
|
// 249 vs. 24,900
|
||||||
long MAX_FEATURES_PER_BATCH = BATCH_SIZE * 100;
|
long MAX_FEATURES_PER_BATCH = (long) BATCH_SIZE * config.mbtilesFeatureMultiplier();
|
||||||
|
long MIN_TILES_PER_BATCH = config.mbtilesMinTilesPerBatch();
|
||||||
for (var feature : features) {
|
for (var feature : features) {
|
||||||
int z = feature.coord().z();
|
int z = feature.coord().z();
|
||||||
if (z > currentZoom) {
|
if (z > currentZoom) {
|
||||||
LOGGER.info("[mbtiles] Starting z" + z);
|
LOGGER.info("[mbtiles] Starting z" + z);
|
||||||
currentZoom = z;
|
currentZoom = z;
|
||||||
}
|
}
|
||||||
if (tilesInThisBatch > BATCH_SIZE || featuresInThisBatch > MAX_FEATURES_PER_BATCH) {
|
if (tilesInThisBatch > BATCH_SIZE ||
|
||||||
|
(tilesInThisBatch >= MIN_TILES_PER_BATCH && featuresInThisBatch > MAX_FEATURES_PER_BATCH)) {
|
||||||
next.accept(batch);
|
next.accept(batch);
|
||||||
batch = new TileBatch();
|
batch = new TileBatch();
|
||||||
featuresInThisBatch = 0;
|
featuresInThisBatch = 0;
|
||||||
|
@ -200,7 +203,12 @@ public class MbtilesWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tileEncoder(Supplier<TileBatch> prev) throws IOException {
|
void tileEncoderSink(Supplier<TileBatch> prev) throws IOException {
|
||||||
|
tileEncoder(prev, batch -> {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void tileEncoder(Supplier<TileBatch> prev, Consumer<TileBatch> next) throws IOException {
|
||||||
TileBatch batch;
|
TileBatch batch;
|
||||||
byte[] lastBytes = null, lastEncoded = null;
|
byte[] lastBytes = null, lastEncoded = null;
|
||||||
|
|
||||||
|
@ -233,6 +241,7 @@ public class MbtilesWriter {
|
||||||
result.add(new Mbtiles.TileEntry(tileFeatures.coord(), bytes));
|
result.add(new Mbtiles.TileEntry(tileFeatures.coord(), bytes));
|
||||||
}
|
}
|
||||||
batch.out.complete(result);
|
batch.out.complete(result);
|
||||||
|
next.accept(batch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1300,4 +1300,57 @@ public class FlatMapTest {
|
||||||
return postprocessLayerFeatures.process(layer, zoom, items);
|
return postprocessLayerFeatures.process(layer, zoom, items);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final <T> List<T> orEmpty(List<T> in) {
|
||||||
|
return in == null ? List.of() : in;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBadRelation() throws Exception {
|
||||||
|
// this threw an exception in OsmMultipolygon.build
|
||||||
|
OsmXml osmInfo = TestUtils.readOsmXml("bad_spain_relation.xml");
|
||||||
|
List<ReaderElement> elements = new ArrayList<>();
|
||||||
|
for (var node : orEmpty(osmInfo.nodes())) {
|
||||||
|
elements.add(new ReaderNode(node.id(), node.lat(), node.lon()));
|
||||||
|
}
|
||||||
|
for (var way : orEmpty(osmInfo.ways())) {
|
||||||
|
ReaderWay readerWay = new ReaderWay(way.id());
|
||||||
|
elements.add(readerWay);
|
||||||
|
for (var tag : orEmpty(way.tags())) {
|
||||||
|
readerWay.setTag(tag.k(), tag.v());
|
||||||
|
}
|
||||||
|
for (var nodeRef : orEmpty(way.nodeRefs())) {
|
||||||
|
readerWay.getNodes().add(nodeRef.ref());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (var relation : orEmpty(osmInfo.relation())) {
|
||||||
|
ReaderRelation readerRelation = new ReaderRelation(relation.id());
|
||||||
|
elements.add(readerRelation);
|
||||||
|
for (var tag : orEmpty(relation.tags())) {
|
||||||
|
readerRelation.setTag(tag.k(), tag.v());
|
||||||
|
}
|
||||||
|
for (var member : orEmpty(relation.members())) {
|
||||||
|
readerRelation.add(new ReaderRelation.Member(switch (member.type()) {
|
||||||
|
case "way" -> ReaderRelation.Member.WAY;
|
||||||
|
case "relation" -> ReaderRelation.Member.RELATION;
|
||||||
|
case "node" -> ReaderRelation.Member.NODE;
|
||||||
|
default -> throw new IllegalStateException("Unexpected value: " + member.type());
|
||||||
|
}, member.ref(), member.role()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var results = runWithOsmElements(
|
||||||
|
Map.of("threads", "1"),
|
||||||
|
elements,
|
||||||
|
(in, features) -> {
|
||||||
|
if (in.hasTag("landuse", "forest")) {
|
||||||
|
features.polygon("layer")
|
||||||
|
.setZoomRange(12, 14)
|
||||||
|
.setBufferPixels(4);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(11, results.tiles.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,19 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
|
||||||
|
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
|
||||||
|
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
|
||||||
|
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
|
||||||
|
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
||||||
import com.onthegomap.flatmap.geo.GeoUtils;
|
import com.onthegomap.flatmap.geo.GeoUtils;
|
||||||
import com.onthegomap.flatmap.geo.GeometryException;
|
import com.onthegomap.flatmap.geo.GeometryException;
|
||||||
import com.onthegomap.flatmap.geo.TileCoord;
|
import com.onthegomap.flatmap.geo.TileCoord;
|
||||||
import com.onthegomap.flatmap.write.Mbtiles;
|
import com.onthegomap.flatmap.write.Mbtiles;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
@ -489,4 +496,71 @@ public class TestUtils {
|
||||||
public static LinearRing newLinearRing(double... coords) {
|
public static LinearRing newLinearRing(double... coords) {
|
||||||
return JTS_FACTORY.createLinearRing(coordinateSequence(coords));
|
return JTS_FACTORY.createLinearRing(coordinateSequence(coords));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JacksonXmlRootElement(localName = "node")
|
||||||
|
public static record Node(
|
||||||
|
long id, double lat, double lon
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@JacksonXmlRootElement(localName = "nd")
|
||||||
|
public static record NodeRef(
|
||||||
|
long ref
|
||||||
|
) {}
|
||||||
|
|
||||||
|
public static record Tag(String k, String v) {}
|
||||||
|
|
||||||
|
public static record Way(
|
||||||
|
long id,
|
||||||
|
@JacksonXmlProperty(localName = "nd")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<NodeRef> nodeRefs,
|
||||||
|
@JacksonXmlProperty(localName = "tag")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<Tag> tags
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@JacksonXmlRootElement(localName = "member")
|
||||||
|
public static record RelationMember(
|
||||||
|
String type, long ref, String role
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@JacksonXmlRootElement(localName = "relation")
|
||||||
|
public static record Relation(
|
||||||
|
long id,
|
||||||
|
@JacksonXmlProperty(localName = "member")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<RelationMember> members,
|
||||||
|
@JacksonXmlProperty(localName = "tag")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<Tag> tags
|
||||||
|
) {}
|
||||||
|
|
||||||
|
// @JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public static record OsmXml(
|
||||||
|
String version,
|
||||||
|
String generator,
|
||||||
|
String copyright,
|
||||||
|
String attribution,
|
||||||
|
String license,
|
||||||
|
@JacksonXmlProperty(localName = "node")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<Node> nodes,
|
||||||
|
@JacksonXmlProperty(localName = "way")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<Way> ways,
|
||||||
|
@JacksonXmlProperty(localName = "relation")
|
||||||
|
@JacksonXmlElementWrapper(useWrapping = false)
|
||||||
|
List<Relation> relation
|
||||||
|
) {}
|
||||||
|
|
||||||
|
private static final XmlMapper xmlMapper = new XmlMapper();
|
||||||
|
|
||||||
|
static {
|
||||||
|
xmlMapper.registerModule(new Jdk8Module());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static OsmXml readOsmXml(String s) throws IOException {
|
||||||
|
Path path = Path.of("src", "test", "resources", s);
|
||||||
|
return xmlMapper.readValue(Files.newInputStream(path), OsmXml.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -401,7 +401,6 @@ public class OpenStreetMapReaderTest {
|
||||||
var nodeCache = reader.newNodeGeometryCache();
|
var nodeCache = reader.newNodeGeometryCache();
|
||||||
elements.stream().flatMap(ways).forEach(way -> {
|
elements.stream().flatMap(ways).forEach(way -> {
|
||||||
reader.processWayPass2(nodeCache, way);
|
reader.processWayPass2(nodeCache, way);
|
||||||
nodeCache.reset();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
var feature = reader.processRelationPass2(relation, nodeCache);
|
var feature = reader.processRelationPass2(relation, nodeCache);
|
||||||
|
@ -476,7 +475,6 @@ public class OpenStreetMapReaderTest {
|
||||||
var nodeCache = reader.newNodeGeometryCache();
|
var nodeCache = reader.newNodeGeometryCache();
|
||||||
elements.stream().flatMap(ways).forEach(way -> {
|
elements.stream().flatMap(ways).forEach(way -> {
|
||||||
reader.processWayPass2(nodeCache, way);
|
reader.processWayPass2(nodeCache, way);
|
||||||
nodeCache.reset();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
var feature = reader.processRelationPass2(relation, nodeCache);
|
var feature = reader.processRelationPass2(relation, nodeCache);
|
||||||
|
@ -545,7 +543,6 @@ public class OpenStreetMapReaderTest {
|
||||||
var nodeCache = reader.newNodeGeometryCache();
|
var nodeCache = reader.newNodeGeometryCache();
|
||||||
elements.stream().flatMap(ways).forEach(way -> {
|
elements.stream().flatMap(ways).forEach(way -> {
|
||||||
reader.processWayPass2(nodeCache, way);
|
reader.processWayPass2(nodeCache, way);
|
||||||
nodeCache.reset();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
var feature = reader.processRelationPass2(relation, nodeCache);
|
var feature = reader.processRelationPass2(relation, nodeCache);
|
||||||
|
@ -599,7 +596,6 @@ public class OpenStreetMapReaderTest {
|
||||||
var nodeCache = reader.newNodeGeometryCache();
|
var nodeCache = reader.newNodeGeometryCache();
|
||||||
elements.stream().flatMap(ways).forEach(way -> {
|
elements.stream().flatMap(ways).forEach(way -> {
|
||||||
reader.processWayPass2(nodeCache, way);
|
reader.processWayPass2(nodeCache, way);
|
||||||
nodeCache.reset();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
var feature = reader.processRelationPass2(relation, nodeCache);
|
var feature = reader.processRelationPass2(relation, nodeCache);
|
||||||
|
@ -633,7 +629,6 @@ public class OpenStreetMapReaderTest {
|
||||||
var nodeCache = reader.newNodeGeometryCache();
|
var nodeCache = reader.newNodeGeometryCache();
|
||||||
elements.stream().flatMap(ways).forEach(way -> {
|
elements.stream().flatMap(ways).forEach(way -> {
|
||||||
reader.processWayPass2(nodeCache, way);
|
reader.processWayPass2(nodeCache, way);
|
||||||
nodeCache.reset();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
var feature = reader.processRelationPass2(relation, nodeCache);
|
var feature = reader.processRelationPass2(relation, nodeCache);
|
||||||
|
|
Plik diff jest za duży
Load Diff
Ładowanie…
Reference in New Issue