pull/1/head
Mike Barry 2021-04-16 07:30:33 -04:00
rodzic f26ecce4f2
commit f9e3d1153d
5 zmienionych plików z 19 dodań i 10 usunięć

Wyświetl plik

@ -71,7 +71,7 @@ public class OsmInputFile {
public void readTo(Consumer<ReaderElement> next, int threads) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try (var stream = new BufferedInputStream(new FileInputStream(file), 50000)) {
try (var stream = new BufferedInputStream(new FileInputStream(file), 50_000)) {
PbfStreamSplitter streamSplitter = new PbfStreamSplitter(new DataInputStream(stream));
var sink = new ReaderElementSink(next);
PbfDecoder pbfDecoder = new PbfDecoder(streamSplitter, executorService, threads + 1, sink);

Wyświetl plik

@ -156,7 +156,6 @@ public class Wikidata {
.addRateCounter("wiki", fetcher.wikidatas)
.addFileSize(outfile)
.addProcessStats()
.addThreadPoolStats("pbf", "PBF")
.addThreadPoolStats("parse", "pool-")
.addTopologyStats(topology);

Wyświetl plik

@ -100,7 +100,6 @@ public class OpenStreetMapReader implements Closeable {
.addRateCounter("rels", TOTAL_RELATIONS)
.addProcessStats()
.addInMemoryObject("hppc", this::getBigObjectSizeBytes)
.addThreadPoolStats("pbf", "PBF")
.addThreadPoolStats("parse", "pool-")
.addTopologyStats(topology);
topology.awaitAndLog(loggers, config.logInterval());
@ -166,7 +165,6 @@ public class OpenStreetMapReader implements Closeable {
.addFileSize(writer::getStorageSize)
.addProcessStats()
.addInMemoryObject("hppc", this::getBigObjectSizeBytes)
.addThreadPoolStats("pbf", "PBF")
.addThreadPoolStats("parse", "pool-")
.addTopologyStats(topology);

Wyświetl plik

@ -18,18 +18,28 @@ public record Topology<T>(
return new Empty(prefix, stats);
}
public void awaitAndLog(ProgressLoggers loggers, Duration logInterval) {
// track time since last log and stagger initial log interval for each step to keep logs
// coming at consistent intervals
private void doAwaitAndLog(ProgressLoggers loggers, Duration logInterval, long startNanos) {
if (previous != null) {
previous.awaitAndLog(loggers, logInterval);
previous.doAwaitAndLog(loggers, logInterval, startNanos);
}
if (inputQueue != null) {
inputQueue.close();
}
if (worker != null) {
worker.awaitAndLog(loggers, logInterval);
long elapsedSoFar = System.nanoTime() - startNanos;
Duration sinceLastLog = Duration.ofNanos(elapsedSoFar % logInterval.toNanos());
Duration untilNextLog = logInterval.minus(sinceLastLog);
worker.awaitAndLog(loggers, untilNextLog, logInterval);
}
}
public void awaitAndLog(ProgressLoggers loggers, Duration logInterval) {
doAwaitAndLog(loggers, logInterval, System.nanoTime());
loggers.log();
}
public void await() {
if (previous != null) {
previous.await();

Wyświetl plik

@ -74,15 +74,17 @@ public class Worker {
return prefix;
}
public void awaitAndLog(ProgressLoggers loggers, Duration longInterval) {
public void awaitAndLog(ProgressLoggers loggers, Duration initialLogInterval, Duration logInterval) {
try {
while (!es.awaitTermination(longInterval.toNanos(), TimeUnit.NANOSECONDS)) {
if (!es.awaitTermination(initialLogInterval.toNanos(), TimeUnit.NANOSECONDS)) {
loggers.log();
while (!es.awaitTermination(logInterval.toNanos(), TimeUnit.NANOSECONDS)) {
loggers.log();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
loggers.log();
}
public void await() {