From 5602bf512382ecf532f7a5d92cd5eb848e26395a Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Sun, 6 Jun 2021 08:00:04 -0400 Subject: [PATCH] rest of stats --- .../onthegomap/flatmap/FeatureCollector.java | 17 ++- .../com/onthegomap/flatmap/SourceFeature.java | 6 +- .../onthegomap/flatmap/VectorTileEncoder.java | 2 +- .../flatmap/collections/FeatureGroup.java | 1 + .../com/onthegomap/flatmap/geo/GeoUtils.java | 9 +- .../flatmap/geo/GeometryException.java | 13 ++- .../flatmap/monitoring/Counter.java | 109 ++++++++++++++++++ .../flatmap/monitoring/PrometheusStats.java | 60 ++++++++-- .../onthegomap/flatmap/monitoring/Stats.java | 72 +++++------- .../flatmap/read/OpenStreetMapReader.java | 50 +++++--- .../flatmap/read/OsmMultipolygon.java | 8 +- .../com/onthegomap/flatmap/read/Reader.java | 5 +- .../render/CoordinateSequenceExtractor.java | 2 +- .../flatmap/render/FeatureRenderer.java | 31 +++-- .../onthegomap/flatmap/worker/WorkQueue.java | 22 ++-- .../flatmap/write/MbtilesWriter.java | 58 ++++++---- .../flatmap/FeatureCollectorTest.java | 3 +- .../flatmap/render/FeatureRendererTest.java | 10 +- 18 files changed, 338 insertions(+), 140 deletions(-) create mode 100644 src/main/java/com/onthegomap/flatmap/monitoring/Counter.java diff --git a/src/main/java/com/onthegomap/flatmap/FeatureCollector.java b/src/main/java/com/onthegomap/flatmap/FeatureCollector.java index 19fd8f43..acf16c1d 100644 --- a/src/main/java/com/onthegomap/flatmap/FeatureCollector.java +++ b/src/main/java/com/onthegomap/flatmap/FeatureCollector.java @@ -3,6 +3,7 @@ package com.onthegomap.flatmap; import com.onthegomap.flatmap.collections.CacheByZoom; import com.onthegomap.flatmap.geo.GeoUtils; import com.onthegomap.flatmap.geo.GeometryException; +import com.onthegomap.flatmap.monitoring.Stats; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -20,10 +21,12 @@ public class FeatureCollector implements Iterable { private final SourceFeature source; private final List output = new ArrayList<>(); private final CommonParams config; + private final Stats stats; - private FeatureCollector(SourceFeature source, CommonParams config) { + private FeatureCollector(SourceFeature source, CommonParams config, Stats stats) { this.source = source; this.config = config; + this.stats = stats; } @Override @@ -40,10 +43,11 @@ public class FeatureCollector implements Iterable { public Feature point(String layer) { try { if (!source.isPoint()) { - throw new GeometryException("not a point"); + throw new GeometryException("feature_not_point", "not a point"); } return geometry(layer, source.worldGeometry()); } catch (GeometryException e) { + stats.dataError("feature_point_" + e.stat()); LOGGER.warn("Error getting point geometry for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } @@ -53,6 +57,7 @@ public class FeatureCollector implements Iterable { try { return geometry(layer, source.centroid()); } catch (GeometryException e) { + stats.dataError("feature_centroid_" + e.stat()); LOGGER.warn("Error getting centroid for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } @@ -62,6 +67,7 @@ public class FeatureCollector implements Iterable { try { return geometry(layer, source.line()); } catch (GeometryException e) { + stats.dataError("feature_line_" + e.stat()); LOGGER.warn("Error constructing line for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } @@ -71,6 +77,7 @@ public class FeatureCollector implements Iterable { try { return geometry(layer, source.polygon()); } catch (GeometryException e) { + stats.dataError("feature_polygon_" + e.stat()); LOGGER.warn("Error constructing polygon for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } @@ -80,6 +87,7 @@ public class FeatureCollector implements Iterable { try { return geometry(layer, source.validatedPolygon()); } catch (GeometryException e) { + stats.dataError("feature_validated_polygon_" + e.stat()); LOGGER.warn("Error constructing validated polygon for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } @@ -89,15 +97,16 @@ public class FeatureCollector implements Iterable { try { return geometry(layer, source.pointOnSurface()); } catch (GeometryException e) { + stats.dataError("feature_point_on_surface_" + e.stat()); LOGGER.warn("Error constructing point on surface for " + source + ": " + e.getMessage()); return new Feature(layer, EMPTY_GEOM); } } - public static record Factory(CommonParams config) { + public static record Factory(CommonParams config, Stats stats) { public FeatureCollector get(SourceFeature source) { - return new FeatureCollector(source, config); + return new FeatureCollector(source, config, stats); } } diff --git a/src/main/java/com/onthegomap/flatmap/SourceFeature.java b/src/main/java/com/onthegomap/flatmap/SourceFeature.java index c471415f..de740d6f 100644 --- a/src/main/java/com/onthegomap/flatmap/SourceFeature.java +++ b/src/main/java/com/onthegomap/flatmap/SourceFeature.java @@ -63,7 +63,7 @@ public abstract class SourceFeature { public final Geometry line() throws GeometryException { if (!canBeLine()) { - throw new GeometryException("cannot be line"); + throw new GeometryException("feature_not_line", "cannot be line"); } if (linearGeometry == null) { linearGeometry = computeLine(); @@ -80,7 +80,7 @@ public abstract class SourceFeature { public final Geometry polygon() throws GeometryException { if (!canBePolygon()) { - throw new GeometryException("cannot be polygon"); + throw new GeometryException("feature_not_polygon", "cannot be polygon"); } return polygonGeometry != null ? polygonGeometry : (polygonGeometry = computePolygon()); } @@ -97,7 +97,7 @@ public abstract class SourceFeature { public final Geometry validatedPolygon() throws GeometryException { if (!canBePolygon()) { - throw new GeometryException("cannot be polygon"); + throw new GeometryException("feature_not_polygon", "cannot be polygon"); } return validPolygon != null ? validPolygon : (validPolygon = computeValidPolygon()); } diff --git a/src/main/java/com/onthegomap/flatmap/VectorTileEncoder.java b/src/main/java/com/onthegomap/flatmap/VectorTileEncoder.java index 52f251ff..a76c2307 100644 --- a/src/main/java/com/onthegomap/flatmap/VectorTileEncoder.java +++ b/src/main/java/com/onthegomap/flatmap/VectorTileEncoder.java @@ -219,7 +219,7 @@ public class VectorTileEncoder { return geometry; } catch (IllegalArgumentException e) { - throw new GeometryException("Unable to decode geometry", e); + throw new GeometryException("decode_vector_tile", "Unable to decode geometry", e); } } diff --git a/src/main/java/com/onthegomap/flatmap/collections/FeatureGroup.java b/src/main/java/com/onthegomap/flatmap/collections/FeatureGroup.java index 5e18b8f6..36be96a0 100644 --- a/src/main/java/com/onthegomap/flatmap/collections/FeatureGroup.java +++ b/src/main/java/com/onthegomap/flatmap/collections/FeatureGroup.java @@ -387,6 +387,7 @@ public final class FeatureGroup implements Consumer, Iterable try { items = profile.postProcessLayerFeatures(currentLayer, tile.z(), items); } catch (GeometryException e) { + stats.dataError("postprocess_layer_" + e.stat()); LOGGER.warn("error postprocessing features for " + currentLayer + " layer on " + tile + ": " + e.getMessage()); } encoder.addLayerFeatures(currentLayer, items); diff --git a/src/main/java/com/onthegomap/flatmap/geo/GeoUtils.java b/src/main/java/com/onthegomap/flatmap/geo/GeoUtils.java index 29cd7b5a..1759ce68 100644 --- a/src/main/java/com/onthegomap/flatmap/geo/GeoUtils.java +++ b/src/main/java/com/onthegomap/flatmap/geo/GeoUtils.java @@ -179,7 +179,7 @@ public class GeoUtils { try { return geom.buffer(0); } catch (TopologyException e) { - throw new GeometryException("robustness error fixing polygon: " + e); + throw new GeometryException("fix_polygon_topology_error", "robustness error fixing polygon: " + e); } } @@ -214,7 +214,7 @@ public class GeoUtils { try { return GeometryPrecisionReducer.reduce(geom, tilePrecision); } catch (IllegalArgumentException e3) { - throw new GeometryException("Error reducing precision"); + throw new GeometryException("snap_third_time_failed", "Error reducing precision"); } } } @@ -247,7 +247,7 @@ public class GeoUtils { List lineStrings = new ArrayList<>(); getLineStrings(world, lineStrings); if (lineStrings.size() == 0) { - throw new GeometryException("No line strings"); + throw new GeometryException("polygon_to_linestring_empty", "No line strings"); } else if (lineStrings.size() == 1) { return lineStrings.get(0); } else { @@ -270,7 +270,8 @@ public class GeoUtils { getLineStrings(gc.getGeometryN(i), output); } } else { - throw new GeometryException("unrecognized geometry type: " + input.getGeometryType()); + throw new GeometryException("get_line_strings_bad_type", + "unrecognized geometry type: " + input.getGeometryType()); } } diff --git a/src/main/java/com/onthegomap/flatmap/geo/GeometryException.java b/src/main/java/com/onthegomap/flatmap/geo/GeometryException.java index ca3620e2..515de7e4 100644 --- a/src/main/java/com/onthegomap/flatmap/geo/GeometryException.java +++ b/src/main/java/com/onthegomap/flatmap/geo/GeometryException.java @@ -2,16 +2,19 @@ package com.onthegomap.flatmap.geo; public class GeometryException extends Exception { - public GeometryException(Throwable cause) { - super(cause); - } + private final String stat; - public GeometryException(String message, Throwable cause) { + public GeometryException(String stat, String message, Throwable cause) { super(message, cause); + this.stat = stat; } - public GeometryException(String message) { + public GeometryException(String stat, String message) { super(message); + this.stat = stat; } + public String stat() { + return stat; + } } diff --git a/src/main/java/com/onthegomap/flatmap/monitoring/Counter.java b/src/main/java/com/onthegomap/flatmap/monitoring/Counter.java new file mode 100644 index 00000000..a3745495 --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/monitoring/Counter.java @@ -0,0 +1,109 @@ +package com.onthegomap.flatmap.monitoring; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; + +public interface Counter { + + void inc(); + + void incBy(long value); + + interface Readable extends Counter, LongSupplier { + + long get(); + + @Override + default long getAsLong() { + return get(); + } + } + + static Readable newSingleThreadCounter() { + return new SingleThreadCounter(); + } + + static MultiThreadCounter newMultiThreadCounter() { + return new MultiThreadCounter(); + } + + static Counter.Readable noop() { + return NoopCoounter.instance; + } + + class SingleThreadCounter implements Readable { + + private SingleThreadCounter() { + } + + private final AtomicLong counter = new AtomicLong(0); + + @Override + public void inc() { + counter.incrementAndGet(); + } + + @Override + public void incBy(long value) { + counter.addAndGet(value); + } + + @Override + public long get() { + return counter.get(); + } + } + + class MultiThreadCounter implements Readable { + + private MultiThreadCounter() { + } + + private final List all = Collections.synchronizedList(new ArrayList<>()); + private final ThreadLocal thread = ThreadLocal.withInitial(() -> { + SingleThreadCounter counter = new SingleThreadCounter(); + all.add(counter); + return counter; + }); + + @Override + public void inc() { + thread.get().inc(); + } + + @Override + public void incBy(long value) { + thread.get().incBy(value); + } + + public Counter counterForThread() { + return thread.get(); + } + + @Override + public long get() { + return all.stream().mapToLong(SingleThreadCounter::get).sum(); + } + } + + class NoopCoounter implements Counter.Readable { + + private static final NoopCoounter instance = new NoopCoounter(); + + @Override + public void inc() { + } + + @Override + public void incBy(long value) { + } + + @Override + public long get() { + return 0; + } + } +} diff --git a/src/main/java/com/onthegomap/flatmap/monitoring/PrometheusStats.java b/src/main/java/com/onthegomap/flatmap/monitoring/PrometheusStats.java index 1da35469..0c6c052e 100644 --- a/src/main/java/com/onthegomap/flatmap/monitoring/PrometheusStats.java +++ b/src/main/java/com/onthegomap/flatmap/monitoring/PrometheusStats.java @@ -6,6 +6,7 @@ import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.CounterMetricFamily; import io.prometheus.client.GaugeMetricFamily; +import io.prometheus.client.Histogram; import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory; import io.prometheus.client.exporter.PushGateway; import io.prometheus.client.hotspot.DefaultExports; @@ -36,6 +37,7 @@ public class PrometheusStats implements Stats { private final CollectorRegistry registry = new CollectorRegistry(); private final Timers timers = new Timers(); + private static final String NAMESPACE = "flatmap"; private static final String BASE = "flatmap_"; private final PushGateway pg; private final ScheduledExecutorService executor; @@ -102,19 +104,45 @@ public class PrometheusStats implements Stats { }.register(registry); } - @Override - public void emittedFeature(int z, String layer, int coveringTiles) { - - } + private final io.prometheus.client.Counter processedElements = io.prometheus.client.Counter + .build(BASE + "renderer_elements_processed", "Number of source elements processed") + .labelNames("type", "layer") + .register(registry); @Override - public void encodedTile(int zoom, int length) { - + public void processedElement(String elemType, String layer) { + processedElements.labels(elemType, layer).inc(); } + private final io.prometheus.client.Counter dataErrors = io.prometheus.client.Counter + .build(BASE + "bad_input_data", "Number of data inconsistencies encountered in source data") + .labelNames("type") + .register(registry); + + @Override + public void dataError(String stat) { + dataErrors.labels(stat).inc(); + } + + private final io.prometheus.client.Counter emittedFeatures = io.prometheus.client.Counter + .build(BASE + "renderer_features_emitted", "Features enqueued for writing to feature DB") + .labelNames("zoom", "layer") + .register(registry); + + @Override + public void emittedFeatures(int z, String layer, int number) { + emittedFeatures.labels(Integer.toString(z), layer).inc(number); + } + + private final Histogram tilesWrittenBytes = Histogram + .build(BASE + "mbtiles_tile_written_bytes", "Written tile sizes by zoom level") + .buckets(1_000, 10_000, 100_000, 500_000) + .labelNames("zoom") + .register(registry); + @Override public void wroteTile(int zoom, int bytes) { - + tilesWrittenBytes.labels(Integer.toString(zoom)).observe(bytes); } @Override @@ -137,7 +165,23 @@ public class PrometheusStats implements Stats { new Collector() { @Override public List collect() { - return List.of(new GaugeMetricFamily(BASE + sanitizeMetricName(name), "", supplier.get().doubleValue())); + return List.of(new CounterMetricFamily(BASE + sanitizeMetricName(name), "", supplier.get().doubleValue())); + } + }.register(registry); + } + + @Override + public void counter(String name, String label, Supplier> values) { + new Collector() { + @Override + public List collect() { + List result = new ArrayList<>(); + CounterMetricFamily family = new CounterMetricFamily(BASE + sanitizeMetricName(name), "", List.of(label)); + result.add(family); + for (var entry : values.get().entrySet()) { + family.addMetric(List.of(entry.getKey()), entry.getValue().get()); + } + return result; } }.register(registry); } diff --git a/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java b/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java index 722fef73..733eb795 100644 --- a/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java +++ b/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java @@ -4,7 +4,7 @@ import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND; import com.onthegomap.flatmap.MemoryEstimator; import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Map; import java.util.function.Supplier; public interface Stats extends AutoCloseable { @@ -23,9 +23,7 @@ public interface Stats extends AutoCloseable { void gauge(String name, Supplier value); - void emittedFeature(int z, String layer, int coveringTiles); - - void encodedTile(int zoom, int length); + void emittedFeatures(int z, String layer, int coveringTiles); void wroteTile(int zoom, int bytes); @@ -37,51 +35,26 @@ public interface Stats extends AutoCloseable { void counter(String name, Supplier supplier); - default StatCounter longCounter(String name) { - StatCounter.AtomicCounter counter = new StatCounter.AtomicCounter(); + default Counter.Readable longCounter(String name) { + Counter.Readable counter = Counter.newMultiThreadCounter(); counter(name, counter::get); return counter; } - default StatCounter nanoCounter(String name) { - StatCounter.AtomicCounter counter = new StatCounter.AtomicCounter(); + default Counter nanoCounter(String name) { + Counter.Readable counter = Counter.newMultiThreadCounter(); counter(name, () -> counter.get() / NANOSECONDS_PER_SECOND); return counter; } - interface StatCounter { + void counter(String name, String label, Supplier> values); - void inc(long v); + void processedElement(String elemType, String layer); - default void inc() { - inc(1); - } - - class NoopCounter implements StatCounter { - - @Override - public void inc(long v) { - } - } - - class AtomicCounter implements StatCounter { - - private final AtomicLong counter = new AtomicLong(0); - - @Override - public void inc(long v) { - counter.addAndGet(v); - } - - public long get() { - return counter.get(); - } - } - } + void dataError(String stat); class InMemory implements Stats { - private static final StatCounter NOOP_COUNTER = new StatCounter.NoopCounter(); private final Timers timers = new Timers(); @Override @@ -94,11 +67,6 @@ public interface Stats extends AutoCloseable { return timers.startTimer(name); } - @Override - public void encodedTile(int zoom, int length) { - - } - @Override public void wroteTile(int zoom, int bytes) { } @@ -121,13 +89,25 @@ public interface Stats extends AutoCloseable { } @Override - public StatCounter longCounter(String name) { - return NOOP_COUNTER; + public Counter.Readable longCounter(String name) { + return Counter.noop(); } @Override - public StatCounter nanoCounter(String name) { - return NOOP_COUNTER; + public Counter nanoCounter(String name) { + return Counter.noop(); + } + + @Override + public void counter(String name, String label, Supplier> values) { + } + + @Override + public void processedElement(String elemType, String layer) { + } + + @Override + public void dataError(String stat) { } @Override @@ -135,7 +115,7 @@ public interface Stats extends AutoCloseable { } @Override - public void emittedFeature(int z, String layer, int coveringTiles) { + public void emittedFeatures(int z, String layer, int coveringTiles) { } @Override diff --git a/src/main/java/com/onthegomap/flatmap/read/OpenStreetMapReader.java b/src/main/java/com/onthegomap/flatmap/read/OpenStreetMapReader.java index cc63ddf9..54542c4b 100644 --- a/src/main/java/com/onthegomap/flatmap/read/OpenStreetMapReader.java +++ b/src/main/java/com/onthegomap/flatmap/read/OpenStreetMapReader.java @@ -21,6 +21,7 @@ import com.onthegomap.flatmap.collections.LongLongMap; import com.onthegomap.flatmap.collections.LongLongMultimap; import com.onthegomap.flatmap.geo.GeoUtils; import com.onthegomap.flatmap.geo.GeometryException; +import com.onthegomap.flatmap.monitoring.Counter; import com.onthegomap.flatmap.monitoring.ProgressLoggers; import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.render.FeatureRenderer; @@ -29,6 +30,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.locationtech.jts.geom.Coordinate; @@ -48,9 +50,9 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima private final OsmSource osmInputFile; private final Stats stats; private final LongLongMap nodeDb; - private final AtomicLong PASS1_NODES = new AtomicLong(0); - private final AtomicLong PASS1_WAYS = new AtomicLong(0); - private final AtomicLong PASS1_RELATIONS = new AtomicLong(0); + private final Counter.Readable PASS1_NODES = Counter.newSingleThreadCounter(); + private final Counter.Readable PASS1_WAYS = Counter.newSingleThreadCounter(); + private final Counter.Readable PASS1_RELATIONS = Counter.newSingleThreadCounter(); private final Profile profile; private final String name; @@ -80,6 +82,11 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima this.stats = stats; this.profile = profile; stats.monitorInMemoryObject("osm_relations", this); + stats.counter("osm_pass1_elements_processed", "type", () -> Map.of( + "nodes", PASS1_NODES, + "ways", PASS1_WAYS, + "relations", PASS1_RELATIONS + )); } public void pass1(CommonParams config) { @@ -102,12 +109,12 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima void processPass1(ReaderElement readerElement) { if (readerElement instanceof ReaderNode node) { - PASS1_NODES.incrementAndGet(); + PASS1_NODES.inc(); nodeDb.put(node.getId(), GeoUtils.encodeFlatLocation(node.getLon(), node.getLat())); } else if (readerElement instanceof ReaderWay) { - PASS1_WAYS.incrementAndGet(); + PASS1_WAYS.inc(); } else if (readerElement instanceof ReaderRelation rel) { - PASS1_RELATIONS.incrementAndGet(); + PASS1_RELATIONS.inc(); List infos = profile.preprocessOsmRelation(rel); if (infos != null) { for (RelationInfo info : infos) { @@ -133,30 +140,41 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima public void pass2(FeatureGroup writer, CommonParams config) { int readerThreads = Math.max(config.threads() / 4, 1); int processThreads = config.threads() - 1; - AtomicLong nodesProcessed = new AtomicLong(0); - AtomicLong waysProcessed = new AtomicLong(0); - AtomicLong relsProcessed = new AtomicLong(0); + Counter.MultiThreadCounter nodesProcessed = Counter.newMultiThreadCounter(); + Counter.MultiThreadCounter waysProcessed = Counter.newMultiThreadCounter(); + Counter.MultiThreadCounter relsProcessed = Counter.newMultiThreadCounter(); + stats.counter("osm_pass2_elements_processed", "type", () -> Map.of( + "nodes", nodesProcessed, + "ways", waysProcessed, + "relations", relsProcessed + )); + CountDownLatch waysDone = new CountDownLatch(processThreads); var topology = Topology.start("osm_pass2", stats) .fromGenerator("pbf", osmInputFile.read("pbfpass2", readerThreads)) .addBuffer("reader_queue", 50_000, 1_000) .addWorker("process", processThreads, (prev, next) -> { + Counter nodes = nodesProcessed.counterForThread(); + Counter ways = waysProcessed.counterForThread(); + Counter rels = relsProcessed.counterForThread(); + ReaderElement readerElement; - var featureCollectors = new FeatureCollector.Factory(config); + var featureCollectors = new FeatureCollector.Factory(config, stats); NodeLocationProvider nodeCache = newNodeGeometryCache(); var encoder = writer.newRenderedFeatureEncoder(); FeatureRenderer renderer = new FeatureRenderer( config, - rendered -> next.accept(encoder.apply(rendered)) + rendered -> next.accept(encoder.apply(rendered)), + stats ); while ((readerElement = prev.get()) != null) { SourceFeature feature = null; if (readerElement instanceof ReaderNode node) { - nodesProcessed.incrementAndGet(); + nodes.inc(); feature = processNodePass2(node); } else if (readerElement instanceof ReaderWay way) { - waysProcessed.incrementAndGet(); + ways.inc(); feature = processWayPass2(nodeCache, way); } else if (readerElement instanceof ReaderRelation rel) { // ensure all ways finished processing before we start relations @@ -164,7 +182,7 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima waysDone.countDown(); waysDone.await(); } - relsProcessed.incrementAndGet(); + rels.inc(); feature = processRelationPass2(rel, nodeCache); } if (feature != null) { @@ -357,7 +375,7 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima CoordinateSequence coords = nodeCache.getWayGeometry(nodeIds); return GeoUtils.JTS_FACTORY.createLineString(coords); } catch (IllegalArgumentException e) { - throw new GeometryException("Error building line for way " + osmId + ": " + e); + throw new GeometryException("osm_invalid_line", "Error building line for way " + osmId + ": " + e); } } @@ -367,7 +385,7 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima CoordinateSequence coords = nodeCache.getWayGeometry(nodeIds); return GeoUtils.JTS_FACTORY.createPolygon(coords); } catch (IllegalArgumentException e) { - throw new GeometryException("Error building polygon for way " + osmId + ": " + e); + throw new GeometryException("osm_invalid_polygon", "Error building polygon for way " + osmId + ": " + e); } } diff --git a/src/main/java/com/onthegomap/flatmap/read/OsmMultipolygon.java b/src/main/java/com/onthegomap/flatmap/read/OsmMultipolygon.java index 00899fd3..ab7404eb 100644 --- a/src/main/java/com/onthegomap/flatmap/read/OsmMultipolygon.java +++ b/src/main/java/com/onthegomap/flatmap/read/OsmMultipolygon.java @@ -75,7 +75,8 @@ class OsmMultipolygon { ) throws GeometryException { try { if (rings.size() == 0) { - throw new IllegalArgumentException("no rings to process"); + throw new GeometryException("osm_invalid_multipolygon_empty", + "error building multipolygon " + osmId + ": no rings to process"); } List idSegments = connectPolygonSegments(rings); List polygons = new ArrayList<>(idSegments.size()); @@ -91,7 +92,8 @@ class OsmMultipolygon { polygons.sort(BY_AREA_DESCENDING); Set shells = groupParentChildShells(polygons); if (shells.size() == 0) { - throw new IllegalArgumentException("multipolygon not closed"); + throw new GeometryException("osm_invalid_multipolygon_not_closed", + "error building multipolygon " + osmId + ": multipolygon not closed"); } else if (shells.size() == 1) { return shells.iterator().next().toPolygon(); } else { @@ -99,7 +101,7 @@ class OsmMultipolygon { return GeoUtils.JTS_FACTORY.createMultiPolygon(finished); } } catch (IllegalArgumentException e) { - throw new GeometryException("error building multipolygon " + osmId + ": " + e); + throw new GeometryException("osm_invalid_multipolygon", "error building multipolygon " + osmId + ": " + e); } } diff --git a/src/main/java/com/onthegomap/flatmap/read/Reader.java b/src/main/java/com/onthegomap/flatmap/read/Reader.java index b2feb7ae..d784d811 100644 --- a/src/main/java/com/onthegomap/flatmap/read/Reader.java +++ b/src/main/java/com/onthegomap/flatmap/read/Reader.java @@ -38,11 +38,12 @@ public abstract class Reader implements Closeable { .addBuffer("read_queue", 1000) .addWorker("process", threads, (prev, next) -> { SourceFeature sourceFeature; - var featureCollectors = new FeatureCollector.Factory(config); + var featureCollectors = new FeatureCollector.Factory(config, stats); var encoder = writer.newRenderedFeatureEncoder(); FeatureRenderer renderer = new FeatureRenderer( config, - rendered -> next.accept(encoder.apply(rendered)) + rendered -> next.accept(encoder.apply(rendered)), + stats ); while ((sourceFeature = prev.get()) != null) { featuresRead.incrementAndGet(); diff --git a/src/main/java/com/onthegomap/flatmap/render/CoordinateSequenceExtractor.java b/src/main/java/com/onthegomap/flatmap/render/CoordinateSequenceExtractor.java index 039e22fd..3e8d9ca1 100644 --- a/src/main/java/com/onthegomap/flatmap/render/CoordinateSequenceExtractor.java +++ b/src/main/java/com/onthegomap/flatmap/render/CoordinateSequenceExtractor.java @@ -107,7 +107,7 @@ class CoordinateSequenceExtractor { } return GeoUtils.JTS_FACTORY.createPolygon(first, rest); } catch (IllegalArgumentException e) { - throw new GeometryException("Could not build polygon", e); + throw new GeometryException("reassemble_polygon_failed", "Could not build polygon", e); } } diff --git a/src/main/java/com/onthegomap/flatmap/render/FeatureRenderer.java b/src/main/java/com/onthegomap/flatmap/render/FeatureRenderer.java index 405dd14a..b2c8dd43 100644 --- a/src/main/java/com/onthegomap/flatmap/render/FeatureRenderer.java +++ b/src/main/java/com/onthegomap/flatmap/render/FeatureRenderer.java @@ -9,6 +9,7 @@ import com.onthegomap.flatmap.VectorTileEncoder; import com.onthegomap.flatmap.geo.GeoUtils; import com.onthegomap.flatmap.geo.GeometryException; import com.onthegomap.flatmap.geo.TileCoord; +import com.onthegomap.flatmap.monitoring.Stats; import java.util.List; import java.util.Map; import java.util.Optional; @@ -46,10 +47,12 @@ public class FeatureRenderer implements Consumer { }, 2, 0)))); private final CommonParams config; private final Consumer consumer; + private final Stats stats; - public FeatureRenderer(CommonParams config, Consumer consumer) { + public FeatureRenderer(CommonParams config, Consumer consumer, Stats stats) { this.config = config; this.consumer = consumer; + this.stats = stats; } @Override @@ -96,15 +99,18 @@ public class FeatureRenderer implements Consumer { ); } + int emitted = 0; for (var entry : tiled.getTileData()) { TileCoord tile = entry.getKey(); List> result = entry.getValue(); Geometry geom = CoordinateSequenceExtractor.reassemblePoints(result); - // TODO stats - // TODO writeTileFeatures emitFeature(feature, id, attrs, tile, geom, groupInfo); + emitted++; } + stats.emittedFeatures(zoom, feature.getLayer(), emitted); } + + stats.processedElement("point", feature.getLayer()); } private void emitFeature(FeatureCollector.Feature feature, long id, Map attrs, TileCoord tile, @@ -158,12 +164,15 @@ public class FeatureRenderer implements Consumer { double buffer = feature.getBufferPixelsAtZoom(z) / 256; TileExtents.ForZoom extents = config.extents().getForZoom(z); TiledGeometry sliced = TiledGeometry.sliceIntoTiles(groups, buffer, area, z, extents); - writeTileFeatures(id, feature, sliced); + writeTileFeatures(z, id, feature, sliced); } + + stats.processedElement(area ? "polygon" : "line", feature.getLayer()); } - private void writeTileFeatures(long id, FeatureCollector.Feature feature, TiledGeometry sliced) { + private void writeTileFeatures(int zoom, long id, FeatureCollector.Feature feature, TiledGeometry sliced) { Map attrs = feature.getAttrsAtZoom(sliced.zoomLevel()); + int emitted = 0; for (var entry : sliced.getTileData()) { TileCoord tile = entry.getKey(); try { @@ -181,19 +190,22 @@ public class FeatureRenderer implements Consumer { if (!geom.isEmpty()) { emitFeature(feature, id, attrs, tile, geom, null); + emitted++; } } catch (GeometryException e) { + stats.dataError("write_tile_features_" + e.stat()); LOGGER.warn(e.getMessage() + ": " + tile + " " + feature); } } if (feature.area()) { - emitFilledTiles(id, feature, sliced); + emitted += emitFilledTiles(id, feature, sliced); } - // TODO log stats + + stats.emittedFeatures(zoom, feature.getLayer(), emitted); } - private void emitFilledTiles(long id, FeatureCollector.Feature feature, TiledGeometry sliced) { + private int emitFilledTiles(long id, FeatureCollector.Feature feature, TiledGeometry sliced) { /* * Optimization: large input polygons that generate many filled interior tiles (ie. the ocean), the encoder avoids * re-encoding if groupInfo and vector tile feature are == to previous values. @@ -206,6 +218,7 @@ public class FeatureRenderer implements Consumer { feature.getAttrsAtZoom(sliced.zoomLevel()) ); + int emitted = 0; for (TileCoord tile : sliced.getFilledTiles()) { consumer.accept(new RenderedFeature( tile, @@ -213,6 +226,8 @@ public class FeatureRenderer implements Consumer { feature.getZorder(), groupInfo )); + emitted++; } + return emitted; } } diff --git a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java index 5228b12c..11b6b674 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java +++ b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java @@ -1,5 +1,6 @@ package com.onthegomap.flatmap.worker; +import com.onthegomap.flatmap.monitoring.Counter; import com.onthegomap.flatmap.monitoring.Stats; import java.util.ArrayDeque; import java.util.Queue; @@ -7,7 +8,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -19,12 +19,12 @@ public class WorkQueue implements AutoCloseable, Supplier, Consumer { private final int batchSize; private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); private final int pendingBatchesCapacity; - private final Stats.StatCounter enqueueCountStat; - private final Stats.StatCounter enqueueBlockTimeNanos; - private final Stats.StatCounter dequeueCountStat; - private final Stats.StatCounter dequeueBlockTimeNanos; + private final Counter enqueueCountStat; + private final Counter enqueueBlockTimeNanos; + private final Counter dequeueCountStat; + private final Counter dequeueBlockTimeNanos; private volatile boolean hasIncomingData = true; - private final AtomicInteger pendingCount = new AtomicInteger(0); + private final Counter.Readable pendingCount = Counter.newMultiThreadCounter(); public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { this.pendingBatchesCapacity = capacity / maxBatch; @@ -67,7 +67,7 @@ public class WorkQueue implements AutoCloseable, Supplier, Consumer { } writeBatch.offer(item); - pendingCount.incrementAndGet(); + pendingCount.inc(); if (writeBatch.size() >= batchSize) { flushWrites(); @@ -85,7 +85,7 @@ public class WorkQueue implements AutoCloseable, Supplier, Consumer { if (!itemQueue.offer(writeBatch)) { long start = System.nanoTime(); itemQueue.put(writeBatch); - enqueueBlockTimeNanos.inc(System.nanoTime() - start); + enqueueBlockTimeNanos.incBy(System.nanoTime() - start); } } catch (InterruptedException ex) { throw new RuntimeException(ex); @@ -117,19 +117,19 @@ public class WorkQueue implements AutoCloseable, Supplier, Consumer { } } while (itemBatch == null); itemReadBatchProvider.set(itemBatch); - dequeueBlockTimeNanos.inc(System.nanoTime() - start); + dequeueBlockTimeNanos.incBy(System.nanoTime() - start); } T result = itemBatch == null ? null : itemBatch.poll(); if (result != null) { - pendingCount.decrementAndGet(); + pendingCount.incBy(-1); } dequeueCountStat.inc(); return result; } public int getPending() { - return pendingCount.get(); + return (int) pendingCount.get(); } public int getCapacity() { diff --git a/src/main/java/com/onthegomap/flatmap/write/MbtilesWriter.java b/src/main/java/com/onthegomap/flatmap/write/MbtilesWriter.java index 4b0cd0d2..ab5205c5 100644 --- a/src/main/java/com/onthegomap/flatmap/write/MbtilesWriter.java +++ b/src/main/java/com/onthegomap/flatmap/write/MbtilesWriter.java @@ -7,18 +7,21 @@ import com.onthegomap.flatmap.LayerStats; import com.onthegomap.flatmap.Profile; import com.onthegomap.flatmap.VectorTileEncoder; import com.onthegomap.flatmap.collections.FeatureGroup; +import com.onthegomap.flatmap.monitoring.Counter; import com.onthegomap.flatmap.monitoring.ProgressLoggers; import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.LongAccumulator; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.IntStream; +import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,18 +30,17 @@ public class MbtilesWriter { private static final Logger LOGGER = LoggerFactory.getLogger(MbtilesWriter.class); - private final AtomicLong featuresProcessed = new AtomicLong(0); - private final AtomicLong memoizedTiles = new AtomicLong(0); - private final AtomicLong tilesEmitted = new AtomicLong(0); + private final Counter.Readable featuresProcessed; + private final Counter memoizedTiles; private final Mbtiles db; private final CommonParams config; private final Profile profile; private final Stats stats; private final LayerStats layerStats; - private final AtomicLong[] tilesByZoom; - private final AtomicLong[] totalTileSizesByZoom; - private final AtomicInteger[] maxTileSizesByZoom; + private final Counter.Readable[] tilesByZoom; + private final Counter.Readable[] totalTileSizesByZoom; + private final LongAccumulator[] maxTileSizesByZoom; MbtilesWriter(Mbtiles db, CommonParams config, Profile profile, Stats stats, LayerStats layerStats) { this.db = db; @@ -46,11 +48,19 @@ public class MbtilesWriter { this.profile = profile; this.stats = stats; this.layerStats = layerStats; - tilesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(AtomicLong::new).toArray(AtomicLong[]::new); - totalTileSizesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(AtomicLong::new) - .toArray(AtomicLong[]::new); - maxTileSizesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(AtomicInteger::new) - .toArray(AtomicInteger[]::new); + tilesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(i -> Counter.newSingleThreadCounter()) + .toArray(Counter.Readable[]::new); + totalTileSizesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(i -> Counter.newMultiThreadCounter()) + .toArray(Counter.Readable[]::new); + maxTileSizesByZoom = IntStream.rangeClosed(0, config.maxzoom()).mapToObj(i -> new LongAccumulator(Long::max, 0)) + .toArray(LongAccumulator[]::new); + memoizedTiles = stats.longCounter("mbtiles_memoized_tiles"); + featuresProcessed = stats.longCounter("mbtiles_features_processed"); + Map countsByZoom = new LinkedHashMap<>(); + for (int zoom = config.minzoom(); zoom <= config.maxzoom(); zoom++) { + countsByZoom.put(Integer.toString(zoom), tilesByZoom[zoom]); + } + stats.counter("mbtiles_tiles_written", "zoom", () -> countsByZoom); } @@ -76,7 +86,7 @@ public class MbtilesWriter { var loggers = new ProgressLoggers("mbtiles") .addRatePercentCounter("features", features.numFeatures(), writer.featuresProcessed) - .addRateCounter("tiles", writer.tilesEmitted) + .addRateCounter("tiles", writer::tilesEmitted) .addFileSize(fileSize) .add(" features ").addFileSize(features::getStorageSize) .addProcessStats() @@ -89,12 +99,12 @@ public class MbtilesWriter { FeatureGroup.TileFeatures tileFeatures, last = null; byte[] lastBytes = null, lastEncoded = null; while ((tileFeatures = prev.get()) != null) { - featuresProcessed.addAndGet(tileFeatures.getNumFeatures()); + featuresProcessed.incBy(tileFeatures.getNumFeatures()); byte[] bytes, encoded; if (tileFeatures.hasSameContents(last)) { bytes = lastBytes; encoded = lastEncoded; - memoizedTiles.incrementAndGet(); + memoizedTiles.inc(); } else { VectorTileEncoder en = tileFeatures.getTile(); encoded = en.encode(); @@ -108,10 +118,8 @@ public class MbtilesWriter { } int zoom = tileFeatures.coord().z(); int encodedLength = encoded.length; - tilesByZoom[zoom].incrementAndGet(); - totalTileSizesByZoom[zoom].addAndGet(encodedLength); - maxTileSizesByZoom[zoom].accumulateAndGet(encodedLength, Integer::max); - stats.encodedTile(tileFeatures.coord().z(), encodedLength); + totalTileSizesByZoom[zoom].incBy(encodedLength); + maxTileSizesByZoom[zoom].accumulate(encodedLength); next.accept(new Mbtiles.TileEntry(tileFeatures.coord(), bytes)); } } @@ -141,7 +149,7 @@ public class MbtilesWriter { while ((tile = tiles.get()) != null) { batchedWriter.write(tile.tile(), tile.bytes()); stats.wroteTile(tile.tile().z(), tile.bytes().length); - tilesEmitted.incrementAndGet(); + tilesByZoom[tile.tile().z()].inc(); } } @@ -173,8 +181,12 @@ public class MbtilesWriter { LOGGER.debug("all" + " avg:" + Format.formatStorage(sumSize / Math.max(sumCount, 1), false) + " max:" + Format.formatStorage(maxMax, false)); - LOGGER.debug(" # features: " + Format.formatInteger(featuresProcessed)); - LOGGER.debug(" # tiles: " + Format.formatInteger(tilesEmitted)); + LOGGER.debug(" # features: " + Format.formatInteger(featuresProcessed.get())); + LOGGER.debug(" # tiles: " + Format.formatInteger(this.tilesEmitted())); + } + + private long tilesEmitted() { + return Stream.of(tilesByZoom).mapToLong(Counter.Readable::get).sum(); } private static byte[] gzipCompress(byte[] uncompressedData) throws IOException { diff --git a/src/test/java/com/onthegomap/flatmap/FeatureCollectorTest.java b/src/test/java/com/onthegomap/flatmap/FeatureCollectorTest.java index eab1f976..58c1f406 100644 --- a/src/test/java/com/onthegomap/flatmap/FeatureCollectorTest.java +++ b/src/test/java/com/onthegomap/flatmap/FeatureCollectorTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.onthegomap.flatmap.geo.GeoUtils; import com.onthegomap.flatmap.geo.GeometryException; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.read.ReaderFeature; import java.util.Arrays; import java.util.List; @@ -19,7 +20,7 @@ import org.junit.jupiter.params.provider.ValueSource; public class FeatureCollectorTest { private CommonParams config = CommonParams.defaults(); - private FeatureCollector.Factory factory = new FeatureCollector.Factory(config); + private FeatureCollector.Factory factory = new FeatureCollector.Factory(config, new Stats.InMemory()); private static void assertFeatures(int zoom, List> expected, FeatureCollector actual) { List actualList = StreamSupport.stream(actual.spliterator(), false).toList(); diff --git a/src/test/java/com/onthegomap/flatmap/render/FeatureRendererTest.java b/src/test/java/com/onthegomap/flatmap/render/FeatureRendererTest.java index 357ea813..dc2fbaf6 100644 --- a/src/test/java/com/onthegomap/flatmap/render/FeatureRendererTest.java +++ b/src/test/java/com/onthegomap/flatmap/render/FeatureRendererTest.java @@ -12,6 +12,7 @@ import com.onthegomap.flatmap.FeatureCollector; import com.onthegomap.flatmap.TestUtils; import com.onthegomap.flatmap.geo.GeoUtils; import com.onthegomap.flatmap.geo.TileCoord; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.read.ReaderFeature; import java.util.ArrayList; import java.util.Collection; @@ -39,16 +40,17 @@ import org.locationtech.jts.precision.GeometryPrecisionReducer; public class FeatureRendererTest { private CommonParams config = CommonParams.defaults(); + private final Stats stats = new Stats.InMemory(); private FeatureCollector collector(Geometry worldGeom) { var latLonGeom = GeoUtils.worldToLatLonCoords(worldGeom); - return new FeatureCollector.Factory(config).get(new ReaderFeature(latLonGeom, 0, null, null)); + return new FeatureCollector.Factory(config, stats).get(new ReaderFeature(latLonGeom, 0, null, null)); } private Map> renderGeometry(FeatureCollector.Feature feature) { Map> result = new TreeMap<>(); new FeatureRenderer(config, rendered -> result.computeIfAbsent(rendered.tile(), tile -> new HashSet<>()) - .add(decodeSilently(rendered.vectorTileFeature().geometry()))).accept(feature); + .add(decodeSilently(rendered.vectorTileFeature().geometry())), new Stats.InMemory()).accept(feature); result.values().forEach(gs -> gs.forEach(TestUtils::validateGeometry)); return result; } @@ -56,7 +58,7 @@ public class FeatureRendererTest { private Map> renderFeatures(FeatureCollector.Feature feature) { Map> result = new TreeMap<>(); new FeatureRenderer(config, rendered -> result.computeIfAbsent(rendered.tile(), tile -> new HashSet<>()) - .add(rendered)).accept(feature); + .add(rendered), new Stats.InMemory()).accept(feature); result.values() .forEach(gs -> gs.forEach(f -> TestUtils.validateGeometry(decodeSilently(f.vectorTileFeature().geometry())))); return result; @@ -806,7 +808,7 @@ public class FeatureRendererTest { .setZoomRange(maxZoom, maxZoom) .setBufferPixels(0); AtomicLong num = new AtomicLong(0); - new FeatureRenderer(config, rendered1 -> num.incrementAndGet()) + new FeatureRenderer(config, rendered1 -> num.incrementAndGet(), new Stats.InMemory()) .accept(feature); assertEquals(num.get(), Math.pow(4, maxZoom)); }