osm2parquet
Mike Barry 2025-06-21 15:51:41 -04:00
rodzic 595c824c68
commit a0b7bb8a57
3 zmienionych plików z 429 dodań i 10 usunięć

Wyświetl plik

@ -2,6 +2,9 @@ package com.onthegomap.planetiler.reader.osm;
import static com.onthegomap.planetiler.util.MemoryEstimator.estimateSize;
import static com.onthegomap.planetiler.worker.Worker.joinFutures;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.LongArrayList;
@ -24,6 +27,7 @@ import com.onthegomap.planetiler.stats.Counter;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.util.MemoryEstimator;
import com.onthegomap.planetiler.util.ResourceUsage;
@ -37,12 +41,26 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateList;
import org.locationtech.jts.geom.CoordinateSequence;
@ -54,6 +72,7 @@ import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.geom.impl.CoordinateArraySequence;
import org.locationtech.jts.geom.impl.PackedCoordinateSequence;
import org.locationtech.jts.io.WKBWriter;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -346,7 +365,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
});
for (var block : prev) {
for (var element : block.decodeElements()) {
SourceFeature feature = null;
OsmFeature feature = null;
if (element instanceof OsmElement.Node node) {
phaser.arrive(OsmPhaser.Phase.NODES);
feature = processNodePass2(node);
@ -413,6 +432,182 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
}
}
static final MessageType SCHEMA =
Types.buildMessage()
.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("type")
.required(INT64).named("id")
.required(BINARY).named("geom")
.optional(INT64).named("changeset")
.optional(INT32).named("version")
.optional(INT64).named("timestamp")
.optional(INT32).named("user_id")
.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("user")
.list(Type.Repetition.OPTIONAL).element(Types.required(INT64).named("element")).named("way_nodes")
.list(Type.Repetition.OPTIONAL)
.element(Types.requiredGroup()
.addField(Types.required(INT64).named("ref"))
.addField(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("type"))
.addField(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("role"))
.named("element")
).named("relation_members")
.map(Type.Repetition.OPTIONAL)
.key(BINARY).as(LogicalTypeAnnotation.stringType())
.value(BINARY, Type.Repetition.REQUIRED).as(LogicalTypeAnnotation.stringType())
.named("tags")
.named("root");
static class CloseWrapper<T extends Closeable> implements Closeable {
T thing;
CloseWrapper(T thing) {
this.thing = thing;
}
@Override
public void close() throws IOException {
thing.close();
}
}
private ParquetWriter<Group> createParquetWriter(Path path) throws IOException {
return ExampleParquetWriter.builder(new LocalOutputFile(path))
.withRowGroupSize(128 * 1024 * 1024L)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withType(SCHEMA)
.build();
}
/**
* Constructs geometries from OSM elements and emits map features as defined by the {@link Profile}.
*
* @param config user-provided arguments to control the number of threads, and log interval
*/
public void pass2parquet(PlanetilerConfig config, Path output) {
FileUtils.deleteDirectory(output);
FileUtils.createDirectory(output);
var timer = stats.startStage("osm_pass2");
int processThreads = config.featureProcessThreads();
Counter.MultiThreadCounter blocksProcessed = Counter.newMultiThreadCounter();
// track relation count separately because they get enqueued onto the distributor near the end
Counter.MultiThreadCounter relationsProcessed = Counter.newMultiThreadCounter();
OsmPhaser pass2Phaser = new OsmPhaser(processThreads);
stats.counter("osm_pass2_elements_processed", "type", () -> Map.of(
"blocks", blocksProcessed::get,
"nodes", pass2Phaser::nodes,
"ways", pass2Phaser::ways,
"relations", relationsProcessed
));
AtomicInteger num = new AtomicInteger(0);
// record Block(OsmBlockSource.Block input, CompletableFuture<EncodedRowGroup> output) {}
var pipeline = WorkerPipeline.start("osm_pass2", stats)
.fromGenerator("read", osmBlockSource::forEachBlock)
.addBuffer("pbf_blocks", Math.max(10, processThreads / 2))
.sinkTo("process", processThreads, prev -> {
// avoid contention trying to get the thread-local counters by getting them once when thread starts
Counter blocks = blocksProcessed.counterForThread();
Path path;
var phaser = pass2Phaser.forWorker();
try (
var writer =
new CloseWrapper<>(createParquetWriter(path = output.resolve(num.incrementAndGet() + ".parquet")))
) {
long idx = 0;
final NodeLocationProvider nodeLocations = newNodeLocationProvider();
for (var block : prev) {
for (var element : block.decodeElements()) {
OsmFeature feature = null;
if (element instanceof OsmElement.Node node) {
phaser.arrive(OsmPhaser.Phase.NODES);
feature = processNodePass2(node);
} else if (element instanceof OsmElement.Way way) {
phaser.arrive(OsmPhaser.Phase.WAYS);
feature = processWayPass2(way, nodeLocations);
} else if (element instanceof OsmElement.Relation relation) {
phaser.arriveAndWaitForOthers(OsmPhaser.Phase.RELATIONS);
feature = processRelationPass2(relation, nodeLocations);
}
// render features specified by profile and hand them off to next step that will
// write them intermediate storage
if (feature != null) {
var group = new SimpleGroup(SCHEMA);
try {
Geometry geom = feature.worldGeometry();
var info = feature.originalElement().info();
if (info != null) {
group.add("changeset", info.changeset());
group.add("version", info.version());
group.add("timestamp", info.timestamp());
group.add("user_id", info.userId());
if (!StringUtils.isBlank(info.user())) {
group.add("user", info.user());
}
}
group.add("id", feature.originalElement().id());
group.add("type", feature.originalElement().type().name().toLowerCase(Locale.ROOT));
group.add("geom", Binary.fromConstantByteArray(new WKBWriter().write(geom)));
if (feature.tags() != null && !feature.tags().isEmpty()) {
var tags = group.addGroup("tags");
feature.tags().forEach((k, v) -> {
var tag = tags.addGroup("key_value");
tag.add("key", k);
tag.add("value", (String) v);
});
}
if (feature.originalElement() instanceof OsmElement.Way way && !way.nodes().isEmpty()) {
var nodes = group.addGroup("way_nodes");
for (var node : way.nodes()) {
nodes.addGroup("list").add("element", node.value);
}
}
if (feature.originalElement() instanceof OsmElement.Relation rel && !rel.members().isEmpty()) {
var nodes = group.addGroup("relation_members");
for (var member : rel.members()) {
var g = nodes.addGroup("list").addGroup("element");
g.add("ref", member.ref());
g.add("type", member.type().name().toLowerCase(Locale.ROOT));
g.add("role", member.role());
}
}
writer.thing.write(group);
} catch (GeometryException e) {
e.log(stats, "pass2", "pass2");
}
}
if (idx++ % 10_000 == 0 && FileUtils.size(path) > 1_000_000_000L) {
writer.thing.close();
writer.thing = createParquetWriter(path = output.resolve(num.incrementAndGet() + ".parquet"));
}
}
blocks.inc();
}
}
phaser.close();
});
var logger = ProgressLoggers.create()
.addRatePercentCounter("nodes", pass1Phaser.nodes(), pass2Phaser::nodes, true)
.addFileSizeAndRam(nodeLocationDb)
.addRatePercentCounter("ways", pass1Phaser.ways(), pass2Phaser::ways, true)
.addRatePercentCounter("rels", pass1Phaser.relations(), relationsProcessed, true)
.addFileSize("output", () -> FileUtils.directorySize(output))
.addRatePercentCounter("blocks", PASS1_BLOCKS.get(), blocksProcessed, false)
.newLine()
.addProcessStats()
.addInMemoryObject("relInfo", this)
.addFileSizeAndRam("mpGeoms", multipolygonWayGeometries)
.newLine()
.addPipelineStats(pipeline);
pipeline.awaitAndLog(logger, config.logInterval());
LOGGER.debug("Processed " + FORMAT.integer(blocksProcessed.get()) + " blocks:");
pass2Phaser.printSummary();
timer.stop();
}
/** Estimates the resource requirements for a nodemap but parses the type/storage from strings. */
public static ResourceUsage estimateNodeLocationUsage(String type, String storage, long osmFileSize, Path path) {
return estimateNodeLocationUsage(LongLongMap.Type.from(type), Storage.from(storage), osmFileSize, path);
@ -490,12 +685,12 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
);
}
SourceFeature processNodePass2(OsmElement.Node node) {
OsmFeature processNodePass2(OsmElement.Node node) {
// nodes are simple because they already contain their location
return new NodeSourceFeature(node);
}
SourceFeature processWayPass2(OsmElement.Way way, NodeLocationProvider nodeLocations) {
OsmFeature processWayPass2(OsmElement.Way way, NodeLocationProvider nodeLocations) {
// ways contain an ordered list of node IDs, so we need to join that with node locations
// from pass1 to reconstruct the geometry.
LongArrayList nodes = way.nodes();
@ -513,7 +708,7 @@ public class OsmReader implements Closeable, MemoryEstimator.HasEstimate {
return new WaySourceFeature(way, closed, area, nodeLocations, rels);
}
SourceFeature processRelationPass2(OsmElement.Relation rel, NodeLocationProvider nodeLocations) {
OsmFeature processRelationPass2(OsmElement.Relation rel, NodeLocationProvider nodeLocations) {
// Relation info gets used during way processing, except multipolygons which we have to process after we've
// stored all the node IDs for each way.
if (isMultipolygon(rel)) {

Wyświetl plik

@ -0,0 +1,224 @@
package com.onthegomap.planetiler.util;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.collection.LongLongMultimap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.reader.osm.OsmSourceFeature;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Locale;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
public class OsmToGeoparquet implements Profile, Closeable {
private static MessageType SCHEMA =
Types.buildMessage()
.required(INT64)
.named("id")
.required(BINARY).as(LogicalTypeAnnotation.stringType())
.named("geom")
.map(Type.Repetition.REQUIRED)
.key(BINARY).as(LogicalTypeAnnotation.stringType())
.value(BINARY, Type.Repetition.REQUIRED).as(LogicalTypeAnnotation.stringType())
.named("tags")
.repeated(INT64).named("nodes")
.named("root");
// TODO make encoded geometries
private static MessageType NODE_SCHEMA =
Types.buildMessage()
.required(INT64)
.named("id")
.map(Type.Repetition.REQUIRED)
.key(BINARY).as(LogicalTypeAnnotation.stringType())
.value(BINARY, Type.Repetition.REQUIRED).as(LogicalTypeAnnotation.stringType())
.named("tags")
.required(BINARY).named("geometry")
.named("root");
private static MessageType WAY_SCHEMA =
Types.buildMessage()
.required(INT64)
.named("id")
.map(Type.Repetition.REQUIRED)
.key(BINARY).as(LogicalTypeAnnotation.stringType())
.value(BINARY, Type.Repetition.REQUIRED).as(LogicalTypeAnnotation.stringType())
.named("tags")
.repeated(INT64).named("nodes")
.named("root");
private static MessageType REL_SCHEMA =
Types.buildMessage()
.required(INT64)
.named("id")
.map(Type.Repetition.REQUIRED)
.key(BINARY).as(LogicalTypeAnnotation.stringType())
.value(BINARY, Type.Repetition.REQUIRED).as(LogicalTypeAnnotation.stringType())
.named("tags")
.repeatedGroup()
.addField(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("role"))
.addField(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("type"))
.addField(Types.required(INT64).named("ref"))
.named("members")
.named("root");
private final ParquetWriter<Group> nodeWriter;
private final ParquetWriter<Group> wayWriter;
private final ParquetWriter<Group> relWriter;
public OsmToGeoparquet(Path file) throws IOException {
FileUtils.deleteDirectory(file);
FileUtils.createDirectory(file);
this.nodeWriter = ExampleParquetWriter.builder(new LocalOutputFile(file.resolve("nodes.parquet")))
.withType(NODE_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(100_000L)
.build();
this.wayWriter = ExampleParquetWriter.builder(new LocalOutputFile(file.resolve("ways.parquet")))
.withType(WAY_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(100_000L)
.build();
this.relWriter = ExampleParquetWriter.builder(new LocalOutputFile(file.resolve("relations.parquet")))
.withType(REL_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(100_000L)
.build();
}
public static void main(String... args) throws Exception {
Path dataDir = Path.of("data");
Arguments arguments = Arguments.fromArgs(args);
PlanetilerConfig config = PlanetilerConfig.from(arguments);
Path output = arguments.file("output", "output dir", Path.of("parquet-out"));
Path sourcesDir = arguments.file("download_dir", "download directory", dataDir.resolve("sources"));
// use --area=... argument, AREA=... env var or area=... in config to set the region of the world to use
// will be ignored if osm_path or osm_url are set
String area = arguments.getString(
"area",
"name of the extract to download if osm_url/osm_path not specified (i.e. 'monaco' 'rhode island' 'australia' or 'planet')",
"massachusetts"
);
var tmpDir = config.tmpDir();
Path nodeDbPath = arguments.file("temp_nodes", "temp node db location", tmpDir.resolve("node.db"));
Path multipolygonPath =
arguments.file("temp_multipolygons", "temp multipolygon db location", tmpDir.resolve("multipolygon.db"));
// Path osmDefaultPath = sourcesDir.resolve(area.replaceAll("[^a-zA-Z]+", "_") + ".osm.pbf");
// // TODO download
String osmDefaultUrl = "planet".equalsIgnoreCase(area) ? ("aws:latest") : ("geofabrik:" + area);
Path path = arguments.inputFile("osm_path", "OSM input file", Path.of("data", "sources", "massachusetts.osm.pbf"));
var thisInputFile = new OsmInputFile(path, config.osmLazyReads());
var stats = arguments.getStats();
try (
var nodeLocations =
LongLongMap.from(config.nodeMapType(), config.nodeMapStorage(), nodeDbPath, config.nodeMapMadvise());
var multipolygonGeometries = LongLongMultimap.newReplaceableMultimap(
config.multipolygonGeometryStorage(), multipolygonPath, config.multipolygonGeometryMadvise());
var osmReader =
new OsmReader("osm", thisInputFile, nodeLocations, multipolygonGeometries, (a, b) -> {
}, stats)
) {
osmReader.pass1(config);
osmReader.pass2parquet(config, output);
} finally {
FileUtils.delete(nodeDbPath);
FileUtils.delete(multipolygonPath);
}
stats.printSummary();
}
@Override
public void processFeature(SourceFeature sourceFeature, FeatureCollector features) {
if (sourceFeature instanceof OsmSourceFeature osm) {
switch (osm.originalElement()) {
case OsmElement.Node node -> writeNode(node);
case OsmElement.Way way -> writeWay(way);
case OsmElement.Relation rel -> writeRel(rel);
default -> throw new IllegalStateException("Unexpected value: " + osm.originalElement());
}
}
}
synchronized void writeNode(OsmElement.Node node) {
var group = new SimpleGroup(NODE_SCHEMA);
group.add("id", node.id());
group.add("lat", node.lat());
group.add("lon", node.lon());
var tags = group.addGroup("tags");
node.tags().forEach((k, v) -> {
var tag = tags.addGroup("key_value");
tag.add("key", k);
tag.add("value", (String) v);
});
try {
nodeWriter.write(group);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private synchronized void writeRel(OsmElement.Relation rel) {
var group = new SimpleGroup(REL_SCHEMA);
group.add("id", rel.id());
var tags = group.addGroup("tags");
rel.tags().forEach((k, v) -> {
var tag = tags.addGroup("key_value");
tag.add("key", k);
tag.add("value", (String) v);
});
rel.members().forEach((member) -> {
var m = group.addGroup("members");
m.add("role", member.role());
m.add("ref", member.ref());
m.add("type", member.type().name().toLowerCase(Locale.ROOT));
});
try {
relWriter.write(group);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private synchronized void writeWay(OsmElement.Way way) {
var group = new SimpleGroup(WAY_SCHEMA);
group.add("id", way.id());
var tags = group.addGroup("tags");
way.tags().forEach((k, v) -> {
var tag = tags.addGroup("key_value");
tag.add("key", k);
tag.add("value", (String) v);
});
for (var node : way.nodes()) {
group.add("nodes", node.value);
}
try {
wayWriter.write(group);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
try (nodeWriter; wayWriter; relWriter) {
}
}
}

Wyświetl plik

@ -376,7 +376,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertFalse(feature.canBeLine());
assertFalse(feature.isPoint());
@ -448,7 +448,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertFalse(feature.canBeLine());
assertFalse(feature.isPoint());
@ -514,7 +514,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertFalse(feature.canBeLine());
assertFalse(feature.isPoint());
@ -580,7 +580,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertNull(feature);
}
@ -611,7 +611,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertThrows(GeometryException.class, feature::worldGeometry);
assertThrows(GeometryException.class, feature::polygon);
@ -642,7 +642,7 @@ class OsmReaderTest {
var nodeCache = reader.newNodeLocationProvider();
elements.stream().flatMap(ways).forEach(way -> reader.processWayPass2(way, nodeCache));
var feature = reader.processRelationPass2(relation, nodeCache);
SourceFeature feature = reader.processRelationPass2(relation, nodeCache);
assertThrows(GeometryException.class, feature::worldGeometry);
assertThrows(GeometryException.class, feature::polygon);