Iniitial geoparquet support (#888)

pull/891/head
Michael Barry 2024-05-22 05:55:57 -04:00 zatwierdzone przez GitHub
rodzic 2b878fa9ed
commit fb1d0e3bd6
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
47 zmienionych plików z 3492 dodań i 23 usunięć

Wyświetl plik

@ -18,6 +18,7 @@ The `planetiler-core` module includes the following software:
, [EPSG](https://github.com/geotools/geotools/blob/main/licenses/EPSG.md))
- org.msgpack:msgpack-core (Apache license)
- org.xerial:sqlite-jdbc (Apache license)
- org.xerial.snappy:snappy-java (Apache license)
- com.ibm.icu:icu4j ([ICU license](https://github.com/unicode-org/icu/blob/main/icu4c/LICENSE))
- com.google.guava:guava (Apache license)
- com.google.protobuf:protobuf-java (BSD 3-Clause License)
@ -29,6 +30,7 @@ The `planetiler-core` module includes the following software:
- org.snakeyaml:snakeyaml-engine (Apache license)
- org.commonmark:commonmark (BSD 2-clause license)
- org.tukaani:xz (public domain)
- blue.strategic.parquet:parquet-floor (Apache license)
- Adapted code:
- `DouglasPeuckerSimplifier` from [JTS](https://github.com/locationtech/jts) (EDL)
- `OsmMultipolygon` from [imposm3](https://github.com/omniscale/imposm3) (Apache license)
@ -65,4 +67,5 @@ The `planetiler-core` module includes the following software:
| OSM Lakelines | [MIT](https://github.com/lukasmartinelli/osm-lakelines), data from OSM [ODBL](https://www.openstreetmap.org/copyright) | yes | no |
| OSM Water Polygons | [acknowledgement](https://osmdata.openstreetmap.de/info/license.html), data from OSM [ODBL](https://www.openstreetmap.org/copyright) | yes | yes |
| Wikidata name translations | [CCO](https://www.wikidata.org/wiki/Wikidata:Licensing) | no | no |
| Overture Maps | [Various](https://docs.overturemaps.org/attribution) | no | yes |

Wyświetl plik

@ -334,6 +334,8 @@ Planetiler is made possible by these awesome open source projects:
Google's [Common Expression Language](https://github.com/google/cel-spec) that powers dynamic expressions embedded in
schema config files.
- [PMTiles](https://github.com/protomaps/PMTiles) optimized tile storage format
- [Apache Parquet](https://github.com/apache/parquet-mr) to support reading geoparquet files in java (with dependencies
minimized by [parquet-floor](https://github.com/strategicblue/parquet-floor))
See [NOTICE.md](NOTICE.md) for a full list and license details.

Wyświetl plik

@ -0,0 +1,28 @@
package com.onthegomap.planetiler.benchmarks;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.reader.parquet.ParquetInputFile;
import java.nio.file.Path;
public class BenchmarkParquetRead {
public static void main(String[] args) {
var arguments = Arguments.fromArgs(args);
var path =
arguments.inputFile("parquet", "parquet file to read", Path.of("data", "sources", "locality.zstd.parquet"));
long c = 0;
var file = new ParquetInputFile("parquet", "locality", path, null, Bounds.WORLD, null, tags -> tags.get("id"));
for (int i = 0; i < 20; i++) {
long start = System.currentTimeMillis();
for (var block : file.get()) {
for (var item : block) {
c += item.tags().size();
}
}
System.err.println(System.currentTimeMillis() - start);
}
System.err.println(c);
}
}

Wyświetl plik

@ -154,12 +154,24 @@
<artifactId>geopackage</artifactId>
<version>${geopackage.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Pin transitive snappy dependency to more recent version without vulnerability -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>
<dependency>
<groupId>blue.strategic.parquet</groupId>
<artifactId>parquet-floor</artifactId>
<version>1.41</version>
</dependency>
</dependencies>
<build>

Wyświetl plik

@ -13,9 +13,11 @@ import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.GeoPackageReader;
import com.onthegomap.planetiler.reader.NaturalEarthReader;
import com.onthegomap.planetiler.reader.ShapefileReader;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmNodeBoundsProvider;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.reader.parquet.ParquetReader;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timers;
@ -39,9 +41,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -469,6 +474,53 @@ public class Planetiler {
config, profile, stats, keepUnzipped)));
}
/**
* Adds a new <a href="https://github.com/opengeospatial/geoparquet">geoparquet</a> source that will be processed when
* {@link #run()} is called.
*
* @param name string to use in stats and logs to identify this stage
* @param paths paths to the geoparquet files to read.
* @param hivePartitioning Set to true to parse extra feature tags from the file path, for example
* {@code {them="buildings", type="part"}} from
* {@code base/theme=buildings/type=part/file.parquet}
* @param getId function that extracts a unique vector tile feature ID from each input feature, string or
* binary features will be hashed to a {@code long}.
* @param getLayer function that extracts {@link SourceFeature#getSourceLayer()} from the properties of each
* input feature
* @return this runner instance for chaining
* @see GeoPackageReader
*/
public Planetiler addParquetSource(String name, List<Path> paths, boolean hivePartitioning,
Function<Map<String, Object>, Object> getId, Function<Map<String, Object>, Object> getLayer) {
// TODO handle auto-downloading
for (var path : paths) {
inputPaths.add(new InputPath(name, path, false));
}
var separator = Pattern.quote(paths.isEmpty() ? "/" : paths.getFirst().getFileSystem().getSeparator());
String prefix = StringUtils.getCommonPrefix(paths.stream().map(Path::toString).toArray(String[]::new))
.replaceAll(separator + "[^" + separator + "]*$", "");
return addStage(name, "Process features in " + (prefix.isEmpty() ? (paths.size() + " files") : prefix),
ifSourceUsed(name, () -> new ParquetReader(name, profile, stats, getId, getLayer, hivePartitioning)
.process(paths, featureGroup, config)));
}
/**
* Alias for {@link #addParquetSource(String, List, boolean, Function, Function)} using the default layer and ID
* extractors.
*/
public Planetiler addParquetSource(String name, List<Path> paths, boolean hivePartitioning) {
return addParquetSource(name, paths, hivePartitioning, null, null);
}
/**
* Alias for {@link #addParquetSource(String, List, boolean, Function, Function)} without hive partitioning and using
* the default layer and ID extractors.
*/
public Planetiler addParquetSource(String name, List<Path> paths) {
return addParquetSource(name, paths, false);
}
/**
* Adds a new stage that will be invoked when {@link #run()} is called.
*

Wyświetl plik

@ -17,6 +17,8 @@ import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.LineSegment;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.geom.LinearRing;
import org.locationtech.jts.geom.MultiLineString;
import org.locationtech.jts.geom.MultiPoint;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
@ -27,6 +29,7 @@ import org.locationtech.jts.geom.impl.PackedCoordinateSequenceFactory;
import org.locationtech.jts.geom.util.GeometryFixer;
import org.locationtech.jts.geom.util.GeometryTransformer;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKTReader;
import org.locationtech.jts.precision.GeometryPrecisionReducer;
/**
@ -40,8 +43,9 @@ public class GeoUtils {
/** Rounding precision for 256x256px tiles encoded using 4096 values. */
public static final PrecisionModel TILE_PRECISION = new PrecisionModel(4096d / 256d);
public static final GeometryFactory JTS_FACTORY = new GeometryFactory(PackedCoordinateSequenceFactory.DOUBLE_FACTORY);
public static final WKBReader WKB_READER = new WKBReader(JTS_FACTORY);
public static final Geometry EMPTY_GEOMETRY = JTS_FACTORY.createGeometryCollection();
public static final CoordinateSequence EMPTY_COORDINATE_SEQUENCE = new PackedCoordinateSequence.Double(0, 2, 0);
public static final Point EMPTY_POINT = JTS_FACTORY.createPoint();
public static final LineString EMPTY_LINE = JTS_FACTORY.createLineString();
public static final Polygon EMPTY_POLYGON = JTS_FACTORY.createPolygon();
@ -247,11 +251,11 @@ public class GeoUtils {
return JTS_FACTORY.createPoint(coord);
}
public static Geometry createMultiLineString(List<LineString> lineStrings) {
public static MultiLineString createMultiLineString(List<LineString> lineStrings) {
return JTS_FACTORY.createMultiLineString(lineStrings.toArray(EMPTY_LINE_STRING_ARRAY));
}
public static Geometry createMultiPolygon(List<Polygon> polygon) {
public static MultiPolygon createMultiPolygon(List<Polygon> polygon) {
return JTS_FACTORY.createMultiPolygon(polygon.toArray(EMPTY_POLYGON_ARRAY));
}
@ -370,7 +374,7 @@ public class GeoUtils {
return new PackedCoordinateSequence.Double(coords, 2, 0);
}
public static Geometry createMultiPoint(List<Point> points) {
public static MultiPoint createMultiPoint(List<Point> points) {
return JTS_FACTORY.createMultiPoint(points.toArray(EMPTY_POINT_ARRAY));
}
@ -548,6 +552,14 @@ public class GeoUtils {
PlanetilerConfig.MAX_MAXZOOM);
}
public static WKBReader wkbReader() {
return new WKBReader(JTS_FACTORY);
}
public static WKTReader wktReader() {
return new WKTReader(JTS_FACTORY);
}
/** Helper class to sort polygons by area of their outer shell. */
private record PolyAndArea(Polygon poly, double area) implements Comparable<PolyAndArea> {

Wyświetl plik

@ -172,6 +172,7 @@ public class NaturalEarthReader extends SimpleReader<SimpleFeature> {
}
}
if (geometryColumn >= 0) {
var wkbReader = GeoUtils.wkbReader();
while (rs.next()) {
byte[] geometry = rs.getBytes(geometryColumn + 1);
if (geometry == null) {
@ -179,7 +180,7 @@ public class NaturalEarthReader extends SimpleReader<SimpleFeature> {
}
// create the feature and pass to next stage
Geometry latLonGeometry = GeoUtils.WKB_READER.read(geometry);
Geometry latLonGeometry = wkbReader.read(geometry);
SimpleFeature readerGeometry = SimpleFeature.create(latLonGeometry, HashMap.newHashMap(column.length - 1),
sourceName, table, ++id);
for (int c = 0; c < column.length; c++) {

Wyświetl plik

@ -0,0 +1,104 @@
package com.onthegomap.planetiler.reader.parquet;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.geom.LinearRing;
import org.locationtech.jts.geom.MultiLineString;
import org.locationtech.jts.geom.MultiPoint;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.geom.impl.PackedCoordinateSequence;
/**
* Utilities for converting nested <a href=
* "https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md#native-encodings-based-on-geoarrow">geoarrow</a>
* coordinate lists to JTS geometries.
*/
class GeoArrow {
private GeoArrow() {}
// TODO create packed coordinate arrays while reading parquet values to avoid creating so many intermediate objects
static MultiPolygon multipolygon(List<List<List<Object>>> list) {
return GeoUtils.createMultiPolygon(map(list, GeoArrow::polygon));
}
static Polygon polygon(List<List<Object>> input) {
return GeoUtils.createPolygon(ring(input.getFirst()), input.stream().skip(1).map(GeoArrow::ring).toList());
}
static MultiPoint multipoint(List<Object> input) {
return GeoUtils.createMultiPoint(map(input, GeoArrow::point));
}
static Point point(Object input) {
int dims = input instanceof List<?> l ? l.size() : input instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(1, dims, dims == 4 ? 1 : 0);
coordinate(input, result, 0);
return GeoUtils.JTS_FACTORY.createPoint(result);
}
static MultiLineString multilinestring(List<List<Object>> input) {
return GeoUtils.createMultiLineString(map(input, GeoArrow::linestring));
}
static LineString linestring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLineString(coordinateSequence(input));
}
private static CoordinateSequence coordinateSequence(List<Object> input) {
if (input.isEmpty()) {
return GeoUtils.EMPTY_COORDINATE_SEQUENCE;
}
Object first = input.getFirst();
int dims = first instanceof List<?> l ? l.size() : first instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(input.size(), dims, dims == 4 ? 1 : 0);
for (int i = 0; i < input.size(); i++) {
Object item = input.get(i);
coordinate(item, result, i);
}
return result;
}
private static LinearRing ring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLinearRing(coordinateSequence(input));
}
private static void coordinate(Object input, CoordinateSequence result, int index) {
switch (input) {
case List<?> list -> {
List<Number> l = (List<Number>) list;
for (int i = 0; i < l.size(); i++) {
result.setOrdinate(index, i, l.get(i).doubleValue());
}
}
case Map<?, ?> map -> {
Map<String, Number> m = (Map<String, Number>) map;
for (var entry : m.entrySet()) {
int ordinateIndex = switch (entry.getKey()) {
case "x" -> 0;
case "y" -> 1;
case "z" -> 2;
case "m" -> 3;
case null, default -> throw new IllegalArgumentException("Bad coordinate key: " + entry.getKey());
};
result.setOrdinate(index, ordinateIndex, entry.getValue().doubleValue());
}
}
default -> throw new IllegalArgumentException("Expecting map or list, got: " + input);
}
}
private static <I, O> List<O> map(List<I> in, Function<I, O> remap) {
return in.stream().map(remap).toList();
}
}

Wyświetl plik

@ -0,0 +1,200 @@
package com.onthegomap.planetiler.reader.parquet;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Filters;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Struct for deserializing a
* <a href="https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md#file-metadata">geoparquet
* metadata</a> json string into.
*/
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record GeoParquetMetadata(
String version,
String primaryColumn,
Map<String, ColumnMetadata> columns
) {
private static final Logger LOGGER = LoggerFactory.getLogger(GeoParquetMetadata.class);
private static final ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public record CoveringBbox(
List<String> xmin,
List<String> ymin,
List<String> xmax,
List<String> ymax
) {}
public record Covering(
CoveringBbox bbox
) {}
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ColumnMetadata(
String encoding,
List<String> geometryTypes,
Object crs,
String orientation,
String edges,
List<Double> bbox,
Double epoch,
Covering covering
) {
ColumnMetadata(String encoding) {
this(encoding, List.of());
}
ColumnMetadata(String encoding, List<String> geometryTypes) {
this(encoding, geometryTypes, null, null, null, null, null, null);
}
public Envelope envelope() {
return (bbox == null || bbox.size() != 4) ? GeoUtils.WORLD_LAT_LON_BOUNDS :
new Envelope(bbox.get(0), bbox.get(2), bbox.get(1), bbox.get(3));
}
/**
* Returns a parquet filter that filters records read to only those where the covering bbox overlaps {@code bounds}
* or null if unable to infer that from the metadata.
* <p>
* If covering bbox metadata is missing from geoparquet metadata, it will try to use bbox.xmin, bbox.xmax,
* bbox.ymin, and bbox.ymax if present.
*/
public FilterPredicate bboxFilter(MessageType schema, Bounds bounds) {
if (!bounds.isWorld()) {
var covering = covering();
// if covering metadata missing, use default bbox:{xmin,xmax,ymin,ymax}
if (covering == null) {
if (hasNumericField(schema, "bbox.xmin") &&
hasNumericField(schema, "bbox.xmax") &&
hasNumericField(schema, "bbox.ymin") &&
hasNumericField(schema, "bbox.ymax")) {
covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox(
List.of("bbox.xmin"),
List.of("bbox.ymin"),
List.of("bbox.xmax"),
List.of("bbox.ymax")
));
} else if (hasNumericField(schema, "bbox", "xmin") &&
hasNumericField(schema, "bbox", "xmax") &&
hasNumericField(schema, "bbox", "ymin") &&
hasNumericField(schema, "bbox", "ymax")) {
covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox(
List.of("bbox", "xmin"),
List.of("bbox", "ymin"),
List.of("bbox", "xmax"),
List.of("bbox", "ymax")
));
}
}
if (covering != null) {
var latLonBounds = bounds.latLon();
// TODO apply projection
var coveringBbox = covering.bbox();
var coordinateType =
schema.getColumnDescription(coveringBbox.xmax().toArray(String[]::new))
.getPrimitiveType()
.getPrimitiveTypeName();
BiFunction<List<String>, Number, FilterPredicate> gtEq = switch (coordinateType) {
case DOUBLE -> (p, v) -> FilterApi.gtEq(Filters.doubleColumn(p), v.doubleValue());
case FLOAT -> (p, v) -> FilterApi.gtEq(Filters.floatColumn(p), v.floatValue());
default -> throw new UnsupportedOperationException();
};
BiFunction<List<String>, Number, FilterPredicate> ltEq = switch (coordinateType) {
case DOUBLE -> (p, v) -> FilterApi.ltEq(Filters.doubleColumn(p), v.doubleValue());
case FLOAT -> (p, v) -> FilterApi.ltEq(Filters.floatColumn(p), v.floatValue());
default -> throw new UnsupportedOperationException();
};
return FilterApi.and(
FilterApi.and(
gtEq.apply(coveringBbox.xmax(), latLonBounds.getMinX()),
ltEq.apply(coveringBbox.xmin(), latLonBounds.getMaxX())
),
FilterApi.and(
gtEq.apply(coveringBbox.ymax(), latLonBounds.getMinY()),
ltEq.apply(coveringBbox.ymin(), latLonBounds.getMaxY())
)
);
}
}
return null;
}
}
public ColumnMetadata primaryColumnMetadata() {
return Objects.requireNonNull(columns.get(primaryColumn),
"No geoparquet metadata for primary column " + primaryColumn);
}
/**
* Extracts geoparquet metadata from the {@code "geo"} key value metadata field for the file, or tries to generate a
* default one if missing that uses geometry, wkb_geometry, or wkt_geometry column.
*/
public static GeoParquetMetadata parse(FileMetaData metadata) throws IOException {
String string = metadata.getKeyValueMetaData().get("geo");
if (string != null) {
try {
return mapper.readValue(string, GeoParquetMetadata.class);
} catch (JsonProcessingException e) {
LOGGER.warn("Invalid geoparquet metadata", e);
}
}
// fallback
for (var field : metadata.getSchema().asGroupType().getFields()) {
if (field.isPrimitive() &&
field.asPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
switch (field.getName()) {
case "geometry", "wkb_geometry" -> {
return new GeoParquetMetadata("1.0.0", field.getName(), Map.of(
field.getName(), new ColumnMetadata("WKB")));
}
case "wkt_geometry" -> {
return new GeoParquetMetadata("1.0.0", field.getName(), Map.of(
field.getName(), new ColumnMetadata("WKT")));
}
default -> {
//ignore
}
}
}
}
throw new IOException(
"No valid geometry columns found: " + metadata.getSchema().asGroupType().getFields().stream().map(
Type::getName).toList());
}
private static boolean hasNumericField(MessageType root, String... path) {
if (root.containsPath(path)) {
var type = root.getType(path);
if (!type.isPrimitive()) {
return false;
}
var typeName = type.asPrimitiveType().getPrimitiveTypeName();
return typeName == PrimitiveType.PrimitiveTypeName.DOUBLE || typeName == PrimitiveType.PrimitiveTypeName.FLOAT;
}
return false;
}
}

Wyświetl plik

@ -0,0 +1,63 @@
package com.onthegomap.planetiler.reader.parquet;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.WithTags;
import com.onthegomap.planetiler.util.FunctionThatThrows;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.locationtech.jts.geom.Geometry;
/**
* Decodes geometries from a parquet record based on the {@link GeoParquetMetadata} provided.
*/
class GeometryReader {
private final Map<String, FunctionThatThrows<Object, Geometry>> converters = new HashMap<>();
private final String geometryColumn;
GeometryReader(GeoParquetMetadata geoparquet) {
this.geometryColumn = geoparquet.primaryColumn();
for (var entry : geoparquet.columns().entrySet()) {
String column = entry.getKey();
GeoParquetMetadata.ColumnMetadata columnInfo = entry.getValue();
FunctionThatThrows<Object, Geometry> converter = switch (columnInfo.encoding()) {
case "WKB" -> obj -> obj instanceof byte[] bytes ? GeoUtils.wkbReader().read(bytes) : null;
case "WKT" -> obj -> obj instanceof String string ? GeoUtils.wktReader().read(string) : null;
case "multipolygon", "geoarrow.multipolygon" ->
obj -> obj instanceof List<?> list ? GeoArrow.multipolygon((List<List<List<Object>>>) list) : null;
case "polygon", "geoarrow.polygon" ->
obj -> obj instanceof List<?> list ? GeoArrow.polygon((List<List<Object>>) list) : null;
case "multilinestring", "geoarrow.multilinestring" ->
obj -> obj instanceof List<?> list ? GeoArrow.multilinestring((List<List<Object>>) list) : null;
case "linestring", "geoarrow.linestring" ->
obj -> obj instanceof List<?> list ? GeoArrow.linestring((List<Object>) list) : null;
case "multipoint", "geoarrow.multipoint" ->
obj -> obj instanceof List<?> list ? GeoArrow.multipoint((List<Object>) list) : null;
case "point", "geoarrow.point" -> GeoArrow::point;
default -> throw new IllegalArgumentException("Unhandled type: " + columnInfo.encoding());
};
converters.put(column, converter);
}
}
Geometry readPrimaryGeometry(WithTags tags) throws GeometryException {
return readGeometry(tags, geometryColumn);
}
Geometry readGeometry(WithTags tags, String column) throws GeometryException {
var value = tags.getTag(column);
var converter = converters.get(column);
if (value == null) {
throw new GeometryException("no_parquet_column", "Missing geometry column column " + column);
} else if (converter == null) {
throw new GeometryException("no_converter", "No geometry converter for " + column);
}
try {
return converter.apply(value);
} catch (Exception e) {
throw new GeometryException("error_reading", "Error reading " + column, e);
}
}
}

Wyświetl plik

@ -0,0 +1,42 @@
package com.onthegomap.planetiler.reader.parquet;
import java.time.Duration;
import java.time.Period;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.stream.Stream;
/**
* Represents a <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval">parquet
* interval</a> datatype which has a month, day, and millisecond part.
* <p>
* Built-in java {@link TemporalAmount} implementations can only store a period or duration amount, but not both.
*/
public record Interval(Period period, Duration duration) implements TemporalAmount {
public static Interval of(int months, long days, long millis) {
return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis));
}
@Override
public long get(TemporalUnit unit) {
return period.get(unit) + duration.get(unit);
}
@Override
public List<TemporalUnit> getUnits() {
return Stream.concat(period.getUnits().stream(), duration.getUnits().stream()).toList();
}
@Override
public Temporal addTo(Temporal temporal) {
return temporal.plus(period).plus(duration);
}
@Override
public Temporal subtractFrom(Temporal temporal) {
return temporal.minus(period).minus(duration);
}
}

Wyświetl plik

@ -0,0 +1,77 @@
package com.onthegomap.planetiler.reader.parquet;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.SourceFeature;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Lineal;
import org.locationtech.jts.geom.Polygonal;
import org.locationtech.jts.geom.Puntal;
/**
* A single record read from a geoparquet file.
*/
public class ParquetFeature extends SourceFeature {
private final GeometryReader geometryParser;
private final Path filename;
private Geometry latLon;
private Geometry world;
ParquetFeature(String source, String sourceLayer, Path filename, long id, GeometryReader geometryParser,
Map<String, Object> tags) {
super(tags, source, sourceLayer, List.of(), id);
this.geometryParser = geometryParser;
this.filename = filename;
}
public Path getFilename() {
return filename;
}
@Override
public Geometry latLonGeometry() throws GeometryException {
return latLon == null ? latLon = geometryParser.readPrimaryGeometry(this) : latLon;
}
@Override
public Geometry worldGeometry() throws GeometryException {
return world != null ? world :
(world = GeoUtils.sortPolygonsByAreaDescending(GeoUtils.latLonToWorldCoords(latLonGeometry())));
}
@Override
public boolean isPoint() {
try {
return latLonGeometry() instanceof Puntal;
} catch (GeometryException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean canBePolygon() {
try {
return latLonGeometry() instanceof Polygonal;
} catch (GeometryException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean canBeLine() {
try {
return latLonGeometry() instanceof Lineal;
} catch (GeometryException e) {
throw new IllegalStateException(e);
}
}
@Override
public String toString() {
return tags().toString();
}
}

Wyświetl plik

@ -0,0 +1,233 @@
package com.onthegomap.planetiler.reader.parquet;
import blue.strategic.parquet.ParquetReader;
import com.google.common.collect.Iterators;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.util.Hashing;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.IntStream;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reads {@link SourceFeature SourceFeatures} from a single
* <a href="https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md">geoparquet</a> file.
*/
public class ParquetInputFile {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetInputFile.class);
private final ParquetMetadata metadata;
private final InputFile inputFile;
private final Path path;
private final FilterCompat.Filter filter;
private final String source;
private final ToLongFunction<Map<String, Object>> idGenerator;
private final String layer;
private final long count;
private final int blockCount;
private final GeometryReader geometryReader;
private final Map<String, Object> extraFields;
private Envelope postFilterBounds = null;
private boolean outOfBounds = false;
public ParquetInputFile(String source, String layer, Path path) {
this(source, layer, path, null, Bounds.WORLD, null, null);
}
public ParquetInputFile(String source, String layer, Path path, FilterPredicate filter, Bounds bounds,
Map<String, Object> extraFields, Function<Map<String, Object>, Object> idGenerator) {
this.idGenerator = idGenerator == null ? null : map -> hashToLong(idGenerator.apply(map));
this.layer = layer;
this.source = source;
this.path = path;
inputFile = ParquetReader.makeInputFile(path.toFile());
this.extraFields = extraFields;
try (var file = open()) {
metadata = file.getFooter();
var fileMetadata = metadata.getFileMetaData();
var geoparquet = GeoParquetMetadata.parse(fileMetadata);
this.geometryReader = new GeometryReader(geoparquet);
if (!bounds.isWorld()) {
if (!geoparquet.primaryColumnMetadata().envelope().intersects(bounds.latLon())) {
outOfBounds = true;
} else {
var bboxFilter = geoparquet.primaryColumnMetadata().bboxFilter(fileMetadata.getSchema(), bounds);
if (bboxFilter != null) {
filter = filter == null ? bboxFilter : FilterApi.and(filter, bboxFilter);
} else {
LOGGER.warn("No covering column specified in geoparquet metadata, fall back to post-filtering");
postFilterBounds = bounds.latLon();
}
}
}
count = outOfBounds ? 0 : file.getFilteredRecordCount();
blockCount = outOfBounds ? 0 : metadata.getBlocks().size();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.filter = filter == null ? FilterCompat.NOOP : FilterCompat.get(filter);
}
private static long hashToLong(Object o) {
return switch (o) {
case String s -> Hashing.fnv1a64(s.getBytes(StandardCharsets.UTF_8));
case byte[] bs -> Hashing.fnv1a64(bs);
case Integer i -> i;
case Long l -> l;
case Float f -> Float.floatToIntBits(f);
case Double d -> Double.doubleToLongBits(d);
case null -> 0;
default -> Hashing.fnv1a64(o.toString().getBytes(StandardCharsets.UTF_8));
};
}
public boolean hasFilter() {
return FilterCompat.isFilteringRequired(filter);
}
public boolean isOutOfBounds() {
return outOfBounds;
}
public BlockReader get() {
if (outOfBounds) {
return Collections::emptyIterator;
}
long fileHash = Hashing.fnv1a64(path.toString().getBytes(StandardCharsets.UTF_8));
var schema = metadata.getFileMetaData().getSchema();
var columnIOFactory = new ColumnIOFactory(metadata.getFileMetaData().getCreatedBy(), false);
return () -> IntStream.range(0, metadata.getBlocks().size()).mapToObj(blockIndex -> {
long blockHash = Hashing.fnv1a64(fileHash, ByteBuffer.allocate(4).putInt(blockIndex).array());
// happens in reader thread
return (Block) new Block() {
@Override
public Path getFileName() {
return path;
}
@Override
public String layer() {
return layer;
}
@Override
public Iterator<ParquetFeature> iterator() {
PageReadStore group;
try (var reader = open()) {
group = reader.readFilteredRowGroup(blockIndex);
if (group == null) {
return Collections.emptyIterator();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema);
var recordReader = columnIO.getRecordReader(group, new ParquetRecordConverter(schema), filter);
long total = group.getRowCount();
return Iterators.filter(new Iterator<>() {
long i = 0;
@Override
public boolean hasNext() {
return i < total;
}
@Override
public ParquetFeature next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
i++;
var item = recordReader.read();
if (item == null) {
return null;
}
if (extraFields != null) {
item.putAll(extraFields);
}
var feature = new ParquetFeature(
source,
layer,
path,
idGenerator != null ? idGenerator.applyAsLong(item) :
Hashing.fnv1a64(blockHash, ByteBuffer.allocate(8).putLong(i).array()),
geometryReader,
item
);
if (postFilterBounds != null) {
try {
if (!feature.latLonGeometry().getEnvelopeInternal().intersects(postFilterBounds)) {
return null;
}
} catch (GeometryException e) {
LOGGER.warn("Error reading geometry to post-filter bounds", e);
return null;
}
}
return feature;
}
}, Objects::nonNull);
}
};
}).iterator();
}
private ParquetFileReader open() throws IOException {
return ParquetFileReader.open(inputFile, ParquetReadOptions.builder()
.withRecordFilter(filter)
.build());
}
public long getCount() {
return count;
}
public long getBlockCount() {
return blockCount;
}
public interface BlockReader extends Iterable<Block>, Closeable {
@Override
default void close() throws IOException {}
}
public interface Block extends Iterable<ParquetFeature> {
Path getFileName();
String layer();
}
}

Wyświetl plik

@ -0,0 +1,214 @@
package com.onthegomap.planetiler.reader.parquet;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.LongFunction;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
/**
* Converts typed primitive values from parquet records to java objects:
*
* <ul>
* <li>{@link PrimitiveType.PrimitiveTypeName#FLOAT} -> {@link Float}
* <li>{@link PrimitiveType.PrimitiveTypeName#DOUBLE} -> {@link Double}
* <li>{@link PrimitiveType.PrimitiveTypeName#INT32} -> {@link Integer}
* <li>{@link PrimitiveType.PrimitiveTypeName#INT64} -> {@link Long}
* <li>{@link PrimitiveType.PrimitiveTypeName#BOOLEAN} -> {@link Boolean}
* <li>{@link PrimitiveType.PrimitiveTypeName#INT96} -> {@link Instant}
* <li>{@link LogicalTypeAnnotation.DateLogicalTypeAnnotation} -> {@link LocalDate}
* <li>{@link LogicalTypeAnnotation.TimeLogicalTypeAnnotation} -> {@link LocalTime}
* <li>{@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} -> {@link Instant}
* <li>{@link LogicalTypeAnnotation.UUIDLogicalTypeAnnotation} -> {@link UUID}
* <li>{@link LogicalTypeAnnotation.DecimalLogicalTypeAnnotation} -> {@link Double}
* <li>{@link LogicalTypeAnnotation.StringLogicalTypeAnnotation} -> {@link String}
* <li>{@link LogicalTypeAnnotation.JsonLogicalTypeAnnotation} -> {@link String}
* <li>{@link LogicalTypeAnnotation.EnumLogicalTypeAnnotation} -> {@link String}
* <li>{@link PrimitiveType.PrimitiveTypeName#BINARY} -> {@code byte[]}
* </ul>
*/
class ParquetPrimitiveConverter extends PrimitiveConverter {
private final PrimitiveType.PrimitiveTypeName primitiveType;
private final ParquetRecordConverter.Context context;
private Dictionary dictionary;
ParquetPrimitiveConverter(ParquetRecordConverter.Context context) {
this.context = context;
this.primitiveType = context.type.asPrimitiveType().getPrimitiveTypeName();
}
static ParquetPrimitiveConverter of(ParquetRecordConverter.Context context) {
var primitiveType = context.type().asPrimitiveType().getPrimitiveTypeName();
return switch (primitiveType) {
case FLOAT, DOUBLE, BOOLEAN -> new ParquetPrimitiveConverter(context);
case INT64, INT32 -> switch (context.type().getLogicalTypeAnnotation()) {
case null -> new ParquetPrimitiveConverter(context);
case LogicalTypeAnnotation.IntLogicalTypeAnnotation ignored ->
new ParquetPrimitiveConverter(context);
case LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal -> {
var multiplier = Math.pow(10, -decimal.getScale());
yield new IntegerConverter(context, value -> multiplier * value);
}
case LogicalTypeAnnotation.DateLogicalTypeAnnotation ignored ->
new IntegerConverter(context, LocalDate::ofEpochDay);
case LogicalTypeAnnotation.TimeLogicalTypeAnnotation time -> {
var unit = getUnit(time.getUnit());
yield new IntegerConverter(context, value -> LocalTime.ofNanoOfDay(Duration.of(value, unit).toNanos()));
}
case LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp -> {
var unit = getUnit(timestamp.getUnit());
yield new IntegerConverter(context, value -> Instant.ofEpochMilli(Duration.of(value, unit).toMillis()));
}
default -> throw new UnsupportedOperationException(
"Unsupported logical type for " + primitiveType + ": " + context.type().getLogicalTypeAnnotation());
};
case INT96 -> new BinaryConverer(context, value -> {
var buf = value.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
LocalTime timeOfDay = LocalTime.ofNanoOfDay(buf.getLong());
LocalDate day = LocalDate.ofEpochDay(buf.getInt() - 2440588L);
return LocalDateTime.of(day, timeOfDay).toInstant(ZoneOffset.UTC);
});
case FIXED_LEN_BYTE_ARRAY, BINARY -> switch (context.type().getLogicalTypeAnnotation()) {
case LogicalTypeAnnotation.UUIDLogicalTypeAnnotation ignored -> new BinaryConverer(context, binary -> {
ByteBuffer byteBuffer = binary.toByteBuffer();
long msb = byteBuffer.getLong();
long lsb = byteBuffer.getLong();
return new UUID(msb, lsb);
});
case LogicalTypeAnnotation.IntervalLogicalTypeAnnotation ignored -> new BinaryConverer(context, binary -> {
ByteBuffer byteBuffer = binary.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
int months = byteBuffer.getInt();
int days = byteBuffer.getInt();
int millis = byteBuffer.getInt();
return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis));
});
case LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal -> {
int scale = -decimal.getScale();
yield new BinaryConverer(context,
binary -> new BigDecimal(new BigInteger(binary.getBytes()), scale).doubleValue());
}
case LogicalTypeAnnotation.StringLogicalTypeAnnotation ignored ->
new BinaryConverer(context, Binary::toStringUsingUTF8);
case LogicalTypeAnnotation.EnumLogicalTypeAnnotation ignored ->
new BinaryConverer(context, Binary::toStringUsingUTF8);
case LogicalTypeAnnotation.JsonLogicalTypeAnnotation ignores ->
new BinaryConverer(context, Binary::toStringUsingUTF8);
case null, default -> new ParquetPrimitiveConverter(context);
};
};
}
private static ChronoUnit getUnit(LogicalTypeAnnotation.TimeUnit unit) {
return switch (unit) {
case MILLIS -> ChronoUnit.MILLIS;
case MICROS -> ChronoUnit.MICROS;
case NANOS -> ChronoUnit.NANOS;
};
}
void add(Object value) {
context.accept(value);
}
@Override
public void addFloat(float value) {
add((double) value);
}
@Override
public void addDouble(double value) {
add(value);
}
@Override
public void addInt(int value) {
add(value);
}
@Override
public void addLong(long value) {
add(value);
}
@Override
public void addBoolean(boolean value) {
add(value);
}
@Override
public void addBinary(Binary value) {
add(value.getBytes());
}
@Override
public void addValueFromDictionary(int idx) {
switch (primitiveType) {
case INT64 -> addLong(dictionary.decodeToLong(idx));
case INT32 -> addInt(dictionary.decodeToInt(idx));
case BOOLEAN -> addBoolean(dictionary.decodeToBoolean(idx));
case FLOAT -> addFloat(dictionary.decodeToFloat(idx));
case DOUBLE -> addDouble(dictionary.decodeToDouble(idx));
case BINARY, FIXED_LEN_BYTE_ARRAY, INT96 -> addBinary(dictionary.decodeToBinary(idx));
}
}
@Override
public void setDictionary(Dictionary dictionary) {
this.dictionary = dictionary;
}
@Override
public boolean hasDictionarySupport() {
return true;
}
private static class BinaryConverer extends ParquetPrimitiveConverter {
private final Function<Binary, ?> remapper;
BinaryConverer(ParquetRecordConverter.Context context, Function<Binary, ?> remapper) {
super(context);
this.remapper = remapper;
}
@Override
public void addBinary(Binary value) {
add(remapper.apply(value));
}
}
private static class IntegerConverter extends ParquetPrimitiveConverter {
private final LongFunction<?> remapper;
IntegerConverter(ParquetRecordConverter.Context context, LongFunction<?> remapper) {
super(context);
this.remapper = remapper;
}
@Override
public void addLong(long value) {
add(remapper.apply(value));
}
@Override
public void addInt(int value) {
addLong(value);
}
}
}

Wyświetl plik

@ -0,0 +1,217 @@
package com.onthegomap.planetiler.reader.parquet;
import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;
import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.collection.SortableFeature;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.render.FeatureRenderer;
import com.onthegomap.planetiler.stats.Counter;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reads {@link SourceFeature SourceFeatures} from one or more
* <a href="https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md">geoparquet</a> files.
* <p>
* If files don't contain geoparquet metadata then try to get geometry from "geometry" "wkb_geometry" or "wkt_geometry"
* fields.
*/
public class ParquetReader {
public static final String DEFAULT_LAYER = "features";
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetReader.class);
private final String sourceName;
private final Function<Map<String, Object>, Object> idGenerator;
private final Function<Map<String, Object>, Object> layerGenerator;
private final Profile profile;
private final Stats stats;
private final boolean hivePartitioning;
public ParquetReader(
String sourceName,
Profile profile,
Stats stats
) {
this(sourceName, profile, stats, null, null, false);
}
public ParquetReader(
String sourceName,
Profile profile,
Stats stats,
Function<Map<String, Object>, Object> getId,
Function<Map<String, Object>, Object> getLayer,
boolean hivePartitioning
) {
this.sourceName = sourceName;
this.layerGenerator = getLayer;
this.idGenerator = getId;
this.profile = profile;
this.stats = stats;
this.hivePartitioning = hivePartitioning;
}
static Map<String, Object> getHivePartitionFields(Path path) {
Map<String, Object> fields = new HashMap<>();
for (var part : path) {
var string = part.toString();
if (string.contains("=")) {
var parts = string.split("=");
fields.put(parts[0], parts[1]);
}
}
return fields.isEmpty() ? null : fields;
}
public void process(List<Path> sourcePath, FeatureGroup writer, PlanetilerConfig config) {
var timer = stats.startStage(sourceName);
var inputFiles = sourcePath.stream()
.filter(d -> !"_SUCCESS".equals(d.getFileName().toString()))
.map(path -> {
var hivePartitionFields = hivePartitioning ? getHivePartitionFields(path) : null;
String layer = getLayerName(path);
return new ParquetInputFile(sourceName, layer, path, null, config.bounds(), hivePartitionFields, idGenerator);
})
.filter(file -> !file.isOutOfBounds())
.toList();
// don't show % complete on features when a filter is present because to determine total # elements would
// take an expensive initial query, and % complete on blocks gives a good enough proxy
long featureCount = inputFiles.stream().anyMatch(ParquetInputFile::hasFilter) ? 0 :
inputFiles.stream().mapToLong(ParquetInputFile::getCount).sum();
long blockCount = inputFiles.stream().mapToLong(ParquetInputFile::getBlockCount).sum();
int processThreads = config.featureProcessThreads();
int writeThreads = config.featureWriteThreads();
var blocksRead = Counter.newMultiThreadCounter();
var featuresRead = Counter.newMultiThreadCounter();
var featuresWritten = Counter.newMultiThreadCounter();
Map<String, Integer> workingOn = new ConcurrentHashMap<>();
var inputBlocks = inputFiles.stream().<ParquetInputFile.Block>mapMulti((file, next) -> {
try (var blockReader = file.get()) {
for (var block : blockReader) {
next.accept(block);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).toList();
var pipeline = WorkerPipeline.start(sourceName, stats)
.readFromTiny("blocks", inputBlocks)
.<SortableFeature>addWorker("process", processThreads, (prev, next) -> {
var blocks = blocksRead.counterForThread();
var elements = featuresRead.counterForThread();
var featureCollectors = new FeatureCollector.Factory(config, stats);
try (FeatureRenderer renderer = newFeatureRenderer(writer, config, next)) {
for (var block : prev) {
String layer = block.layer();
workingOn.merge(layer, 1, Integer::sum);
for (var sourceFeature : block) {
FeatureCollector features = featureCollectors.get(sourceFeature);
try {
profile.processFeature(sourceFeature, features);
for (FeatureCollector.Feature renderable : features) {
renderer.accept(renderable);
}
} catch (Exception e) {
LOGGER.error("Error processing {}", sourceFeature, e);
}
elements.inc();
}
blocks.inc();
workingOn.merge(layer, -1, Integer::sum);
}
}
})
.addBuffer("write_queue", 50_000, 1_000)
.sinkTo("write", writeThreads, prev -> {
var features = featuresWritten.counterForThread();
try (var threadLocalWriter = writer.writerForThread()) {
for (var item : prev) {
features.inc();
threadLocalWriter.accept(item);
}
}
});
var loggers = ProgressLoggers.create()
.addRatePercentCounter("read", featureCount, featuresRead, true)
.addRatePercentCounter("blocks", blockCount, blocksRead, false)
.addRateCounter("write", featuresWritten)
.addFileSize(writer)
.newLine()
.add(() -> workingOn.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.filter(d -> d.getValue() > 0)
.map(d -> d.getKey() + ": " + d.getValue())
.collect(Collectors.joining(", ")))
.newLine()
.addProcessStats()
.newLine()
.addPipelineStats(pipeline);
pipeline.awaitAndLog(loggers, config.logInterval());
if (LOGGER.isInfoEnabled()) {
var format = Format.defaultInstance();
long count = featuresRead.get();
var elapsed = timer.elapsed();
LOGGER.info("Processed {} parquet features ({}/s, {} blocks, {} files) in {}",
format.integer(count),
format.numeric(count * NANOSECONDS_PER_SECOND / elapsed.wall().toNanos()),
format.integer(blocksRead.get()),
format.integer(inputFiles.size()),
elapsed
);
}
timer.stop();
// hook for profile to do any post-processing after this source is read
try (
var threadLocalWriter = writer.writerForThread();
var featureRenderer = newFeatureRenderer(writer, config, threadLocalWriter)
) {
profile.finish(sourceName, new FeatureCollector.Factory(config, stats), featureRenderer);
} catch (IOException e) {
LOGGER.warn("Error closing writer", e);
}
}
private String getLayerName(Path path) {
String layer = DEFAULT_LAYER;
if (hivePartitioning) {
var fields = getHivePartitionFields(path);
layer = layerGenerator.apply(fields == null ? Map.of() : fields) instanceof Object o ? o.toString() : layer;
}
return layer;
}
private FeatureRenderer newFeatureRenderer(FeatureGroup writer, PlanetilerConfig config,
Consumer<SortableFeature> next) {
@SuppressWarnings("java:S2095") // closed by FeatureRenderer
var encoder = writer.newRenderedFeatureEncoder();
return new FeatureRenderer(
config,
rendered -> next.accept(encoder.apply(rendered)),
stats,
encoder
);
}
}

Wyświetl plik

@ -0,0 +1,391 @@
package com.onthegomap.planetiler.reader.parquet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
/**
* Simple converter for parquet datatypes that maps all structs to {@code Map<String, Object>} and handles deserializing
* <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types">list and map nested
* types</a> into java {@link List Lists} and {@link Map Maps}.
*/
public class ParquetRecordConverter extends RecordMaterializer<Map<String, Object>> {
private final StructConverter root;
private Map<String, Object> map;
ParquetRecordConverter(MessageType schema) {
root = new StructConverter(new Context(schema)) {
@Override
public void start() {
var group = new MapGroup(schema.getFieldCount());
context.current = group;
map = group.getMap();
}
};
}
@Override
public Map<String, Object> getCurrentRecord() {
return map;
}
@Override
public void skipCurrentRecord() {
root.context.current = null;
}
@Override
public GroupConverter getRootConverter() {
return root;
}
interface Group {
// TODO handle repeated when processing schema, not elements
void add(Object key, Object value, boolean repeated);
Object value();
}
private static class ListConverter extends StructConverter {
ListConverter(Context context) {
super(context);
}
@Override
protected Converter makeConverter(Context child) {
if ((child.named("list") || child.named("array")) && child.onlyField("element")) {
return new ListElementConverter(child.hoist());
}
return super.makeConverter(child);
}
@Override
public void start() {
context.current = new ListGroup();
context.acceptCurrentValue();
}
}
private static class ListElementConverter extends StructConverter {
ListElementConverter(Context context) {
super(context);
}
@Override
public void start() {
context.current = new ItemGroup();
}
@Override
public void end() {
context.acceptCurrentValue();
}
}
private static class MapConverter extends StructConverter {
MapConverter(Context context) {
super(context);
}
@Override
protected Converter makeConverter(Context child) {
if (context.getFieldCount() == 1) {
Type type = child.type;
String onlyFieldName = type.getName().toLowerCase(Locale.ROOT);
if (!type.isPrimitive() && type.asGroupType().getFieldCount() == 2 &&
(onlyFieldName.equals("key_value") || onlyFieldName.equals("map"))) {
return new MapEntryConverter(child.repeated(false));
}
}
return super.makeConverter(child);
}
@Override
public void start() {
context.current = new MapGroup();
context.acceptCurrentValue();
}
}
private static class MapEntryConverter extends StructConverter {
MapEntryGroup entry;
MapEntryConverter(Context context) {
super(context);
}
@Override
public void start() {
context.current = entry = new MapEntryGroup();
}
@Override
public void end() {
if (entry.v != null && entry.k != null) {
context.accept(entry.k, entry.v);
}
}
}
static class StructConverter extends GroupConverter {
final Context context;
private final Converter[] converters;
StructConverter(Context context) {
this.context = context;
int count = context.type.asGroupType().getFieldCount();
converters = new Converter[count];
for (int i = 0; i < count; i++) {
converters[i] = makeConverter(context.field(i));
}
}
protected Converter makeConverter(Context child) {
Type type = child.type;
LogicalTypeAnnotation logical = type.getLogicalTypeAnnotation();
if (!type.isPrimitive()) {
return switch (logical) {
case LogicalTypeAnnotation.ListLogicalTypeAnnotation ignored ->
// If the repeated field is not a group, then its type is the element type and elements are required.
// If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
// If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
// Otherwise, the repeated field's type is the element type with the repeated field's repetition.
new ListConverter(child);
case LogicalTypeAnnotation.MapLogicalTypeAnnotation ignored ->
// The outer-most level must be a group annotated with MAP that contains a single field named key_value. The repetition of this level must be either optional or required and determines whether the list is nullable.
// The middle level, named key_value, must be a repeated group with a key field for map keys and, optionally, a value field for map values.
// The key field encodes the map's key type. This field must have repetition required and must always be present.
// The value field encodes the map's value type and repetition. This field can be required, optional, or omitted.
new MapConverter(child);
case LogicalTypeAnnotation.MapKeyValueTypeAnnotation ignored ->
new MapConverter(child);
case null, default -> new StructConverter(child);
};
}
return ParquetPrimitiveConverter.of(child);
}
@Override
public Converter getConverter(int fieldIndex) {
return converters[fieldIndex];
}
@Override
public void start() {
context.current = new MapGroup(context.getFieldCount());
context.acceptCurrentValue();
}
@Override
public void end() {
// by default, don't need to do anything
}
}
private static class MapGroup implements Group {
private final Map<Object, Object> map;
MapGroup() {
this(10);
}
MapGroup(int size) {
map = HashMap.newHashMap(size * 2);
}
@Override
public void add(Object key, Object value, boolean repeated) {
if (repeated) {
List<Object> items = (List<Object>) map.computeIfAbsent(key, n -> new ArrayList<>());
items.add(value);
} else {
if (map.put(key, value) != null) {
throw new IllegalStateException("Multiple values for " + key);
}
}
}
@Override
public String toString() {
return "MapGroup" + map;
}
public Map<String, Object> getMap() {
return (Map<String, Object>) (Map<?, Object>) map;
}
@Override
public Object value() {
return map;
}
}
private static class ListGroup implements Group {
private final List<Object> list = new ArrayList<>();
@Override
public void add(Object key, Object value, boolean repeated) {
list.add(value);
}
@Override
public String toString() {
return "ListGroup" + list;
}
@Override
public Object value() {
return list;
}
}
private static class ItemGroup implements Group {
private Object item;
@Override
public void add(Object key, Object value, boolean repeated) {
if (repeated) {
if (item == null) {
item = new ArrayList<>();
}
((List<Object>) item).add(value);
} else {
item = value;
}
}
@Override
public String toString() {
return "ItemGroup{" + item + '}';
}
@Override
public Object value() {
return item;
}
}
private static class MapEntryGroup implements Group {
private Object k;
private Object v;
@Override
public void add(Object key, Object value, boolean repeated) {
if ("key".equals(key)) {
k = value;
} else if ("value".equals(key)) {
v = value;
} else if (k == null) {
k = value;
} else {
v = value;
}
}
@Override
public String toString() {
return "MapEntryGroup{" + k + '=' + v + '}';
}
@Override
public Object value() {
throw new UnsupportedOperationException();
}
}
/** Constructs java objects from parquet records at read-time. */
static final class Context {
final Context parent;
final String fieldOnParent;
final Type type;
final boolean repeated;
private final int fieldCount;
Group current;
Context(Context parent, String fieldOnParent, Type type, boolean repeated) {
this.parent = parent;
this.fieldOnParent = fieldOnParent;
this.type = type;
this.repeated = repeated;
this.fieldCount = type.isPrimitive() ? 0 : type.asGroupType().getFieldCount();
}
public Context(Context newParent, Type type) {
this(newParent, type.getName(), type, type.isRepetition(Type.Repetition.REPEATED));
}
public Context(MessageType schema) {
this(null, schema);
}
public Context field(int i) {
return new Context(this, type.asGroupType().getType(i));
}
/** Returns a new context that flattens-out this level of the hierarchy and writes values into the parent field. */
public Context hoist() {
return new Context(parent, parent.fieldOnParent, type, repeated);
}
public void acceptCurrentValue() {
accept(current.value());
}
public void accept(Object value) {
parent.current.add(fieldOnParent, value, repeated);
}
public int getFieldCount() {
return fieldCount;
}
public void accept(Object k, Object v) {
parent.current.add(k, v, repeated);
}
public Context repeated(boolean newRepeated) {
return new Context(parent, fieldOnParent, type, newRepeated);
}
public boolean named(String name) {
return type.getName().equalsIgnoreCase(name);
}
boolean onlyField(String name) {
return !type.isPrimitive() && fieldCount == 1 &&
type.asGroupType().getFieldName(0).equalsIgnoreCase(name);
}
public Type type() {
return type;
}
@Override
public String toString() {
return "Context[" +
"parent=" + parent + ", " +
"fieldOnParent=" + fieldOnParent + ", " +
"type=" + type + ", " +
"repeated=" + repeated + ']';
}
}
}

Wyświetl plik

@ -94,10 +94,15 @@ public interface Stats extends AutoCloseable {
LogUtil.setStage(name);
}
var timer = timers().startTimer(name, log);
return () -> {
timer.stop();
if (log) {
LogUtil.clearStage();
return new Timers.Finishable() {
@Override
public void stop() {
timer.stop();
}
@Override
public ProcessTime elapsed() {
return timer.elapsed();
}
};
}

Wyświetl plik

@ -103,12 +103,20 @@ public class Timers {
LOGGER.info("");
LOGGER.info("Starting...");
}
return () -> {
LOGGER.info("Finished in {}", timers.get(name).timer.stop());
for (var details : getStageDetails(name, true)) {
LOGGER.info(" {}", details);
return new Finishable() {
@Override
public void stop() {
LOGGER.info("Finished in {}", timers.get(name).timer.stop());
for (var details : getStageDetails(name, true)) {
LOGGER.info(" {}", details);
}
currentStage.set(last);
}
@Override
public ProcessTime elapsed() {
return timer.elapsed();
}
currentStage.set(last);
};
}
@ -129,6 +137,8 @@ public class Timers {
/** A handle that callers can use to indicate a task has finished. */
public interface Finishable {
void stop();
ProcessTime elapsed();
}
record ThreadInfo(ProcessInfo.ThreadState state, String prefix, Duration elapsed) {}

Wyświetl plik

@ -12,6 +12,7 @@ import java.nio.file.ClosedFileSystemException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@ -49,7 +50,7 @@ public class FileUtils {
return StreamSupport.stream(fileSystem.getRootDirectories().spliterator(), false)
.flatMap(rootDirectory -> {
try {
return Files.walk(rootDirectory);
return Files.walk(rootDirectory, FileVisitOption.FOLLOW_LINKS);
} catch (IOException e) {
LOGGER.error("Unable to walk " + rootDirectory + " in " + fileSystem, e);
return Stream.empty();
@ -82,9 +83,9 @@ public class FileUtils {
.toList();
}
} else if (Files.isDirectory(basePath)) {
try (var walk = Files.walk(basePath)) {
try (var walk = Files.walk(basePath, FileVisitOption.FOLLOW_LINKS)) {
return walk
.filter(path -> matcher.matches(path.getFileName()))
.filter(path -> matcher.matches(path.getFileName()) || matcher.matches(basePath.relativize(path)))
.flatMap(path -> {
if (FileUtils.hasExtension(path, "zip")) {
return walkZipFile.apply(path).stream();
@ -109,9 +110,10 @@ public class FileUtils {
* @param pattern pattern to match filenames against, as described in {@link FileSystem#getPathMatcher(String)}.
*/
public static List<Path> walkPathWithPattern(Path basePath, String pattern) {
return walkPathWithPattern(basePath, pattern, zipPath -> List.of(zipPath));
return walkPathWithPattern(basePath, pattern, List::of);
}
/** Returns true if {@code path} ends with ".extension" (case-insensitive). */
public static boolean hasExtension(Path path, String extension) {
return path.toString().toLowerCase().endsWith("." + extension.toLowerCase());

Wyświetl plik

@ -0,0 +1,55 @@
package com.onthegomap.planetiler.util;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
/**
* Utility for constructing base+glob paths for matching many files
*/
public record Glob(Path base, String pattern) {
private static final Pattern GLOB_PATTERN = Pattern.compile("[?*{\\[].*$");
/** Wrap a base path with no globs in it yet. */
public static Glob of(Path path) {
return new Glob(path, null);
}
/** Resolves a subdirectory using parts separated by the platform file separator. */
public Glob resolve(String... subPath) {
String separator = "/";
if (pattern != null) {
return new Glob(base, pattern + separator + String.join(separator, subPath));
} else if (subPath == null || subPath.length == 0) {
return this;
} else if (GLOB_PATTERN.matcher(subPath[0]).find()) {
return new Glob(base, String.join(separator, subPath));
} else {
return of(base.resolve(subPath[0])).resolve(Arrays.copyOfRange(subPath, 1, subPath.length));
}
}
/** Parse a string containing platform-specific file separators into a base+glob pattern. */
public static Glob parse(String path) {
var matcher = GLOB_PATTERN.matcher(path);
if (!matcher.find()) {
return of(Path.of(path));
}
matcher.reset();
String base = matcher.replaceAll("");
String separator = Path.of(base).getFileSystem().getSeparator();
int idx = base.lastIndexOf(separator);
if (idx > 0) {
base = base.substring(0, idx);
}
return of(Path.of(base)).resolve(path.substring(idx + 1).split(Pattern.quote(separator)));
}
/** Search the filesystem for all files beneath {@link #base()} matching {@link #pattern()}. */
public List<Path> find() {
return pattern == null ? List.of(base) : FileUtils.walkPathWithPattern(base, pattern);
}
}

Wyświetl plik

@ -0,0 +1,16 @@
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/** Fix interface from parquet floor so we can extend it with {@link GzipCodec} and {@link Lz4Codec} */
public interface CompressionCodec {
Decompressor createDecompressor();
Compressor createCompressor();
CompressionInputStream createInputStream(InputStream is, Decompressor d) throws IOException;
CompressionOutputStream createOutputStream(OutputStream os, Compressor c) throws IOException;
}

Wyświetl plik

@ -0,0 +1,10 @@
package org.apache.hadoop.io.compress;
import io.airlift.compress.gzip.JdkGzipCodec;
/**
* Make {@link JdkGzipCodec} available at the location expected by
* {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use gzip
* compression.
*/
public class GzipCodec extends JdkGzipCodec {}

Wyświetl plik

@ -0,0 +1,9 @@
package org.apache.hadoop.io.compress;
/**
* Make {@link io.airlift.compress.lz4.Lz4Codec} available at the location expected by
* {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use lz4
* compression.
*/
@SuppressWarnings("java:S2176")
public class Lz4Codec extends io.airlift.compress.lz4.Lz4Codec {}

Wyświetl plik

@ -0,0 +1,20 @@
package org.apache.parquet.filter2.predicate;
import java.util.List;
import org.apache.parquet.hadoop.metadata.ColumnPath;
/**
* Create {@link Operators.DoubleColumn} and {@link Operators.FloatColumn} instances with dots in the column names since
* their constructors are package-private.
*/
public class Filters {
private Filters() {}
public static Operators.DoubleColumn doubleColumn(List<String> parts) {
return new Operators.DoubleColumn(ColumnPath.get(parts.toArray(String[]::new)));
}
public static Operators.FloatColumn floatColumn(List<String> parts) {
return new Operators.FloatColumn(ColumnPath.get(parts.toArray(String[]::new)));
}
}

Wyświetl plik

@ -7,3 +7,10 @@ packages=com.onthegomap.planetiler.util.log4j
rootLogger.level=debug
rootLogger.appenderRefs=stdout
rootLogger.appenderRef.stdout.ref=STDOUT
logger.apache.name=org.apache
logger.apache.level=warn
# suppress warning about unreadable duckdb statistics
logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics
logger.apachecorrupt.level=error

Wyświetl plik

@ -2248,6 +2248,49 @@ class PlanetilerTests {
}
}
@ParameterizedTest
@ValueSource(strings = {
"",
"--write-threads=2 --process-threads=2 --feature-read-threads=2 --threads=4"
})
void testPlanetilerRunnerParquet(String args) throws Exception {
Path mbtiles = tempDir.resolve("output.mbtiles");
Planetiler.create(Arguments.fromArgs((args + " --tmpdir=" + tempDir.resolve("data")).split("\\s+")))
.setProfile(new Profile.NullProfile() {
@Override
public void processFeature(SourceFeature source, FeatureCollector features) {
features.polygon("buildings")
.setZoomRange(0, 14)
.setMinPixelSize(0)
.setAttr("id", source.getString("id"));
}
})
.addParquetSource("parquet", List.of(TestUtils.pathToResource("parquet").resolve("boston.parquet")))
.setOutput(mbtiles)
.run();
try (Mbtiles db = Mbtiles.newReadOnlyDatabase(mbtiles)) {
Set<String> uniqueIds = new HashSet<>();
long featureCount = 0;
var tileMap = TestUtils.getTileMap(db);
for (int z = 14; z >= 11; z--) {
var coord = TileCoord.aroundLngLat(-71.07448, 42.35626, z);
assertTrue(tileMap.containsKey(coord), "contain " + coord);
}
for (var tile : tileMap.values()) {
for (var feature : tile) {
feature.geometry().validate();
featureCount++;
uniqueIds.add((String) feature.attrs().get("id"));
}
}
assertTrue(featureCount > 0);
assertEquals(3, uniqueIds.size());
}
}
private void runWithProfile(Path tempDir, Profile profile, boolean force) throws Exception {
Planetiler.create(Arguments.of("tmpdir", tempDir, "force", Boolean.toString(force)))
.setProfile(profile)

Wyświetl plik

@ -0,0 +1,231 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
class GeoArrowTest {
@Test
void testPointXY() throws ParseException {
assertSame(
"POINT(1 2)",
GeoArrow.point(Map.of("x", 1, "y", 2)),
GeoArrow.point(List.of(1, 2))
);
}
@Test
void testPointXYZ() throws ParseException {
assertSame(
"POINT Z(1 2 3)",
GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3)),
GeoArrow.point(List.of(1, 2, 3))
);
}
@Test
void testPointXYZM() throws ParseException {
assertSame(
"POINT ZM(1 2 3 4)",
GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3, "m", 4)),
GeoArrow.point(List.of(1, 2, 3, 4))
);
}
@Test
void testLine() throws ParseException {
assertSame(
"LINESTRING(1 2, 3 4)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
)),
GeoArrow.linestring(List.of(
List.of(1, 2),
List.of(3, 4)
))
);
}
@Test
void testLineZ() throws ParseException {
assertSame(
"LINESTRING Z(1 2 3, 4 5 6)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2, "z", 3),
Map.of("x", 4, "y", 5, "z", 6)
)),
GeoArrow.linestring(List.of(
List.of(1, 2, 3),
List.of(4, 5, 6)
))
);
}
@Test
void testLineZM() throws ParseException {
assertSame(
"LINESTRING ZM(1 2 3 4, 5 6 7 8)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2, "z", 3, "m", 4),
Map.of("x", 5, "y", 6, "z", 7, "m", 8)
)),
GeoArrow.linestring(List.of(
List.of(1, 2, 3, 4),
List.of(5, 6, 7, 8)
))
);
}
@Test
void testPolygon() throws ParseException {
assertSame(
"POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))",
GeoArrow.polygon(List.of(
List.of(
Map.of("x", 0, "y", 0),
Map.of("x", 0, "y", 1),
Map.of("x", 1, "y", 1),
Map.of("x", 1, "y", 0),
Map.of("x", 0, "y", 0)
))),
GeoArrow.polygon(List.of(
List.of(
List.of(0, 0),
List.of(0, 1),
List.of(1, 1),
List.of(1, 0),
List.of(0, 0)
)
))
);
}
@Test
void testPolygonWithHole() throws ParseException {
assertSame(
"POLYGON((-2 -2, 2 -2, 0 2, -2 -2), (-1 -1, 1 -1, 0 1, -1 -1))",
GeoArrow.polygon(List.of(
List.of(
Map.of("x", -2, "y", -2),
Map.of("x", 2, "y", -2),
Map.of("x", 0, "y", 2),
Map.of("x", -2, "y", -2)
),
List.of(
Map.of("x", -1, "y", -1),
Map.of("x", 1, "y", -1),
Map.of("x", 0, "y", 1),
Map.of("x", -1, "y", -1)
)
)),
GeoArrow.polygon(List.of(
List.of(
List.of(-2, -2),
List.of(2, -2),
List.of(0, 2),
List.of(-2, -2)
),
List.of(
List.of(-1, -1),
List.of(1, -1),
List.of(0, 1),
List.of(-1, -1)
)
))
);
}
@Test
void testMultipoint() throws ParseException {
assertSame(
"MULTIPOINT(1 2, 3 4)",
GeoArrow.multipoint(List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
)),
GeoArrow.multipoint(List.of(
List.of(1, 2),
List.of(3, 4)
))
);
}
@Test
void testMultilinestring() throws ParseException {
assertSame(
"MULTILINESTRING((1 2, 3 4), (5 6, 7 8))",
GeoArrow.multilinestring(List.of(
List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
),
List.of(
Map.of("x", 5, "y", 6),
Map.of("x", 7, "y", 8)
)
)),
GeoArrow.multilinestring(List.of(
List.of(
List.of(1, 2),
List.of(3, 4)
),
List.of(
List.of(5, 6),
List.of(7, 8)
)
))
);
}
@Test
void testMultipolygon() throws ParseException {
assertSame(
"MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ((2 0, 3 0, 3 1, 2 1, 2 0)))",
GeoArrow.multipolygon(List.of(
List.of(List.of(
Map.of("x", 0, "y", 0),
Map.of("x", 1, "y", 0),
Map.of("x", 1, "y", 1),
Map.of("x", 0, "y", 1),
Map.of("x", 0, "y", 0)
)),
List.of(List.of(
Map.of("x", 2, "y", 0),
Map.of("x", 3, "y", 0),
Map.of("x", 3, "y", 1),
Map.of("x", 2, "y", 1),
Map.of("x", 2, "y", 0)
))
)),
GeoArrow.multipolygon(List.of(
List.of(List.of(
List.of(0, 0),
List.of(1, 0),
List.of(1, 1),
List.of(0, 1),
List.of(0, 0)
)),
List.of(List.of(
List.of(2, 0),
List.of(3, 0),
List.of(3, 1),
List.of(2, 1),
List.of(2, 0)
))
))
);
}
private static void assertSame(String wkt, Geometry... geometry) throws ParseException {
Geometry expected = GeoUtils.wktReader().read(wkt);
for (int i = 0; i < geometry.length; i++) {
assertEquals(expected, geometry[i], "geometry #" + i);
}
}
}

Wyświetl plik

@ -0,0 +1,447 @@
package com.onthegomap.planetiler.reader.parquet;
import static com.onthegomap.planetiler.geo.GeoUtils.createMultiPoint;
import static com.onthegomap.planetiler.geo.GeoUtils.point;
import static org.apache.parquet.filter2.predicate.FilterApi.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.WithTags;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.parquet.filter2.predicate.Filters;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.io.WKBWriter;
import org.locationtech.jts.io.WKTWriter;
class GeoParquetMetadataTest {
// https://github.com/opengeospatial/geoparquet/blob/main/examples/example_metadata.json
private static final String EXAMPLE_METADATA = """
{
"columns": {
"geometry": {
"bbox": [
-180.0,
-90.0,
180.0,
83.6451
],
"covering": {
"bbox": {
"xmax": [
"bbox",
"xmax"
],
"xmin": [
"bbox",
"xmin"
],
"ymax": [
"bbox",
"ymax"
],
"ymin": [
"bbox",
"ymin"
]
}
},
"crs": {
"$schema": "https://proj.org/schemas/v0.6/projjson.schema.json",
"area": "World.",
"bbox": {
"east_longitude": 180,
"north_latitude": 90,
"south_latitude": -90,
"west_longitude": -180
},
"coordinate_system": {
"axis": [
{
"abbreviation": "Lon",
"direction": "east",
"name": "Geodetic longitude",
"unit": "degree"
},
{
"abbreviation": "Lat",
"direction": "north",
"name": "Geodetic latitude",
"unit": "degree"
}
],
"subtype": "ellipsoidal"
},
"datum_ensemble": {
"accuracy": "2.0",
"ellipsoid": {
"inverse_flattening": 298.257223563,
"name": "WGS 84",
"semi_major_axis": 6378137
},
"id": {
"authority": "EPSG",
"code": 6326
},
"members": [
{
"id": {
"authority": "EPSG",
"code": 1166
},
"name": "World Geodetic System 1984 (Transit)"
},
{
"id": {
"authority": "EPSG",
"code": 1152
},
"name": "World Geodetic System 1984 (G730)"
},
{
"id": {
"authority": "EPSG",
"code": 1153
},
"name": "World Geodetic System 1984 (G873)"
},
{
"id": {
"authority": "EPSG",
"code": 1154
},
"name": "World Geodetic System 1984 (G1150)"
},
{
"id": {
"authority": "EPSG",
"code": 1155
},
"name": "World Geodetic System 1984 (G1674)"
},
{
"id": {
"authority": "EPSG",
"code": 1156
},
"name": "World Geodetic System 1984 (G1762)"
},
{
"id": {
"authority": "EPSG",
"code": 1309
},
"name": "World Geodetic System 1984 (G2139)"
}
],
"name": "World Geodetic System 1984 ensemble"
},
"id": {
"authority": "OGC",
"code": "CRS84"
},
"name": "WGS 84 (CRS84)",
"scope": "Not known.",
"type": "GeographicCRS"
},
"edges": "planar",
"encoding": "WKB",
"geometry_types": [
"Polygon",
"MultiPolygon"
]
}
},
"primary_column": "geometry",
"version": "1.1.0-dev"
}
""";
@Test
void testParseBasicMetadata() throws IOException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.named("root"),
Map.of("geo", EXAMPLE_METADATA),
""));
assertEquals("geometry", parsed.primaryColumn());
assertEquals("1.1.0-dev", parsed.version());
assertEquals("planar", parsed.primaryColumnMetadata().edges());
assertEquals("WKB", parsed.primaryColumnMetadata().encoding());
assertEquals(new Envelope(-180.0, 180.0, -90.0, 83.6451), parsed.primaryColumnMetadata().envelope());
assertEquals(new GeoParquetMetadata.CoveringBbox(
List.of("bbox", "xmin"),
List.of("bbox", "ymin"),
List.of("bbox", "xmax"),
List.of("bbox", "ymax")
), parsed.primaryColumnMetadata().covering().bbox());
assertEquals(List.of("Polygon", "MultiPolygon"), parsed.primaryColumnMetadata().geometryTypes());
assertTrue(parsed.primaryColumnMetadata().crs() instanceof Map);
}
@Test
void testFailsWhenNoGeometry() {
var fileMetadata = new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("not_geometry")
.named("root"),
Map.of(),
"");
assertThrows(IOException.class, () -> GeoParquetMetadata.parse(fileMetadata));
}
@Test
void testFailsWhenBadGeometryType() {
var fileMetadata = new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32)
.named("geometry")
.named("root"),
Map.of(),
"");
assertThrows(IOException.class, () -> GeoParquetMetadata.parse(fileMetadata));
}
@Test
void testInfersDefaultGeometry() throws IOException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.named("root"),
Map.of(),
""));
assertEquals("geometry", parsed.primaryColumn());
assertEquals("WKB", parsed.primaryColumnMetadata().encoding());
assertEquals(Bounds.WORLD.latLon(), parsed.primaryColumnMetadata().envelope());
assertNull(parsed.primaryColumnMetadata().covering());
}
@Test
void testGeometryReaderFromMetadata() throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.named("root"),
Map.of("geo", EXAMPLE_METADATA),
""));
assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
"geometry", new WKBWriter().write(point(1, 2))
))));
}
@Test
void testGeometryReaderFromMetadataDifferentName() throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("other")
.named("root"),
Map.of("geo", """
{
"primary_column": "other",
"columns": {
"other": {
"encoding": "WKB"
}
}
}
"""),
""));
assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
"other", new WKBWriter().write(point(1, 2))
))));
}
@ParameterizedTest
@ValueSource(strings = {"wkb_geometry", "geometry"})
void testReadWKBGeometryNoMetadata(String name) throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named(name)
.named("root"),
Map.of(),
""));
assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
name, new WKBWriter().write(point(1, 2))
))));
}
@Test
void testReadWKTGeometryNoMetadata() throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("wkt_geometry")
.named("root"),
Map.of(),
""));
assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
"wkt_geometry", new WKTWriter().write(point(1, 2))
))));
}
@TestFactory
void testReadGeoArrowPoint() throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage().named("root"),
Map.of("geo", """
{
"primary_column": "geoarrow",
"columns": {
"geoarrow": {
"encoding": "point"
}
}
}
"""),
""));
assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
"geoarrow", Map.of("x", 1, "y", 2)
))));
}
@TestFactory
void testReadGeoArrowMultiPoint() throws IOException, GeometryException {
var parsed = GeoParquetMetadata.parse(new FileMetaData(
Types.buildMessage().named("root"),
Map.of("geo", """
{
"primary_column": "geoarrow",
"columns": {
"geoarrow": {
"encoding": "multipolygon"
}
}
}
"""),
""));
assertEquals(createMultiPoint(List.of(point(1, 2))),
new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of(
"geoarrow", List.of(Map.of("x", 1, "y", 2))
))));
}
@ParameterizedTest
@CsvSource({
"bbox, true, DOUBLE",
"bbox, true, FLOAT",
"custom_bbox, true, DOUBLE",
"custom_bbox, true, FLOAT",
"bbox, false, DOUBLE",
"bbox, false, FLOAT",
})
void testBboxFilterFromMetadata(String bbox, boolean hasMetadata, PrimitiveType.PrimitiveTypeName type)
throws IOException {
var schema = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.requiredGroup()
.required(type).named("xmin")
.required(type).named("xmax")
.required(type).named("ymin")
.required(type).named("ymax")
.named(bbox)
.named("root");
var parsed = GeoParquetMetadata.parse(new FileMetaData(
schema,
hasMetadata ? Map.of("geo", EXAMPLE_METADATA.replaceAll("\"bbox\",", "\"" + bbox + "\",")) : Map.of(),
""));
var expected = type == PrimitiveType.PrimitiveTypeName.FLOAT ?
and(
and(gtEq(floatColumn(bbox + ".xmax"), 1f), ltEq(floatColumn(bbox + ".xmin"), 2f)),
and(gtEq(floatColumn(bbox + ".ymax"), 3f), ltEq(floatColumn(bbox + ".ymin"), 4f))
) :
and(
and(gtEq(doubleColumn(bbox + ".xmax"), 1.0), ltEq(doubleColumn(bbox + ".xmin"), 2.0)),
and(gtEq(doubleColumn(bbox + ".ymax"), 3.0), ltEq(doubleColumn(bbox + ".ymin"), 4.0))
);
assertEquals(expected, parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4))));
}
@ParameterizedTest
@CsvSource({
"bbox, true, DOUBLE",
"bbox, true, FLOAT",
"custom_bbox, true, DOUBLE",
"custom_bbox, true, FLOAT",
"bbox, false, DOUBLE",
"bbox, false, FLOAT",
})
void testBboxFilterFromMetadataOldGdalStyle(String bbox, boolean hasMetadata, PrimitiveType.PrimitiveTypeName type)
throws IOException {
var schema = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.required(type).named(bbox + ".xmin")
.required(type).named(bbox + ".xmax")
.required(type).named(bbox + ".ymin")
.required(type).named(bbox + ".ymax")
.named("root");
var parsed = GeoParquetMetadata.parse(new FileMetaData(
schema,
hasMetadata ? Map.of("geo", """
{
"primary_column": "geometry",
"columns": {
"geometry": {
"covering": {
"bbox": {
"xmin": ["bbox.xmin"],
"xmax": ["bbox.xmax"],
"ymin": ["bbox.ymin"],
"ymax": ["bbox.ymax"]
}
}
}
}
}
""".replace("bbox.", bbox + ".")) : Map.of(),
""));
var expected = type == PrimitiveType.PrimitiveTypeName.FLOAT ?
and(
and(gtEq(Filters.floatColumn(List.of(bbox + ".xmax")), 1f),
ltEq(Filters.floatColumn(List.of(bbox + ".xmin")), 2f)),
and(gtEq(Filters.floatColumn(List.of(bbox + ".ymax")), 3f),
ltEq(Filters.floatColumn(List.of(bbox + ".ymin")), 4f))
) :
and(
and(gtEq(Filters.doubleColumn(List.of(bbox + ".xmax")), 1.0),
ltEq(Filters.doubleColumn(List.of(bbox + ".xmin")), 2.0)),
and(gtEq(Filters.doubleColumn(List.of(bbox + ".ymax")), 3.0),
ltEq(Filters.doubleColumn(List.of(bbox + ".ymin")), 4.0))
);
assertEquals(expected, parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4))));
}
@Test
void testNoBboxFilterFromDefault() throws IOException {
var schema = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.BINARY)
.named("geometry")
.named("root");
var parsed = GeoParquetMetadata.parse(new FileMetaData(
schema,
Map.of(),
""));
assertNull(parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4))));
}
}

Wyświetl plik

@ -0,0 +1,536 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.Period;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
class ParquetConverterTest {
@Test
void testIntPrimitive() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.INT32,
converter -> converter.addInt(1),
1
);
}
@ParameterizedTest
@CsvSource({
"32, true, 100, 100",
"32, true, 2147483647, 2147483647",
"32, true, -2147483648, -2147483648",
"32, false, 100, 100",
"16, true, 100, 100",
"8, true, 256, 256",
})
void testIntPrimitiveWithAnnotation(int bitWidth, boolean isSigned, int input, int expected) {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(bitWidth, isSigned),
converter -> converter.addInt(input),
expected
);
}
@Test
void testLongPrimitive() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
converter -> converter.addLong(1),
1L
);
}
@ParameterizedTest
@CsvSource({
"64, true, 9223372036854775807, 9223372036854775807",
"64, false, 9223372036854775807, 9223372036854775807",
"64, true, -9223372036854775808, -9223372036854775808",
"64, true, 1, 1",
})
void testLongPrimitiveWithAnnotation(int bitWidth, boolean isSigned, long input, long expected) {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.intType(bitWidth, isSigned),
converter -> converter.addLong(input),
expected
);
}
@ParameterizedTest
@CsvSource({
"0, 1, 10, 10",
"1, 9, 10, 1",
"2, 9, 10, 0.1",
})
void testIntDecimal(int scale, int precision, int value, double expected) {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.decimalType(scale, precision),
converter -> converter.addInt(value),
expected
);
}
@ParameterizedTest
@CsvSource({
"0, 1, 10, 10",
"1, 18, 10, 1",
"2, 18, 10, 0.1",
})
void testLongDecimal(int scale, int precision, long value, double expected) {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.decimalType(scale, precision),
converter -> converter.addLong(value),
expected
);
}
@Test
void testBooleanPrimitive() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.BOOLEAN,
converter -> converter.addBoolean(true),
true
);
}
@Test
void testFloatPrimitive() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.FLOAT,
converter -> converter.addFloat(1f),
1.0
);
}
@Test
void testDoublePrimitive() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.DOUBLE,
converter -> converter.addDouble(1.5),
1.5
);
}
@Test
void testInt96Timestamp() {
testPrimitive(
PrimitiveType.PrimitiveTypeName.INT96,
converter -> converter.addBinary(Binary.fromConstantByteArray(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})),
Instant.parse("-4713-11-24T00:00:00Z")
);
}
@Test
void testDate() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.dateType(),
converter -> converter.addInt(2),
LocalDate.of(1970, 1, 3)
);
}
@Test
void testTime() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS),
converter -> converter.addInt(61_000),
LocalTime.of(0, 1, 1)
);
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS),
converter -> converter.addLong(61_000_000),
LocalTime.of(0, 1, 1)
);
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS),
converter -> converter.addLong(61_000_000_000L),
LocalTime.of(0, 1, 1)
);
}
@ParameterizedTest
@CsvSource({
"true, MILLIS, 61000, 1970-01-01T00:01:01Z",
"true, MICROS, 61000000, 1970-01-01T00:01:01Z",
"true, NANOS, 61000000000, 1970-01-01T00:01:01Z",
})
void testTimestamp(boolean utc, LogicalTypeAnnotation.TimeUnit unit, long input, String output) {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.timestampType(utc, unit),
converter -> converter.addLong(input),
Instant.parse(output)
);
}
@Test
void testString() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType(),
converter -> converter.addBinary(Binary.fromString("abcdef")),
"abcdef"
);
}
@Test
void testEnum() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.enumType(),
converter -> converter.addBinary(Binary.fromString("value")),
"value"
);
}
@Test
void testJson() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.jsonType(),
converter -> converter.addBinary(Binary.fromString("[1,2,3]")),
"[1,2,3]"
);
}
@Test
void testUUID() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
LogicalTypeAnnotation.uuidType(),
16,
converter -> converter
.addBinary(Binary.fromConstantByteArray(
new byte[]{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88, (byte) 0x99, (byte) 0xaa, (byte) 0xbb,
(byte) 0xcc, (byte) 0xdd, (byte) 0xee, (byte) 0xff})),
UUID.fromString("00112233-4455-6677-8899-aabbccddeeff")
);
}
@Test
void testInterval() {
testAnnotatedPrimitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(),
12,
converter -> converter.addBinary(Binary.fromConstantByteBuffer(ByteBuffer.allocate(12)
.order(ByteOrder.LITTLE_ENDIAN)
.putInt(1)
.putInt(2)
.putInt(3)
.flip())),
new Interval(
Period.ofMonths(1).plusDays(2),
Duration.ofMillis(3)
)
);
}
@Test
void testOptionalMissing() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("value")
.named("message"));
var rootConverter = materializer.getRootConverter();
rootConverter.start();
rootConverter.end();
assertEquals(Map.of(), materializer.getCurrentRecord());
}
@Test
void testListFromSimpleRepeatedElement() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.repeated(PrimitiveType.PrimitiveTypeName.INT32).named("value")
.named("message"));
var rootConverter = materializer.getRootConverter();
rootConverter.start();
rootConverter.end();
assertEquals(Map.of(), materializer.getCurrentRecord());
rootConverter.start();
rootConverter.getConverter(0).asPrimitiveConverter().addInt(1);
rootConverter.end();
assertEquals(Map.of("value", List.of(1)), materializer.getCurrentRecord());
rootConverter.start();
rootConverter.getConverter(0).asPrimitiveConverter().addInt(1);
rootConverter.getConverter(0).asPrimitiveConverter().addInt(2);
rootConverter.end();
assertEquals(Map.of("value", List.of(1, 2)), materializer.getCurrentRecord());
}
@Test
void testListFromListElementStructs() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.requiredList().optionalElement(PrimitiveType.PrimitiveTypeName.INT32).named("value")
.named("message"));
var root = materializer.getRootConverter();
var value = root.getConverter(0).asGroupConverter();
var list = value.getConverter(0).asGroupConverter();
var element = list.getConverter(0).asPrimitiveConverter();
root.start();
value.start();
value.end();
root.end();
assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord());
root.start();
value.start();
list.start();
element.addInt(1);
list.end();
list.start();
list.end();
list.start();
element.addInt(3);
list.end();
value.end();
root.end();
assertEquals(Map.of("value", Lists.newArrayList(1, null, 3)), materializer.getCurrentRecord());
}
@Test
void testListRepeatedAtTopAndBottomLevel() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.list(Type.Repetition.REPEATED).element(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REPEATED)
.named("value")
.named("message"));
var root = materializer.getRootConverter();
var value = root.getConverter(0).asGroupConverter();
var list = value.getConverter(0).asGroupConverter();
var element = list.getConverter(0).asPrimitiveConverter();
root.start();
value.start();
value.end();
value.start();
list.start();
element.addInt(1);
element.addInt(2);
list.end();
list.start();
element.addInt(3);
list.end();
value.end();
root.end();
assertEquals(Map.of("value", List.of(List.of(), List.of(List.of(1, 2), List.of(3)))),
materializer.getCurrentRecord());
}
@Test
void testNestedList() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.optionalList()
.optionalListElement()
.optionalElement(PrimitiveType.PrimitiveTypeName.INT32)
.named("value")
.named("root"));
//message root {
// optional group value (LIST) {
// repeated group list {
// optional group element (LIST) {
// repeated group list {
// optional int32 element;
// }
// }
// }
// }
//}
var root = materializer.getRootConverter();
var value = root.getConverter(0).asGroupConverter();
var outerList = value.getConverter(0).asGroupConverter();
var outerElement = outerList.getConverter(0).asGroupConverter();
var innerList = outerElement.getConverter(0).asGroupConverter();
var innerElement = innerList.getConverter(0).asPrimitiveConverter();
root.start();
root.end();
assertEquals(Map.of(), materializer.getCurrentRecord());
root.start();
value.start();
value.end();
root.end();
assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord());
root.start();
value.start();
outerList.start();
outerList.end();
outerList.start();
outerElement.start();
innerList.start();
innerElement.addInt(1);
innerList.end();
innerList.start();
innerList.end();
innerList.start();
innerElement.addInt(2);
innerList.end();
outerElement.end();
outerList.end();
value.end();
root.end();
assertEquals(Map.of(
"value", Lists.newArrayList(null, Lists.newArrayList(1, null, 2))
), materializer.getCurrentRecord());
}
@Test
void testMapConverter() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.optionalMap()
.key(PrimitiveType.PrimitiveTypeName.INT32)
.optionalValue(PrimitiveType.PrimitiveTypeName.INT64)
.named("value")
.named("root"));
//message root {
// optional group value (MAP) {
// repeated group key_value {
// required int32 key;
// optional int64 value;
// }
// }
//}
var root = materializer.getRootConverter();
var map = root.getConverter(0).asGroupConverter();
var keyValue = map.getConverter(0).asGroupConverter();
var key = keyValue.getConverter(0).asPrimitiveConverter();
var value = keyValue.getConverter(1).asPrimitiveConverter();
root.start();
root.end();
assertEquals(Map.of(), materializer.getCurrentRecord());
root.start();
map.start();
map.end();
root.end();
assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord());
root.start();
map.start();
keyValue.start();
key.addInt(1);
keyValue.end();
map.end();
root.end();
assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord());
root.start();
map.start();
keyValue.start();
key.addInt(1);
value.addLong(2);
keyValue.end();
map.end();
root.end();
assertEquals(Map.of("value", Map.of(1, 2L)), materializer.getCurrentRecord());
root.start();
map.start();
keyValue.start();
key.addInt(1);
value.addLong(2);
keyValue.end();
keyValue.start();
key.addInt(3);
value.addLong(4);
keyValue.end();
map.end();
root.end();
assertEquals(Map.of("value", Map.of(1, 2L, 3, 4L)), materializer.getCurrentRecord());
}
@Test
void testRepeatedMap() {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.map(Type.Repetition.REPEATED)
.key(PrimitiveType.PrimitiveTypeName.INT32)
.optionalValue(PrimitiveType.PrimitiveTypeName.INT64)
.named("value")
.named("root"));
var root = materializer.getRootConverter();
var map = root.getConverter(0).asGroupConverter();
var keyValue = map.getConverter(0).asGroupConverter();
var key = keyValue.getConverter(0).asPrimitiveConverter();
root.start();
map.start();
keyValue.start();
key.addInt(1);
keyValue.end();
map.end();
root.end();
assertEquals(Map.of("value", List.of(Map.of())), materializer.getCurrentRecord());
}
private void testPrimitive(PrimitiveType.PrimitiveTypeName type, Consumer<PrimitiveConverter> consumer,
Object expected) {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.required(type).named("value")
.named("message"));
var rootConverter = materializer.getRootConverter();
rootConverter.start();
consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter());
rootConverter.end();
assertEquals(Map.of("value", expected), materializer.getCurrentRecord());
}
private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation,
Consumer<PrimitiveConverter> consumer, Object expected) {
testAnnotatedPrimitive(type, annotation, 0, consumer, expected);
}
private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation,
int length, Consumer<PrimitiveConverter> consumer, Object expected) {
var materializer = new ParquetRecordConverter(Types.buildMessage()
.required(type).as(annotation).length(length).named("value")
.named("message"));
var rootConverter = materializer.getRootConverter();
rootConverter.start();
consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter());
rootConverter.end();
assertEquals(Map.of("value", expected), materializer.getCurrentRecord());
}
}

Wyświetl plik

@ -0,0 +1,188 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
import com.onthegomap.planetiler.TestUtils;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.util.Glob;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.locationtech.jts.geom.Envelope;
class ParquetInputFileTest {
static List<Path> bostons() {
return Glob.of(TestUtils.pathToResource("parquet")).resolve("boston*.parquet").find();
}
@ParameterizedTest
@MethodSource("bostons")
void testReadBoston(Path path) {
for (int i = 0; i < 3; i++) {
Set<Object> ids = new HashSet<>();
for (var block : new ParquetInputFile("parquet", "layer", path)
.get()) {
for (var item : block) {
ids.add(item.getString("id"));
}
}
assertEquals(3, ids.size(), "iter " + i);
}
}
@ParameterizedTest
@MethodSource("bostons")
void testReadBostonWithBboxFilterCovering(Path path) {
Set<Object> ids = new HashSet<>();
for (var block : new ParquetInputFile("parquet", "layer", path, null,
new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null)
.get()) {
for (var item : block) {
ids.add(item.getString("id"));
}
}
assertEquals(3, ids.size());
}
@ParameterizedTest
@MethodSource("bostons")
void testReadBostonWithBboxFilterCoveringAndOtherFilter(Path path) {
Set<Object> ids = new HashSet<>();
for (var block : new ParquetInputFile("parquet", "layer", path, FilterApi.gt(FilterApi.doubleColumn("height"), 3d),
new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null)
.get()) {
for (var item : block) {
ids.add(item.getString("id"));
}
}
assertEquals(1, ids.size());
}
@ParameterizedTest
@MethodSource("bostons")
void testReadBostonWithBboxFilterNotCovering(Path path) {
Set<Object> ids = new HashSet<>();
for (var block : new ParquetInputFile("parquet", "layer", path, null,
new Bounds(new Envelope(-72.0747653629, -72.0741656634, 42.3560968301, 42.3564346282)), null, null)
.get()) {
for (var item : block) {
ids.add(item.getString("id"));
}
}
assertEquals(0, ids.size());
}
@TestFactory
@SuppressWarnings("java:S5961")
List<DynamicTest> testReadAllDataTypes() {
/*
column_name column_type null key default extra
varchar varchar varchar varchar varchar varchar
geometry BLOB YES ST_AsWKB(ST_Point(1, 2))
bigint BIGINT YES 9223372036854775807
blob BLOB YES '1011'
boolean BOOLEAN YES true
date DATE YES '2000-01-01'
decimal DECIMAL(18,3) YES 123456.789
double DOUBLE YES 123456.789
hugeint HUGEINT YES 92233720368547758079223372036854775807
integer INTEGER YES 123
interval INTERVAL YES INTERVAL 1 MONTH + INTERVAL 1 DAY + INTERVAL 1 SECOND
real FLOAT YES 123.456
smallint SMALLINT YES 1234
time TIME YES 2000-01-01 05:30:10.123
timestamp_with_tim TIMESTAMP WITH TIME ZONE YES 2000-01-01 05:30:10.123 EST
timestamp TIMESTAMP YES 2000-01-01 05:30:10.123456 EST
tinyint TINYINT YES 123
ubigint UBIGINT YES 9223372036854775807
uhugeint UHUGEINT YES 92233720368547758079223372036854775807
uinteger UINTEGER YES 123
usmallint USMALLINT YES 123
utinyint UTINYINT YES 123
uuid UUID YES 606362d9-012a-4949-b91a-1ab439951671
varchar VARCHAR YES "string"
list INTEGER[] YES [1,2,3,4]
map MAP(INTEGER, VARCHAR) YES map([1,2,3],['one','two','three'])
array INTEGER[3] YES [1,2,3]
struct STRUCT(i INTEGER, j VARCHAR) YES {'i': 42, 'j': 'a'}
complex MAP(VARCHAR, STRUCT(i INTEGE YES [MAP(['a', 'b'], [[], [{'i': 43, 'j': 'a'}, {'i': 43, 'j': 'b'}]])];
29 rows 6 columns
*/
Map<String, Object> map = null;
int i = 0;
for (var block : new ParquetInputFile("parquet", "layer",
TestUtils.pathToResource("parquet").resolve("all_data_types.parquet"))
.get()) {
for (var item : block) {
map = item.tags();
assertEquals(0, i++);
}
}
assertNotNull(map);
return List.of(
testEquals(map, "bigint", 9223372036854775807L),
test(map, "blob", v -> assertArrayEquals("1011".getBytes(), (byte[]) v)),
testEquals(map, "boolean", true),
testEquals(map, "date", LocalDate.of(2000, 1, 1)),
testEquals(map, "decimal", 123456.789),
testEquals(map, "double", 123456.789),
testEquals(map, "hugeint", 92233720368547758079223372036854775807.0),
testEquals(map, "integer", 123),
testEquals(map, "interval", Interval.of(1, 2, 3_000)),
test(map, "real", v -> assertEquals(123.456, (double) v, 1e-3)),
testEquals(map, "smallint", 1234),
testEquals(map, "time", LocalTime.parse("05:30:10.123")),
testEquals(map, "timestamp_with_timezone", Instant.parse("2000-01-01T10:30:10.123Z")),
testEquals(map, "timestamp", Instant.parse("2000-01-01T10:30:10.123Z")),
testEquals(map, "tinyint", 123),
testEquals(map, "ubigint", 9223372036854775807L),
testEquals(map, "uhugeint", 92233720368547758079223372036854775807.0),
testEquals(map, "uinteger", 123),
testEquals(map, "usmallint", 123),
testEquals(map, "utinyint", 123),
testEquals(map, "uuid", UUID.fromString("606362d9-012a-4949-b91a-1ab439951671")),
testEquals(map, "varchar", "string"),
testEquals(map, "list_of_items", List.of(1, 2, 3, 4)),
testEquals(map, "map", Map.of(1, "one", 2, "two", 3, "three")),
testEquals(map, "array", List.of(1, 2, 3)),
testEquals(map, "struct", Map.of("i", 42, "j", "a")),
testEquals(map, "complex", List.of(Map.of(
"a", List.of(),
"b", List.of(
Map.of("i", 43, "j", "a"),
Map.of("i", 43, "j", "b")
)
)))
);
}
private static DynamicTest testEquals(Map<String, Object> map, String key, Object expected) {
return test(map, key, v -> assertEquals(expected, map.get(key)));
}
private static DynamicTest test(Map<String, Object> map, String key, Consumer<Object> test) {
return dynamicTest(key, () -> test.accept(map.get(key)));
}
}

Wyświetl plik

@ -0,0 +1,84 @@
package com.onthegomap.planetiler.reader.parquet;
import static com.onthegomap.planetiler.TestUtils.newPoint;
import static com.onthegomap.planetiler.TestUtils.round;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.TestUtils;
import com.onthegomap.planetiler.collection.FeatureGroup;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.geo.TileOrder;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Glob;
import com.onthegomap.planetiler.util.Parse;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.locationtech.jts.geom.Geometry;
class ParquetReaderTest {
private final PlanetilerConfig config = PlanetilerConfig.defaults();
private final Stats stats = Stats.inMemory();
static List<Path> bostons() {
return Glob.of(TestUtils.pathToResource("parquet")).resolve("boston*.parquet").find();
}
@ParameterizedTest
@MethodSource("bostons")
@Timeout(30)
void testReadOvertureParquet(Path path) {
List<String> ids = new CopyOnWriteArrayList<>();
List<Geometry> geoms = new CopyOnWriteArrayList<>();
var profile = new Profile.NullProfile() {
volatile double height = 0;
@Override
public synchronized void processFeature(SourceFeature sourceFeature, FeatureCollector features) {
try {
ids.add(sourceFeature.getString("id"));
height += Parse.parseDoubleOrNull(sourceFeature.getTag("height")) instanceof Double d ? d : 0;
geoms.add(sourceFeature.latLonGeometry());
} catch (GeometryException e) {
throw new RuntimeException(e);
}
}
};
var reader = new ParquetReader("source", profile, stats);
reader.process(List.of(path),
FeatureGroup.newInMemoryFeatureGroup(TileOrder.TMS, profile, config, stats), PlanetilerConfig.defaults());
assertEquals(List.of(
"08b2a306638a0fff02001c5b97636c80",
"08b2a306638a0fff0200a75c80c3d54b",
"08b2a306638a0fff0200d1814977faca"
), ids.stream().sorted().toList());
var center = GeoUtils.combine(geoms.toArray(Geometry[]::new)).getCentroid();
assertEquals(newPoint(-71.07448, 42.35626), round(center));
assertEquals(4.7, profile.height);
}
@Test
void testHivePartitionFields() {
assertNull(ParquetReader.getHivePartitionFields(Path.of("")));
assertNull(ParquetReader.getHivePartitionFields(Path.of("a")));
assertNull(ParquetReader.getHivePartitionFields(Path.of("a", "b")));
assertEquals(Map.of("c", "d"), ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d")));
assertEquals(Map.of("c", "d", "e", "f"),
ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d", "e=f")));
assertEquals(Map.of("a", "b", "c", "d", "e", "f"),
ParquetReader.getHivePartitionFields(Path.of("a=b", "b", "c=d", "e=f")));
}
}

Wyświetl plik

@ -119,6 +119,13 @@ class FileUtilsTest {
txtFiles.stream().sorted().toList(),
matchingPaths.stream().sorted().toList()
);
matchingPaths = Glob.of(parent).resolve("*.txt").find();
assertEquals(
txtFiles.stream().sorted().toList(),
matchingPaths.stream().sorted().toList()
);
}
@Test
@ -140,6 +147,9 @@ class FileUtilsTest {
// Otherwise, the files inside the zip should be returned.
assertEquals(List.of(zipFile.resolve("inner.txt")),
FileUtils.walkPathWithPattern(parent, "*.zip", mockWalkZipFile));
assertEquals(List.of(zipFile), Glob.of(parent).resolve("*.zip").find());
}
@Test
@ -148,6 +158,12 @@ class FileUtilsTest {
var matchingPaths = FileUtils.walkPathWithPattern(zipPath, "stations.sh[px]");
assertEquals(
List.of("/shapefile/stations.shp", "/shapefile/stations.shx"),
matchingPaths.stream().map(Path::toString).sorted().toList());
matchingPaths = Glob.of(zipPath).resolve("stations.sh[px]").find();
assertEquals(
List.of("/shapefile/stations.shp", "/shapefile/stations.shx"),
matchingPaths.stream().map(Path::toString).sorted().toList());

Wyświetl plik

@ -0,0 +1,66 @@
package com.onthegomap.planetiler.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
class GlobTest {
@TempDir
Path tmpDir;
@ParameterizedTest
@CsvSource(value = {
"a/b/c; a/b/c;",
"a/b/*; a/b; *",
"a/*/b; a; */b",
"*/b/*; ; */b/*",
"/*/test; /; */test",
"a/b={c,d}/other; a; b={c,d}/other",
"./a/b=?/other; ./a; b=?/other",
}, delimiter = ';')
void testParsePathWithPattern(String input, String base, String pattern) {
var separator = FileSystems.getDefault().getSeparator();
input = input.replace("/", separator);
base = base == null ? "" : base.replace("/", separator);
assertEquals(
new Glob(Path.of(base), pattern),
Glob.parse(input)
);
}
@Test
void testWalkPathWithPattern() throws IOException {
var path = tmpDir.resolve("a").resolve("b").resolve("c.txt");
FileUtils.createParentDirectories(path);
Files.writeString(path, "test");
assertEquals(List.of(path), Glob.of(tmpDir).resolve("a", "*", "c.txt").find());
assertEquals(List.of(path), Glob.of(tmpDir).resolve("*", "*", "c.txt").find());
assertEquals(List.of(path), Glob.of(tmpDir).resolve("a", "b", "c.txt").find());
}
@Test
void testResolve() {
var base = Glob.of(Path.of("a", "b"));
assertEquals(new Glob(Path.of("a", "b", "c"), null), base.resolve("c"));
assertEquals(new Glob(Path.of("a", "b", "c", "d"), null), base.resolve("c", "d"));
assertEquals(new Glob(Path.of("a", "b"), "*/d"), base.resolve("*", "d"));
assertEquals(new Glob(tmpDir, "*/*/c.txt"),
Glob.of(tmpDir).resolve("*", "*", "c.txt"));
}
@Test
void testParseAbsoluteString() {
var base = Glob.of(Path.of("a", "b")).resolve("*", "d");
var separator = base.base().getFileSystem().getSeparator();
assertEquals(new Glob(base.base().toAbsolutePath(), base.pattern()),
Glob.parse(base.base().toAbsolutePath() + separator + base.pattern()));
}
}

Wyświetl plik

@ -7,3 +7,7 @@ packages=com.onthegomap.planetiler.util.log4j
rootLogger.level=warn
rootLogger.appenderRefs=stdout
rootLogger.appenderRef.stdout.ref=STDOUT
# suppress warning about unreadable duckdb statistics
logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics
logger.apachecorrupt.level=error

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -10,6 +10,7 @@ import com.onthegomap.planetiler.examples.BikeRouteOverlay;
import com.onthegomap.planetiler.examples.OsmQaTiles;
import com.onthegomap.planetiler.examples.ToiletsOverlay;
import com.onthegomap.planetiler.examples.ToiletsOverlayLowLevelApi;
import com.onthegomap.planetiler.examples.overture.OvertureBasemap;
import com.onthegomap.planetiler.mbtiles.Verify;
import com.onthegomap.planetiler.util.CompareArchives;
import com.onthegomap.planetiler.util.TileSizeStats;
@ -54,6 +55,8 @@ public class Main {
entry("example-bikeroutes", BikeRouteOverlay::main),
entry("example-toilets", ToiletsOverlay::main),
entry("example-toilets-lowlevel", ToiletsOverlayLowLevelApi::main),
entry("example-overture", OvertureBasemap::main),
entry("overture", OvertureBasemap::main),
entry("example-qa", OsmQaTiles::main),
entry("osm-qa", OsmQaTiles::main),

Wyświetl plik

@ -0,0 +1,66 @@
package com.onthegomap.planetiler.examples.overture;
import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Planetiler;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.util.Glob;
import java.nio.file.Path;
/**
* Example basemap using <a href="https://overturemaps.org/">Overture Maps</a> data.
*/
public class OvertureBasemap implements Profile {
@Override
public void processFeature(SourceFeature source, FeatureCollector features) {
String layer = source.getSourceLayer();
switch (layer) {
case "building" -> features.polygon("building")
.setMinZoom(13)
.inheritAttrFromSource("height")
.inheritAttrFromSource("roof_color");
case null, default -> {
// ignore for now
}
}
}
@Override
public String name() {
return "Overture";
}
@Override
public String description() {
return "A basemap generated from Overture data";
}
@Override
public String attribution() {
return """
<a href="https://www.openstreetmap.org/copyright" target="_blank">&copy; OpenStreetMap</a>
<a href="https://docs.overturemaps.org/attribution" target="_blank">&copy; Overture Maps Foundation</a>
"""
.replace("\n", " ")
.trim();
}
public static void main(String[] args) throws Exception {
run(Arguments.fromArgsOrConfigFile(args));
}
static void run(Arguments args) throws Exception {
Path base = args.inputFile("base", "overture base directory", Path.of("data", "overture"));
Planetiler.create(args)
.setProfile(new OvertureBasemap())
.addParquetSource("overture-buildings",
Glob.of(base).resolve("*", "type=building", "*.parquet").find(),
true, // hive-partitioning
fields -> fields.get("id"), // hash the ID field to generate unique long IDs
fields -> fields.get("type")) // extract "type={}" from the filename to get layer
.overwriteOutput(Path.of("data", "overture.pmtiles"))
.run();
}
}