From b80a5763484b78d5c623c4beede45d0b7d0fe66e Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Thu, 15 Apr 2021 20:54:33 -0400 Subject: [PATCH] topology and progress loggers --- pom.xml | 6 + .../com/onthegomap/flatmap/Arguments.java | 2 +- .../com/onthegomap/flatmap/FlatMapConfig.java | 2 +- .../java/com/onthegomap/flatmap/Format.java | 83 +++++++ .../com/onthegomap/flatmap/MbtilesWriter.java | 3 +- .../com/onthegomap/flatmap/OsmInputFile.java | 2 +- .../onthegomap/flatmap/ProgressLoggers.java | 68 ------ .../java/com/onthegomap/flatmap/Wikidata.java | 3 +- .../collections/MergeSortFeatureMap.java | 2 +- .../flatmap/monitoring/ProcessInfo.java | 79 ++++++ .../flatmap/monitoring/ProgressLoggers.java | 231 ++++++++++++++++++ .../flatmap/{stats => monitoring}/Stats.java | 9 +- .../flatmap/reader/NaturalEarthReader.java | 2 +- .../flatmap/reader/OpenStreetMapReader.java | 4 +- .../com/onthegomap/flatmap/reader/Reader.java | 4 +- .../flatmap/reader/ShapefileReader.java | 2 +- .../onthegomap/flatmap/worker/Topology.java | 23 +- .../onthegomap/flatmap/worker/WorkQueue.java | 101 +++++++- .../com/onthegomap/flatmap/worker/Worker.java | 84 ++++++- .../com/onthegomap/flatmap/FormatTest.java | 71 ++++++ .../flatmap/monitoring/ProcessInfoTest.java | 24 ++ .../monitoring/ProgressLoggersTest.java | 39 +++ .../flatmap/worker/TopologyTest.java | 89 +++++++ .../flatmap/worker/WorkQueueTest.java | 108 ++++++++ 24 files changed, 941 insertions(+), 100 deletions(-) create mode 100644 src/main/java/com/onthegomap/flatmap/Format.java delete mode 100644 src/main/java/com/onthegomap/flatmap/ProgressLoggers.java create mode 100644 src/main/java/com/onthegomap/flatmap/monitoring/ProcessInfo.java create mode 100644 src/main/java/com/onthegomap/flatmap/monitoring/ProgressLoggers.java rename src/main/java/com/onthegomap/flatmap/{stats => monitoring}/Stats.java (85%) create mode 100644 src/test/java/com/onthegomap/flatmap/FormatTest.java create mode 100644 src/test/java/com/onthegomap/flatmap/monitoring/ProcessInfoTest.java create mode 100644 src/test/java/com/onthegomap/flatmap/monitoring/ProgressLoggersTest.java create mode 100644 src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java create mode 100644 src/test/java/com/onthegomap/flatmap/worker/WorkQueueTest.java diff --git a/pom.xml b/pom.xml index 19855553..85215898 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,12 @@ ${junit.version} test + + org.junit.jupiter + junit-jupiter-params + ${junit.version} + test + org.junit.jupiter junit-jupiter-engine diff --git a/src/main/java/com/onthegomap/flatmap/Arguments.java b/src/main/java/com/onthegomap/flatmap/Arguments.java index 8517e169..746b67ed 100644 --- a/src/main/java/com/onthegomap/flatmap/Arguments.java +++ b/src/main/java/com/onthegomap/flatmap/Arguments.java @@ -1,6 +1,6 @@ package com.onthegomap.flatmap; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import java.io.File; import java.time.Duration; import java.time.temporal.ChronoUnit; diff --git a/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java b/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java index c1094be2..ea7ab217 100644 --- a/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java +++ b/src/main/java/com/onthegomap/flatmap/FlatMapConfig.java @@ -1,6 +1,6 @@ package com.onthegomap.flatmap; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import java.time.Duration; import org.locationtech.jts.geom.Envelope; diff --git a/src/main/java/com/onthegomap/flatmap/Format.java b/src/main/java/com/onthegomap/flatmap/Format.java new file mode 100644 index 00000000..ef14142e --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/Format.java @@ -0,0 +1,83 @@ +package com.onthegomap.flatmap; + +import java.text.NumberFormat; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +public class Format { + + private static final NavigableMap STORAGE_SUFFIXES = new TreeMap<>(Map.ofEntries( + Map.entry(1_000L, "kB"), + Map.entry(1_000_000L, "MB"), + Map.entry(1_000_000_000L, "GB"), + Map.entry(1_000_000_000_000L, "TB"), + Map.entry(1_000_000_000_000_000L, "PB") + )); + private static final NavigableMap NUMERIC_SUFFIXES = new TreeMap<>(Map.ofEntries( + Map.entry(1_000L, "k"), + Map.entry(1_000_000L, "M"), + Map.entry(1_000_000_000L, "B"), + Map.entry(1_000_000_000_000L, "T"), + Map.entry(1_000_000_000_000_000L, "Q") + )); + + private static final NumberFormat pf = NumberFormat.getPercentInstance(); + + private static final NumberFormat nf = NumberFormat.getNumberInstance(); + + static { + pf.setMaximumFractionDigits(0); + nf.setMaximumFractionDigits(1); + } + + public static String padRight(String str, int size) { + StringBuilder strBuilder = new StringBuilder(str); + while (strBuilder.length() < size) { + strBuilder.append(" "); + } + return strBuilder.toString(); + } + + public static String padLeft(String str, int size) { + StringBuilder strBuilder = new StringBuilder(str); + while (strBuilder.length() < size) { + strBuilder.insert(0, " "); + } + return strBuilder.toString(); + } + + public static String formatStorage(Number num, boolean pad) { + return format(num, pad, STORAGE_SUFFIXES); + } + + public static String formatNumeric(Number num, boolean pad) { + return format(num, pad, NUMERIC_SUFFIXES); + } + + public static String format(Number num, boolean pad, NavigableMap suffixes) { + long value = num.longValue(); + if (value < 0) { + return "-" + format(-value, pad, suffixes); + } + if (value < 1000) { + return padLeft(Long.toString(value), pad ? 4 : 0); + } + + Map.Entry e = suffixes.floorEntry(value); + Long divideBy = e.getKey(); + String suffix = e.getValue(); + + long truncated = value / (divideBy / 10); + boolean hasDecimal = truncated < 100 && (truncated % 10 != 0); + return padLeft(hasDecimal ? (truncated / 10d) + suffix : (truncated / 10) + suffix, pad ? 4 : 0); + } + + public static String formatPercent(double value) { + return pf.format(value); + } + + public static String formatDecimal(double value) { + return nf.format(value); + } +} diff --git a/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java b/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java index d9c0da71..c08e05fe 100644 --- a/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java +++ b/src/main/java/com/onthegomap/flatmap/MbtilesWriter.java @@ -6,7 +6,8 @@ import static com.onthegomap.flatmap.GeoUtils.z; import com.onthegomap.flatmap.collections.MergeSortFeatureMap; import com.onthegomap.flatmap.collections.MergeSortFeatureMap.TileFeatures; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology; import java.io.ByteArrayOutputStream; import java.io.File; diff --git a/src/main/java/com/onthegomap/flatmap/OsmInputFile.java b/src/main/java/com/onthegomap/flatmap/OsmInputFile.java index 2a24d038..a827418b 100644 --- a/src/main/java/com/onthegomap/flatmap/OsmInputFile.java +++ b/src/main/java/com/onthegomap/flatmap/OsmInputFile.java @@ -2,7 +2,7 @@ package com.onthegomap.flatmap; import com.google.protobuf.ByteString; import com.graphhopper.reader.ReaderElement; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology; import com.onthegomap.flatmap.worker.WorkQueue; import java.io.DataInputStream; diff --git a/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java b/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java deleted file mode 100644 index 9139c178..00000000 --- a/src/main/java/com/onthegomap/flatmap/ProgressLoggers.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.onthegomap.flatmap; - -import com.onthegomap.flatmap.worker.Topology; -import com.onthegomap.flatmap.worker.WorkQueue; -import com.onthegomap.flatmap.worker.Worker; -import java.io.File; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongSupplier; - -public class ProgressLoggers { - - public ProgressLoggers(String name) { - - } - - public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value) { - return addRatePercentCounter(name, total, value::get); - } - - public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) { - return this; - } - - public ProgressLoggers addRateCounter(String name, AtomicLong featuresWritten) { - return this; - } - - public ProgressLoggers addFileSize(LongSupplier getStorageSize) { - return this; - } - - public ProgressLoggers addFileSize(File filePath) { - return this; - } - - public ProgressLoggers addProcessStats() { - return this; - } - - public ProgressLoggers addInMemoryObject(String name, LongSupplier size) { - return this; - } - - public ProgressLoggers addThreadPoolStats(String name, String prefix) { - return this; - } - - public ProgressLoggers addThreadPoolStats(String name, Worker worker) { - return addThreadPoolStats(name, worker.getPrefix()); - } - - public ProgressLoggers addQueueStats(WorkQueue queue) { - return this; - } - - public ProgressLoggers addTopologyStats(Topology topology) { - if (topology == null) { - return this; - } - return addTopologyStats(topology.previous()) - .addQueueStats(topology.inputQueue()) - .addThreadPoolStats(topology.name(), topology.worker()); - } - - public ProgressLoggers add(String s) { - return this; - } -} diff --git a/src/main/java/com/onthegomap/flatmap/Wikidata.java b/src/main/java/com/onthegomap/flatmap/Wikidata.java index 77215c60..1cbae368 100644 --- a/src/main/java/com/onthegomap/flatmap/Wikidata.java +++ b/src/main/java/com/onthegomap/flatmap/Wikidata.java @@ -12,8 +12,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.graphhopper.coll.GHLongObjectHashMap; import com.graphhopper.reader.ReaderElement; import com.graphhopper.util.StopWatch; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.profiles.OpenMapTilesProfile; -import com.onthegomap.flatmap.stats.Stats; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.BufferedWriter; diff --git a/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java b/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java index e6202430..c67b0c4f 100644 --- a/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java +++ b/src/main/java/com/onthegomap/flatmap/collections/MergeSortFeatureMap.java @@ -2,7 +2,7 @@ package com.onthegomap.flatmap.collections; import com.onthegomap.flatmap.RenderedFeature; import com.onthegomap.flatmap.VectorTile; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import java.nio.file.Path; import java.util.Iterator; import java.util.function.Consumer; diff --git a/src/main/java/com/onthegomap/flatmap/monitoring/ProcessInfo.java b/src/main/java/com/onthegomap/flatmap/monitoring/ProcessInfo.java new file mode 100644 index 00000000..b9a69ba6 --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/monitoring/ProcessInfo.java @@ -0,0 +1,79 @@ +package com.onthegomap.flatmap.monitoring; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.Map; +import java.util.TreeMap; + +public class ProcessInfo { + + public static Duration getProcessCpuTime() { + try { + return Duration.ofNanos(callLongGetter("getProcessCpuTime", ManagementFactory.getOperatingSystemMXBean())); + } catch (NoSuchMethodException | InvocationTargetException e) { + return Duration.ZERO; + } + } + + private static Long callLongGetter(String getterName, Object obj) + throws NoSuchMethodException, InvocationTargetException { + return callLongGetter(obj.getClass().getMethod(getterName), obj); + } + + + private static Long callLongGetter(Method method, Object obj) throws InvocationTargetException { + try { + return (Long) method.invoke(obj); + } catch (IllegalAccessException e) { + // Expected, the declaring class or interface might not be public. + } + + // Iterate over all implemented/extended interfaces and attempt invoking the method with the + // same name and parameters on each. + for (Class clazz : method.getDeclaringClass().getInterfaces()) { + try { + Method interfaceMethod = clazz.getMethod(method.getName(), method.getParameterTypes()); + Long result = callLongGetter(interfaceMethod, obj); + if (result != null) { + return result; + } + } catch (NoSuchMethodException e) { + // Expected, class might implement multiple, unrelated interfaces. + } + } + + return null; + } + + + public static record ThreadState(String name, long cpuTimeNanos, long id) { + + public static final ThreadState DEFAULT = new ThreadState("", 0, -1); + + } + + + public static Duration getGcTime() { + long total = 0; + for (final GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) { + total += gc.getCollectionTime(); + } + return Duration.ofMillis(total); + } + + public static Map getThreadStats() { + Map threadState = new TreeMap<>(); + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + for (ThreadInfo thread : threadMXBean.dumpAllThreads(false, false)) { + threadState.put(thread.getThreadId(), + new ThreadState(thread.getThreadName(), threadMXBean.getThreadCpuTime(thread.getThreadId()), + thread.getThreadId())); + } + return threadState; + } +} diff --git a/src/main/java/com/onthegomap/flatmap/monitoring/ProgressLoggers.java b/src/main/java/com/onthegomap/flatmap/monitoring/ProgressLoggers.java new file mode 100644 index 00000000..e5ed8122 --- /dev/null +++ b/src/main/java/com/onthegomap/flatmap/monitoring/ProgressLoggers.java @@ -0,0 +1,231 @@ +package com.onthegomap.flatmap.monitoring; + +import static com.onthegomap.flatmap.Format.formatNumeric; +import static com.onthegomap.flatmap.Format.formatPercent; +import static com.onthegomap.flatmap.Format.formatStorage; +import static com.onthegomap.flatmap.Format.padLeft; +import static com.onthegomap.flatmap.Format.padRight; + +import com.graphhopper.util.Helper; +import com.onthegomap.flatmap.Format; +import com.onthegomap.flatmap.monitoring.ProcessInfo.ThreadState; +import com.onthegomap.flatmap.worker.Topology; +import com.onthegomap.flatmap.worker.WorkQueue; +import com.onthegomap.flatmap.worker.Worker; +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.DoubleFunction; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProgressLoggers { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProgressLoggers.class); + private final List loggers; + private final String prefix; + + public ProgressLoggers(String prefix) { + this.prefix = prefix; + loggers = new ArrayList<>(); + } + + public String getLog() { + return "[" + prefix + "]" + loggers.stream() + .map(Object::toString) + .collect(Collectors.joining("")); + } + + public ProgressLoggers addRateCounter(String name, LongSupplier getValue) { + AtomicLong last = new AtomicLong(getValue.getAsLong()); + AtomicLong lastTime = new AtomicLong(System.nanoTime()); + loggers.add(new ProgressLogger(name, () -> { + long now = System.nanoTime(); + long valueNow = getValue.getAsLong(); + double timeDiff = (now - lastTime.get()) * 1d / (1d * TimeUnit.SECONDS.toNanos(1)); + double valueDiff = valueNow - last.get(); + last.set(valueNow); + lastTime.set(now); + return "[ " + formatNumeric(valueNow, true) + " " + formatNumeric(valueDiff / timeDiff, true) + "/s ]"; + })); + return this; + } + + public ProgressLoggers addRateCounter(String name, AtomicLong value) { + return addRateCounter(name, value::get); + } + + public ProgressLoggers addRatePercentCounter(String name, long total, AtomicLong value) { + return addRatePercentCounter(name, total, value::get); + } + + public ProgressLoggers addRatePercentCounter(String name, long total, LongSupplier getValue) { + AtomicLong last = new AtomicLong(getValue.getAsLong()); + AtomicLong lastTime = new AtomicLong(System.nanoTime()); + loggers.add(new ProgressLogger(name, () -> { + long now = System.nanoTime(); + long valueNow = getValue.getAsLong(); + double timeDiff = (now - lastTime.get()) * 1d / (1d * TimeUnit.SECONDS.toNanos(1)); + double valueDiff = valueNow - last.get(); + last.set(valueNow); + lastTime.set(now); + return "[ " + formatNumeric(valueNow, true) + " " + padLeft(formatPercent(1f * valueNow / total), 4) + " " + + formatNumeric(valueDiff / timeDiff, true) + "/s ]"; + })); + return this; + } + + public ProgressLoggers addPercentCounter(String name, long total, AtomicLong getValue) { + loggers.add(new ProgressLogger(name, () -> { + long valueNow = getValue.get(); + return "[ " + padLeft("" + valueNow, 3) + " / " + padLeft("" + total, 3) + " " + padLeft( + formatPercent(1f * valueNow / total), 4) + " ]"; + })); + return this; + } + + public ProgressLoggers addQueueStats(WorkQueue queue) { + loggers.add(new TopologyLogger(() -> + " -> " + padLeft("(" + + formatNumeric(queue.getPending(), false) + + "/" + + formatNumeric(queue.getCapacity(), false) + + ")", 9) + )); + return this; + } + + public ProgressLoggers add(Object obj) { + loggers.add(obj); + return this; + } + + public ProgressLoggers addFileSize(File file) { + return addFileSize(file::length); + } + + public ProgressLoggers addFileSize(LongSupplier longSupplier) { + loggers.add(new Object() { + @Override + public String toString() { + return " " + padRight(formatBytes(longSupplier.getAsLong(), false), 5); + } + }); + return this; + } + + public ProgressLoggers addProcessStats() { + addDeltaLogger("cpus", ProcessInfo::getProcessCpuTime, Format::formatDecimal); + addDeltaLogger("gc", ProcessInfo::getGcTime, Format::formatPercent); + loggers.add(new ProgressLogger("mem", + () -> padLeft(formatMB(Helper.getUsedMB(), false) + " / " + formatMB(Helper.getTotalMB(), false), 7))); + + return this; + } + + private void addDeltaLogger(String name, Supplier supplier, DoubleFunction format) { + AtomicLong lastValue = new AtomicLong(supplier.get().toNanos()); + AtomicLong lastTime = new AtomicLong(System.nanoTime()); + loggers.add(new ProgressLogger(name, () -> { + long currentValue = supplier.get().toNanos(); + if (currentValue < 0) { + return "-"; + } + long currentTime = System.nanoTime(); + double rate = 1d * (currentValue - lastValue.get()) / (currentTime - lastTime.get()); + lastTime.set(currentTime); + lastValue.set(currentValue); + return padLeft(format.apply(rate), 3); + })); + } + + public ProgressLoggers addThreadPoolStats(String name, String prefix) { + boolean first = loggers.isEmpty() || !(loggers.get(loggers.size() - 1) instanceof TopologyLogger); + try { + Map lastThreads = ProcessInfo.getThreadStats(); + AtomicLong lastTime = new AtomicLong(System.nanoTime()); + loggers.add(new TopologyLogger(() -> { + var oldAndNewThreads = new TreeMap<>(lastThreads); + var newThreads = ProcessInfo.getThreadStats(); + oldAndNewThreads.putAll(newThreads); + + long currentTime = System.nanoTime(); + double timeDiff = 1d * (currentTime - lastTime.get()); + String percents = oldAndNewThreads.values().stream() + .filter(thread -> thread.name().startsWith(prefix)) + .map(thread -> { + if (!newThreads.containsKey(thread.id())) { + return " -%"; + } + long last = lastThreads.getOrDefault(thread.id(), ThreadState.DEFAULT).cpuTimeNanos(); + return padLeft(formatPercent(1d * (thread.cpuTimeNanos() - last) / timeDiff), 3); + }).collect(Collectors.joining(" ", "(", ")")); + + lastTime.set(currentTime); + lastThreads.putAll(newThreads); + return (first ? " | " : " -> ") + name + percents; + })); + } catch (Throwable e) { + // can't get CPU stats per-thread + } + return this; + } + + public ProgressLoggers addThreadPoolStats(String name, Worker worker) { + return addThreadPoolStats(name, worker.getPrefix()); + } + + private String formatBytes(long bytes, boolean pad) { + return formatStorage(bytes, pad); + } + + private String formatMB(long mb, boolean pad) { + return formatStorage(mb * Helper.MB, pad); + } + + public void log() { + LOGGER.info(getLog()); + } + + public ProgressLoggers addInMemoryObject(String name, LongSupplier getSize) { + loggers.add(new ProgressLogger(name, () -> formatStorage(getSize.getAsLong(), true))); + return this; + } + + public ProgressLoggers addTopologyStats(Topology topology) { + if (topology != null) { + addTopologyStats(topology.previous()); + if (topology.inputQueue() != null) { + addQueueStats(topology.inputQueue()); + } + if (topology.worker() != null) { + addThreadPoolStats(topology.name(), topology.worker()); + } + } + return this; + } + + private static record ProgressLogger(String name, Supplier fn) { + + @Override + public String toString() { + return " " + name + ": " + fn.get(); + } + } + + private static record TopologyLogger(Supplier fn) { + + @Override + public String toString() { + return fn.get(); + } + } +} diff --git a/src/main/java/com/onthegomap/flatmap/stats/Stats.java b/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java similarity index 85% rename from src/main/java/com/onthegomap/flatmap/stats/Stats.java rename to src/main/java/com/onthegomap/flatmap/monitoring/Stats.java index 42e43470..81ca9a4a 100644 --- a/src/main/java/com/onthegomap/flatmap/stats/Stats.java +++ b/src/main/java/com/onthegomap/flatmap/monitoring/Stats.java @@ -1,4 +1,4 @@ -package com.onthegomap.flatmap.stats; +package com.onthegomap.flatmap.monitoring; import com.graphhopper.util.StopWatch; import org.slf4j.Logger; @@ -16,6 +16,8 @@ public interface Stats { void encodedTile(int zoom, int length); + void gauge(String name, int value); + class InMemory implements Stats { private static final Logger LOGGER = LoggerFactory.getLogger(InMemory.class); @@ -47,5 +49,10 @@ public interface Stats { public void encodedTile(int zoom, int length) { } + + @Override + public void gauge(String name, int value) { + + } } } diff --git a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java index 50c376d2..7c144d24 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java @@ -1,7 +1,7 @@ package com.onthegomap.flatmap.reader; import com.onthegomap.flatmap.SourceFeature; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.io.File; diff --git a/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java b/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java index 10dc9f59..498058e8 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/OpenStreetMapReader.java @@ -12,7 +12,6 @@ import com.onthegomap.flatmap.FlatMapConfig; import com.onthegomap.flatmap.GeoUtils; import com.onthegomap.flatmap.OsmInputFile; import com.onthegomap.flatmap.Profile; -import com.onthegomap.flatmap.ProgressLoggers; import com.onthegomap.flatmap.RenderableFeature; import com.onthegomap.flatmap.RenderableFeatures; import com.onthegomap.flatmap.RenderedFeature; @@ -20,7 +19,8 @@ import com.onthegomap.flatmap.SourceFeature; import com.onthegomap.flatmap.collections.LongLongMap; import com.onthegomap.flatmap.collections.LongLongMultimap; import com.onthegomap.flatmap.collections.MergeSortFeatureMap; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology; import java.io.Closeable; import java.io.IOException; diff --git a/src/main/java/com/onthegomap/flatmap/reader/Reader.java b/src/main/java/com/onthegomap/flatmap/reader/Reader.java index b01ae2ab..e473ccd1 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/Reader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/Reader.java @@ -3,13 +3,13 @@ package com.onthegomap.flatmap.reader; import com.onthegomap.flatmap.FeatureRenderer; import com.onthegomap.flatmap.FlatMapConfig; import com.onthegomap.flatmap.Profile; -import com.onthegomap.flatmap.ProgressLoggers; import com.onthegomap.flatmap.RenderableFeature; import com.onthegomap.flatmap.RenderableFeatures; import com.onthegomap.flatmap.RenderedFeature; import com.onthegomap.flatmap.SourceFeature; import com.onthegomap.flatmap.collections.MergeSortFeatureMap; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology; import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.util.concurrent.atomic.AtomicLong; diff --git a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java index 2931f3d8..9ecf596c 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java @@ -1,7 +1,7 @@ package com.onthegomap.flatmap.reader; import com.onthegomap.flatmap.SourceFeature; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.io.File; diff --git a/src/main/java/com/onthegomap/flatmap/worker/Topology.java b/src/main/java/com/onthegomap/flatmap/worker/Topology.java index 5077ef7c..926fc655 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Topology.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Topology.java @@ -1,7 +1,7 @@ package com.onthegomap.flatmap.worker; -import com.onthegomap.flatmap.ProgressLoggers; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import java.time.Duration; import java.util.Iterator; import java.util.function.Consumer; @@ -21,10 +21,13 @@ public record Topology( public void awaitAndLog(ProgressLoggers loggers, Duration logInterval) { if (previous != null) { previous.awaitAndLog(loggers, logInterval); - } else { // producer is responsible for closing the initial input queue + } + if (inputQueue != null) { inputQueue.close(); } - worker.awaitAndLog(loggers, logInterval); + if (worker != null) { + worker.awaitAndLog(loggers, logInterval); + } } public interface SourceStep { @@ -56,8 +59,8 @@ public record Topology( public Bufferable fromGenerator(String name, SourceStep producer, int threads) { return (queueName, size, batchSize) -> { - var nextQueue = new WorkQueue(prefix, queueName, size, batchSize, stats); - Worker worker = new Worker(prefix, name, stats, threads, () -> producer.run(nextQueue)); + var nextQueue = new WorkQueue(prefix + "_" + queueName, size, batchSize, stats); + Worker worker = new Worker(prefix + "_" + name, stats, threads, () -> producer.run(nextQueue)); return new Builder<>(prefix, name, nextQueue, worker, stats); }; } @@ -99,8 +102,8 @@ public record Topology( public Bufferable addWorker(String name, int threads, WorkerStep step) { Builder curr = this; return (queueName, size, batchSize) -> { - var nextOutputQueue = new WorkQueue(prefix, queueName, size, batchSize, stats); - var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue, nextOutputQueue)); + var nextOutputQueue = new WorkQueue(prefix + "_" + queueName, size, batchSize, stats); + var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue, nextOutputQueue)); return new Builder<>(prefix, name, curr, outputQueue, nextOutputQueue, worker, stats); }; } @@ -111,8 +114,8 @@ public record Topology( } public Topology sinkTo(String name, int threads, SinkStep step) { - var previousTopology = previous.build(); - var worker = new Worker(prefix, name, stats, threads, () -> step.run(outputQueue)); + var previousTopology = build(); + var worker = new Worker(prefix + "_" + name, stats, threads, () -> step.run(outputQueue)); return new Topology<>(name, previousTopology, outputQueue, worker); } diff --git a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java index 7f99a397..43dd3c2f 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java +++ b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java @@ -1,29 +1,120 @@ package com.onthegomap.flatmap.worker; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.Stats; import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; public class WorkQueue implements Closeable, Supplier, Consumer { - public WorkQueue(String prefix, String name, int capacity, int maxBatch, Stats stats) { + private final ThreadLocal> itemWriteBatchProvider = new ThreadLocal<>(); + private final ThreadLocal> itemReadBatchProvider = new ThreadLocal<>(); + private final BlockingQueue> itemQueue; + private final int batchSize; + private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); + private final int pendingBatchesCapacity; + private volatile boolean hasIncomingData = true; + private final AtomicInteger pendingCount = new AtomicInteger(0); + public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { + this.pendingBatchesCapacity = capacity / maxBatch; + this.batchSize = maxBatch; + itemQueue = new ArrayBlockingQueue<>(pendingBatchesCapacity); } @Override public void close() { - + for (Queue q : queues.values()) { + try { + if (!q.isEmpty()) { + itemQueue.put(q); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + hasIncomingData = false; } @Override - public void accept(T t) { + public void accept(T item) { + // past 4-8 concurrent writers, start getting lock contention adding to the blocking queue so add to the + // queue in lass frequent, larger batches + Queue writeBatch = itemWriteBatchProvider.get(); + if (writeBatch == null) { + itemWriteBatchProvider.set(writeBatch = new ArrayDeque<>(batchSize)); + queues.put(Thread.currentThread().getId(), writeBatch); + } + writeBatch.offer(item); + pendingCount.incrementAndGet(); + + if (writeBatch.size() >= batchSize) { + flushWrites(); + } + } + + private void flushWrites() { + Queue writeBatch = itemWriteBatchProvider.get(); + if (writeBatch != null && !writeBatch.isEmpty()) { + try { + itemWriteBatchProvider.set(null); + queues.remove(Thread.currentThread().getId()); + // blocks if full + if (!itemQueue.offer(writeBatch)) { + itemQueue.put(writeBatch); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } } @Override public T get() { - return null; + Queue itemBatch = itemReadBatchProvider.get(); + + if (itemBatch == null || itemBatch.isEmpty()) { + do { + if (!hasIncomingData && itemQueue.isEmpty()) { + break; + } + + if ((itemBatch = itemQueue.poll()) == null) { + try { + itemBatch = itemQueue.poll(100, TimeUnit.MILLISECONDS); + if (itemBatch != null) { + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break;// signal EOF + } + } + } while (itemBatch == null); + itemReadBatchProvider.set(itemBatch); + } + + T result = itemBatch == null ? null : itemBatch.poll(); + if (result != null) { + pendingCount.decrementAndGet(); + } + return result; + } + + public int getPending() { + return pendingCount.get(); + } + + public int getCapacity() { + return pendingBatchesCapacity * batchSize; } } diff --git a/src/main/java/com/onthegomap/flatmap/worker/Worker.java b/src/main/java/com/onthegomap/flatmap/worker/Worker.java index 551094ea..5b29d3ff 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Worker.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Worker.java @@ -1,20 +1,96 @@ package com.onthegomap.flatmap.worker; -import com.onthegomap.flatmap.ProgressLoggers; -import com.onthegomap.flatmap.stats.Stats; +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Worker { - public Worker(String prefix, String name, Stats stats, int threads, RunnableThatThrows task) { + private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class); + private final ExecutorService es; + private final String prefix; + private final Stats stats; + private static class NamedThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + private NamedThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = name + "-"; + } + + @Override + public Thread newThread(@NotNull Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } + + public Worker(String prefix, Stats stats, int threads, RunnableThatThrows task) { + this.prefix = prefix; + this.stats = stats; + stats.gauge(prefix + "_threads", threads); + es = Executors.newFixedThreadPool(threads, new NamedThreadFactory(prefix)); + for (int i = 0; i < threads; i++) { + es.submit(() -> { + String id = Thread.currentThread().getName(); + LOGGER.debug("Starting worker"); + try { + task.run(); + } catch (Throwable e) { + System.err.println("Worker " + id + " died"); + e.printStackTrace(); + System.exit(1); + } finally { + LOGGER.debug("Finished worker"); + } + }); + } + es.shutdown(); } public String getPrefix() { - return null; + return prefix; } public void awaitAndLog(ProgressLoggers loggers, Duration longInterval) { + try { + while (!es.awaitTermination(longInterval.toNanos(), TimeUnit.NANOSECONDS)) { + loggers.log(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + loggers.log(); + } + + public void await() { + try { + es.awaitTermination(365, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } public interface RunnableThatThrows { diff --git a/src/test/java/com/onthegomap/flatmap/FormatTest.java b/src/test/java/com/onthegomap/flatmap/FormatTest.java new file mode 100644 index 00000000..51f18d11 --- /dev/null +++ b/src/test/java/com/onthegomap/flatmap/FormatTest.java @@ -0,0 +1,71 @@ +package com.onthegomap.flatmap; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class FormatTest { + + @ParameterizedTest + @CsvSource({ + "1.5,1", + "999,999", + "1000,1k", + "9999,9.9k", + "10001,10k", + "99999,99k", + "999999,999k", + "9999999,9.9M", + "-9999999,-9.9M", + "5.5e12,5.5T", + }) + public void testFormatNumeric(Double number, String formatted) { + assertEquals(Format.formatNumeric(number, false), formatted); + } + + @ParameterizedTest + @CsvSource({ + "999,999", + "1000,1kB", + "9999,9.9kB", + "5.5e9,5.5GB", + }) + public void testFormatStorage(Double number, String formatted) { + assertEquals(formatted, Format.formatStorage(number, false)); + } + + @ParameterizedTest + @CsvSource({ + "0,0%", + "1,100%", + "0.11111,11%", + }) + public void testFormatPercent(Double number, String formatted) { + assertEquals(formatted, Format.formatPercent(number)); + } + + @ParameterizedTest + @CsvSource({ + "a,0,a", + "a,1,a", + "a,2,' a'", + "a,3,' a'", + "ab,3,' ab'", + "abc,3,'abc'", + }) + public void testPad(String in, Integer size, String out) { + assertEquals(out, Format.padLeft(in, size)); + } + + @ParameterizedTest + @CsvSource({ + "0,0", + "0.1,0.1", + "0.11,0.1", + "1111.11,'1,111.1'", + }) + public void testFormatDecimal(Double in, String out) { + assertEquals(out, Format.formatDecimal(in)); + } +} diff --git a/src/test/java/com/onthegomap/flatmap/monitoring/ProcessInfoTest.java b/src/test/java/com/onthegomap/flatmap/monitoring/ProcessInfoTest.java new file mode 100644 index 00000000..54fff682 --- /dev/null +++ b/src/test/java/com/onthegomap/flatmap/monitoring/ProcessInfoTest.java @@ -0,0 +1,24 @@ +package com.onthegomap.flatmap.monitoring; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +public class ProcessInfoTest { + + @Test + public void testGC() { + assertTrue(ProcessInfo.getGcTime().toNanos() >= 0); + } + + @Test + public void testCPU() { + assertTrue(ProcessInfo.getProcessCpuTime().toNanos() > 0); + } + + @Test + public void testThreads() { + assertFalse(ProcessInfo.getThreadStats().isEmpty()); + } +} diff --git a/src/test/java/com/onthegomap/flatmap/monitoring/ProgressLoggersTest.java b/src/test/java/com/onthegomap/flatmap/monitoring/ProgressLoggersTest.java new file mode 100644 index 00000000..c0a3054f --- /dev/null +++ b/src/test/java/com/onthegomap/flatmap/monitoring/ProgressLoggersTest.java @@ -0,0 +1,39 @@ +package com.onthegomap.flatmap.monitoring; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.flatmap.monitoring.Stats.InMemory; +import com.onthegomap.flatmap.worker.Topology; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class ProgressLoggersTest { + + @Test + @Timeout(10) + public void testLogTopology() { + var latch = new CountDownLatch(1); + var topology = Topology.start("topo", new InMemory()) + .fromGenerator("reader", next -> latch.await()) + .addBuffer("reader_queue", 10) + .addWorker("worker", 2, (a, b) -> latch.await()) + .addBuffer("writer_queue", 10) + .sinkTo("writer", 2, a -> latch.await()); + + var loggers = new ProgressLoggers("prefix") + .addTopologyStats(topology); + + String log; + while ((log = loggers.getLog()).split("%").length < 6) { + // spin waiting for threads to start + } + + assertEquals("[prefix] | reader( 0%) -> (0/10) -> worker( 0% 0%) -> (0/10) -> writer( 0% 0%)", log); + latch.countDown(); + topology.awaitAndLog(loggers, Duration.ofSeconds(10)); + assertEquals("[prefix] | reader( -%) -> (0/10) -> worker( -% -%) -> (0/10) -> writer( -% -%)", + loggers.getLog()); + } +} diff --git a/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java b/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java new file mode 100644 index 00000000..e783075b --- /dev/null +++ b/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java @@ -0,0 +1,89 @@ +package com.onthegomap.flatmap.worker; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.flatmap.monitoring.ProgressLoggers; +import com.onthegomap.flatmap.monitoring.Stats; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class TopologyTest { + + Stats stats = new Stats.InMemory(); + + @Test + @Timeout(10) + public void testSimpleTopology() { + Set result = Collections.synchronizedSet(new TreeSet<>()); + var topology = Topology.start("test", stats) + .fromGenerator("reader", (next) -> { + next.accept(0); + next.accept(1); + }).addBuffer("reader_queue", 1) + .addWorker("process", 1, (prev, next) -> { + Integer item; + while ((item = prev.get()) != null) { + next.accept(item * 2 + 1); + next.accept(item * 2 + 2); + } + }).addBuffer("writer_queue", 1) + .sinkToConsumer("writer", 1, result::add); + + topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1)); + + assertEquals(Set.of(1, 2, 3, 4), result); + } + + @Test + @Timeout(10) + public void testTopologyFromQueue() { + var queue = new WorkQueue("readerqueue", 10, 1, stats); + Set result = Collections.synchronizedSet(new TreeSet<>()); + var topology = Topology.start("test", stats) + .readFromQueue(queue) + .addWorker("process", 1, (prev, next) -> { + Integer item; + while ((item = prev.get()) != null) { + next.accept(item * 2 + 1); + next.accept(item * 2 + 2); + } + }).addBuffer("writer_queue", 1) + .sinkToConsumer("writer", 1, result::add); + + new Thread(() -> { + queue.accept(0); + queue.accept(1); + queue.close(); + }).start(); + + topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1)); + + assertEquals(Set.of(1, 2, 3, 4), result); + } + + @Test + @Timeout(10) + public void testTopologyFromIterator() { + Set result = Collections.synchronizedSet(new TreeSet<>()); + var topology = Topology.start("test", stats) + .readFromIterator("reader", List.of(0, 1).iterator()) + .addBuffer("reader_queue", 1) + .addWorker("process", 1, (prev, next) -> { + Integer item; + while ((item = prev.get()) != null) { + next.accept(item * 2 + 1); + next.accept(item * 2 + 2); + } + }).addBuffer("writer_queue", 1) + .sinkToConsumer("writer", 1, result::add); + + topology.awaitAndLog(new ProgressLoggers("test"), Duration.ofSeconds(1)); + + assertEquals(Set.of(1, 2, 3, 4), result); + } +} diff --git a/src/test/java/com/onthegomap/flatmap/worker/WorkQueueTest.java b/src/test/java/com/onthegomap/flatmap/worker/WorkQueueTest.java new file mode 100644 index 00000000..103d6b0d --- /dev/null +++ b/src/test/java/com/onthegomap/flatmap/worker/WorkQueueTest.java @@ -0,0 +1,108 @@ +package com.onthegomap.flatmap.worker; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.onthegomap.flatmap.monitoring.Stats; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class WorkQueueTest { + + @Test + @Timeout(10) + public void testEmpty() { + WorkQueue q = newQueue(1); + q.close(); + assertNull(q.get()); + } + + @Test + @Timeout(10) + public void testOneItem() { + WorkQueue q = newQueue(1); + q.accept("a"); + q.close(); + assertEquals("a", q.get()); + assertNull(q.get()); + } + + @Test + @Timeout(10) + public void testMoreItemsThanBatchSize() { + WorkQueue q = newQueue(2); + q.accept("a"); + q.accept("b"); + q.accept("c"); + q.close(); + assertEquals("a", q.get()); + assertEquals("b", q.get()); + assertEquals("c", q.get()); + assertNull(q.get()); + } + + @Test + @Timeout(10) + public void testManyItems() { + WorkQueue q = newQueue(100); + for (int i = 0; i < 950; i++) { + q.accept(i); + } + q.close(); + for (int i = 0; i < 950; i++) { + assertEquals((Integer) i, q.get()); + } + assertNull(q.get()); + } + + @Test + @Timeout(10) + public void testTwoWriters() { + WorkQueue q = newQueue(2); + AtomicInteger ni = new AtomicInteger(0); + new Worker("worker", stats, 2, () -> { + int i = ni.getAndIncrement(); + q.accept(i); + }).await(); + q.close(); + assertEquals(2, q.getPending()); + Set found = new TreeSet<>(); + for (int i = 0; i < 2; i++) { + found.add(q.get()); + } + assertNull(q.get()); + assertEquals(Set.of(0, 1), found); + assertEquals(0, q.getPending()); + } + + @Test + @Timeout(10) + public void testTwoWritersManyElements() { + WorkQueue q = newQueue(2); + AtomicInteger ni = new AtomicInteger(0); + new Worker("worker", stats, 2, () -> { + int i = ni.getAndIncrement(); + q.accept(i * 3); + q.accept(i * 3 + 1); + q.accept(i * 3 + 2); + }).await(); + q.close(); + assertEquals(6, q.getPending()); + Set found = new TreeSet<>(); + for (int i = 0; i < 6; i++) { + found.add(q.get()); + } + assertNull(q.get()); + assertEquals(Set.of(0, 1, 2, 3, 4, 5), found); + assertEquals(0, q.getPending()); + } + + private WorkQueue newQueue(int maxBatch) { + return new WorkQueue<>("queue", 1000, maxBatch, stats); + } + + private static final Stats stats = new Stats.InMemory(); +}