topology and progress loggers

pull/1/head
Mike Barry 2021-04-15 20:54:33 -04:00
rodzic 3a08af2be0
commit b80a576348
24 zmienionych plików z 941 dodań i 100 usunięć

Wyświetl plik

@ -118,6 +118,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>

Wyświetl plik

@ -1,6 +1,6 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import java.io.File;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

Wyświetl plik

@ -1,6 +1,6 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import java.time.Duration;
import org.locationtech.jts.geom.Envelope;

Wyświetl plik

@ -0,0 +1,83 @@
package com.onthegomap.flatmap;
import java.text.NumberFormat;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
public class Format {
private static final NavigableMap<Long, String> STORAGE_SUFFIXES = new TreeMap<>(Map.ofEntries(
Map.entry(1_000L, "kB"),
Map.entry(1_000_000L, "MB"),
Map.entry(1_000_000_000L, "GB"),
Map.entry(1_000_000_000_000L, "TB"),
Map.entry(1_000_000_000_000_000L, "PB")
));
private static final NavigableMap<Long, String> NUMERIC_SUFFIXES = new TreeMap<>(Map.ofEntries(
Map.entry(1_000L, "k"),
Map.entry(1_000_000L, "M"),
Map.entry(1_000_000_000L, "B"),
Map.entry(1_000_000_000_000L, "T"),
Map.entry(1_000_000_000_000_000L, "Q")
));
private static final NumberFormat pf = NumberFormat.getPercentInstance();
private static final NumberFormat nf = NumberFormat.getNumberInstance();
static {
pf.setMaximumFractionDigits(0);
nf.setMaximumFractionDigits(1);
}
public static String padRight(String str, int size) {
StringBuilder strBuilder = new StringBuilder(str);
while (strBuilder.length() < size) {
strBuilder.append(" ");
}
return strBuilder.toString();
}
public static String padLeft(String str, int size) {
StringBuilder strBuilder = new StringBuilder(str);
while (strBuilder.length() < size) {
strBuilder.insert(0, " ");
}
return strBuilder.toString();
}
public static String formatStorage(Number num, boolean pad) {
return format(num, pad, STORAGE_SUFFIXES);
}
public static String formatNumeric(Number num, boolean pad) {
return format(num, pad, NUMERIC_SUFFIXES);
}
public static String format(Number num, boolean pad, NavigableMap<Long, String> suffixes) {
long value = num.longValue();
if (value < 0) {
return "-" + format(-value, pad, suffixes);
}
if (value < 1000) {
return padLeft(Long.toString(value), pad ? 4 : 0);
}
Map.Entry<Long, String> e = suffixes.floorEntry(value);
Long divideBy = e.getKey();
String suffix = e.getValue();
long truncated = value / (divideBy / 10);
boolean hasDecimal = truncated < 100 && (truncated % 10 != 0);
return padLeft(hasDecimal ? (truncated / 10d) + suffix : (truncated / 10) + suffix, pad ? 4 : 0);
}
public static String formatPercent(double value) {
return pf.format(value);
}
public static String formatDecimal(double value) {
return nf.format(value);
}
}

Wyświetl plik

@ -6,7 +6,8 @@ import static com.onthegomap.flatmap.GeoUtils.z;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap.TileFeatures;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.ByteArrayOutputStream;
import java.io.File;

Wyświetl plik

@ -2,7 +2,7 @@ package com.onthegomap.flatmap;
import com.google.protobuf.ByteString;
import com.graphhopper.reader.ReaderElement;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import java.io.DataInputStream;

Wyświetl plik

@ -1,68 +0,0 @@
package com.onthegomap.flatmap;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import com.onthegomap.flatmap.worker.Worker;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
public class ProgressLoggers {
public ProgressLoggers(String name) {
}
public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value) {
return addRatePercentCounter(name, total, value::get);
}
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) {
return this;
}
public ProgressLoggers addRateCounter(String name, AtomicLong featuresWritten) {
return this;
}
public ProgressLoggers addFileSize(LongSupplier getStorageSize) {
return this;
}
public ProgressLoggers addFileSize(File filePath) {
return this;
}
public ProgressLoggers addProcessStats() {
return this;
}
public ProgressLoggers addInMemoryObject(String name, LongSupplier size) {
return this;
}
public ProgressLoggers addThreadPoolStats(String name, String prefix) {
return this;
}
public ProgressLoggers addThreadPoolStats(String name, Worker worker) {
return addThreadPoolStats(name, worker.getPrefix());
}
public ProgressLoggers addQueueStats(WorkQueue<?> queue) {
return this;
}
public ProgressLoggers addTopologyStats(Topology<?> topology) {
if (topology == null) {
return this;
}
return addTopologyStats(topology.previous())
.addQueueStats(topology.inputQueue())
.addThreadPoolStats(topology.name(), topology.worker());
}
public ProgressLoggers add(String s) {
return this;
}
}

Wyświetl plik

@ -12,8 +12,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.graphhopper.coll.GHLongObjectHashMap;
import com.graphhopper.reader.ReaderElement;
import com.graphhopper.util.StopWatch;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.profiles.OpenMapTilesProfile;
import com.onthegomap.flatmap.stats.Stats;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;

Wyświetl plik

@ -2,7 +2,7 @@ package com.onthegomap.flatmap.collections;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.VectorTile;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.function.Consumer;

Wyświetl plik

@ -0,0 +1,79 @@
package com.onthegomap.flatmap.monitoring;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Map;
import java.util.TreeMap;
public class ProcessInfo {
public static Duration getProcessCpuTime() {
try {
return Duration.ofNanos(callLongGetter("getProcessCpuTime", ManagementFactory.getOperatingSystemMXBean()));
} catch (NoSuchMethodException | InvocationTargetException e) {
return Duration.ZERO;
}
}
private static Long callLongGetter(String getterName, Object obj)
throws NoSuchMethodException, InvocationTargetException {
return callLongGetter(obj.getClass().getMethod(getterName), obj);
}
private static Long callLongGetter(Method method, Object obj) throws InvocationTargetException {
try {
return (Long) method.invoke(obj);
} catch (IllegalAccessException e) {
// Expected, the declaring class or interface might not be public.
}
// Iterate over all implemented/extended interfaces and attempt invoking the method with the
// same name and parameters on each.
for (Class<?> clazz : method.getDeclaringClass().getInterfaces()) {
try {
Method interfaceMethod = clazz.getMethod(method.getName(), method.getParameterTypes());
Long result = callLongGetter(interfaceMethod, obj);
if (result != null) {
return result;
}
} catch (NoSuchMethodException e) {
// Expected, class might implement multiple, unrelated interfaces.
}
}
return null;
}
public static record ThreadState(String name, long cpuTimeNanos, long id) {
public static final ThreadState DEFAULT = new ThreadState("", 0, -1);
}
public static Duration getGcTime() {
long total = 0;
for (final GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {
total += gc.getCollectionTime();
}
return Duration.ofMillis(total);
}
public static Map<Long, ThreadState> getThreadStats() {
Map<Long, ThreadState> threadState = new TreeMap<>();
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo thread : threadMXBean.dumpAllThreads(false, false)) {
threadState.put(thread.getThreadId(),
new ThreadState(thread.getThreadName(), threadMXBean.getThreadCpuTime(thread.getThreadId()),
thread.getThreadId()));
}
return threadState;
}
}

Wyświetl plik

@ -0,0 +1,231 @@
package com.onthegomap.flatmap.monitoring;
import static com.onthegomap.flatmap.Format.formatNumeric;
import static com.onthegomap.flatmap.Format.formatPercent;
import static com.onthegomap.flatmap.Format.formatStorage;
import static com.onthegomap.flatmap.Format.padLeft;
import static com.onthegomap.flatmap.Format.padRight;
import com.graphhopper.util.Helper;
import com.onthegomap.flatmap.Format;
import com.onthegomap.flatmap.monitoring.ProcessInfo.ThreadState;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.WorkQueue;
import com.onthegomap.flatmap.worker.Worker;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.DoubleFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProgressLoggers {
private static final Logger LOGGER = LoggerFactory.getLogger(ProgressLoggers.class);
private final List<Object> loggers;
private final String prefix;
public ProgressLoggers(String prefix) {
this.prefix = prefix;
loggers = new ArrayList<>();
}
public String getLog() {
return "[" + prefix + "]" + loggers.stream()
.map(Object::toString)
.collect(Collectors.joining(""));
}
public ProgressLoggers addRateCounter(String name, LongSupplier getValue) {
AtomicLong last = new AtomicLong(getValue.getAsLong());
AtomicLong lastTime = new AtomicLong(System.nanoTime());
loggers.add(new ProgressLogger(name, () -> {
long now = System.nanoTime();
long valueNow = getValue.getAsLong();
double timeDiff = (now - lastTime.get()) * 1d / (1d * TimeUnit.SECONDS.toNanos(1));
double valueDiff = valueNow - last.get();
last.set(valueNow);
lastTime.set(now);
return "[ " + formatNumeric(valueNow, true) + " " + formatNumeric(valueDiff / timeDiff, true) + "/s ]";
}));
return this;
}
public ProgressLoggers addRateCounter(String name, AtomicLong value) {
return addRateCounter(name, value::get);
}
public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value) {
return addRatePercentCounter(name, total, value::get);
}
public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) {
AtomicLong last = new AtomicLong(getValue.getAsLong());
AtomicLong lastTime = new AtomicLong(System.nanoTime());
loggers.add(new ProgressLogger(name, () -> {
long now = System.nanoTime();
long valueNow = getValue.getAsLong();
double timeDiff = (now - lastTime.get()) * 1d / (1d * TimeUnit.SECONDS.toNanos(1));
double valueDiff = valueNow - last.get();
last.set(valueNow);
lastTime.set(now);
return "[ " + formatNumeric(valueNow, true) + " " + padLeft(formatPercent(1f * valueNow / total), 4) + " "
+ formatNumeric(valueDiff / timeDiff, true) + "/s ]";
}));
return this;
}
public ProgressLoggers addPercentCounter(String name, long total, AtomicLong getValue) {
loggers.add(new ProgressLogger(name, () -> {
long valueNow = getValue.get();
return "[ " + padLeft("" + valueNow, 3) + " / " + padLeft("" + total, 3) + " " + padLeft(
formatPercent(1f * valueNow / total), 4) + " ]";
}));
return this;
}
public ProgressLoggers addQueueStats(WorkQueue<?> queue) {
loggers.add(new TopologyLogger(() ->
" -> " + padLeft("(" +
formatNumeric(queue.getPending(), false)
+ "/" +
formatNumeric(queue.getCapacity(), false)
+ ")", 9)
));
return this;
}
public ProgressLoggers add(Object obj) {
loggers.add(obj);
return this;
}
public ProgressLoggers addFileSize(File file) {
return addFileSize(file::length);
}
public ProgressLoggers addFileSize(LongSupplier longSupplier) {
loggers.add(new Object() {
@Override
public String toString() {
return " " + padRight(formatBytes(longSupplier.getAsLong(), false), 5);
}
});
return this;
}
public ProgressLoggers addProcessStats() {
addDeltaLogger("cpus", ProcessInfo::getProcessCpuTime, Format::formatDecimal);
addDeltaLogger("gc", ProcessInfo::getGcTime, Format::formatPercent);
loggers.add(new ProgressLogger("mem",
() -> padLeft(formatMB(Helper.getUsedMB(), false) + " / " + formatMB(Helper.getTotalMB(), false), 7)));
return this;
}
private void addDeltaLogger(String name, Supplier<Duration> supplier, DoubleFunction<String> format) {
AtomicLong lastValue = new AtomicLong(supplier.get().toNanos());
AtomicLong lastTime = new AtomicLong(System.nanoTime());
loggers.add(new ProgressLogger(name, () -> {
long currentValue = supplier.get().toNanos();
if (currentValue < 0) {
return "-";
}
long currentTime = System.nanoTime();
double rate = 1d * (currentValue - lastValue.get()) / (currentTime - lastTime.get());
lastTime.set(currentTime);
lastValue.set(currentValue);
return padLeft(format.apply(rate), 3);
}));
}
public ProgressLoggers addThreadPoolStats(String name, String prefix) {
boolean first = loggers.isEmpty() || !(loggers.get(loggers.size() - 1) instanceof TopologyLogger);
try {
Map<Long, ThreadState> lastThreads = ProcessInfo.getThreadStats();
AtomicLong lastTime = new AtomicLong(System.nanoTime());
loggers.add(new TopologyLogger(() -> {
var oldAndNewThreads = new TreeMap<>(lastThreads);
var newThreads = ProcessInfo.getThreadStats();
oldAndNewThreads.putAll(newThreads);
long currentTime = System.nanoTime();
double timeDiff = 1d * (currentTime - lastTime.get());
String percents = oldAndNewThreads.values().stream()
.filter(thread -> thread.name().startsWith(prefix))
.map(thread -> {
if (!newThreads.containsKey(thread.id())) {
return " -%";
}
long last = lastThreads.getOrDefault(thread.id(), ThreadState.DEFAULT).cpuTimeNanos();
return padLeft(formatPercent(1d * (thread.cpuTimeNanos() - last) / timeDiff), 3);
}).collect(Collectors.joining(" ", "(", ")"));
lastTime.set(currentTime);
lastThreads.putAll(newThreads);
return (first ? " | " : " -> ") + name + percents;
}));
} catch (Throwable e) {
// can't get CPU stats per-thread
}
return this;
}
public ProgressLoggers addThreadPoolStats(String name, Worker worker) {
return addThreadPoolStats(name, worker.getPrefix());
}
private String formatBytes(long bytes, boolean pad) {
return formatStorage(bytes, pad);
}
private String formatMB(long mb, boolean pad) {
return formatStorage(mb * Helper.MB, pad);
}
public void log() {
LOGGER.info(getLog());
}
public ProgressLoggers addInMemoryObject(String name, LongSupplier getSize) {
loggers.add(new ProgressLogger(name, () -> formatStorage(getSize.getAsLong(), true)));
return this;
}
public ProgressLoggers addTopologyStats(Topology<?> topology) {
if (topology != null) {
addTopologyStats(topology.previous());
if (topology.inputQueue() != null) {
addQueueStats(topology.inputQueue());
}
if (topology.worker() != null) {
addThreadPoolStats(topology.name(), topology.worker());
}
}
return this;
}
private static record ProgressLogger(String name, Supplier<String> fn) {
@Override
public String toString() {
return " " + name + ": " + fn.get();
}
}
private static record TopologyLogger(Supplier<String> fn) {
@Override
public String toString() {
return fn.get();
}
}
}

Wyświetl plik

@ -1,4 +1,4 @@
package com.onthegomap.flatmap.stats;
package com.onthegomap.flatmap.monitoring;
import com.graphhopper.util.StopWatch;
import org.slf4j.Logger;
@ -16,6 +16,8 @@ public interface Stats {
void encodedTile(int zoom, int length);
void gauge(String name, int value);
class InMemory implements Stats {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemory.class);
@ -47,5 +49,10 @@ public interface Stats {
public void encodedTile(int zoom, int length) {
}
@Override
public void gauge(String name, int value) {
}
}
}

Wyświetl plik

@ -1,7 +1,7 @@
package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.io.File;

Wyświetl plik

@ -12,7 +12,6 @@ import com.onthegomap.flatmap.FlatMapConfig;
import com.onthegomap.flatmap.GeoUtils;
import com.onthegomap.flatmap.OsmInputFile;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
@ -20,7 +19,8 @@ import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.LongLongMap;
import com.onthegomap.flatmap.collections.LongLongMultimap;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.Closeable;
import java.io.IOException;

Wyświetl plik

@ -3,13 +3,13 @@ package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.FeatureRenderer;
import com.onthegomap.flatmap.FlatMapConfig;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.RenderableFeature;
import com.onthegomap.flatmap.RenderableFeatures;
import com.onthegomap.flatmap.RenderedFeature;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.collections.MergeSortFeatureMap;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.util.concurrent.atomic.AtomicLong;

Wyświetl plik

@ -1,7 +1,7 @@
package com.onthegomap.flatmap.reader;
import com.onthegomap.flatmap.SourceFeature;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology.SourceStep;
import java.io.File;

Wyświetl plik

@ -1,7 +1,7 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import java.time.Duration;
import java.util.Iterator;
import java.util.function.Consumer;
@ -21,10 +21,13 @@ public record Topology<T>(
public void awaitAndLog(ProgressLoggers loggers, Duration logInterval) {
if (previous != null) {
previous.awaitAndLog(loggers, logInterval);
} else { // producer is responsible for closing the initial input queue
}
if (inputQueue != null) {
inputQueue.close();
}
worker.awaitAndLog(loggers, logInterval);
if (worker != null) {
worker.awaitAndLog(loggers, logInterval);
}
}
public interface SourceStep<O> {
@ -56,8 +59,8 @@ public record Topology<T>(
public <T> Bufferable<?, T> fromGenerator(String name, SourceStep<T> producer, int threads) {
return (queueName, size, batchSize) -> {
var nextQueue = new WorkQueue<T>(prefix, queueName, size, batchSize, stats);
Worker worker = new Worker(prefix, name, stats, threads, () -> producer.run(nextQueue));
var nextQueue = new WorkQueue<T>(prefix + "_" + queueName, size, batchSize, stats);
Worker worker = new Worker(prefix + "_" + name, stats, threads, () -> producer.run(nextQueue));
return new Builder<>(prefix, name, nextQueue, worker, stats);
};
}
@ -99,8 +102,8 @@ public record Topology<T>(
public <O2> Bufferable<O, O2> addWorker(String name, int threads, WorkerStep<O, O2> step) {
Builder<I, O> curr = this;
return (queueName, size, batchSize) -> {
var nextOutputQueue = new WorkQueue<O2>(prefix, queueName, size, batchSize, stats);
var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
var nextOutputQueue = new WorkQueue<O2>(prefix + "_" + queueName, size, batchSize, stats);
var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue, nextOutputQueue));
return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats);
};
}
@ -111,8 +114,8 @@ public record Topology<T>(
}
public Topology<O> sinkTo(String name, int threads, SinkStep<O> step) {
var previousTopology = previous.build();
var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue));
var previousTopology = build();
var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue));
return new Topology<>(name, previousTopology, outputQueue, worker);
}

Wyświetl plik

@ -1,29 +1,120 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.Stats;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public WorkQueue(String prefix, String name, int capacity, int maxBatch, Stats stats) {
private final ThreadLocal<Queue<T>> itemWriteBatchProvider = new ThreadLocal<>();
private final ThreadLocal<Queue<T>> itemReadBatchProvider = new ThreadLocal<>();
private final BlockingQueue<Queue<T>> itemQueue;
private final int batchSize;
private final ConcurrentHashMap<Long, Queue<T>> queues = new ConcurrentHashMap<>();
private final int pendingBatchesCapacity;
private volatile boolean hasIncomingData = true;
private final AtomicInteger pendingCount = new AtomicInteger(0);
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
this.pendingBatchesCapacity = capacity / maxBatch;
this.batchSize = maxBatch;
itemQueue = new ArrayBlockingQueue<>(pendingBatchesCapacity);
}
@Override
public void close() {
for (Queue<T> q : queues.values()) {
try {
if (!q.isEmpty()) {
itemQueue.put(q);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
hasIncomingData = false;
}
@Override
public void accept(T t) {
public void accept(T item) {
// past 4-8 concurrent writers, start getting lock contention adding to the blocking queue so add to the
// queue in lass frequent, larger batches
Queue<T> writeBatch = itemWriteBatchProvider.get();
if (writeBatch == null) {
itemWriteBatchProvider.set(writeBatch = new ArrayDeque<>(batchSize));
queues.put(Thread.currentThread().getId(), writeBatch);
}
writeBatch.offer(item);
pendingCount.incrementAndGet();
if (writeBatch.size() >= batchSize) {
flushWrites();
}
}
private void flushWrites() {
Queue<T> writeBatch = itemWriteBatchProvider.get();
if (writeBatch != null && !writeBatch.isEmpty()) {
try {
itemWriteBatchProvider.set(null);
queues.remove(Thread.currentThread().getId());
// blocks if full
if (!itemQueue.offer(writeBatch)) {
itemQueue.put(writeBatch);
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
@Override
public T get() {
return null;
Queue<T> itemBatch = itemReadBatchProvider.get();
if (itemBatch == null || itemBatch.isEmpty()) {
do {
if (!hasIncomingData && itemQueue.isEmpty()) {
break;
}
if ((itemBatch = itemQueue.poll()) == null) {
try {
itemBatch = itemQueue.poll(100, TimeUnit.MILLISECONDS);
if (itemBatch != null) {
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;// signal EOF
}
}
} while (itemBatch == null);
itemReadBatchProvider.set(itemBatch);
}
T result = itemBatch == null ? null : itemBatch.poll();
if (result != null) {
pendingCount.decrementAndGet();
}
return result;
}
public int getPending() {
return pendingCount.get();
}
public int getCapacity() {
return pendingBatchesCapacity * batchSize;
}
}

Wyświetl plik

@ -1,20 +1,96 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.ProgressLoggers;
import com.onthegomap.flatmap.stats.Stats;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Worker {
public Worker(String prefix, String name, Stats stats, int threads, RunnableThatThrows task) {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
private final ExecutorService es;
private final String prefix;
private final Stats stats;
private static class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name + "-";
}
@Override
public Thread newThread(@NotNull Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
public Worker(String prefix, Stats stats, int threads, RunnableThatThrows task) {
this.prefix = prefix;
this.stats = stats;
stats.gauge(prefix + "_threads", threads);
es = Executors.newFixedThreadPool(threads, new NamedThreadFactory(prefix));
for (int i = 0; i < threads; i++) {
es.submit(() -> {
String id = Thread.currentThread().getName();
LOGGER.debug("Starting worker");
try {
task.run();
} catch (Throwable e) {
System.err.println("Worker " + id + " died");
e.printStackTrace();
System.exit(1);
} finally {
LOGGER.debug("Finished worker");
}
});
}
es.shutdown();
}
public String getPrefix() {
return null;
return prefix;
}
public void awaitAndLog(ProgressLoggers loggers, Duration longInterval) {
try {
while (!es.awaitTermination(longInterval.toNanos(), TimeUnit.NANOSECONDS)) {
loggers.log();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
loggers.log();
}
public void await() {
try {
es.awaitTermination(365, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public interface RunnableThatThrows {

Wyświetl plik

@ -0,0 +1,71 @@
package com.onthegomap.flatmap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
public class FormatTest {
@ParameterizedTest
@CsvSource({
"1.5,1",
"999,999",
"1000,1k",
"9999,9.9k",
"10001,10k",
"99999,99k",
"999999,999k",
"9999999,9.9M",
"-9999999,-9.9M",
"5.5e12,5.5T",
})
public void testFormatNumeric(Double number, String formatted) {
assertEquals(Format.formatNumeric(number, false), formatted);
}
@ParameterizedTest
@CsvSource({
"999,999",
"1000,1kB",
"9999,9.9kB",
"5.5e9,5.5GB",
})
public void testFormatStorage(Double number, String formatted) {
assertEquals(formatted, Format.formatStorage(number, false));
}
@ParameterizedTest
@CsvSource({
"0,0%",
"1,100%",
"0.11111,11%",
})
public void testFormatPercent(Double number, String formatted) {
assertEquals(formatted, Format.formatPercent(number));
}
@ParameterizedTest
@CsvSource({
"a,0,a",
"a,1,a",
"a,2,' a'",
"a,3,' a'",
"ab,3,' ab'",
"abc,3,'abc'",
})
public void testPad(String in, Integer size, String out) {
assertEquals(out, Format.padLeft(in, size));
}
@ParameterizedTest
@CsvSource({
"0,0",
"0.1,0.1",
"0.11,0.1",
"1111.11,'1,111.1'",
})
public void testFormatDecimal(Double in, String out) {
assertEquals(out, Format.formatDecimal(in));
}
}

Wyświetl plik

@ -0,0 +1,24 @@
package com.onthegomap.flatmap.monitoring;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
public class ProcessInfoTest {
@Test
public void testGC() {
assertTrue(ProcessInfo.getGcTime().toNanos() >= 0);
}
@Test
public void testCPU() {
assertTrue(ProcessInfo.getProcessCpuTime().toNanos() > 0);
}
@Test
public void testThreads() {
assertFalse(ProcessInfo.getThreadStats().isEmpty());
}
}

Wyświetl plik

@ -0,0 +1,39 @@
package com.onthegomap.flatmap.monitoring;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.onthegomap.flatmap.monitoring.Stats.InMemory;
import com.onthegomap.flatmap.worker.Topology;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class ProgressLoggersTest {
@Test
@Timeout(10)
public void testLogTopology() {
var latch = new CountDownLatch(1);
var topology = Topology.start("topo", new InMemory())
.fromGenerator("reader", next -> latch.await())
.addBuffer("reader_queue", 10)
.addWorker("worker", 2, (a, b) -> latch.await())
.addBuffer("writer_queue", 10)
.sinkTo("writer", 2, a -> latch.await());
var loggers = new ProgressLoggers("prefix")
.addTopologyStats(topology);
String log;
while ((log = loggers.getLog()).split("%").length < 6) {
// spin waiting for threads to start
}
assertEquals("[prefix] | reader( 0%) -> (0/10) -> worker( 0% 0%) -> (0/10) -> writer( 0% 0%)", log);
latch.countDown();
topology.awaitAndLog(loggers, Duration.ofSeconds(10));
assertEquals("[prefix] | reader( -%) -> (0/10) -> worker( -% -%) -> (0/10) -> writer( -% -%)",
loggers.getLog());
}
}

Wyświetl plik

@ -0,0 +1,89 @@
package com.onthegomap.flatmap.worker;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class TopologyTest {
Stats stats = new Stats.InMemory();
@Test
@Timeout(10)
public void testSimpleTopology() {
Set<Integer> result = Collections.synchronizedSet(new TreeSet<>());
var topology = Topology.start("test", stats)
.<Integer>fromGenerator("reader", (next) -> {
next.accept(0);
next.accept(1);
}).addBuffer("reader_queue", 1)
.<Integer>addWorker("process", 1, (prev, next) -> {
Integer item;
while ((item = prev.get()) != null) {
next.accept(item * 2 + 1);
next.accept(item * 2 + 2);
}
}).addBuffer("writer_queue", 1)
.sinkToConsumer("writer", 1, result::add);
topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1));
assertEquals(Set.of(1, 2, 3, 4), result);
}
@Test
@Timeout(10)
public void testTopologyFromQueue() {
var queue = new WorkQueue<Integer>("readerqueue", 10, 1, stats);
Set<Integer> result = Collections.synchronizedSet(new TreeSet<>());
var topology = Topology.start("test", stats)
.<Integer>readFromQueue(queue)
.<Integer>addWorker("process", 1, (prev, next) -> {
Integer item;
while ((item = prev.get()) != null) {
next.accept(item * 2 + 1);
next.accept(item * 2 + 2);
}
}).addBuffer("writer_queue", 1)
.sinkToConsumer("writer", 1, result::add);
new Thread(() -> {
queue.accept(0);
queue.accept(1);
queue.close();
}).start();
topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1));
assertEquals(Set.of(1, 2, 3, 4), result);
}
@Test
@Timeout(10)
public void testTopologyFromIterator() {
Set<Integer> result = Collections.synchronizedSet(new TreeSet<>());
var topology = Topology.start("test", stats)
.readFromIterator("reader", List.of(0, 1).iterator())
.addBuffer("reader_queue", 1)
.<Integer>addWorker("process", 1, (prev, next) -> {
Integer item;
while ((item = prev.get()) != null) {
next.accept(item * 2 + 1);
next.accept(item * 2 + 2);
}
}).addBuffer("writer_queue", 1)
.sinkToConsumer("writer", 1, result::add);
topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1));
assertEquals(Set.of(1, 2, 3, 4), result);
}
}

Wyświetl plik

@ -0,0 +1,108 @@
package com.onthegomap.flatmap.worker;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import com.onthegomap.flatmap.monitoring.Stats;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class WorkQueueTest {
@Test
@Timeout(10)
public void testEmpty() {
WorkQueue<String> q = newQueue(1);
q.close();
assertNull(q.get());
}
@Test
@Timeout(10)
public void testOneItem() {
WorkQueue<String> q = newQueue(1);
q.accept("a");
q.close();
assertEquals("a", q.get());
assertNull(q.get());
}
@Test
@Timeout(10)
public void testMoreItemsThanBatchSize() {
WorkQueue<String> q = newQueue(2);
q.accept("a");
q.accept("b");
q.accept("c");
q.close();
assertEquals("a", q.get());
assertEquals("b", q.get());
assertEquals("c", q.get());
assertNull(q.get());
}
@Test
@Timeout(10)
public void testManyItems() {
WorkQueue<Integer> q = newQueue(100);
for (int i = 0; i < 950; i++) {
q.accept(i);
}
q.close();
for (int i = 0; i < 950; i++) {
assertEquals((Integer) i, q.get());
}
assertNull(q.get());
}
@Test
@Timeout(10)
public void testTwoWriters() {
WorkQueue<Integer> q = newQueue(2);
AtomicInteger ni = new AtomicInteger(0);
new Worker("worker", stats, 2, () -> {
int i = ni.getAndIncrement();
q.accept(i);
}).await();
q.close();
assertEquals(2, q.getPending());
Set<Integer> found = new TreeSet<>();
for (int i = 0; i < 2; i++) {
found.add(q.get());
}
assertNull(q.get());
assertEquals(Set.of(0, 1), found);
assertEquals(0, q.getPending());
}
@Test
@Timeout(10)
public void testTwoWritersManyElements() {
WorkQueue<Integer> q = newQueue(2);
AtomicInteger ni = new AtomicInteger(0);
new Worker("worker", stats, 2, () -> {
int i = ni.getAndIncrement();
q.accept(i * 3);
q.accept(i * 3 + 1);
q.accept(i * 3 + 2);
}).await();
q.close();
assertEquals(6, q.getPending());
Set<Integer> found = new TreeSet<>();
for (int i = 0; i < 6; i++) {
found.add(q.get());
}
assertNull(q.get());
assertEquals(Set.of(0, 1, 2, 3, 4, 5), found);
assertEquals(0, q.getPending());
}
private <T> WorkQueue<T> newQueue(int maxBatch) {
return new WorkQueue<>("queue", 1000, maxBatch, stats);
}
private static final Stats stats = new Stats.InMemory();
}