some refactoring, mbtileswriter will need e2e test

pull/1/head
Mike Barry 2021-05-03 05:57:26 -04:00
rodzic 531a7e9bd1
commit 11e296a9cb
12 zmienionych plików z 107 dodań i 86 usunięć

Wyświetl plik

@ -25,6 +25,10 @@ public record CommonParams(
}
}
public static CommonParams defaultParams() {
return from(new Arguments(new String[]{}));
}
public static CommonParams from(Arguments arguments) {
return from(arguments, BoundsProvider.WORLD);
}

Wyświetl plik

@ -13,7 +13,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -85,10 +84,9 @@ public class OpenMapTilesMain {
.process("natural_earth", renderer, featureMap, config)
);
AtomicLong featureCount = new AtomicLong(0);
try (var osmReader = new OpenStreetMapReader(osmInputFile, nodeLocations, profile, stats)) {
stats.time("osm_pass1", () -> osmReader.pass1(config));
stats.time("osm_pass2", () -> featureCount.set(osmReader.pass2(renderer, featureMap, config)));
stats.time("osm_pass2", () -> osmReader.pass2(renderer, featureMap, config));
}
LOGGER.info("Deleting node.db to make room for mbtiles");
@ -96,8 +94,7 @@ public class OpenMapTilesMain {
Files.delete(nodeDb);
stats.time("sort", featureDb::sort);
stats
.time("mbtiles", () -> MbtilesWriter.writeOutput(featureCount.get(), featureMap, output, profile, config, stats));
stats.time("mbtiles", () -> MbtilesWriter.writeOutput(featureMap, output, profile, config, stats));
stats.stopTimer("import");

Wyświetl plik

@ -37,6 +37,7 @@ class ExternalMergeSort implements FeatureSort {
private final Stats stats;
private final int chunkSizeLimit;
private final int workers;
private final AtomicLong features = new AtomicLong(0);
private final List<Chunk> chunks = new ArrayList<>();
private Chunk current;
@ -78,6 +79,7 @@ class ExternalMergeSort implements FeatureSort {
public void add(Entry item) {
try {
assert !sorted;
features.incrementAndGet();
current.add(item);
if (current.bytesInMemory > chunkSizeLimit) {
newChunk();
@ -148,6 +150,11 @@ class ExternalMergeSort implements FeatureSort {
"s sort:" + Duration.ofNanos(sorting.get()).toSeconds() + "s");
}
@Override
public long size() {
return features.get();
}
@NotNull
@Override
public Iterator<Entry> iterator() {
@ -187,7 +194,7 @@ class ExternalMergeSort implements FeatureSort {
chunks.add(current = new Chunk(chunkPath));
}
class Chunk implements Closeable {
private static class Chunk implements Closeable {
private final Path path;
private final DataOutputStream outputStream;
@ -273,7 +280,7 @@ class ExternalMergeSort implements FeatureSort {
}
}
class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private static class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private final int count;
private int read = 0;

Wyświetl plik

@ -71,6 +71,10 @@ public record FeatureGroup(FeatureSort sorter, Profile profile, CommonStringEnco
}
}
public long numFeatures() {
return sorter.size();
}
public FeatureSort.Entry encode(RenderedFeature feature) {
return new FeatureSort.Entry(
encodeSortKey(feature),
@ -235,7 +239,7 @@ public record FeatureGroup(FeatureSort sorter, Profile profile, CommonStringEnco
}
public long getNumFeatures() {
return 0;
return entries.size();
}
public TileCoord coord() {

Wyświetl plik

@ -4,6 +4,8 @@ import com.onthegomap.flatmap.monitoring.Stats;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.jetbrains.annotations.NotNull;
@ -17,8 +19,41 @@ public interface FeatureSort extends Iterable<FeatureSort.Entry> {
return new ExternalMergeSort(dir, workers, chunkSizeLimit, stats);
}
static FeatureSort newInMemory() {
List<Entry> list = new ArrayList<>();
return new FeatureSort() {
@Override
public void sort() {
list.sort(Comparator.naturalOrder());
}
@Override
public long size() {
return list.size();
}
@Override
public void add(Entry newEntry) {
list.add(newEntry);
}
@Override
public long getStorageSize() {
return 0;
}
@NotNull
@Override
public Iterator<Entry> iterator() {
return list.iterator();
}
};
}
void sort();
long size();
default List<Entry> toList() {
List<Entry> list = new ArrayList<>();
for (Entry entry : this) {

Wyświetl plik

@ -107,13 +107,12 @@ public class OpenStreetMapReader implements Closeable {
topology.awaitAndLog(loggers, config.logInterval());
}
public long pass2(FeatureRenderer renderer, FeatureGroup writer, CommonParams config) {
public void pass2(FeatureRenderer renderer, FeatureGroup writer, CommonParams config) {
int readerThreads = Math.max(config.threads() / 4, 1);
int processThreads = config.threads() - 1;
AtomicLong nodesProcessed = new AtomicLong(0);
AtomicLong waysProcessed = new AtomicLong(0);
AtomicLong relsProcessed = new AtomicLong(0);
AtomicLong featuresWritten = new AtomicLong(0);
CountDownLatch waysDone = new CountDownLatch(processThreads);
var topology = Topology.start("osm_pass2", stats)
@ -153,17 +152,14 @@ public class OpenStreetMapReader implements Closeable {
// just in case a worker skipped over all relations
waysDone.countDown();
}).addBuffer("feature_queue", 50_000, 1_000)
.sinkToConsumer("write", 1, (item) -> {
featuresWritten.incrementAndGet();
writer.accept(item);
});
.sinkToConsumer("write", 1, writer);
var logger = new ProgressLoggers("osm_pass2")
.addRatePercentCounter("nodes", TOTAL_NODES.get(), nodesProcessed)
.addFileSize(nodeDb.filePath())
.addRatePercentCounter("ways", TOTAL_WAYS.get(), waysProcessed)
.addRatePercentCounter("rels", TOTAL_RELATIONS.get(), relsProcessed)
.addRateCounter("features", featuresWritten)
.addRateCounter("features", () -> writer.sorter().size())
.addFileSize(writer::getStorageSize)
.addProcessStats()
.addInMemoryObject("hppc", this::getBigObjectSizeBytes)
@ -171,8 +167,6 @@ public class OpenStreetMapReader implements Closeable {
.addTopologyStats(topology);
topology.awaitAndLog(logger, config.logInterval());
return featuresWritten.get();
}
private long getBigObjectSizeBytes() {

Wyświetl plik

@ -26,6 +26,7 @@ import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import org.jetbrains.annotations.NotNull;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
@ -377,7 +378,7 @@ public record Mbtiles(Connection connection) implements Closeable {
}
}
public static record TileEntry(TileCoord tile, byte[] bytes) {
public static record TileEntry(TileCoord tile, byte[] bytes) implements Comparable<TileEntry> {
@Override
public boolean equals(Object o) {
@ -410,5 +411,10 @@ public record Mbtiles(Connection connection) implements Closeable {
", bytes=" + Arrays.toString(bytes) +
'}';
}
@Override
public int compareTo(@NotNull TileEntry o) {
return tile.compareTo(o.tile);
}
}
}

Wyświetl plik

@ -4,7 +4,6 @@ import com.onthegomap.flatmap.CommonParams;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.VectorTileEncoder;
import com.onthegomap.flatmap.collections.FeatureGroup;
import com.onthegomap.flatmap.geo.TileCoord;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
@ -31,19 +30,15 @@ public class MbtilesWriter {
private final Profile profile;
private final Stats stats;
private MbtilesWriter(Path path, CommonParams config, Profile profile, Stats stats) {
MbtilesWriter(Path path, CommonParams config, Profile profile, Stats stats) {
this.path = path;
this.config = config;
this.profile = profile;
this.stats = stats;
}
private static record RenderedTile(TileCoord tile, byte[] contents) {
}
public static void writeOutput(long featureCount, FeatureGroup features, Path output, Profile profile,
CommonParams config, Stats stats) {
public static void writeOutput(FeatureGroup features, Path output, Profile profile, CommonParams config,
Stats stats) {
try {
Files.deleteIfExists(output);
} catch (IOException e) {
@ -59,7 +54,7 @@ public class MbtilesWriter {
.sinkTo("writer", 1, writer::tileWriter);
var loggers = new ProgressLoggers("mbtiles")
.addRatePercentCounter("features", featureCount, writer.featuresProcessed)
.addRatePercentCounter("features", features.numFeatures(), writer.featuresProcessed)
.addRateCounter("tiles", writer.tiles)
.addFileSize(output)
.add(" features ").addFileSize(features::getStorageSize)
@ -69,7 +64,7 @@ public class MbtilesWriter {
topology.awaitAndLog(loggers, config.logInterval());
}
public void tileEncoder(Supplier<FeatureGroup.TileFeatures> prev, Consumer<RenderedTile> next) throws Exception {
void tileEncoder(Supplier<FeatureGroup.TileFeatures> prev, Consumer<Mbtiles.TileEntry> next) throws IOException {
FeatureGroup.TileFeatures tileFeatures, last = null;
byte[] lastBytes = null, lastEncoded = null;
while ((tileFeatures = prev.get()) != null) {
@ -91,11 +86,11 @@ public class MbtilesWriter {
}
}
stats.encodedTile(tileFeatures.coord().z(), encoded.length);
next.accept(new RenderedTile(tileFeatures.coord(), bytes));
next.accept(new Mbtiles.TileEntry(tileFeatures.coord(), bytes));
}
}
private void tileWriter(Supplier<RenderedTile> tiles) throws Exception {
private void tileWriter(Supplier<Mbtiles.TileEntry> tiles) throws Exception {
try (Mbtiles db = Mbtiles.newFileDatabase(path)) {
db.setupSchema();
db.tuneForWrites();
@ -118,9 +113,9 @@ public class MbtilesWriter {
.setJson(stats.getTileStats());
try (var batchedWriter = db.newBatchedTileWriter()) {
RenderedTile tile;
Mbtiles.TileEntry tile;
while ((tile = tiles.get()) != null) {
batchedWriter.write(tile.tile(), tile.contents);
batchedWriter.write(tile.tile(), tile.bytes());
}
}

Wyświetl plik

@ -12,13 +12,10 @@ import com.onthegomap.flatmap.VectorTileEncoder;
import com.onthegomap.flatmap.geo.TileCoord;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
@ -28,29 +25,7 @@ import org.locationtech.jts.geom.Geometry;
public class FeatureGroupTest {
private final List<FeatureSort.Entry> list = new ArrayList<>();
private final FeatureSort sorter = new FeatureSort() {
@Override
public void sort() {
list.sort(Comparator.naturalOrder());
}
@Override
public void add(Entry newEntry) {
list.add(newEntry);
}
@Override
public long getStorageSize() {
return 0;
}
@NotNull
@Override
public Iterator<Entry> iterator() {
return list.iterator();
}
};
private final FeatureSort sorter = FeatureSort.newInMemory();
private FeatureGroup features = new FeatureGroup(sorter, new Profile.NullProfile());
@Test

Wyświetl plik

@ -6,13 +6,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.onthegomap.flatmap.geo.GeoUtils;
import com.onthegomap.flatmap.geo.TileCoord;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -31,7 +29,7 @@ public class MbtilesTest {
if (!deferIndexCreation) {
db.addIndex();
}
Set<Mbtiles.TileEntry> expected = new HashSet<>();
Set<Mbtiles.TileEntry> expected = new TreeSet<>();
try (var writer = db.newBatchedTileWriter()) {
for (int i = 0; i < howMany; i++) {
var entry = new Mbtiles.TileEntry(TileCoord.ofXYZ(i, i, 14), new byte[]{
@ -50,7 +48,7 @@ public class MbtilesTest {
if (optimize) {
db.vacuumAnalyze();
}
var all = getAll(db);
var all = MbtilesTestUtils.getAll(db);
assertEquals(howMany, all.size());
assertEquals(expected, all);
}
@ -196,22 +194,4 @@ public class MbtilesTest {
}
""");
}
private static Set<Mbtiles.TileEntry> getAll(Mbtiles db) throws SQLException {
Set<Mbtiles.TileEntry> result = new HashSet<>();
try (Statement statement = db.connection().createStatement()) {
ResultSet rs = statement.executeQuery("select zoom_level, tile_column, tile_row, tile_data from tiles");
while (rs.next()) {
result.add(new Mbtiles.TileEntry(
TileCoord.ofXYZ(
rs.getInt("tile_column"),
rs.getInt("tile_row"),
rs.getInt("zoom_level")
),
rs.getBytes("tile_data")
));
}
}
return result;
}
}

Wyświetl plik

@ -0,0 +1,29 @@
package com.onthegomap.flatmap.write;
import com.onthegomap.flatmap.geo.TileCoord;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
public class MbtilesTestUtils {
static Set<Mbtiles.TileEntry> getAll(Mbtiles db) throws SQLException {
Set<Mbtiles.TileEntry> result = new HashSet<>();
try (Statement statement = db.connection().createStatement()) {
ResultSet rs = statement.executeQuery("select zoom_level, tile_column, tile_row, tile_data from tiles");
while (rs.next()) {
result.add(new Mbtiles.TileEntry(
TileCoord.ofXYZ(
rs.getInt("tile_column"),
rs.getInt("tile_row"),
rs.getInt("zoom_level")
),
rs.getBytes("tile_data")
));
}
}
return result;
}
}

Wyświetl plik

@ -1,5 +0,0 @@
package com.onthegomap.flatmap.write;
public class MbtilesWriterTest {
}