tile-stats-readoptim
Mike Barry 2023-09-12 05:44:42 -04:00
rodzic 3e74b94523
commit 0f9fa4963d
1 zmienionych plików z 21 dodań i 34 usunięć

Wyświetl plik

@ -4,17 +4,15 @@ import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import com.google.common.collect.Multiset;
import com.google.common.collect.Multisets;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.geo.TileCoord;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@ -23,9 +21,9 @@ import java.nio.file.Path;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
@ -41,11 +39,14 @@ public class OsmTileStats {
Arguments arguments = Arguments.fromArgsOrConfigFile(args);
PlanetilerConfig config = PlanetilerConfig.from(arguments);
var stats = arguments.getStats();
var timer = stats.startStage("osm-tile-stats");
LocalDate date = LocalDate.now(ZoneOffset.UTC);
int days = arguments.getInteger("days", "number of days into the past to look", 90);
int maxZoom = arguments.getInteger("maxzoom", "max zoom", 15);
int topN = arguments.getInteger("top", "top n", 1_000_000);
int topN = arguments.getInteger("top", "top n", 10_000_000);
Path output = arguments.file("output", "output", Path.of("top_tiles.tsv.gz"));
int threads = arguments.getInteger("download-threads", "number of threads to use for downloading",
Math.min(10, arguments.threads()));
var toDownload = IntStream.range(0, days)
.mapToObj(i -> date.minus(Period.ofDays(i)))
@ -59,21 +60,11 @@ public class OsmTileStats {
AtomicLong downloaded = new AtomicLong();
var pipeline = WorkerPipeline.start("osm-tile-stats", stats)
.readFromTiny("urls", toDownload)
.<byte[]>addWorker("download", 10, (prev, next) -> {
for (var file : prev) {
try (var stream = downloader.openStream(file)) {
next.accept(stream.readAllBytes());
} catch (IOException e) {
LOGGER.warn("Error downloading {} {}", file, e);
}
}
})
.addBuffer("files", 30).<Map.Entry<Integer, Long>>addWorker("parse", arguments.threads(),
.readFromTiny("urls", toDownload).<Map.Entry<Integer, Long>>addWorker("parse", arguments.threads(),
(prev, next) -> {
for (var bytes : prev) {
for (var url : prev) {
try (
var inputStream = new XZInputStream(new ByteArrayInputStream(bytes));
var inputStream = new XZInputStream(new BufferedInputStream(downloader.openStream(url)));
var reader = new BufferedReader(new InputStreamReader(inputStream));
) {
String line;
@ -90,37 +81,32 @@ public class OsmTileStats {
}
}
downloaded.incrementAndGet();
} catch (IOException e) {
LOGGER.warn("Error getting file {} {}", url, e);
}
}
})
.addBuffer("lines", 100_000, 1_000)
.sinkTo("collect", 1, lines -> {
Map<Integer, Long> counts = new HashMap<>();
Multiset m;
Multisets.
for (var line : lines) {
counts.merge(line.getKey(), line.getValue(), Long::sum);
}
PriorityQueue<Long> top = new PriorityQueue<>(topN);
LOGGER.info("Extracting top {} tiles from {} tiles", topN, counts.size());
for (var cursor : counts.entrySet()) {
top.offer(cursor.getValue());
if (top.size() > topN) {
top.poll();
}
}
Long cutoff = top.poll();
var topCounts = counts.entrySet().stream()
.sorted(Comparator.<Map.Entry<Integer, Long>>comparingLong(Map.Entry::getValue).reversed())
.limit(topN)
.sorted(Comparator.comparingInt(Map.Entry::getKey))
.toList();
try (
var ouput = new GZIPOutputStream(
new BufferedOutputStream(Files.newOutputStream(output, CREATE, TRUNCATE_EXISTING, WRITE)));
var writer = new BufferedWriter(new OutputStreamWriter(ouput))
) {
writer.write("z\tx\ty\tcount\n");
for (var cursor : counts.entrySet()) {
if (cursor.getValue() >= cutoff) {
TileCoord coord = TileCoord.hilbertDecode(cursor.getKey());
writer.write("%d\t%d\t%d\t%d%n".formatted(coord.z(), coord.x(), coord.y(), cursor.getValue()));
}
for (var entry : topCounts) {
TileCoord coord = TileCoord.hilbertDecode(entry.getKey());
writer.write("%d\t%d\t%d\t%d%n".formatted(coord.z(), coord.x(), coord.y(), entry.getValue()));
}
}
});
@ -133,6 +119,7 @@ public class OsmTileStats {
.addProcessStats();
pipeline.awaitAndLog(progress, config.logInterval());
timer.stop();
stats.printSummary();
}
}