initial prometheus stats

pull/1/head
Mike Barry 2021-06-05 08:02:51 -04:00
rodzic 9a0119a188
commit 11a65b3c1c
18 zmienionych plików z 4317 dodań i 41 usunięć

3849
grafana.json 100644

Plik diff jest za duży Load Diff

21
pom.xml
Wyświetl plik

@ -18,6 +18,7 @@
<junit.version>5.7.1</junit.version>
<jackson.version>2.12.3</jackson.version>
<log4j.version>2.14.1</log4j.version>
<prometheus.version>0.11.0</prometheus.version>
</properties>
<scm>
@ -111,6 +112,26 @@
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_common</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.geo.GeoUtils;
import com.onthegomap.flatmap.monitoring.PrometheusStats;
import com.onthegomap.flatmap.monitoring.Stats;
import java.nio.file.Files;
import java.nio.file.Path;
@ -118,7 +119,16 @@ public class Arguments {
}
public Stats getStats() {
return new Stats.InMemory();
String prometheus = getArg("pushgateway");
if (prometheus != null && !prometheus.isBlank()) {
LOGGER.info("Using prometheus push gateway stats");
String job = get("pushgateway.job", "prometheus pushgateway job ID", "flatmap");
Duration interval = duration("pushgateway.interval", "how often to send stats to prometheus push gateway", "15s");
return new PrometheusStats(prometheus, job, interval);
} else {
LOGGER.info("Using in-memory stats");
return new Stats.InMemory();
}
}
public int integer(String key, String description, int defaultValue) {

Wyświetl plik

@ -9,30 +9,30 @@ import com.carrotsearch.hppc.LongObjectHashMap;
public class MemoryEstimator {
public static long size(HasEstimate object) {
return object.estimateMemoryUsageBytes();
return object == null ? 0 : object.estimateMemoryUsageBytes();
}
public static long size(LongHashSet object) {
return 24L + 8L * object.keys.length;
return object == null ? 0 : 24L + 8L * object.keys.length;
}
public static long size(LongLongHashMap object) {
return 24L + 8L * object.keys.length +
24L + 8L * object.values.length;
return object == null ? 0 : (24L + 8L * object.keys.length +
24L + 8L * object.values.length);
}
public static <T> long sizeWithoutValues(LongObjectHashMap<T> object) {
return 24L + 8L * object.keys.length +
24L + 8L * object.values.length;
return object == null ? 0 : (24L + 8L * object.keys.length +
24L + 8L * object.values.length);
}
public static long size(LongIntHashMap object) {
return 24L + 8L * object.keys.length +
24L + 4L * object.values.length;
return object == null ? 0 : (24L + 8L * object.keys.length +
24L + 4L * object.values.length);
}
public static long size(LongArrayList object) {
return 24L + 8L * object.buffer.length;
return object == null ? 0 : (24L + 8L * object.buffer.length);
}
public interface HasEstimate {

Wyświetl plik

@ -9,7 +9,6 @@ import com.onthegomap.flatmap.read.OpenStreetMapReader;
import com.onthegomap.flatmap.read.OsmInputFile;
import com.onthegomap.flatmap.read.ShapefileReader;
import com.onthegomap.flatmap.write.MbtilesWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
@ -20,7 +19,7 @@ public class OpenMapTilesMain {
private static final Logger LOGGER = LoggerFactory.getLogger(OpenMapTilesMain.class);
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
Arguments arguments = Arguments.fromJvmProperties();
var stats = arguments.getStats();
var overallTimer = stats.startTimer("openmaptiles");
@ -39,18 +38,18 @@ public class OpenMapTilesMain {
boolean useWikidata = arguments.get("use_wikidata", "use wikidata translations", true);
Path wikidataNamesFile = arguments.file("wikidata_cache", "wikidata cache file",
Path.of("data", "sources", "wikidata_names.json"));
Path output = arguments.file("output", "mbtiles output file", Path.of("data", "massachusetts.mbtiles"));
Path mbtilesOutputPath = arguments.file("output", "mbtiles output file", Path.of("data", "massachusetts.mbtiles"));
List<String> languages = arguments.get("name_languages", "languages to use",
"en,ru,ar,zh,ja,ko,fr,de,fi,pl,es,be,br,he".split(","));
CommonParams config = CommonParams.from(arguments, osmInputFile);
if (config.forceOverwrite()) {
FileUtils.deleteFile(output);
} else if (Files.exists(output)) {
throw new IllegalArgumentException(output + " already exists, use force to overwrite.");
FileUtils.deleteFile(mbtilesOutputPath);
} else if (Files.exists(mbtilesOutputPath)) {
throw new IllegalArgumentException(mbtilesOutputPath + " already exists, use force to overwrite.");
}
LOGGER.info("Building OpenMapTiles profile into " + output + " in these phases:");
LOGGER.info("Building OpenMapTiles profile into " + mbtilesOutputPath + " in these phases:");
if (fetchWikidata) {
LOGGER.info(" [wikidata] Fetch OpenStreetMap element name translations from wikidata");
}
@ -60,16 +59,20 @@ public class OpenMapTilesMain {
LOGGER.info(" [osm_pass1] Pre-process OpenStreetMap input (store node locations then relation members)");
LOGGER.info(" [osm_pass2] Process OpenStreetMap nodes, ways, then relations");
LOGGER.info(" [sort] Sort rendered features by tile ID");
LOGGER.info(" [mbtiles] Encode each tile and write to " + output);
LOGGER.info(" [mbtiles] Encode each tile and write to " + mbtilesOutputPath);
var translations = Translations.defaultProvider(languages);
var profile = new OpenMapTilesProfile();
Files.createDirectories(tmpDir);
Path nodeDb = tmpDir.resolve("node.db");
LongLongMap nodeLocations = LongLongMap.newFileBackedSortedTable(nodeDb);
Path nodeDbPath = tmpDir.resolve("node.db");
LongLongMap nodeLocations = LongLongMap.newFileBackedSortedTable(nodeDbPath);
Path featureDbPath = tmpDir.resolve("feature.db");
FeatureSort featureDb = FeatureSort.newExternalMergeSort(tmpDir.resolve("feature.db"), config.threads(), stats);
FeatureGroup featureMap = new FeatureGroup(featureDb, profile, stats);
stats.monitorFile("nodes", nodeDbPath);
stats.monitorFile("features", featureDbPath);
stats.monitorFile("mbtiles", mbtilesOutputPath);
if (fetchWikidata) {
stats.time("wikidata", () -> Wikidata.fetch(osmInputFile, wikidataNamesFile, config, profile, stats));
@ -99,16 +102,18 @@ public class OpenMapTilesMain {
LOGGER.info("Deleting node.db to make room for mbtiles");
profile.release();
Files.delete(nodeDb);
Files.delete(nodeDbPath);
stats.time("sort", featureDb::sort);
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureMap, output, profile, config, stats));
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureMap, mbtilesOutputPath, profile, config, stats));
overallTimer.stop();
LOGGER.info("FINISHED!");
stats.printSummary();
stats.close();
}
}

Wyświetl plik

@ -132,7 +132,7 @@ public class Wikidata {
fetcher.loadExisting(oldMappings);
var topology = Topology.start("wikidata", stats)
.fromGenerator("pbf", infile.read(readerThreads))
.fromGenerator("pbf", infile.read("pbfwikidata", readerThreads))
.addBuffer("reader_queue", 50_000, 10_000)
.addWorker("filter", processThreads, fetcher::filter)
.addBuffer("fetch_queue", 50_000)

Wyświetl plik

@ -57,9 +57,9 @@ public class ProcessInfo {
}
public static record ThreadState(String name, long cpuTimeNanos, long id) {
public static record ThreadState(String name, long cpuTimeNanos, long userTimeNanos, long id) {
public static final ThreadState DEFAULT = new ThreadState("", 0, -1);
public static final ThreadState DEFAULT = new ThreadState("", 0, 0, -1);
}
@ -77,8 +77,12 @@ public class ProcessInfo {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo thread : threadMXBean.dumpAllThreads(false, false)) {
threadState.put(thread.getThreadId(),
new ThreadState(thread.getThreadName(), threadMXBean.getThreadCpuTime(thread.getThreadId()),
thread.getThreadId()));
new ThreadState(
thread.getThreadName(),
threadMXBean.getThreadCpuTime(thread.getThreadId()),
threadMXBean.getThreadUserTime(thread.getThreadId()),
thread.getThreadId()
));
}
return threadState;
}

Wyświetl plik

@ -0,0 +1,265 @@
package com.onthegomap.flatmap.monitoring;
import com.onthegomap.flatmap.FileUtils;
import com.onthegomap.flatmap.MemoryEstimator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.CounterMetricFamily;
import io.prometheus.client.GaugeMetricFamily;
import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
import io.prometheus.client.exporter.PushGateway;
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.net.URL;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PrometheusStats implements Stats {
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusStats.class);
private final CollectorRegistry registry = new CollectorRegistry();
private final Timers timers = new Timers();
private static final String BASE = "flatmap_";
private final PushGateway pg;
private final ScheduledExecutorService executor;
private final String job;
private final Map<String, Path> filesToMonitor = Collections.synchronizedMap(new LinkedHashMap<>());
private final Map<String, MemoryEstimator.HasEstimate> heapObjectsToMonitor = Collections
.synchronizedMap(new LinkedHashMap<>());
public PrometheusStats(String destination, String job, Duration interval) {
this.job = job;
try {
DefaultExports.register(registry);
new ThreadDetailsExports().register(registry);
new InProgressTasks().register(registry);
new FileSizeCollector().register(registry);
new HeapObjectSizeCollector().register(registry);
URL url = new URL(destination);
pg = new PushGateway(url);
if (url.getUserInfo() != null) {
String[] parts = url.getUserInfo().split(":");
if (parts.length == 2) {
pg.setConnectionFactory(new BasicAuthHttpConnectionFactory(parts[0], parts[1]));
}
}
pg.pushAdd(registry, job);
executor = Executors.newScheduledThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("prometheus-pusher");
return thread;
});
executor.scheduleAtFixedRate(this::push, 0, Math.max(interval.getSeconds(), 5), TimeUnit.SECONDS);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void push() {
try {
pg.push(registry, job);
} catch (IOException e) {
LOGGER.error("Error pushing stats to prometheus", e);
}
}
@Override
public void time(String name, Runnable task) {
timers.time(name, task);
}
@Override
public Timers.Finishable startTimer(String name) {
return timers.startTimer(name);
}
@Override
public void gauge(String name, Supplier<Number> value) {
new Collector() {
@Override
public List<MetricFamilySamples> collect() {
return List.of(new GaugeMetricFamily(BASE + sanitizeMetricName(name), "", value.get().doubleValue()));
}
}.register(registry);
}
@Override
public void emittedFeature(int z, String layer, int coveringTiles) {
}
@Override
public void encodedTile(int zoom, int length) {
}
@Override
public void wroteTile(int zoom, int bytes) {
}
@Override
public Timers timers() {
return timers;
}
@Override
public void monitorFile(String name, Path path) {
filesToMonitor.put(name, path);
}
@Override
public void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate heapObject) {
heapObjectsToMonitor.put(name, heapObject);
}
@Override
public void counter(String name, Supplier<Number> supplier) {
new Collector() {
@Override
public List<MetricFamilySamples> collect() {
return List.of(new GaugeMetricFamily(BASE + sanitizeMetricName(name), "", supplier.get().doubleValue()));
}
}.register(registry);
}
@Override
public void close() throws Exception {
executor.shutdown();
push();
}
private static CounterMetricFamily counterMetric(String name, double value) {
return new CounterMetricFamily(BASE + name, BASE + name + " value", value);
}
private static GaugeMetricFamily gaugeMetric(String name, double value) {
return new GaugeMetricFamily(BASE + name, BASE + name + " value", value);
}
private class InProgressTasks extends Collector {
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> result = new ArrayList<>();
for (var entry : timers.all().entrySet()) {
String name = entry.getKey();
Timer timer = entry.getValue();
result.add(gaugeMetric(name + "_running", timer.running() ? 1 : 0));
ProcessTime time = timer.elapsed();
result.add(gaugeMetric(name + "_elapsed_time_seconds", time.wall().toNanos() / NANOSECONDS_PER_SECOND));
result.add(gaugeMetric(name + "_cpu_time_seconds",
time.cpu().orElse(Duration.ZERO).toNanos() / NANOSECONDS_PER_SECOND));
}
return result;
}
}
private class FileSizeCollector extends Collector {
private boolean logged = false;
@Override
public List<MetricFamilySamples> collect() {
List<Collector.MetricFamilySamples> results = new ArrayList<>();
for (var file : filesToMonitor.entrySet()) {
String name = sanitizeMetricName(file.getKey());
Path path = file.getValue();
results.add(new GaugeMetricFamily(BASE + "file_" + name + "_size_bytes", "Size of " + name + " in bytes",
FileUtils.size(path)));
if (Files.exists(path)) {
try {
FileStore fileStore = Files.getFileStore(path);
results
.add(new GaugeMetricFamily(BASE + "file_" + name + "_total_space_bytes", "Total space available on disk",
fileStore.getTotalSpace()));
results.add(
new GaugeMetricFamily(BASE + "file_" + name + "_unallocated_space_bytes", "Unallocated space on disk",
fileStore.getUnallocatedSpace()));
results
.add(new GaugeMetricFamily(BASE + "file_" + name + "_usable_space_bytes", "Usable space on disk",
fileStore.getUsableSpace()));
} catch (IOException e) {
// let the user know once
if (!logged) {
LOGGER.warn("unable to get usable space on device", e);
logged = true;
}
}
}
}
return results;
}
}
private class HeapObjectSizeCollector extends Collector {
@Override
public List<MetricFamilySamples> collect() {
List<Collector.MetricFamilySamples> results = new ArrayList<>();
for (var entry : heapObjectsToMonitor.entrySet()) {
String name = sanitizeMetricName(entry.getKey());
MemoryEstimator.HasEstimate heapObject = entry.getValue();
results
.add(new GaugeMetricFamily(BASE + "heap_object_" + name + "_size_bytes", "Bytes of memory used by " + name,
heapObject.estimateMemoryUsageBytes()));
}
return results;
}
}
private static class ThreadDetailsExports extends Collector {
private final OperatingSystemMXBean osBean;
public ThreadDetailsExports() {
this.osBean = ManagementFactory.getOperatingSystemMXBean();
}
private Map<Long, ProcessInfo.ThreadState> threads = Collections.synchronizedMap(new TreeMap<>());
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList<>(List.of(
new GaugeMetricFamily("jvm_available_processors", "Result of Runtime.getRuntime().availableProcessors()",
Runtime.getRuntime().availableProcessors()),
new GaugeMetricFamily("jvm_system_load_avg", "Result of OperatingSystemMXBean.getSystemLoadAverage()",
osBean.getSystemLoadAverage())
));
CounterMetricFamily threadCpuTimes = new CounterMetricFamily("jvm_thread_cpu_time_seconds",
"CPU time used by each thread", List.of("name", "id"));
mfs.add(threadCpuTimes);
CounterMetricFamily threadUserTimes = new CounterMetricFamily("jvm_thread_user_time_seconds",
"User time used by each thread", List.of("name", "id"));
mfs.add(threadUserTimes);
threads.putAll(ProcessInfo.getThreadStats());
for (ProcessInfo.ThreadState thread : threads.values()) {
var labels = List.of(thread.name(), Long.toString(thread.id()));
threadUserTimes.addMetric(labels, thread.userTimeNanos() / NANOSECONDS_PER_SECOND);
threadCpuTimes.addMetric(labels, thread.cpuTimeNanos() / NANOSECONDS_PER_SECOND);
}
return mfs;
}
}
}

Wyświetl plik

@ -1,6 +1,13 @@
package com.onthegomap.flatmap.monitoring;
public interface Stats {
import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;
import com.onthegomap.flatmap.MemoryEstimator;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public interface Stats extends AutoCloseable {
void time(String name, Runnable task);
@ -10,7 +17,11 @@ public interface Stats {
Timers.Finishable startTimer(String name);
void gauge(String name, int value);
default void gauge(String name, Number value) {
gauge(name, () -> value);
}
void gauge(String name, Supplier<Number> value);
void emittedFeature(int z, String layer, int coveringTiles);
@ -20,8 +31,57 @@ public interface Stats {
Timers timers();
void monitorFile(String features, Path featureDbPath);
void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate heapObject);
void counter(String name, Supplier<Number> supplier);
default StatCounter longCounter(String name) {
StatCounter.AtomicCounter counter = new StatCounter.AtomicCounter();
counter(name, counter::get);
return counter;
}
default StatCounter nanoCounter(String name) {
StatCounter.AtomicCounter counter = new StatCounter.AtomicCounter();
counter(name, () -> counter.get() / NANOSECONDS_PER_SECOND);
return counter;
}
interface StatCounter {
void inc(long v);
default void inc() {
inc(1);
}
class NoopCounter implements StatCounter {
@Override
public void inc(long v) {
}
}
class AtomicCounter implements StatCounter {
private final AtomicLong counter = new AtomicLong(0);
@Override
public void inc(long v) {
counter.addAndGet(v);
}
public long get() {
return counter.get();
}
}
}
class InMemory implements Stats {
private static final StatCounter NOOP_COUNTER = new StatCounter.NoopCounter();
private final Timers timers = new Timers();
@Override
@ -49,12 +109,38 @@ public interface Stats {
}
@Override
public void gauge(String name, int value) {
public void monitorFile(String features, Path featureDbPath) {
}
@Override
public void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate heapObject) {
}
@Override
public void counter(String name, Supplier<Number> supplier) {
}
@Override
public StatCounter longCounter(String name) {
return NOOP_COUNTER;
}
@Override
public StatCounter nanoCounter(String name) {
return NOOP_COUNTER;
}
@Override
public void gauge(String name, Supplier<Number> value) {
}
@Override
public void emittedFeature(int z, String layer, int coveringTiles) {
}
@Override
public void close() throws Exception {
}
}
}

Wyświetl plik

@ -14,6 +14,10 @@ public class Timer {
return this;
}
public boolean running() {
return end == null;
}
public ProcessTime elapsed() {
return (end == null ? ProcessTime.now() : end).minus(start);
}

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.flatmap.monitoring;
import com.onthegomap.flatmap.Format;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
@ -9,7 +10,7 @@ import org.slf4j.LoggerFactory;
public class Timers {
private static final Logger LOGGER = LoggerFactory.getLogger(Stats.InMemory.class);
private final Map<String, Timer> timers = new LinkedHashMap<>();
private final Map<String, Timer> timers = Collections.synchronizedMap(new LinkedHashMap<>());
public void time(String name, Runnable task) {
Finishable timer = startTimer(name);
@ -32,6 +33,10 @@ public class Timers {
return () -> LOGGER.info("[" + name + "] Finished in " + timers.get(name).stop());
}
public Map<String, Timer> all() {
return new LinkedHashMap<>(timers);
}
public interface Finishable {
void stop();

Wyświetl plik

@ -79,11 +79,12 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima
this.nodeDb = nodeDb;
this.stats = stats;
this.profile = profile;
stats.monitorInMemoryObject("osm_relations", this);
}
public void pass1(CommonParams config) {
var topology = Topology.start("osm_pass1", stats)
.fromGenerator("pbf", osmInputFile.read(config.threads() - 1))
.fromGenerator("pbf", osmInputFile.read("pbfpass1", config.threads() - 1))
.addBuffer("reader_queue", 50_000, 10_000)
.sinkToConsumer("process", 1, this::processPass1);
@ -138,7 +139,7 @@ public class OpenStreetMapReader implements Closeable, MemoryEstimator.HasEstima
CountDownLatch waysDone = new CountDownLatch(processThreads);
var topology = Topology.start("osm_pass2", stats)
.fromGenerator("pbf", osmInputFile.read(readerThreads))
.fromGenerator("pbf", osmInputFile.read("pbfpass2", readerThreads))
.addBuffer("reader_queue", 50_000, 1_000)
.<FeatureSort.Entry>addWorker("process", processThreads, (prev, next) -> {
ReaderElement readerElement;

Wyświetl plik

@ -14,6 +14,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@ -71,8 +72,13 @@ public class OsmInputFile implements BoundsProvider, OsmSource {
}
}
public void readTo(Consumer<ReaderElement> next, int threads) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
public void readTo(Consumer<ReaderElement> next, String poolName, int threads) throws IOException {
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService executorService = Executors.newFixedThreadPool(threads, (runnable) -> {
Thread thread = threadFactory.newThread(runnable);
thread.setName(poolName + "-" + thread.getName());
return thread;
});
try (var stream = new BufferedInputStream(Files.newInputStream(path), 50_000)) {
PbfStreamSplitter streamSplitter = new PbfStreamSplitter(new DataInputStream(stream));
var sink = new ReaderElementSink(next);
@ -84,8 +90,8 @@ public class OsmInputFile implements BoundsProvider, OsmSource {
}
@Override
public Topology.SourceStep<ReaderElement> read(int threads) {
return next -> readTo(next, threads);
public Topology.SourceStep<ReaderElement> read(String poolName, int threads) {
return next -> readTo(next, poolName, threads);
}
private static record ReaderElementSink(Consumer<ReaderElement> queue) implements Sink {

Wyświetl plik

@ -5,5 +5,5 @@ import com.onthegomap.flatmap.worker.Topology;
public interface OsmSource {
Topology.SourceStep<ReaderElement> read(int threads);
Topology.SourceStep<ReaderElement> read(String poolName, int threads);
}

Wyświetl plik

@ -19,6 +19,10 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
private final int batchSize;
private final ConcurrentHashMap<Long, Queue<T>> queues = new ConcurrentHashMap<>();
private final int pendingBatchesCapacity;
private final Stats.StatCounter enqueueCountStat;
private final Stats.StatCounter enqueueBlockTimeNanos;
private final Stats.StatCounter dequeueCountStat;
private final Stats.StatCounter dequeueBlockTimeNanos;
private volatile boolean hasIncomingData = true;
private final AtomicInteger pendingCount = new AtomicInteger(0);
@ -26,6 +30,16 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
this.pendingBatchesCapacity = capacity / maxBatch;
this.batchSize = maxBatch;
itemQueue = new ArrayBlockingQueue<>(pendingBatchesCapacity);
stats.gauge(name + "_blocking_queue_capacity", () -> pendingBatchesCapacity);
stats.gauge(name + "_blocking_queue_size", itemQueue::size);
stats.gauge(name + "_capacity", this::getCapacity);
stats.gauge(name + "_size", this::getPending);
this.enqueueCountStat = stats.longCounter(name + "_enqueue_count");
this.enqueueBlockTimeNanos = stats.nanoCounter(name + "_enqueue_block_time_seconds");
this.dequeueCountStat = stats.longCounter(name + "_dequeue_count");
this.dequeueBlockTimeNanos = stats.nanoCounter(name + "_dequeue_block_time_seconds");
}
@Override
@ -58,6 +72,7 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
if (writeBatch.size() >= batchSize) {
flushWrites();
}
enqueueCountStat.inc();
}
private void flushWrites() {
@ -68,7 +83,9 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
queues.remove(Thread.currentThread().getId());
// blocks if full
if (!itemQueue.offer(writeBatch)) {
long start = System.nanoTime();
itemQueue.put(writeBatch);
enqueueBlockTimeNanos.inc(System.nanoTime() - start);
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
@ -81,6 +98,7 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
Queue<T> itemBatch = itemReadBatchProvider.get();
if (itemBatch == null || itemBatch.isEmpty()) {
long start = System.nanoTime();
do {
if (!hasIncomingData && itemQueue.isEmpty()) {
break;
@ -99,12 +117,14 @@ public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
}
} while (itemBatch == null);
itemReadBatchProvider.set(itemBatch);
dequeueBlockTimeNanos.inc(System.nanoTime() - start);
}
T result = itemBatch == null ? null : itemBatch.poll();
if (result != null) {
pendingCount.decrementAndGet();
}
dequeueCountStat.inc();
return result;
}

Wyświetl plik

@ -90,7 +90,7 @@ public class FlatMapTest {
private void processOsmFeatures(FeatureGroup featureGroup, Profile profile, CommonParams config,
List<? extends ReaderElement> osmElements) throws IOException {
OsmSource elems = threads -> next -> {
OsmSource elems = (name, threads) -> next -> {
// process the same order they come in from an OSM file
osmElements.stream().filter(e -> e.getType() == ReaderElement.FILEHEADER).forEachOrdered(next);
osmElements.stream().filter(e -> e.getType() == ReaderElement.NODE).forEachOrdered(next);

Wyświetl plik

@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test;
public class OpenStreetMapReaderTest {
public final OsmSource osmSource = threads -> next -> {
public final OsmSource osmSource = (name, threads) -> next -> {
};
private final Stats stats = new Stats.InMemory();
private final Profile profile = new Profile.NullProfile();

Wyświetl plik

@ -28,7 +28,7 @@ public class OsmInputFileTest {
AtomicInteger ways = new AtomicInteger(0);
AtomicInteger rels = new AtomicInteger(0);
Topology.start("test", new Stats.InMemory())
.fromGenerator("pbf", file.read(2))
.fromGenerator("pbf", file.read("test", 2))
.addBuffer("reader_queue", 1_000, 100)
.sinkToConsumer("counter", 1, elem -> {
switch (elem.getType()) {