From db796e17205ebc9c7e4953c8a9ef11d498262da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Bilger?= Date: Thu, 24 Aug 2023 02:24:27 +0200 Subject: [PATCH] Add support for simple file output streams: CSV, JSON, protobuf (#639) --- .../com/onthegomap/planetiler/Planetiler.java | 33 ++- .../planetiler/archive/TileArchiveConfig.java | 30 ++- .../archive/TileArchiveMetadata.java | 40 ++- .../planetiler/archive/TileArchiveWriter.java | 15 +- .../planetiler/archive/TileArchives.java | 17 +- .../planetiler/archive/TileCompression.java | 46 ++++ .../planetiler/config/PlanetilerConfig.java | 22 +- .../planetiler/mbtiles/Mbtiles.java | 21 +- .../planetiler/pmtiles/ReadablePmtiles.java | 21 +- .../planetiler/pmtiles/WriteablePmtiles.java | 10 +- .../stream/StreamArchiveConfig.java | 10 + .../planetiler/stream/StreamArchiveUtils.java | 52 ++++ .../stream/WriteableCsvArchive.java | 181 +++++++++++++ .../stream/WriteableJsonStreamArchive.java | 233 +++++++++++++++++ .../stream/WriteableProtoStreamArchive.java | 194 ++++++++++++++ .../stream/WriteableStreamArchive.java | 117 +++++++++ .../src/main/proto/stream_archive_proto.proto | 79 ++++++ .../planetiler/PlanetilerTests.java | 90 ++++++- .../com/onthegomap/planetiler/TestUtils.java | 13 +- .../archive/TileArchiveMetadataTest.java | 3 +- .../planetiler/mbtiles/MbtilesTest.java | 54 +++- .../planetiler/pmtiles/PmtilesTest.java | 120 +++++---- .../stream/InMemoryStreamArchive.java | 119 +++++++++ .../stream/StreamArchiveUtilsTest.java | 34 +++ .../stream/WriteableCsvArchiveTest.java | 174 +++++++++++++ .../WriteableJsonStreamArchiveTest.java | 244 ++++++++++++++++++ .../WriteableProtoStreamArchiveTest.java | 180 +++++++++++++ 27 files changed, 2060 insertions(+), 92 deletions(-) create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileCompression.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveConfig.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveUtils.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableCsvArchive.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchive.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchive.java create mode 100644 planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableStreamArchive.java create mode 100644 planetiler-core/src/main/proto/stream_archive_proto.proto create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/stream/InMemoryStreamArchive.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/stream/StreamArchiveUtilsTest.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableCsvArchiveTest.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchiveTest.java create mode 100644 planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchiveTest.java diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java index c44c7c95..5c081087 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java @@ -19,6 +19,7 @@ import com.onthegomap.planetiler.reader.osm.OsmReader; import com.onthegomap.planetiler.stats.ProcessInfo; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.stats.Timers; +import com.onthegomap.planetiler.stream.StreamArchiveUtils; import com.onthegomap.planetiler.util.AnsiColors; import com.onthegomap.planetiler.util.BuildInfo; import com.onthegomap.planetiler.util.ByteBufferUtil; @@ -38,6 +39,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -650,10 +652,39 @@ public class Planetiler { System.exit(0); } else if (onlyDownloadSources) { // don't check files if not generating map + } else if (config.append()) { + if (!output.format().supportsAppend()) { + throw new IllegalArgumentException("cannot append to " + output.format().id()); + } + if (!output.exists()) { + throw new IllegalArgumentException(output.uri() + " must exist when appending"); + } } else if (overwrite || config.force()) { output.delete(); } else if (output.exists()) { - throw new IllegalArgumentException(output.uri() + " already exists, use the --force argument to overwrite."); + throw new IllegalArgumentException( + output.uri() + " already exists, use the --force argument to overwrite or --append."); + } + + if (config.tileWriteThreads() < 1) { + throw new IllegalArgumentException("require tile_write_threads >= 1"); + } + if (config.tileWriteThreads() > 1) { + if (!output.format().supportsConcurrentWrites()) { + throw new IllegalArgumentException(output.format() + " doesn't support concurrent writes"); + } + IntStream.range(1, config.tileWriteThreads()) + .mapToObj(index -> StreamArchiveUtils.constructIndexedPath(output.getLocalPath(), index)) + .forEach(p -> { + if (!config.append() && (overwrite || config.force())) { + FileUtils.delete(p); + } + if (config.append() && !Files.exists(p)) { + throw new IllegalArgumentException("indexed file \"" + p + "\" must exist when appending"); + } else if (!config.append() && Files.exists(p)) { + throw new IllegalArgumentException("indexed file \"" + p + "\" must not exist when not appending"); + } + }); } LOGGER.info("Building {} profile into {} in these phases:", profile.getClass().getSimpleName(), output.uri()); diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveConfig.java index 021ccc50..284adfb3 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveConfig.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveConfig.java @@ -166,18 +166,42 @@ public record TileArchiveConfig( } public enum Format { - MBTILES("mbtiles"), - PMTILES("pmtiles"); + MBTILES("mbtiles", + false /* TODO mbtiles could support append in the future by using insert statements with an "on conflict"-clause (i.e. upsert) and by creating tables only if they don't exist, yet */, + false), + PMTILES("pmtiles", false, false), + + CSV("csv", true, true), + /** identical to {@link Format#CSV} - except for the column separator */ + TSV("tsv", true, true), + + PROTO("proto", true, true), + /** identical to {@link Format#PROTO} */ + PBF("pbf", true, true), + + JSON("json", true, true); private final String id; + private final boolean supportsAppend; + private final boolean supportsConcurrentWrites; - Format(String id) { + Format(String id, boolean supportsAppend, boolean supportsConcurrentWrites) { this.id = id; + this.supportsAppend = supportsAppend; + this.supportsConcurrentWrites = supportsConcurrentWrites; } public String id() { return id; } + + public boolean supportsAppend() { + return supportsAppend; + } + + public boolean supportsConcurrentWrites() { + return supportsConcurrentWrites; + } } public enum Scheme { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveMetadata.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveMetadata.java index 5184d7cd..0e210c3f 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveMetadata.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveMetadata.java @@ -4,17 +4,24 @@ import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_ABSENT; import static com.onthegomap.planetiler.util.Format.joinCoordinates; import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.onthegomap.planetiler.Profile; import com.onthegomap.planetiler.config.PlanetilerConfig; import com.onthegomap.planetiler.geo.GeoUtils; import com.onthegomap.planetiler.util.BuildInfo; import com.onthegomap.planetiler.util.LayerStats; +import java.io.IOException; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -37,7 +44,8 @@ public record TileArchiveMetadata( @JsonProperty(MINZOOM_KEY) Integer minzoom, @JsonProperty(MAXZOOM_KEY) Integer maxzoom, @JsonIgnore List vectorLayers, - @JsonAnyGetter Map others + @JsonAnyGetter @JsonDeserialize(using = EmptyMapIfNullDeserializer.class) Map others, + @JsonProperty(COMPRESSION_KEY) TileCompression tileCompression ) { public static final String NAME_KEY = "name"; @@ -52,6 +60,7 @@ public record TileArchiveMetadata( public static final String MINZOOM_KEY = "minzoom"; public static final String MAXZOOM_KEY = "maxzoom"; public static final String VECTOR_LAYERS_KEY = "vector_layers"; + public static final String COMPRESSION_KEY = "compression"; public static final String MVT_FORMAT = "pbf"; @@ -78,7 +87,8 @@ public record TileArchiveMetadata( config.minzoom(), config.maxzoom(), vectorLayers, - mapWithBuildInfo() + mapWithBuildInfo(), + config.tileCompression() ); } @@ -137,6 +147,30 @@ public record TileArchiveMetadata( /** Returns a copy of this instance with {@link #vectorLayers} set to {@code layerStats}. */ public TileArchiveMetadata withLayerStats(List layerStats) { return new TileArchiveMetadata(name, description, attribution, version, type, format, bounds, center, zoom, minzoom, - maxzoom, layerStats, others); + maxzoom, layerStats, others, tileCompression); + } + + /* + * few workarounds to make collect unknown fields to others work, + * because @JsonAnySetter does not yet work on constructor/creator arguments + * https://github.com/FasterXML/jackson-databind/issues/3439 + */ + + @JsonAnySetter + private void putUnknownFieldsToOthers(String name, String value) { + others.put(name, value); + } + + private static class EmptyMapIfNullDeserializer extends JsonDeserializer> { + @SuppressWarnings("unchecked") + @Override + public Map deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + return p.readValueAs(HashMap.class); + } + + @Override + public Map getNullValue(DeserializationContext ctxt) { + return new HashMap<>(); + } } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java index 27a34144..93afc98c 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java @@ -91,6 +91,7 @@ public class TileArchiveWriter { int readThreads = config.featureReadThreads(); int threads = config.threads(); int processThreads = threads < 10 ? threads : threads - readThreads; + int tileWriteThreads = config.tileWriteThreads(); // when using more than 1 read thread: (N read threads) -> (1 merge thread) -> ... // when using 1 read thread we just have: (1 read & merge thread) -> ... @@ -127,6 +128,11 @@ public class TileArchiveWriter { * To emit tiles in order, fork the input queue and send features to both the encoder and writer. The writer * waits on them to be encoded in the order they were received, and the encoder processes them in parallel. * One batch might take a long time to process, so make the queues very big to avoid idle encoding CPUs. + * + * Note: + * In the future emitting tiles out order might be especially interesting when tileWriteThreads>1, + * since when multiple threads/files are included there's no order that needs to be preserved. + * So some of the restrictions could be lifted then. */ WorkQueue writerQueue = new WorkQueue<>("archive_writer_queue", queueSize, 1, stats); encodeBranch = pipeline @@ -144,8 +150,7 @@ public class TileArchiveWriter { // the tile writer will wait on the result of each batch to ensure tiles are written in order writeBranch = pipeline.readFromQueue(writerQueue) - // use only 1 thread since tileWriter needs to be single-threaded - .sinkTo("write", 1, writer::tileWriter); + .sinkTo("write", tileWriteThreads, writer::tileWriter); var loggers = ProgressLoggers.create() .addRatePercentCounter("features", features.numFeaturesWritten(), writer.featuresProcessed, true) @@ -251,7 +256,11 @@ public class TileArchiveWriter { bytes = null; } else { encoded = en.encode(); - bytes = gzip(encoded); + bytes = switch (config.tileCompression()) { + case GZIP -> gzip(encoded); + case NONE -> encoded; + case UNKNWON -> throw new IllegalArgumentException("cannot compress \"UNKNOWN\""); + }; if (encoded.length > config.tileWarningSizeBytes()) { LOGGER.warn("{} {}kb uncompressed", tileFeatures.tileCoord(), diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchives.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchives.java index fdd4617b..232308a2 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchives.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchives.java @@ -4,6 +4,10 @@ import com.onthegomap.planetiler.config.PlanetilerConfig; import com.onthegomap.planetiler.mbtiles.Mbtiles; import com.onthegomap.planetiler.pmtiles.ReadablePmtiles; import com.onthegomap.planetiler.pmtiles.WriteablePmtiles; +import com.onthegomap.planetiler.stream.StreamArchiveConfig; +import com.onthegomap.planetiler.stream.WriteableCsvArchive; +import com.onthegomap.planetiler.stream.WriteableJsonStreamArchive; +import com.onthegomap.planetiler.stream.WriteableProtoStreamArchive; import java.io.IOException; import java.nio.file.Path; @@ -39,12 +43,19 @@ public class TileArchives { public static WriteableTileArchive newWriter(TileArchiveConfig archive, PlanetilerConfig config) throws IOException { var options = archive.applyFallbacks(config.arguments()); - return switch (archive.format()) { + var format = archive.format(); + return switch (format) { case MBTILES -> // pass-through legacy arguments for fallback Mbtiles.newWriteToFileDatabase(archive.getLocalPath(), options.orElse(config.arguments() .subset(Mbtiles.LEGACY_VACUUM_ANALYZE, Mbtiles.LEGACY_COMPACT_DB, Mbtiles.LEGACY_SKIP_INDEX_CREATION))); case PMTILES -> WriteablePmtiles.newWriteToFile(archive.getLocalPath()); + case CSV, TSV -> WriteableCsvArchive.newWriteToFile(format, archive.getLocalPath(), + new StreamArchiveConfig(config, options)); + case PROTO, PBF -> WriteableProtoStreamArchive.newWriteToFile(archive.getLocalPath(), + new StreamArchiveConfig(config, options)); + case JSON -> WriteableJsonStreamArchive.newWriteToFile(archive.getLocalPath(), + new StreamArchiveConfig(config, options)); }; } @@ -59,6 +70,9 @@ public class TileArchives { return switch (archive.format()) { case MBTILES -> Mbtiles.newReadOnlyDatabase(archive.getLocalPath(), options); case PMTILES -> ReadablePmtiles.newReadFromFile(archive.getLocalPath()); + case CSV, TSV -> throw new UnsupportedOperationException("reading CSV is not supported"); + case PROTO, PBF -> throw new UnsupportedOperationException("reading PROTO is not supported"); + case JSON -> throw new UnsupportedOperationException("reading JSON is not supported"); }; } @@ -71,5 +85,4 @@ public class TileArchives { public static WriteableTileArchive newWriter(Path path, PlanetilerConfig config) throws IOException { return newWriter(path.toString(), config); } - } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileCompression.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileCompression.java new file mode 100644 index 00000000..266c9621 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileCompression.java @@ -0,0 +1,46 @@ +package com.onthegomap.planetiler.archive; + +import com.fasterxml.jackson.annotation.JsonEnumDefaultValue; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public enum TileCompression { + + @JsonProperty("none") + NONE("none"), + @JsonProperty("gzip") + GZIP("gzip"), + @JsonProperty("unknown") @JsonEnumDefaultValue + UNKNWON("unknown"); + + private final String id; + + TileCompression(String id) { + this.id = id; + } + + public static TileCompression fromId(String id) { + return findById(id) + .orElseThrow(() -> new IllegalArgumentException("invalid compression ID; expected one of " + + Stream.of(TileCompression.values()).map(TileCompression::id).toList())); + } + + public static Optional findById(String id) { + return availableValues() + .stream() + .filter(tdc -> tdc.id().equals(id)) + .findFirst(); + } + + public static Set availableValues() { + return Arrays.stream(TileCompression.values()).filter(tc -> tc != UNKNWON).collect(Collectors.toUnmodifiableSet()); + } + + public String id() { + return id; + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java index e7ea24a4..ea2ba385 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java @@ -1,5 +1,7 @@ package com.onthegomap.planetiler.config; +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.collection.LongLongMap; import com.onthegomap.planetiler.collection.Storage; import com.onthegomap.planetiler.reader.osm.PolyFileReader; @@ -20,11 +22,13 @@ public record PlanetilerConfig( int featureWriteThreads, int featureProcessThreads, int featureReadThreads, + int tileWriteThreads, Duration logInterval, int minzoom, int maxzoom, int maxzoomForRendering, boolean force, + boolean append, boolean gzipTempStorage, boolean mmapTempStorage, int sortMaxReaders, @@ -48,7 +52,8 @@ public record PlanetilerConfig( boolean skipFilledTiles, int tileWarningSizeBytes, Boolean color, - boolean keepUnzippedSources + boolean keepUnzippedSources, + TileCompression tileCompression ) { public static final int MIN_MINZOOM = 0; @@ -119,11 +124,19 @@ public record PlanetilerConfig( featureProcessThreads, arguments.getInteger("feature_read_threads", "number of threads to use when reading features at tile write time", threads < 32 ? 1 : 2), + arguments.getInteger("tile_write_threads", + "number of threads used to write tiles - only supported by " + Stream.of(TileArchiveConfig.Format.values()) + .filter(TileArchiveConfig.Format::supportsConcurrentWrites).map(TileArchiveConfig.Format::id).toList(), + 1), arguments.getDuration("loginterval", "time between logs", "10s"), minzoom, maxzoom, renderMaxzoom, arguments.getBoolean("force", "overwriting output file and ignore disk/RAM warnings", false), + arguments.getBoolean("append", + "append to the output file - only supported by " + Stream.of(TileArchiveConfig.Format.values()) + .filter(TileArchiveConfig.Format::supportsAppend).map(TileArchiveConfig.Format::id).toList(), + false), arguments.getBoolean("gzip_temp", "gzip temporary feature storage (uses more CPU, but less disk space)", false), arguments.getBoolean("mmap_temp", "use memory-mapped IO for temp feature files", true), arguments.getInteger("sort_max_readers", "maximum number of concurrent read threads to use when sorting chunks", @@ -172,7 +185,12 @@ public record PlanetilerConfig( 1d) * 1024 * 1024), arguments.getBooleanObject("color", "Color the terminal output"), arguments.getBoolean("keep_unzipped", - "keep unzipped sources by default after reading", false) + "keep unzipped sources by default after reading", false), + TileCompression + .fromId(arguments.getString("tile_compression", + "the tile compression, one of " + + TileCompression.availableValues().stream().map(TileCompression::id).toList(), + "gzip")) ); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/Mbtiles.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/Mbtiles.java index 723f6900..60169278 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/Mbtiles.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/mbtiles/Mbtiles.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.onthegomap.planetiler.archive.ReadableTileArchive; import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.archive.TileEncodingResult; import com.onthegomap.planetiler.archive.WriteableTileArchive; import com.onthegomap.planetiler.config.Arguments; @@ -888,6 +889,13 @@ public final class Mbtiles implements WriteableTileArchive, ReadableTileArchive * specification */ public Metadata set(TileArchiveMetadata tileArchiveMetadata) { + + final TileCompression tileCompression = tileArchiveMetadata.tileCompression(); + if (tileCompression != null && tileCompression != TileCompression.GZIP) { + LOGGER.warn("will use {} for tile compression, but the mbtiles specification actually requires gzip", + tileCompression.id()); + } + var map = new LinkedHashMap<>(tileArchiveMetadata.toMap()); setMetadata(TileArchiveMetadata.FORMAT_KEY, tileArchiveMetadata.format()); @@ -929,6 +937,16 @@ public final class Mbtiles implements WriteableTileArchive, ReadableTileArchive String[] center = map.containsKey(TileArchiveMetadata.CENTER_KEY) ? map.remove(TileArchiveMetadata.CENTER_KEY).split(",") : null; var metadataJson = MetadataJson.fromJson(map.remove("json")); + + + String tileCompressionRaw = map.remove(TileArchiveMetadata.COMPRESSION_KEY); + TileCompression tileCompression = tileCompressionRaw == null ? TileCompression.GZIP : + TileCompression.findById(tileCompressionRaw).orElseGet(() -> { + LOGGER.warn("unknown tile compression {}", tileCompressionRaw); + return TileCompression.UNKNWON; + }); + + return new TileArchiveMetadata( map.remove(TileArchiveMetadata.NAME_KEY), map.remove(TileArchiveMetadata.DESCRIPTION_KEY), @@ -951,7 +969,8 @@ public final class Mbtiles implements WriteableTileArchive, ReadableTileArchive Parse.parseIntOrNull(map.remove(TileArchiveMetadata.MAXZOOM_KEY)), metadataJson == null ? null : metadataJson.vectorLayers, // any left-overs: - map + map, + tileCompression ); } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/ReadablePmtiles.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/ReadablePmtiles.java index fbabc851..51dc4d5a 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/ReadablePmtiles.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/ReadablePmtiles.java @@ -2,6 +2,7 @@ package com.onthegomap.planetiler.pmtiles; import com.onthegomap.planetiler.archive.ReadableTileArchive; import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.geo.TileCoord; import com.onthegomap.planetiler.util.CloseableIterator; import com.onthegomap.planetiler.util.Gzip; @@ -115,6 +116,18 @@ public class ReadablePmtiles implements ReadableTileArchive { @Override public TileArchiveMetadata metadata() { + + TileCompression tileCompression = switch (header.tileCompression()) { + case GZIP -> TileCompression.GZIP; + case NONE -> TileCompression.NONE; + case UNKNOWN -> TileCompression.UNKNWON; + }; + + String format = switch (header.tileType()) { + case MVT -> TileArchiveMetadata.MVT_FORMAT; + default -> null; + }; + try { var jsonMetadata = getJsonMetadata(); var map = new LinkedHashMap<>(jsonMetadata.otherMetadata()); @@ -124,17 +137,15 @@ public class ReadablePmtiles implements ReadableTileArchive { map.remove(TileArchiveMetadata.ATTRIBUTION_KEY), map.remove(TileArchiveMetadata.VERSION_KEY), map.remove(TileArchiveMetadata.TYPE_KEY), - switch (header.tileType()) { - case MVT -> TileArchiveMetadata.MVT_FORMAT; - default -> null; - }, + format, header.bounds(), header.center(), (double) header.centerZoom(), (int) header.minZoom(), (int) header.maxZoom(), jsonMetadata.vectorLayers(), - map + map, + tileCompression ); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/WriteablePmtiles.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/WriteablePmtiles.java index e99b4b6b..f7be4efc 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/WriteablePmtiles.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/pmtiles/WriteablePmtiles.java @@ -150,6 +150,7 @@ public final class WriteablePmtiles implements WriteableTileArchive { otherMetadata.remove(TileArchiveMetadata.MINZOOM_KEY); otherMetadata.remove(TileArchiveMetadata.MAXZOOM_KEY); otherMetadata.remove(TileArchiveMetadata.VECTOR_LAYERS_KEY); + otherMetadata.remove(TileArchiveMetadata.COMPRESSION_KEY); byte[] jsonBytes = new Pmtiles.JsonMetadata(tileArchiveMetadata.vectorLayers(), otherMetadata).toBytes(); @@ -167,6 +168,13 @@ public final class WriteablePmtiles implements WriteableTileArchive { int maxzoom = tileArchiveMetadata.maxzoom() == null ? PlanetilerConfig.MAX_MAXZOOM : tileArchiveMetadata.maxzoom(); + + Pmtiles.Compression tileCompression = switch (tileArchiveMetadata.tileCompression()) { + case GZIP -> Pmtiles.Compression.GZIP; + case NONE -> Pmtiles.Compression.NONE; + default -> Pmtiles.Compression.UNKNOWN; + }; + Pmtiles.Header header = new Pmtiles.Header( (byte) 3, Pmtiles.HEADER_LEN, @@ -182,7 +190,7 @@ public final class WriteablePmtiles implements WriteableTileArchive { hashToOffset.size() + numUnhashedTiles, isClustered, Pmtiles.Compression.GZIP, - Pmtiles.Compression.GZIP, + tileCompression, outputFormat, (byte) minzoom, (byte) maxzoom, diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveConfig.java new file mode 100644 index 00000000..6f85ccb5 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveConfig.java @@ -0,0 +1,10 @@ +package com.onthegomap.planetiler.stream; + +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.config.PlanetilerConfig; + +public record StreamArchiveConfig(boolean appendToFile, Arguments moreOptions) { + public StreamArchiveConfig(PlanetilerConfig planetilerConfig, Arguments moreOptions) { + this(planetilerConfig.append(), moreOptions); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveUtils.java new file mode 100644 index 00000000..8d23ef6f --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/StreamArchiveUtils.java @@ -0,0 +1,52 @@ +package com.onthegomap.planetiler.stream; + +import com.google.common.net.UrlEscapers; +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.config.Arguments; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.text.StringEscapeUtils; + +public final class StreamArchiveUtils { + + private static final Pattern quotedPattern = Pattern.compile("^'(.+?)'$"); + + private StreamArchiveUtils() {} + + public static Path constructIndexedPath(Path basePath, int index) { + return index == 0 ? basePath : Paths.get(basePath.toString() + index); + } + + static String getEscapedString(Arguments options, TileArchiveConfig.Format format, String key, + String descriptionPrefix, String defaultValue, List examples) { + + final String cliKey = format.id() + "_" + key; + + final String fullDescription = descriptionPrefix + + " - pass it as option: " + + examples.stream().map(e -> "%s=%s".formatted(cliKey, escapeJava(e))).collect(Collectors.joining(" | ")) + + ", or append to the file: " + + examples.stream().map(e -> "?%s=%s".formatted(key, escapeJavaUri(e))).collect(Collectors.joining(" | ")); + + final String rawOptionValue = options.getString(key, fullDescription, defaultValue); + return quotedPattern.matcher(rawOptionValue) + // allow values to be wrapped by single quotes => allows to pass a space which otherwise gets trimmed + .replaceAll("$1") + // \n -> newline... + .translateEscapes(); + } + + private static String escapeJava(String s) { + if (!s.trim().equals(s)) { + s = "'" + s + "'"; + } + return StringEscapeUtils.escapeJava(s); + } + + private static String escapeJavaUri(String s) { + return UrlEscapers.urlFormParameterEscaper().escape(escapeJava(s)); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableCsvArchive.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableCsvArchive.java new file mode 100644 index 00000000..5f9d549e --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableCsvArchive.java @@ -0,0 +1,181 @@ +package com.onthegomap.planetiler.stream; + +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.geo.TileCoord; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Base64; +import java.util.HexFormat; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Writes tile data into a CSV file (or pipe). + *

+ * A simple (not very efficient) upload to S3 using minio client could look as follows: + * + *

+ * mkfifo /tmp/data/output.csv
+ * # now run planetiler with the options --append --output=/tmp/data/output.csv
+ *
+ * # ... and start a script to upload data
+ * #! /bin/bash
+ * while IFS="," read -r x y z encoded
+ * do
+ *   echo "pushing tile z=$z x=$x y=$y"
+ *   # echo $encoded | base64 -d | gzip -d | aws s3 cp - s3://BUCKET/map/$z/$x/$y.pbf --content-type=application/x-protobuf
+ *   echo $encoded | base64 -d | aws s3 cp - s3://BUCKET/map/$z/$x/$y.pbf --content-type=application/x-protobuf --content-encoding=gzip
+ * done < "${1:-/dev/stdin}"
+ * 
+ * + * Loading data into mysql could be done like this: + * + *
+ * mkfifo /tmp/data/output.csv
+ * # now run planetiler with the options --append --output=/tmp/data/output.csv
+ *
+ * mysql> ...create tile(s) table
+ * mysql> LOAD DATA INFILE '/tmp/data/output.csv'
+ *  -> INTO TABLE tiles
+ *  -> FIELDS TERMINATED BY ','
+ *  -> LINES TERMINATED BY '\n'
+ *  -> (tile_column, tile_row, zoom_level, @var1)
+ *  -> SET tile_data = FROM_BASE64(@var1);
+ * 
+ * + * Loading data into postgres could be done like this: + * + *
+ * mkfifo /tmp/data/output_raw.csv
+ * mkfifo /tmp/data/output_transformed.csv
+ * # prefix hex-data with '\x' for the postgres import
+ * cat /tmp/data/output_raw.csv | sed -r 's/^([0-9]+,)([0-9]+,)([0-9]+,)(.*)$/\1\2\3\\x\4/' > /tmp/data/output_transformed.csv
+ * # now run planetiler with the options --append --output=/tmp/data/output_raw.csv --csv_binary_encoding=hex
+ * ...create tile(s) table
+ * postgres=# \copy tiles(tile_column, tile_row, zoom_level, tile_data) from /tmp/data/output_transformed.csv DELIMITER ',' CSV;
+ * 
+ * + * Check {@link WriteableStreamArchive} to see how to write to multiple files. This can be used to parallelize uploads. + */ +public final class WriteableCsvArchive extends WriteableStreamArchive { + + static final String OPTION_COLUMN_SEPARATOR = "column_separator"; + static final String OPTION_LINE_SEPARTATOR = "line_separator"; + static final String OPTION_BINARY_ENCODING = "binary_encoding"; + + private final String columnSeparator; + private final String lineSeparator; + private final Function tileDataEncoder; + + private WriteableCsvArchive(TileArchiveConfig.Format format, Path p, StreamArchiveConfig config) { + super(p, config); + final String defaultColumnSeparator = switch (format) { + case CSV -> "','"; + case TSV -> "'\\t'"; + default -> throw new IllegalArgumentException("supported formats are csv and tsv but got " + format.id()); + }; + this.columnSeparator = StreamArchiveUtils.getEscapedString(config.moreOptions(), format, + OPTION_COLUMN_SEPARATOR, "column separator", defaultColumnSeparator, List.of(",", " ")); + this.lineSeparator = StreamArchiveUtils.getEscapedString(config.moreOptions(), format, + OPTION_LINE_SEPARTATOR, "line separator", "'\\n'", List.of("\n", "\r\n")); + final BinaryEncoding binaryEncoding = BinaryEncoding.fromId(config.moreOptions().getString(OPTION_BINARY_ENCODING, + "binary (tile) data encoding - one of " + BinaryEncoding.ids(), "base64")); + this.tileDataEncoder = switch (binaryEncoding) { + case BASE64 -> Base64.getEncoder()::encodeToString; + case HEX -> HexFormat.of()::formatHex; + }; + } + + public static WriteableCsvArchive newWriteToFile(TileArchiveConfig.Format format, Path path, + StreamArchiveConfig config) { + return new WriteableCsvArchive(format, path, config); + } + + @Override + protected TileWriter newTileWriter(OutputStream outputStream) { + return new CsvTileWriter(outputStream, columnSeparator, lineSeparator, tileDataEncoder); + } + + private static class CsvTileWriter implements TileWriter { + + private final Function tileDataEncoder; + + private final Writer writer; + + private final String columnSeparator; + private final String lineSeparator; + + CsvTileWriter(Writer writer, String columnSeparator, String lineSeparator, + Function tileDataEncoder) { + this.writer = writer; + this.columnSeparator = columnSeparator; + this.lineSeparator = lineSeparator; + this.tileDataEncoder = tileDataEncoder; + + } + + CsvTileWriter(OutputStream outputStream, String columnSeparator, String lineSeparator, + Function tileDataEncoder) { + this(new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8.newEncoder())), + columnSeparator, lineSeparator, tileDataEncoder); + } + + @Override + public void write(TileEncodingResult encodingResult) { + final TileCoord coord = encodingResult.coord(); + final String tileDataEncoded = tileDataEncoder.apply(encodingResult.tileData()); + try { + // x | y | z | encoded data + writer.write("%d%s%d%s%d%s%s%s".formatted(coord.x(), columnSeparator, coord.y(), columnSeparator, coord.z(), + columnSeparator, tileDataEncoded, lineSeparator)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void close() { + try { + writer.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private enum BinaryEncoding { + + BASE64("base64"), + HEX("hex"); + + private final String id; + + private BinaryEncoding(String id) { + this.id = id; + } + + static List ids() { + return Stream.of(BinaryEncoding.values()).map(BinaryEncoding::id).toList(); + } + + static BinaryEncoding fromId(String id) { + return Stream.of(BinaryEncoding.values()) + .filter(de -> de.id().equals(id)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException( + "unexpected binary encoding - expected one of " + ids() + " but got " + id)); + } + + String id() { + return id; + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchive.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchive.java new file mode 100644 index 00000000..9ba98eb5 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchive.java @@ -0,0 +1,233 @@ +package com.onthegomap.planetiler.stream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonIncludeProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SequenceWriter; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.geo.TileCoord; +import com.onthegomap.planetiler.util.LayerStats; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.locationtech.jts.geom.CoordinateXY; +import org.locationtech.jts.geom.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes JSON-serialized tile data as well as meta data into file(s). The entries are of type + * {@link WriteableJsonStreamArchive.Entry} are separated by newline (by default). + */ +public final class WriteableJsonStreamArchive extends WriteableStreamArchive { + + private static final Logger LOGGER = LoggerFactory.getLogger(WriteableJsonStreamArchive.class); + + /** + * exposing meta data (non-tile data) might be useful for most use cases but complicates parsing for simple use cases + * => allow to output tiles, only + */ + private static final String OPTION_WRITE_TILES_ONLY = "tiles_only"; + + private static final String OPTION_ROOT_VALUE_SEPARATOR = "root_value_separator"; + + static final JsonMapper jsonMapper = JsonMapper.builder() + .serializationInclusion(Include.NON_ABSENT) + .addModule(new Jdk8Module()) + .addMixIn(TileArchiveMetadata.class, TileArchiveMetadataMixin.class) + .addMixIn(Envelope.class, EnvelopeMixin.class) + .addMixIn(CoordinateXY.class, CoordinateXYMixin.class) + .build(); + + private final boolean writeTilesOnly; + private final String rootValueSeparator; + + private WriteableJsonStreamArchive(Path p, StreamArchiveConfig config) { + super(p, config); + this.writeTilesOnly = config.moreOptions().getBoolean(OPTION_WRITE_TILES_ONLY, "write tiles, only", false); + this.rootValueSeparator = StreamArchiveUtils.getEscapedString(config.moreOptions(), TileArchiveConfig.Format.JSON, + OPTION_ROOT_VALUE_SEPARATOR, "root value separator", "'\\n'", List.of("\n", " ")); + } + + public static WriteableJsonStreamArchive newWriteToFile(Path path, StreamArchiveConfig config) { + return new WriteableJsonStreamArchive(path, config); + } + + @Override + protected TileWriter newTileWriter(OutputStream outputStream) { + return new JsonTileWriter(outputStream, rootValueSeparator); + } + + @Override + public void initialize(TileArchiveMetadata metadata) { + if (writeTilesOnly) { + return; + } + writeEntryFlush(new InitializationEntry(metadata)); + } + + @Override + public void finish(TileArchiveMetadata metadata) { + if (writeTilesOnly) { + return; + } + writeEntryFlush(new FinishEntry(metadata)); + } + + private void writeEntryFlush(Entry entry) { + try (var out = new OutputStreamWriter(getPrimaryOutputStream(), StandardCharsets.UTF_8.newEncoder())) { + jsonMapper + .writerFor(Entry.class) + .withoutFeatures(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .writeValue(out, entry); + out.write(rootValueSeparator); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static class JsonTileWriter implements TileWriter { + + private final OutputStream outputStream; + private final SequenceWriter jsonWriter; + private final String rootValueSeparator; + + JsonTileWriter(OutputStream out, String rootValueSeparator) { + this.outputStream = new BufferedOutputStream(out); + this.rootValueSeparator = rootValueSeparator; + try { + this.jsonWriter = + jsonMapper.writerFor(Entry.class).withRootValueSeparator(rootValueSeparator).writeValues(outputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void write(TileEncodingResult encodingResult) { + final TileCoord coord = encodingResult.coord(); + try { + jsonWriter.write(new TileEntry(coord.x(), coord.y(), coord.z(), encodingResult.tileData())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void close() { + UncheckedIOException flushOrWriteError = null; + try { + jsonWriter.flush(); + // jackson only handles newlines between entries but does not append one to the last one + for (byte b : rootValueSeparator.getBytes(StandardCharsets.UTF_8)) { + outputStream.write(b); + } + } catch (IOException e) { + LOGGER.warn("failed to finish writing", e); + flushOrWriteError = new UncheckedIOException(e); + } + + try { + jsonWriter.close(); + outputStream.close(); + } catch (IOException e) { + if (flushOrWriteError != null) { + e.addSuppressed(flushOrWriteError); + } + throw new UncheckedIOException(e); + } + + if (flushOrWriteError != null) { + throw flushOrWriteError; + } + } + } + + + @JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") + @JsonSubTypes({ + @Type(value = TileEntry.class, name = "tile"), + @Type(value = InitializationEntry.class, name = "initialization"), + @Type(value = FinishEntry.class, name = "finish") + }) + sealed interface Entry { + + } + + + record TileEntry(int x, int y, int z, byte[] encodedData) implements Entry { + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(encodedData); + result = prime * result + Objects.hash(x, y, z); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof TileEntry)) { + return false; + } + TileEntry other = (TileEntry) obj; + return Arrays.equals(encodedData, other.encodedData) && x == other.x && y == other.y && z == other.z; + } + + @Override + public String toString() { + return "TileEntry [x=" + x + ", y=" + y + ", z=" + z + ", encodedData=" + Arrays.toString(encodedData) + "]"; + } + } + + record InitializationEntry(TileArchiveMetadata metadata) implements Entry {} + + + record FinishEntry(TileArchiveMetadata metadata) implements Entry {} + + private interface TileArchiveMetadataMixin { + + @JsonIgnore(false) + Envelope bounds(); + + @JsonIgnore(false) + CoordinateXY center(); + + @JsonIgnore(false) + List vectorLayers(); + } + + @JsonIncludeProperties({"minX", "maxX", "minY", "maxY"}) + private abstract static class EnvelopeMixin { + @JsonCreator + EnvelopeMixin(@JsonProperty("minX") double minX, @JsonProperty("maxX") double maxX, + @JsonProperty("minY") double minY, @JsonProperty("maxY") double maxY) {} + } + + @JsonIncludeProperties({"x", "y"}) + private interface CoordinateXYMixin {} +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchive.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchive.java new file mode 100644 index 00000000..56881247 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchive.java @@ -0,0 +1,194 @@ +package com.onthegomap.planetiler.stream; + +import com.google.protobuf.ByteString; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.geo.TileCoord; +import com.onthegomap.planetiler.proto.StreamArchiveProto; +import com.onthegomap.planetiler.util.LayerStats.VectorLayer; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.function.Consumer; +import org.locationtech.jts.geom.CoordinateXY; +import org.locationtech.jts.geom.Envelope; + +/** + * Writes protobuf-serialized tile data as well as meta data into file(s). The messages are of type + * {@link StreamArchiveProto.Entry} and are length-delimited. + *

+ * Custom plugins/integrations should prefer to use this format since - given it's binary - it's the fastest to write + * and read, and once setup, it should also be the simplest to use since models and the code to parse it are generated. + * It's also the most stable and straightforward format in regards to schema evolution. + *

+ * In Java the stream could be read like this: + * + *

+ * // note: do not use nio (Files.newInputStream) for pipes
+ * try (var in = new FileInputStream(...)) {
+ *   StreamArchiveProto.Entry entry;
+ *   while ((entry = StreamArchiveProto.Entry.parseDelimitedFrom(in)) != null) {
+ *     ...
+ *   }
+ * }
+ * 
+ */ +public final class WriteableProtoStreamArchive extends WriteableStreamArchive { + + private WriteableProtoStreamArchive(Path p, StreamArchiveConfig config) { + super(p, config); + } + + public static WriteableProtoStreamArchive newWriteToFile(Path path, StreamArchiveConfig config) { + return new WriteableProtoStreamArchive(path, config); + } + + @Override + protected TileWriter newTileWriter(OutputStream outputStream) { + return new ProtoTileArchiveWriter(outputStream); + } + + @Override + public void initialize(TileArchiveMetadata metadata) { + writeEntry( + StreamArchiveProto.Entry.newBuilder() + .setInitialization( + StreamArchiveProto.InitializationEntry.newBuilder().setMetadata(toExportData(metadata)).build() + ) + .build() + ); + } + + @Override + public void finish(TileArchiveMetadata metadata) { + writeEntry( + StreamArchiveProto.Entry.newBuilder() + .setFinish( + StreamArchiveProto.FinishEntry.newBuilder().setMetadata(toExportData(metadata)).build() + ) + .build() + ); + } + + private void writeEntry(StreamArchiveProto.Entry entry) { + try { + entry.writeDelimitedTo(getPrimaryOutputStream()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static StreamArchiveProto.Metadata toExportData(TileArchiveMetadata metadata) { + var metaDataBuilder = StreamArchiveProto.Metadata.newBuilder(); + setIfNotNull(metaDataBuilder::setName, metadata.name()); + setIfNotNull(metaDataBuilder::setDescription, metadata.description()); + setIfNotNull(metaDataBuilder::setAttribution, metadata.attribution()); + setIfNotNull(metaDataBuilder::setVersion, metadata.version()); + setIfNotNull(metaDataBuilder::setType, metadata.type()); + setIfNotNull(metaDataBuilder::setFormat, metadata.format()); + setIfNotNull(metaDataBuilder::setBounds, toExportData(metadata.bounds())); + setIfNotNull(metaDataBuilder::setCenter, toExportData(metadata.center())); + setIfNotNull(metaDataBuilder::setZoom, metadata.zoom()); + setIfNotNull(metaDataBuilder::setMinZoom, metadata.minzoom()); + setIfNotNull(metaDataBuilder::setMaxZoom, metadata.maxzoom()); + final StreamArchiveProto.TileCompression tileCompression = switch (metadata.tileCompression()) { + case GZIP -> StreamArchiveProto.TileCompression.TILE_COMPRESSION_GZIP; + case NONE -> StreamArchiveProto.TileCompression.TILE_COMPRESSION_NONE; + case UNKNWON -> throw new IllegalArgumentException("should not produce \"UNKNOWN\" compression"); + }; + metaDataBuilder.setTileCompression(tileCompression); + if (metadata.vectorLayers() != null) { + metadata.vectorLayers().forEach(vl -> metaDataBuilder.addVectorLayers(toExportData(vl))); + } + if (metadata.others() != null) { + metadata.others().forEach(metaDataBuilder::putOthers); + } + + return metaDataBuilder.build(); + } + + private static StreamArchiveProto.Envelope toExportData(Envelope envelope) { + if (envelope == null) { + return null; + } + return StreamArchiveProto.Envelope.newBuilder() + .setMinX(envelope.getMinX()) + .setMaxX(envelope.getMaxX()) + .setMinY(envelope.getMinY()) + .setMaxY(envelope.getMaxY()) + .build(); + } + + private static StreamArchiveProto.CoordinateXY toExportData(CoordinateXY coord) { + if (coord == null) { + return null; + } + return StreamArchiveProto.CoordinateXY.newBuilder() + .setX(coord.getX()) + .setY(coord.getY()) + .build(); + } + + private static StreamArchiveProto.VectorLayer toExportData(VectorLayer vectorLayer) { + final var builder = StreamArchiveProto.VectorLayer.newBuilder(); + builder.setId(vectorLayer.id()); + vectorLayer.fields().forEach((key, value) -> { + var exportType = switch (value) { + case NUMBER -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_NUMBER; + case BOOLEAN -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_BOOLEAN; + case STRING -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_STRING; + }; + builder.putFields(key, exportType); + }); + vectorLayer.description().ifPresent(builder::setDescription); + vectorLayer.minzoom().ifPresent(builder::setMinZoom); + vectorLayer.maxzoom().ifPresent(builder::setMaxZoom); + return builder.build(); + } + + private static void setIfNotNull(Consumer setter, T value) { + if (value != null) { + setter.accept(value); + } + } + + private static class ProtoTileArchiveWriter implements TileWriter { + + private final OutputStream out; + + ProtoTileArchiveWriter(OutputStream out) { + this.out = out; + } + + @Override + public void write(TileEncodingResult encodingResult) { + final TileCoord coord = encodingResult.coord(); + final StreamArchiveProto.TileEntry tile = StreamArchiveProto.TileEntry.newBuilder() + .setZ(coord.z()) + .setX(coord.x()) + .setY(coord.y()) + .setEncodedData(ByteString.copyFrom(encodingResult.tileData())) + .build(); + + final StreamArchiveProto.Entry entry = StreamArchiveProto.Entry.newBuilder() + .setTile(tile) + .build(); + + try { + entry.writeDelimitedTo(out); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void close() { + try { + out.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableStreamArchive.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableStreamArchive.java new file mode 100644 index 00000000..deafe4ad --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stream/WriteableStreamArchive.java @@ -0,0 +1,117 @@ +package com.onthegomap.planetiler.stream; + +import com.onthegomap.planetiler.archive.WriteableTileArchive; +import com.onthegomap.planetiler.geo.TileOrder; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.logging.log4j.core.util.CloseShieldOutputStream; + +/** + * Base archive for all kinds of simple file streams. This is primarily useful when the file is a named pipe. In that + * case data can directly be transformed and consumed by other programs. + *

+ * Writing can be parallelized across multiple files (tile_write_threads). For the first file the base path is used. For + * consecutive files 1, 2, ... is appended to the base bath. + * + *

+ * # create the pipes
+ * mkfifo /tmp/data/output.csv
+ * mkfifo /tmp/data/output.csv1
+ * mkfifo /tmp/data/output.csv2
+ * # start the consumers
+ * consumer_program < /tmp/data/output.csv
+ * consumer_program < /tmp/data/output.csv1
+ * consumer_program < /tmp/data/output.csv2
+ *
+ * # now run planetiler with the options --append --output=/tmp/data/output.csv --tile_write_threads=3
+ * 
+ */ +abstract class WriteableStreamArchive implements WriteableTileArchive { + + private final OutputStream primaryOutputStream; + private final OutputStreamSupplier outputStreamFactory; + @SuppressWarnings("unused") + private final StreamArchiveConfig config; + + private final AtomicInteger tileWriterCounter = new AtomicInteger(0); + + private WriteableStreamArchive(OutputStreamSupplier outputStreamFactory, StreamArchiveConfig config) { + this.outputStreamFactory = outputStreamFactory; + this.config = config; + + this.primaryOutputStream = outputStreamFactory.newOutputStream(0); + } + + protected WriteableStreamArchive(Path p, StreamArchiveConfig config) { + this(new FileOutputStreamSupplier(p, config.appendToFile()), config); + } + + @Override + public final void close() throws IOException { + primaryOutputStream.close(); + } + + @Override + public boolean deduplicates() { + return false; + } + + @Override + public TileOrder tileOrder() { + return TileOrder.TMS; + } + + @Override + public final TileWriter newTileWriter() { + final int tileWriterIndex = tileWriterCounter.getAndIncrement(); + if (tileWriterIndex == 0) { + return newTileWriter(getPrimaryOutputStream()); + } else { + return newTileWriter(outputStreamFactory.newOutputStream(tileWriterIndex)); + } + + } + + protected abstract TileWriter newTileWriter(OutputStream outputStream); + + protected final OutputStream getPrimaryOutputStream() { + /* + * the outputstream of the first writer must be closed by the archive and not the tile writer + * since the primary stream can be used to send meta data, as well + */ + return new CloseShieldOutputStream(primaryOutputStream); + } + + @FunctionalInterface + private interface OutputStreamSupplier { + OutputStream newOutputStream(int index); + } + + private static class FileOutputStreamSupplier implements OutputStreamSupplier { + + private final Path basePath; + private final OpenOption[] openOptions; + + FileOutputStreamSupplier(Path basePath, boolean append) { + this.basePath = basePath; + this.openOptions = + new OpenOption[]{StandardOpenOption.WRITE, append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE_NEW}; + } + + @Override + public OutputStream newOutputStream(int index) { + final Path p = StreamArchiveUtils.constructIndexedPath(basePath, index); + try { + return Files.newOutputStream(p, openOptions); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/planetiler-core/src/main/proto/stream_archive_proto.proto b/planetiler-core/src/main/proto/stream_archive_proto.proto new file mode 100644 index 00000000..b33d94ae --- /dev/null +++ b/planetiler-core/src/main/proto/stream_archive_proto.proto @@ -0,0 +1,79 @@ + +syntax = "proto3"; + +package com.onthegomap.planetiler.proto; + +message Entry { + oneof entry { + TileEntry tile = 1; + InitializationEntry initialization = 2; + FinishEntry finish = 3; + } +} + +message TileEntry { + int32 x = 1; + int32 y = 2; + int32 z = 3; + bytes encoded_data = 4; +} + +message InitializationEntry { + Metadata metadata = 1; +} + +message FinishEntry { + Metadata metadata = 1; +} + +message Metadata { + + string name = 1; + string description = 2; + string attribution = 3; + string version = 4; + string type = 5; + string format = 6; + Envelope bounds = 7; + CoordinateXY center = 8; + double zoom = 9; + int32 min_zoom = 10; + int32 max_zoom = 11; + repeated VectorLayer vector_layers = 12; + map others = 13; + TileCompression tile_compression = 14; +} + +message Envelope { + double min_x = 1; + double max_x = 2; + double min_y = 3; + double max_y = 4; +} + +message CoordinateXY { + double x = 1; + double y = 2; +} + +message VectorLayer { + string id = 1; + map fields = 2; + string description = 3; + int32 min_zoom = 4; + int32 max_zoom = 5; + + enum FieldType { + FIELD_TYPE_UNSPECIFIED = 0; + FIELD_TYPE_NUMBER = 1; + FIELD_TYPE_BOOLEAN = 3; + FIELD_TYPE_STRING = 4; + } +} + +enum TileCompression { + TILE_COMPRESSION_UNSPECIFIED = 0; + TILE_COMPRESSION_GZIP = 1; + TILE_COMPRESSION_NONE = 2; +} + diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java index 9843618b..cae697ef 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java @@ -3,8 +3,12 @@ package com.onthegomap.planetiler; import static com.onthegomap.planetiler.TestUtils.*; import static org.junit.jupiter.api.Assertions.*; +import com.onthegomap.planetiler.TestUtils.OsmXml; +import com.onthegomap.planetiler.archive.ReadableTileArchive; +import com.onthegomap.planetiler.archive.TileArchiveConfig; import com.onthegomap.planetiler.archive.TileArchiveMetadata; import com.onthegomap.planetiler.archive.TileArchiveWriter; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.collection.FeatureGroup; import com.onthegomap.planetiler.collection.LongLongMap; import com.onthegomap.planetiler.collection.LongLongMultimap; @@ -25,6 +29,7 @@ import com.onthegomap.planetiler.reader.osm.OsmElement; import com.onthegomap.planetiler.reader.osm.OsmReader; import com.onthegomap.planetiler.reader.osm.OsmRelationInfo; import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.stream.InMemoryStreamArchive; import com.onthegomap.planetiler.util.BuildInfo; import java.io.IOException; import java.nio.file.Files; @@ -35,6 +40,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -42,6 +48,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.DoubleStream; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -1758,6 +1765,33 @@ class PlanetilerTests { } } + private static TileArchiveConfig.Format extractFormat(String args) { + + final Optional format = Stream.of(TileArchiveConfig.Format.values()) + .filter(fmt -> args.contains("--output-format=" + fmt.id())) + .findFirst(); + + if (format.isPresent()) { + return format.get(); + } else if (args.contains("--output-format=")) { + throw new IllegalArgumentException("unhandled output format"); + } else { + return TileArchiveConfig.Format.MBTILES; + } + } + + private static TileCompression extractTileCompression(String args) { + if (args.contains("tile-compression=none")) { + return TileCompression.NONE; + } else if (args.contains("tile-compression=gzip")) { + return TileCompression.GZIP; + } else if (args.contains("tile-compression=")) { + throw new IllegalArgumentException("unhandled tile compression"); + } else { + return TileCompression.GZIP; + } + } + @ParameterizedTest @ValueSource(strings = { "", @@ -1765,12 +1799,32 @@ class PlanetilerTests { "--free-osm-after-read", "--osm-parse-node-bounds", "--output-format=pmtiles", + "--output-format=csv", + "--output-format=tsv", + "--output-format=proto", + "--output-format=pbf", + "--output-format=json", + "--tile-compression=none", + "--tile-compression=gzip" }) void testPlanetilerRunner(String args) throws Exception { - boolean pmtiles = args.contains("pmtiles"); Path originalOsm = TestUtils.pathToResource("monaco-latest.osm.pbf"); - Path output = tempDir.resolve(pmtiles ? "output.pmtiles" : "output.mbtiles"); Path tempOsm = tempDir.resolve("monaco-temp.osm.pbf"); + final TileCompression tileCompression = extractTileCompression(args); + + final TileArchiveConfig.Format format = extractFormat(args); + final Path output = tempDir.resolve("output." + format.id()); + + final ReadableTileArchiveFactory readableTileArchiveFactory = switch (format) { + case MBTILES -> Mbtiles::newReadOnlyDatabase; + case CSV -> p -> InMemoryStreamArchive.fromCsv(p, ","); + case TSV -> p -> InMemoryStreamArchive.fromCsv(p, "\t"); + case JSON -> InMemoryStreamArchive::fromJson; + case PMTILES -> ReadablePmtiles::newReadFromFile; + case PROTO, PBF -> InMemoryStreamArchive::fromProtobuf; + }; + + Files.copy(originalOsm, tempOsm); Planetiler.create(Arguments.fromArgs( ("--tmpdir=" + tempDir.resolve("data") + " " + args).split("\\s+") @@ -1795,11 +1849,9 @@ class PlanetilerTests { assertFalse(Files.exists(tempOsm)); } - try ( - var db = pmtiles ? ReadablePmtiles.newReadFromFile(output) : Mbtiles.newReadOnlyDatabase(output) - ) { + try (var db = readableTileArchiveFactory.create(output)) { int features = 0; - var tileMap = TestUtils.getTileMap(db); + var tileMap = TestUtils.getTileMap(db, tileCompression); for (var tile : tileMap.values()) { for (var feature : tile) { feature.geometry().validate(); @@ -1809,12 +1861,21 @@ class PlanetilerTests { assertEquals(11, tileMap.size(), "num tiles"); assertEquals(2146, features, "num buildings"); - assertSubmap(Map.of( - "planetiler:version", BuildInfo.get().version(), - "planetiler:osm:osmosisreplicationtime", "2021-04-21T20:21:46Z", - "planetiler:osm:osmosisreplicationseq", "2947", - "planetiler:osm:osmosisreplicationurl", "http://download.geofabrik.de/europe/monaco-updates" - ), db.metadata().toMap()); + + final boolean checkMetadata = switch (format) { + case MBTILES -> true; + case PMTILES -> true; + default -> db.metadata() != null; + }; + + if (checkMetadata) { + assertSubmap(Map.of( + "planetiler:version", BuildInfo.get().version(), + "planetiler:osm:osmosisreplicationtime", "2021-04-21T20:21:46Z", + "planetiler:osm:osmosisreplicationseq", "2947", + "planetiler:osm:osmosisreplicationurl", "http://download.geofabrik.de/europe/monaco-updates" + ), db.metadata().toMap()); + } } } @@ -2103,4 +2164,9 @@ class PlanetilerTests { assertFalse(polyResultz8.tiles.containsKey(TileCoord.ofXYZ(z8tiles * 3 / 4, z8tiles * 5 / 8, 8))); assertTrue(polyResultz8.tiles.containsKey(TileCoord.ofXYZ(z8tiles * 3 / 4, z8tiles * 7 / 8, 8))); } + + @FunctionalInterface + private interface ReadableTileArchiveFactory { + ReadableTileArchive create(Path p) throws IOException; + } } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/TestUtils.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/TestUtils.java index 2b7d3dfe..21a075d1 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/TestUtils.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/TestUtils.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.onthegomap.planetiler.archive.ReadableTileArchive; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.config.PlanetilerConfig; import com.onthegomap.planetiler.geo.GeoUtils; import com.onthegomap.planetiler.geo.GeometryException; @@ -201,10 +202,20 @@ public class TestUtils { } public static Map> getTileMap(ReadableTileArchive db) + throws IOException { + return getTileMap(db, TileCompression.GZIP); + } + + public static Map> getTileMap(ReadableTileArchive db, + TileCompression tileCompression) throws IOException { Map> tiles = new TreeMap<>(); for (var tile : getAllTiles(db)) { - var bytes = gunzip(tile.bytes()); + var bytes = switch (tileCompression) { + case GZIP -> gunzip(tile.bytes()); + case NONE -> tile.bytes(); + case UNKNWON -> throw new IllegalArgumentException("cannot decompress \"UNKNOWN\""); + }; var decoded = VectorTile.decode(bytes).stream() .map(feature -> feature(decodeSilently(feature.geometry()), feature.attrs())).toList(); tiles.put(tile.tile(), decoded); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/archive/TileArchiveMetadataTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/archive/TileArchiveMetadataTest.java index 194c050b..087efbb3 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/archive/TileArchiveMetadataTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/archive/TileArchiveMetadataTest.java @@ -59,7 +59,8 @@ class TileArchiveMetadataTest { "minzoom", "0", "maxzoom", "14", "bounds", "-73.6632,41.1274,-69.7598,43.0185", - "center", "-71.7115,42.07295" + "center", "-71.7115,42.07295", + "compression", "gzip" )), map ); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/mbtiles/MbtilesTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/mbtiles/MbtilesTest.java index 9ac149e1..4e6b71e1 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/mbtiles/MbtilesTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/mbtiles/MbtilesTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.*; import com.google.common.math.IntMath; import com.onthegomap.planetiler.TestUtils; import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.archive.TileEncodingResult; import com.onthegomap.planetiler.config.Arguments; import com.onthegomap.planetiler.geo.TileCoord; @@ -157,14 +158,56 @@ class MbtilesTest { 8, 9, List.of(new LayerStats.VectorLayer("MyLayer", Map.of())), - Map.of("other key", "other value") + Map.of("other key", "other value"), + TileCompression.GZIP )); } + @Test + void testMetadataWithoutCompressionAssumesGzip() throws IOException { + + final TileArchiveMetadata metadataIn = new TileArchiveMetadata( + "MyName", + "MyDescription", + "MyAttribution", + "MyVersion", + "baselayer", + TileArchiveMetadata.MVT_FORMAT, + new Envelope(1, 2, 3, 4), + new CoordinateXY(5, 6), + 7d, + 8, + 9, + List.of(new LayerStats.VectorLayer("MyLayer", Map.of())), + Map.of("other key", "other value"), + null + ); + + final TileArchiveMetadata expectedMetadataOut = new TileArchiveMetadata( + "MyName", + "MyDescription", + "MyAttribution", + "MyVersion", + "baselayer", + TileArchiveMetadata.MVT_FORMAT, + new Envelope(1, 2, 3, 4), + new CoordinateXY(5, 6), + 7d, + 8, + 9, + List.of(new LayerStats.VectorLayer("MyLayer", Map.of())), + Map.of("other key", "other value"), + TileCompression.GZIP + ); + + roundTripMetadata(metadataIn, expectedMetadataOut); + } + @Test void testRoundTripMinimalMetadata() throws IOException { var empty = - new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, Map.of()); + new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, Map.of(), + TileCompression.GZIP); roundTripMetadata(empty); try (Mbtiles db = Mbtiles.newInMemoryDatabase()) { db.createTablesWithoutIndexes(); @@ -173,11 +216,16 @@ class MbtilesTest { } private static void roundTripMetadata(TileArchiveMetadata metadata) throws IOException { + roundTripMetadata(metadata, metadata); + } + + private static void roundTripMetadata(TileArchiveMetadata metadata, TileArchiveMetadata expectedOut) + throws IOException { try (Mbtiles db = Mbtiles.newInMemoryDatabase()) { db.createTablesWithoutIndexes(); var metadataTable = db.metadataTable(); metadataTable.set(metadata); - assertEquals(metadata, metadataTable.get()); + assertEquals(expectedOut, metadataTable.get()); } } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/pmtiles/PmtilesTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/pmtiles/PmtilesTest.java index d5b5f008..f0a4a178 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/pmtiles/PmtilesTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/pmtiles/PmtilesTest.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.onthegomap.planetiler.Profile; import com.onthegomap.planetiler.TestUtils; import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; import com.onthegomap.planetiler.archive.TileEncodingResult; import com.onthegomap.planetiler.config.PlanetilerConfig; import com.onthegomap.planetiler.geo.TileCoord; @@ -23,6 +24,8 @@ import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.locationtech.jts.geom.CoordinateXY; import org.locationtech.jts.geom.Envelope; @@ -188,17 +191,18 @@ class PmtilesTest { writer.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 1), new byte[]{0xa, 0x2}, OptionalLong.empty())); in.finish(metadata); - var reader = new ReadablePmtiles(bytes); - var header = reader.getHeader(); - assertEquals(1, header.numAddressedTiles()); - assertEquals(1, header.numTileContents()); - assertEquals(1, header.numTileEntries()); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); - assertNull(reader.getTile(0, 0, 0)); - assertNull(reader.getTile(0, 0, 2)); + try (var reader = new ReadablePmtiles(bytes)) { + var header = reader.getHeader(); + assertEquals(1, header.numAddressedTiles()); + assertEquals(1, header.numTileContents()); + assertEquals(1, header.numTileEntries()); + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); + assertNull(reader.getTile(0, 0, 0)); + assertNull(reader.getTile(0, 0, 2)); - Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); - assertEquals(1, coordset.size()); + Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); + assertEquals(1, coordset.size()); + } } @Test @@ -216,14 +220,17 @@ class PmtilesTest { 8, 9, List.of(new LayerStats.VectorLayer("MyLayer", Map.of())), - Map.of("other key", "other value") + Map.of("other key", "other value"), + TileCompression.GZIP )); } - @Test - void testRoundtripMetadataMinimal() throws IOException { + @ParameterizedTest + @EnumSource(value = TileCompression.class, names = {"GZIP", "NONE"}) + void testRoundtripMetadataMinimal(TileCompression tileCompression) throws IOException { roundTripMetadata( - new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, Map.of()), + new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, Map.of(), + tileCompression), new TileArchiveMetadata(null, null, null, null, null, null, new Envelope(-180, 180, -85.0511287, 85.0511287), new CoordinateXY(0, 0), @@ -231,7 +238,8 @@ class PmtilesTest { 0, 15, null, - Map.of() + Map.of(), + tileCompression ) ); } @@ -250,10 +258,11 @@ class PmtilesTest { writer.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0xa, 0x2}, OptionalLong.empty())); in.finish(input); - var reader = new ReadablePmtiles(channel); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); + try (var reader = new ReadablePmtiles(channel)) { + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); - assertEquals(output, reader.metadata()); + assertEquals(output, reader.metadata()); + } } } @@ -291,17 +300,18 @@ class PmtilesTest { writer.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 2), new byte[]{0xa, 0x2}, OptionalLong.of(42))); in.finish(metadata); - var reader = new ReadablePmtiles(bytes); - var header = reader.getHeader(); - assertEquals(3, header.numAddressedTiles()); - assertEquals(1, header.numTileContents()); - assertEquals(2, header.numTileEntries()); // z0 and z1 are contiguous - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 2)); + try (var reader = new ReadablePmtiles(bytes)) { + var header = reader.getHeader(); + assertEquals(3, header.numAddressedTiles()); + assertEquals(1, header.numTileContents()); + assertEquals(2, header.numTileEntries()); // z0 and z1 are contiguous + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 2)); - Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); - assertEquals(3, coordset.size()); + Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); + assertEquals(3, coordset.size()); + } } @Test @@ -317,17 +327,18 @@ class PmtilesTest { writer.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0xa, 0x2}, OptionalLong.of(42))); in.finish(metadata); - var reader = new ReadablePmtiles(bytes); - var header = reader.getHeader(); - assertEquals(2, header.numAddressedTiles()); - assertEquals(1, header.numTileContents()); - assertEquals(2, header.numTileEntries()); - assertFalse(header.clustered()); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); - assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); + try (var reader = new ReadablePmtiles(bytes)) { + var header = reader.getHeader(); + assertEquals(2, header.numAddressedTiles()); + assertEquals(1, header.numTileContents()); + assertEquals(2, header.numTileEntries()); + assertFalse(header.clustered()); + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 0)); + assertArrayEquals(new byte[]{0xa, 0x2}, reader.getTile(0, 0, 1)); - Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); - assertEquals(2, coordset.size()); + Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); + assertEquals(2, coordset.size()); + } } @Test @@ -348,25 +359,26 @@ class PmtilesTest { } in.finish(metadata); - var reader = new ReadablePmtiles(bytes); - var header = reader.getHeader(); - assertEquals(ENTRIES, header.numAddressedTiles()); - assertEquals(ENTRIES, header.numTileContents()); - assertEquals(ENTRIES, header.numTileEntries()); - assertTrue(header.leafDirectoriesLength() > 0); + try (var reader = new ReadablePmtiles(bytes)) { + var header = reader.getHeader(); + assertEquals(ENTRIES, header.numAddressedTiles()); + assertEquals(ENTRIES, header.numTileContents()); + assertEquals(ENTRIES, header.numTileEntries()); + assertTrue(header.leafDirectoriesLength() > 0); - for (int i = 0; i < ENTRIES; i++) { - var coord = TileCoord.hilbertDecode(i); - assertArrayEquals(ByteBuffer.allocate(4).putInt(i).array(), reader.getTile(coord.x(), coord.y(), coord.z()), - "tileCoord=%s did not match".formatted(coord.toString())); - } + for (int i = 0; i < ENTRIES; i++) { + var coord = TileCoord.hilbertDecode(i); + assertArrayEquals(ByteBuffer.allocate(4).putInt(i).array(), reader.getTile(coord.x(), coord.y(), coord.z()), + "tileCoord=%s did not match".formatted(coord.toString())); + } - Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); - assertEquals(ENTRIES, coordset.size()); + Set coordset = reader.getAllTileCoords().stream().collect(Collectors.toSet()); + assertEquals(ENTRIES, coordset.size()); - for (int i = 0; i < ENTRIES; i++) { - var coord = TileCoord.hilbertDecode(i); - assertTrue(coordset.contains(coord), "tileCoord=%s not in result".formatted(coord.toString())); + for (int i = 0; i < ENTRIES; i++) { + var coord = TileCoord.hilbertDecode(i); + assertTrue(coordset.contains(coord), "tileCoord=%s not in result".formatted(coord.toString())); + } } } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/InMemoryStreamArchive.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/InMemoryStreamArchive.java new file mode 100644 index 00000000..1d30afb1 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/InMemoryStreamArchive.java @@ -0,0 +1,119 @@ +package com.onthegomap.planetiler.stream; + +import com.onthegomap.planetiler.archive.ReadableTileArchive; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.geo.TileCoord; +import com.onthegomap.planetiler.proto.StreamArchiveProto; +import com.onthegomap.planetiler.util.CloseableIterator; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +public class InMemoryStreamArchive implements ReadableTileArchive { + + private final List tileEncodings; + private final TileArchiveMetadata metadata; + + private InMemoryStreamArchive(List tileEncodings, TileArchiveMetadata metadata) { + this.tileEncodings = tileEncodings; + this.metadata = metadata; + } + + public static InMemoryStreamArchive fromCsv(Path p, String columnSepatator) throws IOException { + var base64Decoder = Base64.getDecoder(); + final List tileEncodings = new ArrayList<>(); + try (var reader = Files.newBufferedReader(p)) { + String line; + while ((line = reader.readLine()) != null) { + final String[] splits = line.split(columnSepatator); + final TileCoord tileCoord = TileCoord.ofXYZ(Integer.parseInt(splits[0]), Integer.parseInt(splits[1]), + Integer.parseInt(splits[2])); + tileEncodings.add(new TileEncodingResult(tileCoord, base64Decoder.decode(splits[3]), OptionalLong.empty())); + } + } + return new InMemoryStreamArchive(tileEncodings, null); + } + + public static InMemoryStreamArchive fromProtobuf(Path p) throws IOException { + final List tileEncodings = new ArrayList<>(); + try (var in = Files.newInputStream(p)) { + StreamArchiveProto.Entry entry; + while ((entry = StreamArchiveProto.Entry.parseDelimitedFrom(in)) != null) { + if (entry.getEntryCase() == StreamArchiveProto.Entry.EntryCase.TILE) { + final StreamArchiveProto.TileEntry tileProto = entry.getTile(); + final TileCoord tileCoord = TileCoord.ofXYZ(tileProto.getX(), tileProto.getY(), tileProto.getZ()); + tileEncodings + .add(new TileEncodingResult(tileCoord, tileProto.getEncodedData().toByteArray(), OptionalLong.empty())); + } + } + } + return new InMemoryStreamArchive(tileEncodings, null /* could add once the format is finalized*/); + } + + public static InMemoryStreamArchive fromJson(Path p) throws IOException { + final List tileEncodings = new ArrayList<>(); + final TileArchiveMetadata[] metadata = new TileArchiveMetadata[]{null}; + try (var reader = Files.newBufferedReader(p)) { + WriteableJsonStreamArchive.jsonMapper + .readerFor(WriteableJsonStreamArchive.Entry.class) + .readValues(reader) + .forEachRemaining(entry -> { + if (entry instanceof WriteableJsonStreamArchive.TileEntry te) { + final TileCoord tileCoord = TileCoord.ofXYZ(te.x(), te.y(), te.z()); + tileEncodings.add(new TileEncodingResult(tileCoord, te.encodedData(), OptionalLong.empty())); + } else if (entry instanceof WriteableJsonStreamArchive.FinishEntry fe) { + metadata[0] = fe.metadata(); + } + }); + } + return new InMemoryStreamArchive(tileEncodings, Objects.requireNonNull(metadata[0])); + } + + @Override + public void close() throws IOException {} + + @Override + public byte[] getTile(int x, int y, int z) { + + final TileCoord coord = TileCoord.ofXYZ(x, y, z); + + return tileEncodings.stream() + .filter(ter -> ter.coord().equals(coord)).findFirst() + .map(TileEncodingResult::tileData) + .orElse(null); + } + + @Override + public CloseableIterator getAllTileCoords() { + + final Iterator it = tileEncodings.iterator(); + + return new CloseableIterator() { + @Override + public TileCoord next() { + return it.next().coord(); + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public void close() {} + }; + } + + @Override + public TileArchiveMetadata metadata() { + return metadata; + } + +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/StreamArchiveUtilsTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/StreamArchiveUtilsTest.java new file mode 100644 index 00000000..7b96f9dc --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/StreamArchiveUtilsTest.java @@ -0,0 +1,34 @@ +package com.onthegomap.planetiler.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.config.Arguments; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class StreamArchiveUtilsTest { + + @ParameterizedTest + @CsvSource(value = {"a,a", "'a',a", "' ',$ $", "'\\n',$\n$"}, quoteCharacter = '$') + void testGetEscpacedString(String in, String out) { + + final Arguments options = Arguments.of(Map.of("key", in)); + + assertEquals(out, StreamArchiveUtils.getEscapedString(options, TileArchiveConfig.Format.CSV, "key", "descr.", "ex", + List.of("\n", " "))); + } + + @Test + void testConstructIndexedPath(@TempDir Path tempDir) { + final Path base = tempDir.resolve("base.test"); + assertEquals(base, StreamArchiveUtils.constructIndexedPath(base, 0)); + assertEquals(tempDir.resolve("base.test" + 1), StreamArchiveUtils.constructIndexedPath(base, 1)); + assertEquals(tempDir.resolve("base.test" + 13), StreamArchiveUtils.constructIndexedPath(base, 13)); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableCsvArchiveTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableCsvArchiveTest.java new file mode 100644 index 00000000..aef43850 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableCsvArchiveTest.java @@ -0,0 +1,174 @@ +package com.onthegomap.planetiler.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.planetiler.archive.TileArchiveConfig; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.geo.TileCoord; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +class WriteableCsvArchiveTest { + + private static final StreamArchiveConfig defaultConfig = new StreamArchiveConfig(false, Arguments.of()); + private static final TileArchiveMetadata defaultMetadata = + new TileArchiveMetadata("start", null, null, null, null, null, null, null, null, null, null, null, null, null); + + @ParameterizedTest + @EnumSource(value = TileArchiveConfig.Format.class, names = {"CSV", "TSV"}) + void testWriteToSingleFile(TileArchiveConfig.Format format, @TempDir Path tempDir) throws IOException { + + final Path csvFile = tempDir.resolve("out.csv"); + + try (var archive = WriteableCsvArchive.newWriteToFile(format, csvFile, defaultConfig)) { + archive.initialize(defaultMetadata); // ignored + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(1, 2, 3), new byte[]{1}, OptionalLong.of(1))); + } + archive.finish(defaultMetadata); + } + + final String expectedFileContent = switch (format) { + case CSV -> """ + 0,0,0,AA== + 1,2,3,AQ== + """; + case TSV -> """ + 0\t0\t0\tAA== + 1\t2\t3\tAQ== + """; + default -> throw new IllegalArgumentException("unsupported format" + format); + }; + + assertEquals(expectedFileContent, Files.readString(csvFile)); + + assertEquals(Set.of(csvFile), Files.list(tempDir).collect(Collectors.toUnmodifiableSet())); + } + + @Test + void testWriteToMultipleFiles(@TempDir Path tempDir) throws IOException { + + final Path csvFilePrimary = tempDir.resolve("out.csv"); + final Path csvFileSecondary = tempDir.resolve("out.csv1"); + final Path csvFileTertiary = tempDir.resolve("out.csv2"); + + try ( + var archive = WriteableCsvArchive.newWriteToFile(TileArchiveConfig.Format.CSV, csvFilePrimary, defaultConfig) + ) { + archive.initialize(defaultMetadata); // ignored + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(11, 12, 1), new byte[]{0}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(21, 22, 2), new byte[]{1}, OptionalLong.empty())); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(31, 32, 3), new byte[]{2}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(41, 42, 4), new byte[]{3}, OptionalLong.empty())); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(51, 55, 5), new byte[]{4}, OptionalLong.empty())); + } + archive.finish(defaultMetadata); + } + + assertEquals( + """ + 11,12,1,AA== + 21,22,2,AQ== + """, + Files.readString(csvFilePrimary) + ); + assertEquals( + """ + 31,32,3,Ag== + 41,42,4,Aw== + """, + Files.readString(csvFileSecondary) + ); + assertEquals( + """ + 51,55,5,BA== + """, + Files.readString(csvFileTertiary) + ); + + assertEquals( + Set.of(csvFilePrimary, csvFileSecondary, csvFileTertiary), + Files.list(tempDir).collect(Collectors.toUnmodifiableSet()) + ); + } + + @Test + void testColumnSeparator(@TempDir Path tempDir) throws IOException { + + final StreamArchiveConfig config = + new StreamArchiveConfig(false, Arguments.of(Map.of(WriteableCsvArchive.OPTION_COLUMN_SEPARATOR, "' '"))); + + final String expectedCsv = + """ + 0,0,0,AAE= + 1,1,1,AgM= + """.replace(',', ' '); + + testTileOptions(tempDir, config, expectedCsv); + } + + @Test + void testLineSeparator(@TempDir Path tempDir) throws IOException { + + final StreamArchiveConfig config = + new StreamArchiveConfig(false, Arguments.of(Map.of(WriteableCsvArchive.OPTION_LINE_SEPARTATOR, "'\\r'"))); + + final String expectedCsv = + """ + 0,0,0,AAE= + 1,1,1,AgM= + """.replace('\n', '\r'); + + testTileOptions(tempDir, config, expectedCsv); + } + + @Test + void testHexEncoding(@TempDir Path tempDir) throws IOException { + + final StreamArchiveConfig config = + new StreamArchiveConfig(false, Arguments.of(Map.of(WriteableCsvArchive.OPTION_BINARY_ENCODING, "hex"))); + + final String expectedCsv = + """ + 0,0,0,0001 + 1,1,1,0203 + """; + + testTileOptions(tempDir, config, expectedCsv); + } + + private void testTileOptions(Path tempDir, StreamArchiveConfig config, String expectedCsv) throws IOException { + + final Path csvFile = tempDir.resolve("out.csv"); + + try (var archive = WriteableCsvArchive.newWriteToFile(TileArchiveConfig.Format.CSV, csvFile, config)) { + archive.initialize(defaultMetadata); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0, 1}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(1, 1, 1), new byte[]{2, 3}, OptionalLong.empty())); + } + archive.finish(defaultMetadata); + } + + assertEquals(expectedCsv, Files.readString(csvFile)); + + assertEquals(Set.of(csvFile), Files.list(tempDir).collect(Collectors.toUnmodifiableSet())); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchiveTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchiveTest.java new file mode 100644 index 00000000..31ae6c24 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableJsonStreamArchiveTest.java @@ -0,0 +1,244 @@ +package com.onthegomap.planetiler.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.geo.TileCoord; +import com.onthegomap.planetiler.util.LayerStats; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.locationtech.jts.geom.CoordinateXY; +import org.locationtech.jts.geom.Envelope; + +class WriteableJsonStreamArchiveTest { + + private static final StreamArchiveConfig defaultConfig = new StreamArchiveConfig(false, Arguments.of()); + private static final TileArchiveMetadata maxMetadataIn = + new TileArchiveMetadata("name", "description", "attribution", "version", "type", "format", new Envelope(0, 1, 2, 3), + new CoordinateXY(1.3, 3.7), 1.0, 2, 3, + List.of( + new LayerStats.VectorLayer("vl0", + ImmutableMap.of("1", LayerStats.FieldType.BOOLEAN, "2", LayerStats.FieldType.NUMBER, "3", + LayerStats.FieldType.STRING), + Optional.of("description"), OptionalInt.of(1), OptionalInt.of(2)), + new LayerStats.VectorLayer("vl1", + Map.of(), + Optional.empty(), OptionalInt.empty(), OptionalInt.empty()) + ), + ImmutableMap.of("a", "b", "c", "d"), + TileCompression.GZIP); + private static final String maxMetadataOut = """ + { + "name":"name", + "description":"description", + "attribution":"attribution", + "version":"version", + "type":"type", + "format":"format", + "zoom":1.0, + "minzoom":2, + "maxzoom":3, + "compression":"gzip", + "bounds":{ + "minX":0.0, + "maxX":1.0, + "minY":2.0, + "maxY":3.0 + }, + "center":{ + "x":1.3,"y":3.7 + }, + "vectorLayers":[ + { + "id":"vl0", + "fields":{ + "1":"Boolean", + "2":"Number", + "3":"String" + }, + "description":"description", + "minzoom":1, + "maxzoom":2 + }, + { + "id":"vl1", + "fields":{} + } + ], + "a":"b", + "c":"d" + }""".lines().map(String::trim).collect(Collectors.joining("")); + + private static final TileArchiveMetadata minMetadataIn = + new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, null, null); + private static final String MIN_METADATA_OUT = "{}"; + + @Test + void testWriteToSingleFile(@TempDir Path tempDir) throws IOException { + + final Path csvFile = tempDir.resolve("out.json"); + + try (var archive = WriteableJsonStreamArchive.newWriteToFile(csvFile, defaultConfig)) { + archive.initialize(maxMetadataIn); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(1, 2, 3), new byte[]{1}, OptionalLong.of(1))); + } + archive.finish(minMetadataIn); + } + + assertEqualsDelimitedJson( + """ + {"type":"initialization","metadata":%s} + {"type":"tile","x":0,"y":0,"z":0,"encodedData":"AA=="} + {"type":"tile","x":1,"y":2,"z":3,"encodedData":"AQ=="} + {"type":"finish","metadata":%s} + """.formatted(maxMetadataOut, MIN_METADATA_OUT), + Files.readString(csvFile) + ); + + assertEquals(Set.of(csvFile), Files.list(tempDir).collect(Collectors.toUnmodifiableSet())); + } + + @Test + void testWriteToMultipleFiles(@TempDir Path tempDir) throws IOException { + + final Path csvFilePrimary = tempDir.resolve("out.json"); + final Path csvFileSecondary = tempDir.resolve("out.json1"); + final Path csvFileTertiary = tempDir.resolve("out.json2"); + + final var tile0 = new TileEncodingResult(TileCoord.ofXYZ(11, 12, 1), new byte[]{0}, OptionalLong.empty()); + final var tile1 = new TileEncodingResult(TileCoord.ofXYZ(21, 22, 2), new byte[]{1}, OptionalLong.empty()); + final var tile2 = new TileEncodingResult(TileCoord.ofXYZ(31, 32, 3), new byte[]{2}, OptionalLong.empty()); + final var tile3 = new TileEncodingResult(TileCoord.ofXYZ(41, 42, 4), new byte[]{3}, OptionalLong.empty()); + final var tile4 = new TileEncodingResult(TileCoord.ofXYZ(51, 52, 5), new byte[]{4}, OptionalLong.empty()); + try (var archive = WriteableJsonStreamArchive.newWriteToFile(csvFilePrimary, defaultConfig)) { + archive.initialize(minMetadataIn); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile0); + tileWriter.write(tile1); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile2); + tileWriter.write(tile3); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile4); + } + archive.finish(maxMetadataIn); + } + + assertEqualsDelimitedJson( + """ + {"type":"initialization","metadata":%s} + {"type":"tile","x":11,"y":12,"z":1,"encodedData":"AA=="} + {"type":"tile","x":21,"y":22,"z":2,"encodedData":"AQ=="} + {"type":"finish","metadata":%s} + """.formatted(MIN_METADATA_OUT, maxMetadataOut), + Files.readString(csvFilePrimary) + ); + + assertEqualsDelimitedJson( + """ + {"type":"tile","x":31,"y":32,"z":3,"encodedData":"Ag=="} + {"type":"tile","x":41,"y":42,"z":4,"encodedData":"Aw=="} + """, + Files.readString(csvFileSecondary) + ); + + assertEqualsDelimitedJson( + """ + {"type":"tile","x":51,"y":52,"z":5,"encodedData":"BA=="} + """, + Files.readString(csvFileTertiary) + ); + + assertEquals( + Set.of(csvFilePrimary, csvFileSecondary, csvFileTertiary), + Files.list(tempDir).collect(Collectors.toUnmodifiableSet()) + ); + } + + @Test + void testTilesOnly(@TempDir Path tempDir) throws IOException { + + final StreamArchiveConfig config = new StreamArchiveConfig(false, Arguments.of(Map.of("tiles_only", "true"))); + + final String expectedCsv = """ + {"type":"tile","x":0,"y":0,"z":0,"encodedData":"AA=="} + {"type":"tile","x":1,"y":2,"z":3,"encodedData":"AQ=="} + """; + + testTileOptions(tempDir, config, expectedCsv); + } + + @Test + void testRootValueSeparator(@TempDir Path tempDir) throws IOException { + + final StreamArchiveConfig config = + new StreamArchiveConfig(false, Arguments.of(Map.of("root_value_separator", "' '"))); + + final String expectedJson = + """ + {"type":"initialization","metadata":%s} + {"type":"tile","x":0,"y":0,"z":0,"encodedData":"AA=="} + {"type":"tile","x":1,"y":2,"z":3,"encodedData":"AQ=="} + {"type":"finish","metadata":%s} + """.formatted(MIN_METADATA_OUT, maxMetadataOut) + .replace('\n', ' '); + + testTileOptions(tempDir, config, expectedJson); + + assertFalse(Files.readString(tempDir.resolve("out.json")).contains("\n")); + } + + private void testTileOptions(Path tempDir, StreamArchiveConfig config, String expectedJson) throws IOException { + + final Path csvFile = tempDir.resolve("out.json"); + + try (var archive = WriteableJsonStreamArchive.newWriteToFile(csvFile, config)) { + archive.initialize(minMetadataIn); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0}, OptionalLong.empty())); + tileWriter.write(new TileEncodingResult(TileCoord.ofXYZ(1, 2, 3), new byte[]{1}, OptionalLong.empty())); + } + archive.finish(maxMetadataIn); + } + + assertEqualsDelimitedJson(expectedJson, Files.readString(csvFile)); + + assertEquals(Set.of(csvFile), Files.list(tempDir).collect(Collectors.toUnmodifiableSet())); + } + + private static void assertEqualsDelimitedJson(String expectedJson, String actualJson) { + assertEquals(readDelimitedNodes(expectedJson), readDelimitedNodes(actualJson)); + } + + private static List readDelimitedNodes(String json) { + try { + return ImmutableList.copyOf(new ObjectMapper().readerFor(JsonNode.class).readValues(json)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchiveTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchiveTest.java new file mode 100644 index 00000000..e30ff9a6 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/stream/WriteableProtoStreamArchiveTest.java @@ -0,0 +1,180 @@ +package com.onthegomap.planetiler.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.protobuf.ByteString; +import com.onthegomap.planetiler.archive.TileArchiveMetadata; +import com.onthegomap.planetiler.archive.TileCompression; +import com.onthegomap.planetiler.archive.TileEncodingResult; +import com.onthegomap.planetiler.geo.TileCoord; +import com.onthegomap.planetiler.proto.StreamArchiveProto; +import com.onthegomap.planetiler.util.LayerStats; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.locationtech.jts.geom.CoordinateXY; +import org.locationtech.jts.geom.Envelope; + +class WriteableProtoStreamArchiveTest { + + private static final StreamArchiveConfig defaultConfig = new StreamArchiveConfig(false, null); + private static final TileArchiveMetadata maxMetadataIn = + new TileArchiveMetadata("name", "description", "attribution", "version", "type", "format", new Envelope(0, 1, 2, 3), + new CoordinateXY(1.3, 3.7), 1.0, 2, 3, + List.of( + new LayerStats.VectorLayer("vl0", + Map.of("1", LayerStats.FieldType.BOOLEAN, "2", LayerStats.FieldType.NUMBER, "3", LayerStats.FieldType.STRING), + Optional.of("description"), OptionalInt.of(1), OptionalInt.of(2)), + new LayerStats.VectorLayer("vl1", + Map.of(), + Optional.empty(), OptionalInt.empty(), OptionalInt.empty()) + ), + Map.of("a", "b", "c", "d"), + TileCompression.GZIP); + private static final StreamArchiveProto.Metadata maxMetadataOut = StreamArchiveProto.Metadata.newBuilder() + .setName("name").setDescription("description").setAttribution("attribution").setVersion("version") + .setType("type").setFormat("format") + .setBounds(StreamArchiveProto.Envelope.newBuilder().setMinX(0).setMaxX(1).setMinY(2).setMaxY(3).build()) + .setCenter(StreamArchiveProto.CoordinateXY.newBuilder().setX(1.3).setY(3.7)) + .setZoom(1.0).setMinZoom(2).setMaxZoom(3) + .addVectorLayers( + StreamArchiveProto.VectorLayer.newBuilder() + .setId("vl0").setDescription("description").setMinZoom(1).setMaxZoom(2) + .putFields("1", StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_BOOLEAN) + .putFields("2", StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_NUMBER) + .putFields("3", StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_STRING) + .build() + ) + .addVectorLayers(StreamArchiveProto.VectorLayer.newBuilder().setId("vl1").build()) + .putOthers("a", "b").putOthers("c", "d") + .setTileCompression(StreamArchiveProto.TileCompression.TILE_COMPRESSION_GZIP) + .build(); + + private static final TileArchiveMetadata minMetadataIn = + new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, null, + TileCompression.NONE); + private static final StreamArchiveProto.Metadata minMetadataOut = StreamArchiveProto.Metadata.newBuilder() + .setTileCompression(StreamArchiveProto.TileCompression.TILE_COMPRESSION_NONE) + .build(); + + @Test + void testWriteSingleFile(@TempDir Path tempDir) throws IOException { + final Path csvFile = tempDir.resolve("out.proto"); + + final var tile0 = new TileEncodingResult(TileCoord.ofXYZ(0, 0, 0), new byte[]{0}, OptionalLong.empty()); + final var tile1 = new TileEncodingResult(TileCoord.ofXYZ(1, 2, 3), new byte[]{1}, OptionalLong.of(1)); + try (var archive = WriteableProtoStreamArchive.newWriteToFile(csvFile, defaultConfig)) { + archive.initialize(maxMetadataIn); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile0); + tileWriter.write(tile1); + } + archive.finish(minMetadataIn); + } + + try (InputStream in = Files.newInputStream(csvFile)) { + assertEquals( + List.of(wrapInit(maxMetadataOut), toEntry(tile0), toEntry(tile1), wrapFinish(minMetadataOut)), + readAllEntries(in) + ); + } + } + + @Test + void testWriteToMultipleFiles(@TempDir Path tempDir) throws IOException { + + final Path csvFilePrimary = tempDir.resolve("out.proto"); + final Path csvFileSecondary = tempDir.resolve("out.proto1"); + final Path csvFileTertiary = tempDir.resolve("out.proto2"); + + final var tile0 = new TileEncodingResult(TileCoord.ofXYZ(11, 12, 1), new byte[]{0}, OptionalLong.empty()); + final var tile1 = new TileEncodingResult(TileCoord.ofXYZ(21, 22, 2), new byte[]{1}, OptionalLong.empty()); + final var tile2 = new TileEncodingResult(TileCoord.ofXYZ(31, 32, 3), new byte[]{2}, OptionalLong.empty()); + final var tile3 = new TileEncodingResult(TileCoord.ofXYZ(41, 42, 4), new byte[]{3}, OptionalLong.empty()); + final var tile4 = new TileEncodingResult(TileCoord.ofXYZ(51, 52, 5), new byte[]{4}, OptionalLong.empty()); + try (var archive = WriteableProtoStreamArchive.newWriteToFile(csvFilePrimary, defaultConfig)) { + archive.initialize(minMetadataIn); + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile0); + tileWriter.write(tile1); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile2); + tileWriter.write(tile3); + } + try (var tileWriter = archive.newTileWriter()) { + tileWriter.write(tile4); + } + archive.finish(maxMetadataIn); + } + + try (InputStream in = Files.newInputStream(csvFilePrimary)) { + assertEquals( + List.of(wrapInit(minMetadataOut), toEntry(tile0), toEntry(tile1), wrapFinish(maxMetadataOut)), + readAllEntries(in) + ); + } + try (InputStream in = Files.newInputStream(csvFileSecondary)) { + assertEquals( + List.of(toEntry(tile2), toEntry(tile3)), + readAllEntries(in) + ); + } + try (InputStream in = Files.newInputStream(csvFileTertiary)) { + assertEquals( + List.of(toEntry(tile4)), + readAllEntries(in) + ); + } + + assertEquals( + Set.of(csvFilePrimary, csvFileSecondary, csvFileTertiary), + Files.list(tempDir).collect(Collectors.toUnmodifiableSet()) + ); + } + + private static List readAllEntries(InputStream in) throws IOException { + final List result = new ArrayList<>(); + StreamArchiveProto.Entry entry; + while ((entry = StreamArchiveProto.Entry.parseDelimitedFrom(in)) != null) { + result.add(entry); + } + return result; + } + + private static StreamArchiveProto.Entry toEntry(TileEncodingResult result) { + return StreamArchiveProto.Entry.newBuilder() + .setTile( + StreamArchiveProto.TileEntry.newBuilder() + .setZ(result.coord().z()) + .setX(result.coord().x()) + .setY(result.coord().y()) + .setEncodedData(ByteString.copyFrom(result.tileData())) + .build() + ) + .build(); + } + + private static StreamArchiveProto.Entry wrapInit(StreamArchiveProto.Metadata metadata) { + return StreamArchiveProto.Entry.newBuilder() + .setInitialization(StreamArchiveProto.InitializationEntry.newBuilder().setMetadata(metadata).build()) + .build(); + } + + private static StreamArchiveProto.Entry wrapFinish(StreamArchiveProto.Metadata metadata) { + return StreamArchiveProto.Entry.newBuilder() + .setFinish(StreamArchiveProto.FinishEntry.newBuilder().setMetadata(metadata).build()) + .build(); + } +}