Improvements to geoparquet geoarrow conversion (#933)

pull/934/head
Michael Barry 2024-06-27 08:39:01 -04:00 zatwierdzone przez GitHub
rodzic 024e387407
commit cf534c1ff4
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
11 zmienionych plików z 685 dodań i 307 usunięć

Wyświetl plik

@ -0,0 +1,69 @@
package com.onthegomap.planetiler.reader.parquet;
import com.carrotsearch.hppc.DoubleArrayList;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.CoordinateXY;
import org.locationtech.jts.geom.CoordinateXYZM;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.impl.PackedCoordinateSequence;
/** A mutable {@link CoordinateSequence} that grows to fit new elements when {@code set*} methods are called. */
class CoordinateSequenceBuilder extends PackedCoordinateSequence {
private final DoubleArrayList points = new DoubleArrayList();
private final int components;
public CoordinateSequenceBuilder(int components) {
super(components, components > 3 ? 1 : 0);
this.components = components;
}
@Override
public double getOrdinate(int index, int ordinateIndex) {
return points.get(index * components + ordinateIndex);
}
@Override
public int size() {
return points.size() / components;
}
@Override
protected Coordinate getCoordinateInternal(int index) {
return switch (dimension) {
case 2 -> new CoordinateXY(getX(index), getY(index));
case 3 -> new Coordinate(getX(index), getY(index), getZ(index));
case 4 -> new CoordinateXYZM(getX(index), getY(index), getZ(index), getM(index));
default -> throw new IllegalStateException("Unexpected value: " + dimension);
};
}
@Override
public Object clone() {
throw new UnsupportedOperationException();
}
@Override
public PackedCoordinateSequence copy() {
return new PackedCoordinateSequence.Double(points.toArray(), components, components > 3 ? 1 : 0);
}
@Override
public void setOrdinate(int index, int ordinate, double value) {
int idx = index * components + ordinate;
int cnt = (index + 1) * components;
points.elementsCount = Math.max(points.elementsCount, cnt);
points.ensureCapacity(cnt);
points.set(idx, value);
}
@Override
public Envelope expandEnvelope(Envelope env) {
for (int i = 0; i < points.size(); i += dimension) {
env.expandToInclude(points.get(i), points.get(i + 1));
}
return env;
}
}

Wyświetl plik

@ -2,7 +2,6 @@ package com.onthegomap.planetiler.reader.parquet;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.LineString;
@ -12,7 +11,6 @@ import org.locationtech.jts.geom.MultiPoint;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.geom.impl.PackedCoordinateSequence;
/**
* Utilities for converting nested <a href=
@ -22,79 +20,32 @@ import org.locationtech.jts.geom.impl.PackedCoordinateSequence;
class GeoArrow {
private GeoArrow() {}
// TODO create packed coordinate arrays while reading parquet values to avoid creating so many intermediate objects
static MultiPolygon multipolygon(List<List<List<Object>>> list) {
static MultiPolygon multipolygon(List<List<CoordinateSequence>> list) {
return GeoUtils.createMultiPolygon(map(list, GeoArrow::polygon));
}
static Polygon polygon(List<List<Object>> input) {
static Polygon polygon(List<CoordinateSequence> input) {
return GeoUtils.createPolygon(ring(input.getFirst()), input.stream().skip(1).map(GeoArrow::ring).toList());
}
static MultiPoint multipoint(List<Object> input) {
static MultiPoint multipoint(List<CoordinateSequence> input) {
return GeoUtils.createMultiPoint(map(input, GeoArrow::point));
}
static Point point(Object input) {
int dims = input instanceof List<?> l ? l.size() : input instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(1, dims, dims == 4 ? 1 : 0);
coordinate(input, result, 0);
return GeoUtils.JTS_FACTORY.createPoint(result);
static Point point(CoordinateSequence input) {
return GeoUtils.JTS_FACTORY.createPoint(input);
}
static MultiLineString multilinestring(List<List<Object>> input) {
static MultiLineString multilinestring(List<CoordinateSequence> input) {
return GeoUtils.createMultiLineString(map(input, GeoArrow::linestring));
}
static LineString linestring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLineString(coordinateSequence(input));
static LineString linestring(CoordinateSequence input) {
return GeoUtils.JTS_FACTORY.createLineString(input);
}
private static CoordinateSequence coordinateSequence(List<Object> input) {
if (input.isEmpty()) {
return GeoUtils.EMPTY_COORDINATE_SEQUENCE;
}
Object first = input.getFirst();
int dims = first instanceof List<?> l ? l.size() : first instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(input.size(), dims, dims == 4 ? 1 : 0);
for (int i = 0; i < input.size(); i++) {
Object item = input.get(i);
coordinate(item, result, i);
}
return result;
}
private static LinearRing ring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLinearRing(coordinateSequence(input));
}
private static void coordinate(Object input, CoordinateSequence result, int index) {
switch (input) {
case List<?> list -> {
List<Number> l = (List<Number>) list;
for (int i = 0; i < l.size(); i++) {
result.setOrdinate(index, i, l.get(i).doubleValue());
}
}
case Map<?, ?> map -> {
Map<String, Number> m = (Map<String, Number>) map;
for (var entry : m.entrySet()) {
int ordinateIndex = switch (entry.getKey()) {
case "x" -> 0;
case "y" -> 1;
case "z" -> 2;
case "m" -> 3;
case null, default -> throw new IllegalArgumentException("Bad coordinate key: " + entry.getKey());
};
result.setOrdinate(index, ordinateIndex, entry.getValue().doubleValue());
}
}
default -> throw new IllegalArgumentException("Expecting map or list, got: " + input);
}
private static LinearRing ring(CoordinateSequence input) {
return GeoUtils.JTS_FACTORY.createLinearRing(input);
}
private static <I, O> List<O> map(List<I> in, Function<I, O> remap) {

Wyświetl plik

@ -172,6 +172,13 @@ public record GeoParquetMetadata(
}
return null;
}
/** Returns the geoarrow type string of this geometry column, or null if not geoarrow. */
public String getGeoArrowType() {
return (encoding != null && (encoding.contains("polygon") ||
encoding.contains("point") ||
encoding.contains("line"))) ? encoding : null;
}
}
public ColumnMetadata primaryColumnMetadata() {

Wyświetl plik

@ -6,7 +6,6 @@ import com.onthegomap.planetiler.geo.GeometryType;
import com.onthegomap.planetiler.reader.WithTags;
import com.onthegomap.planetiler.util.FunctionThatThrows;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.locationtech.jts.geom.Geometry;
@ -24,9 +23,9 @@ class GeometryReader {
Function<Object, GeometryType> sniffType
) {}
private static <L extends List<?>> FormatHandler arrowHandler(GeometryType type,
FunctionThatThrows<L, Geometry> parser) {
return new FormatHandler(obj -> obj instanceof List<?> list ? parser.apply((L) list) : null, any -> type);
private static <T> FormatHandler arrowHandler(GeometryType type,
FunctionThatThrows<T, Geometry> parser) {
return new FormatHandler(obj -> parser.apply((T) obj), any -> type);
}
GeometryReader(GeoParquetMetadata geoparquet) {
@ -43,17 +42,17 @@ class GeometryReader {
obj -> obj instanceof String string ? GeoUtils.wktReader().read(string) : null,
obj -> obj instanceof String string ? GeometryType.fromWKT(string) : GeometryType.UNKNOWN
);
case "multipolygon", "geoarrow.multipolygon" ->
case "multipolygon" ->
arrowHandler(GeometryType.POLYGON, GeoArrow::multipolygon);
case "polygon", "geoarrow.polygon" ->
case "polygon" ->
arrowHandler(GeometryType.POLYGON, GeoArrow::polygon);
case "multilinestring", "geoarrow.multilinestring" ->
case "multilinestring" ->
arrowHandler(GeometryType.LINE, GeoArrow::multilinestring);
case "linestring", "geoarrow.linestring" ->
case "linestring" ->
arrowHandler(GeometryType.LINE, GeoArrow::linestring);
case "multipoint", "geoarrow.multipoint" ->
case "multipoint" ->
arrowHandler(GeometryType.POINT, GeoArrow::multipoint);
case "point", "geoarrow.point" ->
case "point" ->
arrowHandler(GeometryType.POINT, GeoArrow::point);
default -> throw new IllegalArgumentException("Unhandled type: " + columnInfo.encoding());
};

Wyświetl plik

@ -57,6 +57,7 @@ public class ParquetInputFile {
final GeometryReader geometryReader;
private final Map<String, Object> extraFields;
private final Set<GeometryType> geometryTypes;
private final GeoParquetMetadata geoparquet;
private Envelope postFilterBounds = null;
private boolean outOfBounds = false;
@ -75,7 +76,7 @@ public class ParquetInputFile {
try (var file = open()) {
metadata = file.getFooter();
var fileMetadata = metadata.getFileMetaData();
var geoparquet = GeoParquetMetadata.parse(fileMetadata);
geoparquet = GeoParquetMetadata.parse(fileMetadata);
this.geometryReader = new GeometryReader(geoparquet);
this.geometryTypes = geoparquet.geometryTypes();
if (!bounds.isWorld()) {
@ -154,7 +155,7 @@ public class ParquetInputFile {
throw new UncheckedIOException(e);
}
MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema);
var recordReader = columnIO.getRecordReader(group, new ParquetRecordConverter(schema), filter);
var recordReader = columnIO.getRecordReader(group, new ParquetRecordConverter(schema, geoparquet), filter);
long total = group.getRowCount();
return Iterators.filter(new Iterator<>() {
long i = 0;

Wyświetl plik

@ -1,15 +1,20 @@
package com.onthegomap.planetiler.reader.parquet;
import com.onthegomap.planetiler.reader.FileFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
/**
@ -22,8 +27,8 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
private final StructConverter root;
private Map<String, Object> map;
ParquetRecordConverter(MessageType schema) {
root = new StructConverter(new Context(schema)) {
ParquetRecordConverter(MessageType schema, GeoParquetMetadata geoParquetMetadata) {
root = new StructConverter(new Context(schema, geoParquetMetadata)) {
@Override
public void start() {
var group = new MapGroup(schema.getFieldCount());
@ -31,6 +36,72 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
map = group.getMap();
}
};
if (geoParquetMetadata != null) {
validateGeometryColumn(schema, geoParquetMetadata);
}
}
private void validateGeometryColumn(MessageType schema, GeoParquetMetadata geoParquetMetadata) {
var primary = geoParquetMetadata.primaryColumnMetadata();
String geoColumn = geoParquetMetadata.primaryColumn();
var colSchema = schema.getType(geoColumn);
var encoding = primary.encoding();
switch (encoding) {
case "WKT" -> require(
colSchema.isPrimitive() &&
colSchema.asPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
colSchema.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType(),
"String type required for wkt-encoded geometry column " + geoColumn + " got: " + colSchema);
case "WKB" -> require(
colSchema.isPrimitive() &&
colSchema.asPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
colSchema.getLogicalTypeAnnotation() == null,
"Binary type required for wkb-encoded geometry column " + geoColumn + " got: " + colSchema);
case "point" ->
requireConverter(encoding, geoColumn,
GeoArrowCoordinateConverter.class, 0, "Coordinate");
case "multipoint" ->
requireConverter(encoding, geoColumn,
GeoArrowCoordinateConverter.class, 1, "list<Coordinate>");
case "linestring" ->
requireConverter(encoding, geoColumn,
GeoArrowCoordinateSequenceConverter.class, 0, "list<Coordinate>");
case "multilinestring", "polygon" ->
requireConverter(encoding, geoColumn,
GeoArrowCoordinateSequenceConverter.class, 1, "list<list<Coordinate>>");
case "multipolygon" ->
requireConverter(encoding, geoColumn,
GeoArrowCoordinateSequenceConverter.class, 2, "list<list<list<Coordinate>>>");
case null, default -> throw new FileFormatException("Unexpected geoparquet geometry encoding: " + encoding);
}
}
private void requireConverter(String encoding, String column,
Class<?> clazz, int nesting, String pretty) {
var colSchema = root.context.type.asGroupType().getType(column);
var colIdx = root.context.type.asGroupType().getFieldIndex(column);
Converter converter = root.converters[colIdx];
for (int i = 0; i < nesting; i++) {
converter = getListElement(converter);
}
require(
clazz.isInstance(converter),
pretty + " type required for geoarrow " + encoding + " column " + column + " got: " + colSchema);
}
private static Converter getListElement(Converter converter) {
return (converter instanceof ListConverter lc && lc.getConverter(0) instanceof ListElementConverter lec) ?
lec.getConverter(0) : null;
}
private static void require(boolean condition, String message) {
if (!condition) {
throw new FileFormatException(message);
}
}
ParquetRecordConverter(MessageType schema) {
this(schema, null);
}
@Override
@ -78,6 +149,114 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
}
}
private static class GeoArrowCoordinateSequenceConverter extends StructConverter {
private final int dims;
private CoordinateSequenceBuilder currentSequence;
private int idx;
GeoArrowCoordinateSequenceConverter(Context context) {
super(context);
this.dims = context.type.asGroupType()
.getType(0).asGroupType()
.getType(0).asGroupType()
.getFieldCount();
}
@Override
protected Converter makeConverter(Context child) {
return new StructConverter(child) {
@Override
public void start() {}
@Override
protected Converter makeConverter(Context child) {
return new StructConverter(child) {
@Override
public void start() {}
@Override
public void end() {
idx++;
}
private PrimitiveConverter ordinateSetter(int ordinate) {
return new PrimitiveConverter() {
@Override
public void addDouble(double value) {
currentSequence.setOrdinate(idx, ordinate, value);
}
};
}
@Override
protected Converter makeConverter(Context child) {
return switch (child.type.getName()) {
case "x" -> ordinateSetter(0);
case "y" -> ordinateSetter(1);
case "z" -> ordinateSetter(2);
case "m" -> ordinateSetter(3);
default -> throw new IllegalStateException("Unexpected value: " + child.type.getName());
};
}
};
}
};
}
@Override
public void start() {
idx = 0;
currentSequence = new CoordinateSequenceBuilder(dims);
context.accept(currentSequence);
}
}
private static class GeoArrowCoordinateConverter extends StructConverter {
private final int dims;
private CoordinateSequenceBuilder currentSequence;
GeoArrowCoordinateConverter(Context context) {
super(context);
this.dims = context.type.asGroupType().getFieldCount();
}
private PrimitiveConverter ordinateSetter(int ordinate) {
return new PrimitiveConverter() {
@Override
public void addDouble(double value) {
currentSequence.setOrdinate(0, ordinate, value);
}
};
}
@Override
protected Converter makeConverter(Context child) {
class CoordinateSetter extends PrimitiveConverter {
@Override
public void addDouble(double value) {
super.addDouble(value);
}
}
return switch (child.type.getName()) {
case "x" -> ordinateSetter(0);
case "y" -> ordinateSetter(1);
case "z" -> ordinateSetter(2);
case "m" -> ordinateSetter(3);
default -> throw new IllegalStateException("Unexpected value: " + child.type.getName());
};
}
@Override
public void start() {
currentSequence = new CoordinateSequenceBuilder(dims);
context.accept(currentSequence);
}
}
private static class ListElementConverter extends StructConverter {
ListElementConverter(Context context) {
@ -159,6 +338,11 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
Type type = child.type;
LogicalTypeAnnotation logical = type.getLogicalTypeAnnotation();
if (!type.isPrimitive()) {
if (child.isGeoArrowCoordSeq()) {
return new GeoArrowCoordinateSequenceConverter(child);
} else if (child.isGeoArrowCoordinate()) {
return new GeoArrowCoordinateConverter(child);
}
return switch (logical) {
case LogicalTypeAnnotation.ListLogicalTypeAnnotation ignored ->
// If the repeated field is not a group, then its type is the element type and elements are required.
@ -320,6 +504,7 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
final Type type;
final boolean repeated;
private final int fieldCount;
private GeoParquetMetadata metadata;
Group current;
Context(Context parent, String fieldOnParent, Type type, boolean repeated) {
@ -331,11 +516,16 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
}
public Context(Context newParent, Type type) {
this(newParent, type.getName(), type, type.isRepetition(Type.Repetition.REPEATED));
this(newParent, type, null);
}
public Context(MessageType schema) {
this(null, schema);
public Context(Context newParent, Type type, GeoParquetMetadata metadata) {
this(newParent, type.getName(), type, type.isRepetition(Type.Repetition.REPEATED));
this.metadata = metadata;
}
public Context(MessageType schema, GeoParquetMetadata metadata) {
this(null, schema, metadata);
}
public Context field(int i) {
@ -388,5 +578,51 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
"type=" + type + ", " +
"repeated=" + repeated + ']';
}
public boolean isGeoArrowCoordSeq() {
String geoArrowType = getGeoArrowType();
if (geoArrowType == null || geoArrowType.contains("point")) {
return false;
}
if (type.isPrimitive() || type.asGroupType().getFieldCount() != 1 ||
type.getLogicalTypeAnnotation() != LogicalTypeAnnotation.listType()) {
return false;
}
var repeatedElement = this.type.asGroupType().getType(0);
if (!repeatedElement.isRepetition(Type.Repetition.REPEATED) || repeatedElement.isPrimitive() ||
repeatedElement.asGroupType().getFieldCount() != 1) {
return false;
}
return isGeoarrowCoordinate(repeatedElement.asGroupType().getType(0));
}
public boolean isGeoArrowCoordinate() {
String geoArrowType = getGeoArrowType();
return geoArrowType != null && geoArrowType.contains("point") && isGeoarrowCoordinate(type);
}
private String getGeoArrowType() {
if (parent == null) {
return null;
} else if (parent.metadata != null) {
var column = parent.metadata.columns().get(type.getName());
return column == null ? null : column.getGeoArrowType();
} else {
return parent.getGeoArrowType();
}
}
private static boolean isGeoarrowCoordinate(Type struct) {
if (struct.isPrimitive()) {
return false;
}
var group = struct.asGroupType();
var names = group.getFields().stream().map(Type::getName).collect(Collectors.toSet());
var types = group.getFields().stream()
.map(d -> d.isPrimitive() ? d.asPrimitiveType().getPrimitiveTypeName() : null).collect(Collectors.toSet());
return types.equals(Set.of(PrimitiveType.PrimitiveTypeName.DOUBLE)) &&
(names.equals(Set.of("x", "y")) || names.equals(Set.of("x", "y", "z")) ||
names.equals(Set.of("x", "y", "z", "m")));
}
}
}

Wyświetl plik

@ -0,0 +1,85 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.CoordinateSequences;
import org.locationtech.jts.geom.CoordinateXY;
import org.locationtech.jts.geom.CoordinateXYZM;
import org.locationtech.jts.geom.impl.CoordinateArraySequence;
class CoordinateSequenceBuilderTest {
@ParameterizedTest
@CsvSource({
"2, 2, 0",
"3, 3, 0",
"4, 4, 1",
})
void testEmpty(int components, int dims, int measures) {
CoordinateSequence cs = new CoordinateSequenceBuilder(components);
assertCoordinateSequence(new CoordinateArraySequence(new Coordinate[]{}, dims, measures), cs);
}
@Test
void testSingleXY() {
CoordinateSequence cs = new CoordinateSequenceBuilder(2);
cs.setOrdinate(0, 0, 1);
cs.setOrdinate(0, 1, 2);
assertCoordinateSequence(new CoordinateArraySequence(new Coordinate[]{
new CoordinateXY(1, 2)
}), cs);
}
@Test
void testDoubleXYZ() {
CoordinateSequence cs = new CoordinateSequenceBuilder(3);
cs.setOrdinate(0, 0, 1);
cs.setOrdinate(0, 1, 2);
cs.setOrdinate(1, 0, 1);
cs.setOrdinate(1, 1, 2);
cs.setOrdinate(1, 2, 3);
assertCoordinateSequence(new CoordinateArraySequence(new Coordinate[]{
new Coordinate(1, 2, 0),
new Coordinate(1, 2, 3)
}), cs);
}
@Test
void testTripleXYZM() {
CoordinateSequence cs = new CoordinateSequenceBuilder(4);
cs.setOrdinate(0, 0, 1);
cs.setOrdinate(0, 1, 2);
cs.setOrdinate(1, 0, 1);
cs.setOrdinate(1, 1, 2);
cs.setOrdinate(1, 2, 3);
cs.setOrdinate(2, 0, 1);
cs.setOrdinate(2, 1, 2);
cs.setOrdinate(2, 2, 3);
cs.setOrdinate(2, 3, 4);
assertCoordinateSequence(new CoordinateArraySequence(new Coordinate[]{
new CoordinateXYZM(1, 2, 0, 0),
new CoordinateXYZM(1, 2, 3, 0),
new CoordinateXYZM(1, 2, 3, 4),
}), cs);
}
private static void assertCoordinateSequence(CoordinateSequence expected, CoordinateSequence actual) {
assertEquals(expected.getDimension(), actual.getDimension(), "dimension");
assertEquals(expected.getMeasures(), actual.getMeasures(), "measures");
if (!CoordinateSequences.isEqual(expected, actual)) {
assertionFailure()
.expected(CoordinateSequences.toString(expected))
.actual(CoordinateSequences.toString(actual))
.buildAndThrow();
}
}
}

Wyświetl plik

@ -1,231 +0,0 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
class GeoArrowTest {
@Test
void testPointXY() throws ParseException {
assertSame(
"POINT(1 2)",
GeoArrow.point(Map.of("x", 1, "y", 2)),
GeoArrow.point(List.of(1, 2))
);
}
@Test
void testPointXYZ() throws ParseException {
assertSame(
"POINT Z(1 2 3)",
GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3)),
GeoArrow.point(List.of(1, 2, 3))
);
}
@Test
void testPointXYZM() throws ParseException {
assertSame(
"POINT ZM(1 2 3 4)",
GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3, "m", 4)),
GeoArrow.point(List.of(1, 2, 3, 4))
);
}
@Test
void testLine() throws ParseException {
assertSame(
"LINESTRING(1 2, 3 4)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
)),
GeoArrow.linestring(List.of(
List.of(1, 2),
List.of(3, 4)
))
);
}
@Test
void testLineZ() throws ParseException {
assertSame(
"LINESTRING Z(1 2 3, 4 5 6)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2, "z", 3),
Map.of("x", 4, "y", 5, "z", 6)
)),
GeoArrow.linestring(List.of(
List.of(1, 2, 3),
List.of(4, 5, 6)
))
);
}
@Test
void testLineZM() throws ParseException {
assertSame(
"LINESTRING ZM(1 2 3 4, 5 6 7 8)",
GeoArrow.linestring(List.of(
Map.of("x", 1, "y", 2, "z", 3, "m", 4),
Map.of("x", 5, "y", 6, "z", 7, "m", 8)
)),
GeoArrow.linestring(List.of(
List.of(1, 2, 3, 4),
List.of(5, 6, 7, 8)
))
);
}
@Test
void testPolygon() throws ParseException {
assertSame(
"POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))",
GeoArrow.polygon(List.of(
List.of(
Map.of("x", 0, "y", 0),
Map.of("x", 0, "y", 1),
Map.of("x", 1, "y", 1),
Map.of("x", 1, "y", 0),
Map.of("x", 0, "y", 0)
))),
GeoArrow.polygon(List.of(
List.of(
List.of(0, 0),
List.of(0, 1),
List.of(1, 1),
List.of(1, 0),
List.of(0, 0)
)
))
);
}
@Test
void testPolygonWithHole() throws ParseException {
assertSame(
"POLYGON((-2 -2, 2 -2, 0 2, -2 -2), (-1 -1, 1 -1, 0 1, -1 -1))",
GeoArrow.polygon(List.of(
List.of(
Map.of("x", -2, "y", -2),
Map.of("x", 2, "y", -2),
Map.of("x", 0, "y", 2),
Map.of("x", -2, "y", -2)
),
List.of(
Map.of("x", -1, "y", -1),
Map.of("x", 1, "y", -1),
Map.of("x", 0, "y", 1),
Map.of("x", -1, "y", -1)
)
)),
GeoArrow.polygon(List.of(
List.of(
List.of(-2, -2),
List.of(2, -2),
List.of(0, 2),
List.of(-2, -2)
),
List.of(
List.of(-1, -1),
List.of(1, -1),
List.of(0, 1),
List.of(-1, -1)
)
))
);
}
@Test
void testMultipoint() throws ParseException {
assertSame(
"MULTIPOINT(1 2, 3 4)",
GeoArrow.multipoint(List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
)),
GeoArrow.multipoint(List.of(
List.of(1, 2),
List.of(3, 4)
))
);
}
@Test
void testMultilinestring() throws ParseException {
assertSame(
"MULTILINESTRING((1 2, 3 4), (5 6, 7 8))",
GeoArrow.multilinestring(List.of(
List.of(
Map.of("x", 1, "y", 2),
Map.of("x", 3, "y", 4)
),
List.of(
Map.of("x", 5, "y", 6),
Map.of("x", 7, "y", 8)
)
)),
GeoArrow.multilinestring(List.of(
List.of(
List.of(1, 2),
List.of(3, 4)
),
List.of(
List.of(5, 6),
List.of(7, 8)
)
))
);
}
@Test
void testMultipolygon() throws ParseException {
assertSame(
"MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ((2 0, 3 0, 3 1, 2 1, 2 0)))",
GeoArrow.multipolygon(List.of(
List.of(List.of(
Map.of("x", 0, "y", 0),
Map.of("x", 1, "y", 0),
Map.of("x", 1, "y", 1),
Map.of("x", 0, "y", 1),
Map.of("x", 0, "y", 0)
)),
List.of(List.of(
Map.of("x", 2, "y", 0),
Map.of("x", 3, "y", 0),
Map.of("x", 3, "y", 1),
Map.of("x", 2, "y", 1),
Map.of("x", 2, "y", 0)
))
)),
GeoArrow.multipolygon(List.of(
List.of(List.of(
List.of(0, 0),
List.of(1, 0),
List.of(1, 1),
List.of(0, 1),
List.of(0, 0)
)),
List.of(List.of(
List.of(2, 0),
List.of(3, 0),
List.of(3, 1),
List.of(2, 1),
List.of(2, 0)
))
))
);
}
private static void assertSame(String wkt, Geometry... geometry) throws ParseException {
Geometry expected = GeoUtils.wktReader().read(wkt);
for (int i = 0; i < geometry.length; i++) {
assertEquals(expected, geometry[i], "geometry #" + i);
}
}
}

Wyświetl plik

@ -1,8 +1,12 @@
package com.onthegomap.planetiler.reader.parquet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.google.common.collect.Lists;
import com.onthegomap.planetiler.geo.GeometryException;
import com.onthegomap.planetiler.reader.FileFormatException;
import com.onthegomap.planetiler.reader.WithTags;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
@ -15,16 +19,60 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
class ParquetConverterTest {
private static final MessageType ARROW_SCHEMA_3 = Types.buildMessage()
.requiredList()
.requiredListElement()
.requiredListElement()
.requiredGroupElement()
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("x")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("y")
.named("geometry")
.named("root");
private static final MessageType ARROW_SCHEMA_2 = Types.buildMessage()
.requiredList()
.requiredListElement()
.requiredGroupElement()
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("x")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("y")
.named("geometry")
.named("root");
private static final MessageType ARROW_SCHEMA_1 = Types.buildMessage()
.requiredList()
.requiredGroupElement()
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("x")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("y")
.named("geometry")
.named("root");
private static final MessageType ARROW_SCHEMA_0 = Types.buildMessage()
.requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("x")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("y")
.named("geometry")
.named("root");
private static final List<MessageType> ARROW_SCHEMAS = List.of(
ARROW_SCHEMA_0,
ARROW_SCHEMA_1,
ARROW_SCHEMA_2,
ARROW_SCHEMA_3
);
@Test
void testIntPrimitive() {
testPrimitive(
@ -553,6 +601,219 @@ class ParquetConverterTest {
assertEquals(Map.of("value", List.of(Map.of())), materializer.getCurrentRecord());
}
@Test
void testGeoArrowMultiPolygon() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("multipolygon")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_3, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, geom -> {
traverseOnly(geom, polys -> {
traverseOnly(polys, poly -> {
traverseOnly(poly, rings -> {
traverseOnly(rings, ring -> {
addPoint(ring, 0, 0);
addPoint(ring, 3, 0);
addPoint(ring, 3, 3);
addPoint(ring, 0, 3);
addPoint(ring, 0, 0);
});
});
traverseOnly(poly, rings -> {
traverseOnly(rings, ring -> {
addPoint(ring, 1, 1);
addPoint(ring, 2, 1);
addPoint(ring, 2, 2);
addPoint(ring, 1, 2);
addPoint(ring, 1, 1);
});
});
});
});
traverseOnly(geom, polys -> {
traverseOnly(polys, poly -> {
traverseOnly(poly, rings -> {
traverseOnly(rings, ring -> {
addPoint(ring, 10, 10);
addPoint(ring, 11, 10);
addPoint(ring, 11, 11);
addPoint(ring, 10, 11);
addPoint(ring, 10, 10);
});
});
});
});
});
});
assertGeometry(
"MULTIPOLYGON (((0 0, 3 0, 3 3, 0 3, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1)), ((10 10, 11 10, 11 11, 10 11, 10 10)))",
reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@Test
void testGeoArrowPolygon() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("polygon")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_2, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, geom -> {
traverseOnly(geom, rings -> {
traverseOnly(rings, ring -> {
addPoint(ring, 0, 0);
addPoint(ring, 3, 0);
addPoint(ring, 3, 3);
addPoint(ring, 0, 3);
addPoint(ring, 0, 0);
});
});
});
});
assertGeometry("POLYGON ((0 0, 3 0, 3 3, 0 3, 0 0))",
reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@Test
void testGeoArrowMultilinestring() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("multilinestring")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_2, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, geom -> {
traverseOnly(geom, lines -> {
traverseOnly(lines, line -> {
addPoint(line, 0, 1);
addPoint(line, 2, 3);
});
});
traverseOnly(geom, lines -> {
traverseOnly(lines, line -> {
addPoint(line, 4, 5);
addPoint(line, 6, 7);
});
});
});
});
assertGeometry("MULTILINESTRING ((0 1, 2 3), (4 5, 6 7))",
reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@Test
void testGeoArrowLinestring() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("linestring")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_1, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, line -> {
addPoint(line, 0, 1);
addPoint(line, 2, 3);
});
});
assertGeometry("LINESTRING (0 1, 2 3)", reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@Test
void testGeoArrowMultiPoint() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("multipoint")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_1, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, points -> {
addPoint(points, 0, 1);
addPoint(points, 2, 3);
});
});
assertGeometry("MULTIPOINT (0 1, 2 3)", reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@Test
void testGeoArrowPoint() throws GeometryException, ParseException {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata("point")
));
var reader = new GeometryReader(geoparquet);
var materializer = new ParquetRecordConverter(ARROW_SCHEMA_0, geoparquet);
traverse(materializer.getRootConverter(), root -> {
traverseOnly(root, point -> {
point.getConverter(0).asPrimitiveConverter().addDouble(1);
point.getConverter(1).asPrimitiveConverter().addDouble(2);
});
});
assertGeometry("POINT (1 2)", reader.readPrimaryGeometry(WithTags.from(materializer.getCurrentRecord())));
}
@ParameterizedTest
@CsvSource({
"point, 0",
"multipoint, 1",
"linestring, 1",
"multilinestring, 2",
"polygon, 2",
"multipolygon, 3",
})
void testGeoArrowSchemaValidation(String type, int valid) {
var geoparquet = new GeoParquetMetadata("1.1", "geometry", Map.of(
"geometry", new GeoParquetMetadata.ColumnMetadata(type)
));
for (int i = 0; i < ARROW_SCHEMAS.size(); i++) {
var schema = ARROW_SCHEMAS.get(i);
if (i == valid) {
new ParquetRecordConverter(schema, geoparquet);
} else {
assertThrows(FileFormatException.class, () -> new ParquetRecordConverter(schema, geoparquet));
}
}
}
private static void assertGeometry(String expected, Geometry actual) throws ParseException {
assertEquals(
new WKTReader().read(
expected),
actual);
}
private static void addPoint(GroupConverter ring, double x, double y) {
traverseOnly(ring, coords -> {
traverseOnly(coords, coord -> {
coord.getConverter(0).asPrimitiveConverter().addDouble(x);
coord.getConverter(1).asPrimitiveConverter().addDouble(y);
});
});
}
private static void traverseOnly(Converter converter, Consumer<GroupConverter> fn) {
traverse(converter.asGroupConverter().getConverter(0), fn);
}
private static void traverse(Converter converter, Consumer<GroupConverter> fn) {
var group = converter.asGroupConverter();
group.start();
fn.accept(group);
group.end();
}
private void testPrimitive(PrimitiveType.PrimitiveTypeName type, Consumer<PrimitiveConverter> consumer,
Object expected) {
var materializer = new ParquetRecordConverter(Types.buildMessage()