osm-mirror-pgtiles
Mike Barry 2023-04-09 20:23:40 -04:00
rodzic 29b2ca9dbd
commit d7eb01faeb
13 zmienionych plików z 679 dodań i 17 usunięć

Wyświetl plik

@ -39,6 +39,18 @@
<artifactId>lmdbjava</artifactId>
<version>0.8.3</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.4</version>
</dependency>
<dependency>
<groupId>de.bytefish</groupId>
<artifactId>pgbulkinsert</artifactId>
<version>8.1.0</version>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>

Wyświetl plik

@ -4,6 +4,7 @@ import static com.onthegomap.planetiler.util.LanguageUtils.nullIfEmpty;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.Pgtiles;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
@ -80,6 +81,9 @@ public record TileArchiveConfig(
if (format == null) {
format = getExtension(uri);
}
if (getScheme(uri) == Scheme.POSTGRES) {
format = Format.POSTGRES.id;
}
if (format == null) {
return TileArchiveConfig.Format.MBTILES;
}
@ -154,7 +158,8 @@ public record TileArchiveConfig(
* Returns the current size of this archive.
*/
public long size() {
return getLocalPath() == null ? 0 : FileUtils.size(getLocalPath());
return format == Format.POSTGRES ? Pgtiles.getSize(this) : getLocalPath() == null ? 0 :
FileUtils.size(getLocalPath());
}
/**
@ -167,7 +172,8 @@ public record TileArchiveConfig(
public enum Format {
MBTILES("mbtiles"),
PMTILES("pmtiles");
PMTILES("pmtiles"),
POSTGRES("postgres");
private final String id;
@ -181,7 +187,8 @@ public record TileArchiveConfig(
}
public enum Scheme {
FILE("file");
FILE("file"),
POSTGRES("postgres");
private final String id;

Wyświetl plik

@ -4,6 +4,7 @@ import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.mbtiles.Mbtiles;
import com.onthegomap.planetiler.pmtiles.ReadablePmtiles;
import com.onthegomap.planetiler.pmtiles.WriteablePmtiles;
import com.onthegomap.planetiler.util.Pgtiles;
import java.io.IOException;
import java.nio.file.Path;
@ -45,6 +46,7 @@ public class TileArchives {
Mbtiles.newWriteToFileDatabase(archive.getLocalPath(), options.orElse(config.arguments()
.subset(Mbtiles.LEGACY_VACUUM_ANALYZE, Mbtiles.LEGACY_COMPACT_DB, Mbtiles.LEGACY_SKIP_INDEX_CREATION)));
case PMTILES -> WriteablePmtiles.newWriteToFile(archive.getLocalPath());
case POSTGRES -> Pgtiles.writer(archive.uri(), options);
};
}
@ -59,6 +61,7 @@ public class TileArchives {
return switch (archive.format()) {
case MBTILES -> Mbtiles.newReadOnlyDatabase(archive.getLocalPath(), options);
case PMTILES -> ReadablePmtiles.newReadFromFile(archive.getLocalPath());
case POSTGRES -> Pgtiles.reader(archive.uri(), options);
};
}

Wyświetl plik

@ -8,6 +8,11 @@ import org.geotools.data.store.EmptyIterator;
public class DummyOsmMirror implements OsmMirror {
@Override
public long diskUsageBytes() {
return 0;
}
private class Bulk implements BulkWriter {
@Override

Wyświetl plik

@ -9,6 +9,11 @@ import java.util.NavigableMap;
import java.util.TreeMap;
public class InMemoryOsmMirror implements OsmMirror {
@Override
public long diskUsageBytes() {
return 0;
}
private final NavigableMap<Long, OsmElement.Node> nodes = new TreeMap<>();
private final NavigableMap<Long, OsmElement.Way> ways = new TreeMap<>();
private final NavigableMap<Long, LongArrayList> nodesToParentWay = new TreeMap<>();

Wyświetl plik

@ -22,6 +22,14 @@ import org.lmdbjava.KeyRange;
import org.lmdbjava.Txn;
public class LmdbOsmMirror implements OsmMirror {
private final Path path;
@Override
public long diskUsageBytes() {
return FileUtils.size(path);
}
private final Dbi<byte[]> nodes;
private final Dbi<byte[]> ways;
private final Dbi<byte[]> relations;
@ -32,6 +40,7 @@ public class LmdbOsmMirror implements OsmMirror {
private final Env<byte[]> env;
public LmdbOsmMirror(Path file) {
this.path = file;
if (!Files.exists(file)) {
FileUtils.createDirectory(file);
}

Wyświetl plik

@ -4,6 +4,7 @@ import com.carrotsearch.hppc.LongArrayList;
import com.google.common.collect.Iterators;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.util.CloseableIterator;
import com.onthegomap.planetiler.util.FileUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -20,6 +21,13 @@ import org.mapdb.Serializer;
public class MapDbOsmMirror implements OsmMirror {
private final Path path;
@Override
public long diskUsageBytes() {
return path == null ? 0 : FileUtils.size(this.path);
}
private final BTreeMap<Long, Serialized.Node> nodes;
private final BTreeMap<Long, Serialized.Way> ways;
private final BTreeMap<Long, Serialized.Relation> relations;
@ -30,7 +38,7 @@ public class MapDbOsmMirror implements OsmMirror {
private final DB db;
static MapDbOsmMirror newInMemory() {
return new MapDbOsmMirror(DBMaker.newMemoryDirectDB()
return new MapDbOsmMirror(null, DBMaker.newMemoryDirectDB()
.asyncWriteEnable()
.transactionDisable()
.closeOnJvmShutdown()
@ -38,7 +46,7 @@ public class MapDbOsmMirror implements OsmMirror {
}
static MapDbOsmMirror newWriteToFile(Path path) {
return new MapDbOsmMirror(DBMaker
return new MapDbOsmMirror(path, DBMaker
.newFileDB(path.toFile())
.asyncWriteEnable()
.transactionDisable()
@ -49,7 +57,7 @@ public class MapDbOsmMirror implements OsmMirror {
}
static MapDbOsmMirror newReadFromFile(Path path) {
return new MapDbOsmMirror(DBMaker
return new MapDbOsmMirror(path, DBMaker
.newFileDB(path.toFile())
.transactionDisable()
.compressionEnable()
@ -125,7 +133,8 @@ public class MapDbOsmMirror implements OsmMirror {
}
}
MapDbOsmMirror(DB db) {
MapDbOsmMirror(Path path, DB db) {
this.path = path;
this.db = db;
nodes = db.createTreeMap("nodes")
.keySerializer(BTreeKeySerializer.ZERO_OR_POSITIVE_LONG)

Wyświetl plik

@ -9,9 +9,14 @@ import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmPhaser;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timer;
import com.onthegomap.planetiler.util.CloseableIterator;
import com.onthegomap.planetiler.util.DiskBacked;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.util.Format;
import com.onthegomap.planetiler.worker.WorkQueue;
import com.onthegomap.planetiler.worker.Worker;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.Closeable;
import java.nio.file.Path;
@ -21,8 +26,11 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.msgpack.core.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface OsmMirror extends AutoCloseable {
public interface OsmMirror extends AutoCloseable, DiskBacked {
Logger LOGGER = LoggerFactory.getLogger(OsmMirror.class);
static OsmMirror newInMemory() {
return new InMemoryOsmMirror();
@ -47,6 +55,7 @@ public interface OsmMirror extends AutoCloseable {
boolean doNodes = arguments.getBoolean("nodes", "process nodes", true);
boolean doWays = arguments.getBoolean("ways", "process ways", true);
boolean doRelations = arguments.getBoolean("relations", "process relations", true);
String password = arguments.getString("password", "password");
Path input = arguments.inputFile("input", "input", Path.of("data/sources/massachusetts.osm.pbf"));
Path output = arguments.file("output", "output", Path.of("data/tmp/output"));
FileUtils.delete(output);
@ -54,16 +63,17 @@ public interface OsmMirror extends AutoCloseable {
int processThreads = arguments.threads();
OsmInputFile in = new OsmInputFile(input);
var blockCounter = new AtomicLong();
boolean id = !type.contains("sqlite");
boolean id = !type.contains("sqlite") && !type.contains("postgres");
var timer = Timer.start();
try (
var blocks = in.get();
OsmMirror out = newWriter(type, output.resolve("test.db"), processThreads);
var writer = out.newBulkWriter()
OsmMirror out = newWriter(type, output.resolve("test.db"), processThreads, password)
) {
record Batch(CompletableFuture<List<Serialized<? extends OsmElement>>> results, OsmBlockSource.Block block) {}
var queue = new WorkQueue<Batch>("batches", processThreads * 2, 1, stats);
var phaser = new OsmPhaser(1);
int threads = type.contains("postgres") ? 4 : 1;
var phaser = new OsmPhaser(threads);
var pipeline = WorkerPipeline.start("osm2sqlite", stats);
var readBranch = pipeline.<Batch>fromGenerator("pbf", next -> {
blocks.forEachBlock(block -> {
@ -102,9 +112,12 @@ public interface OsmMirror extends AutoCloseable {
});
var writeBranch = pipeline.readFromQueue(queue)
.sinkTo("write", 1, prev -> {
.sinkTo("write", threads, prev -> {
try (var phaserForWorker = phaser.forWorker()) {
try (
var writer = out.newBulkWriter();
var phaserForWorker = phaser.forWorker()
) {
System.err.println("Using " + out.getClass().getSimpleName());
for (var batch : prev) {
for (var item : batch.results.get()) {
@ -123,7 +136,6 @@ public interface OsmMirror extends AutoCloseable {
}
phaserForWorker.arrive(OsmPhaser.Phase.DONE);
}
phaser.printSummary();
});
ProgressLoggers loggers = ProgressLoggers.create()
@ -131,18 +143,31 @@ public interface OsmMirror extends AutoCloseable {
.addRateCounter("nodes", phaser::nodes, true)
.addRateCounter("ways", phaser::ways, true)
.addRateCounter("rels", phaser::relations, true)
.addFileSize(() -> FileUtils.size(output))
.addFileSize(out)
.newLine()
.addProcessStats()
.newLine()
.addPipelineStats(readBranch)
.addPipelineStats(writeBranch);
loggers.awaitAndLog(joinFutures(readBranch.done(), writeBranch.done()), Duration.ofSeconds(10));
var worker = new Worker("finish", stats, 1, out::finish);
var pl2 = ProgressLoggers.create()
.addFileSize(out)
.newLine()
.addThreadPoolStats("worker", worker)
.newLine()
.addProcessStats();
worker.awaitAndLog(pl2, Duration.ofSeconds(10));
LOGGER.info("Finished in {} final size {}", timer.stop(), Format.defaultInstance().storage(out.diskUsageBytes()));
phaser.printSummary();
}
}
static OsmMirror newWriter(String type, Path path, int maxWorkers) {
default void finish() {}
static OsmMirror newWriter(String type, Path path, int maxWorkers, String password) {
return switch (type) {
case "mapdb" -> newMapdbWrite(path);
case "mapdb-memory" -> newMapdbMemory();
@ -151,6 +176,9 @@ public interface OsmMirror extends AutoCloseable {
case "memory" -> newInMemory();
case "lmdb" -> newLmdbWrite(path);
case "dummy" -> newDummyWriter();
case "postgres" -> PostgresOsmMirror.newMirror(
"jdbc:postgresql://localhost:54321/pgdb?user=admin&password=" + password, 1,
Stats.inMemory());
default -> throw new IllegalArgumentException("Unrecognized type: " + type);
};
}
@ -159,6 +187,9 @@ public interface OsmMirror extends AutoCloseable {
return switch (type) {
case "mapdb" -> MapDbOsmMirror.newReadFromFile(path);
case "sqlite" -> SqliteOsmMirror.newReadFromFile(path);
case "postgres" -> PostgresOsmMirror.newMirror(
"jdbc:postgresql://localhost:5432/pgdb?user=admin&password=password", 1,
Stats.inMemory());
case "lmdb" -> newLmdbWrite(path);
default -> throw new IllegalArgumentException("Unrecognized type: " + type);
};

Wyświetl plik

@ -0,0 +1,354 @@
package com.onthegomap.planetiler.osmmirror;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.CloseableIterator;
import de.bytefish.pgbulkinsert.row.SimpleRowWriter;
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import org.postgresql.PGConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PostgresOsmMirror implements OsmMirror {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresOsmMirror.class);
private final Connection connection;
private final Stats stats;
private final int maxWorkers;
private final String url;
public static void main(String[] args) throws Exception {
try (
var mirror = PostgresOsmMirror.newMirror("jdbc:postgresql://localhost:5432/pgdb?user=admin&password=password", 1,
Stats.inMemory());
var bulk = mirror.newBulkWriter()
) {
bulk.putNode(new Serialized.Node(new OsmElement.Node(1, 2, 3), false));
System.err.println("put node");
}
}
private PostgresOsmMirror(Connection connection, Stats stats, int maxWorkers, String url) {
this.connection = connection;
this.stats = stats;
this.maxWorkers = maxWorkers;
this.url = url;
}
public static PostgresOsmMirror newMirror(String url, int maxWorkers, Stats stats) {
try {
var connection = DriverManager.getConnection(url);
var result = new PostgresOsmMirror(connection, stats, maxWorkers, url);
result.execute(
// https://pgtune.leopard.in.ua/
"ALTER SYSTEM SET max_connections = '20'",
"ALTER SYSTEM SET shared_buffers = '50GB'",
"ALTER SYSTEM SET effective_cache_size = '150GB'",
"ALTER SYSTEM SET maintenance_work_mem = '2GB'",
"ALTER SYSTEM SET checkpoint_completion_target = '0.9'",
"ALTER SYSTEM SET wal_buffers = '16MB'",
"ALTER SYSTEM SET default_statistics_target = '500'",
"ALTER SYSTEM SET random_page_cost = '1.1'",
"ALTER SYSTEM SET effective_io_concurrency = '200'",
"ALTER SYSTEM SET work_mem = '64MB'",
"ALTER SYSTEM SET min_wal_size = '4GB'",
"ALTER SYSTEM SET max_wal_size = '16GB'",
"ALTER SYSTEM SET max_worker_processes = '40'",
"ALTER SYSTEM SET max_parallel_workers_per_gather = '20'",
"ALTER SYSTEM SET max_parallel_workers = '40'",
"ALTER SYSTEM SET max_parallel_maintenance_workers = '4'"
// pgconfig.org
// "ALTER SYSTEM SET shared_buffers TO '50GB'",
// "ALTER SYSTEM SET effective_cache_size TO '150GB'",
// "ALTER SYSTEM SET work_mem TO '1GB'",
// "ALTER SYSTEM SET maintenance_work_mem TO '10GB'",
// "ALTER SYSTEM SET min_wal_size TO '2GB'",
// "ALTER SYSTEM SET max_wal_size TO '3GB'",
// "ALTER SYSTEM SET checkpoint_completion_target TO '0.9'",
// "ALTER SYSTEM SET wal_buffers TO '-1'",
// "ALTER SYSTEM SET listen_addresses TO '*'",
// "ALTER SYSTEM SET max_connections TO '100'",
// "ALTER SYSTEM SET random_page_cost TO '1.1'",
// "ALTER SYSTEM SET effective_io_concurrency TO '200'",
// "ALTER SYSTEM SET max_worker_processes TO '8'",
// "ALTER SYSTEM SET max_parallel_workers_per_gather TO '2'",
// "ALTER SYSTEM SET max_parallel_workers TO '2'"
);
result.dropTables();
result.createTables();
return result;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private void execute(String... commands) {
for (String command : commands) {
try {
connection.prepareStatement(command).execute();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
void createTables() {
execute("""
CREATE TABLE IF NOT EXISTS nodes (
id BIGINT,
data BYTEA
)""");
execute("""
CREATE TABLE IF NOT EXISTS ways (
id BIGINT,
data BYTEA
)""");
execute("""
CREATE TABLE IF NOT EXISTS relations (
id BIGINT,
data BYTEA
)""");
execute("""
CREATE TABLE IF NOT EXISTS node_to_way (
child_id BIGINT,
parent_id BIGINT
)""");
execute("""
CREATE TABLE IF NOT EXISTS node_to_relation (
child_id BIGINT,
parent_id BIGINT
)""");
execute("""
CREATE TABLE IF NOT EXISTS way_to_relation (
child_id BIGINT,
parent_id BIGINT
)""");
execute("""
CREATE TABLE IF NOT EXISTS relation_to_relation (
child_id BIGINT,
parent_id BIGINT
)""");
}
void dropTables() {
for (String table : List.of("nodes", "ways", "relations", "node_to_way", "node_to_relation", "way_to_relation",
"relation_to_relation")) {
execute("DROP TABLE IF EXISTS " + table);
}
execute("VACUUM");
}
private void execute(String query) {
try (var statement = connection.createStatement()) {
LOGGER.debug("Execute: {}", query);
statement.execute(query);
} catch (SQLException throwables) {
throw new IllegalStateException("Error executing queries " + query, throwables);
}
}
@Override
public long diskUsageBytes() {
try (var statement = connection.createStatement()) {
var resultSet = statement.executeQuery("SELECT pg_database_size(current_database())");
resultSet.next();
return resultSet.getLong(1);
} catch (SQLException throwables) {
throw new IllegalStateException("Error getting DB size", throwables);
}
}
record Writer(Connection conn, SimpleRowWriter writer) implements Closeable {
@Override
public void close() {
try (conn; writer) {
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
private class Bulk implements BulkWriter {
private final Writer wayWriter;
private final Writer relWriter;
private final Writer nodeToWayWriter;
private final Writer nodeToRelWriter;
private final Writer wayToRelWriter;
private final Writer relToRelWriter;
private final Writer nodeWriter;
private Writer newWriter(String table, String... columns) {
try {
Connection conn = DriverManager.getConnection(url);
PGConnection pgConnection = PostgreSqlUtils.getPGConnection(conn);
return new Writer(conn, new SimpleRowWriter(new SimpleRowWriter.Table("public", table, columns), pgConnection));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private Bulk() {
nodeWriter = newWriter("nodes", "id", "data");
wayWriter = newWriter("ways", "id", "data");
relWriter = newWriter("relations", "id", "data");
nodeToWayWriter = newWriter("node_to_way", "child_id", "parent_id");
nodeToRelWriter = newWriter("node_to_relation", "child_id", "parent_id");
wayToRelWriter = newWriter("way_to_relation", "child_id", "parent_id");
relToRelWriter = newWriter("relation_to_relation", "child_id", "parent_id");
}
@Override
public void putNode(Serialized.Node node) {
nodeWriter.writer().startRow(row -> {
row.setLong(0, node.item().id());
row.setByteArray(1, node.bytes());
});
}
@Override
public void putWay(Serialized.Way way) {
wayWriter.writer().startRow(row -> {
row.setLong(0, way.item().id());
row.setByteArray(1, way.bytes());
});
LongSet nodes = new LongHashSet();
for (var node : way.item().nodes()) {
if (nodes.add(node.value)) {
nodeToWayWriter.writer().startRow(row -> {
row.setLong(0, node.value);
row.setLong(1, way.item().id());
});
}
}
}
@Override
public void putRelation(Serialized.Relation rel) {
relWriter.writer().startRow(row -> {
row.setLong(0, rel.item().id());
row.setByteArray(1, rel.bytes());
});
LongSet nodes = new LongHashSet();
LongSet ways = new LongHashSet();
LongSet rels = new LongHashSet();
for (var member : rel.item().members()) {
if (member.type() == OsmElement.Type.NODE && nodes.add(member.ref())) {
nodeToRelWriter.writer().startRow(row -> {
row.setLong(0, member.ref());
row.setLong(1, rel.item().id());
});
} else if (member.type() == OsmElement.Type.WAY && ways.add(member.ref())) {
wayToRelWriter.writer().startRow(row -> {
row.setLong(0, member.ref());
row.setLong(1, rel.item().id());
});
} else if (member.type() == OsmElement.Type.RELATION && rels.add(member.ref())) {
relToRelWriter.writer().startRow(row -> {
row.setLong(0, member.ref());
row.setLong(1, rel.item().id());
});
}
}
}
@Override
public void close() throws IOException {
try (
nodeWriter;
wayWriter;
relWriter;
nodeToWayWriter;
nodeToRelWriter;
wayToRelWriter;
relToRelWriter;
) {
}
}
}
@Override
public BulkWriter newBulkWriter() {
return new Bulk();
}
@Override
public void deleteNode(long nodeId, int version) {
}
@Override
public void deleteWay(long wayId, int version) {
}
@Override
public void deleteRelation(long relationId, int version) {
}
@Override
public OsmElement.Node getNode(long id) {
return null;
}
@Override
public LongArrayList getParentWaysForNode(long nodeId) {
return null;
}
@Override
public OsmElement.Way getWay(long id) {
return null;
}
@Override
public OsmElement.Relation getRelation(long id) {
return null;
}
@Override
public LongArrayList getParentRelationsForNode(long nodeId) {
return null;
}
@Override
public LongArrayList getParentRelationsForWay(long nodeId) {
return null;
}
@Override
public LongArrayList getParentRelationsForRelation(long nodeId) {
return null;
}
@Override
public CloseableIterator<OsmElement> iterator() {
return null;
}
@Override
public void close() throws Exception {
connection.close();
}
@Override
public void finish() {
for (String table : List.of("nodes", "ways", "relations")) {
execute("ALTER TABLE " + table + " ADD PRIMARY KEY (id)");
}
for (String table : List.of("node_to_way", "node_to_relation", "way_to_relation", "relation_to_relation")) {
execute("ALTER TABLE " + table + " ADD PRIMARY KEY (child_id, parent_id)");
}
}
}

Wyświetl plik

@ -491,6 +491,11 @@ public class SqliteOsmMirror implements OsmMirror {
}
}
@Override
public long diskUsageBytes() {
return FileUtils.size(path);
}
private record ParentChild(long child, long parent) {}
private class Bulk implements BulkWriter {

Wyświetl plik

@ -0,0 +1,194 @@
package com.onthegomap.planetiler.util;
import com.onthegomap.planetiler.archive.ReadableTileArchive;
import com.onthegomap.planetiler.archive.TileArchiveConfig;
import com.onthegomap.planetiler.archive.TileArchiveMetadata;
import com.onthegomap.planetiler.archive.TileEncodingResult;
import com.onthegomap.planetiler.archive.WriteableTileArchive;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.geo.TileCoord;
import com.onthegomap.planetiler.geo.TileOrder;
import de.bytefish.pgbulkinsert.row.SimpleRowWriter;
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
import java.io.IOException;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.postgresql.PGConnection;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Pgtiles implements ReadableTileArchive, WriteableTileArchive {
private static final Logger LOGGER = LoggerFactory.getLogger(Pgtiles.class);
private final String url;
private final Arguments options;
private final Connection connection;
public Pgtiles(String url, Arguments options) throws SQLException {
this.url = url;
this.options = options;
var props = new Properties();
props.putAll(options.toMap());
this.connection = DriverManager.getConnection(url, props);
}
public static WriteableTileArchive writer(URI uri, Arguments options) {
try {
return new Pgtiles("jdbc:postgresql://" + uri.getAuthority() + uri.getPath(), options);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public static ReadableTileArchive reader(URI uri, Arguments options) {
try {
return new Pgtiles("jdbc:postgresql://" + uri.getAuthority() + uri.getPath(), options);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public static long getSize(TileArchiveConfig archiveConfig) {
Properties options = new Properties();
options.putAll(archiveConfig.options());
String url = "jdbc:postgresql://" + archiveConfig.uri().getAuthority() + archiveConfig.uri().getPath();
return getSize(url, archiveConfig.options());
}
public static long getSize(String url, Map<String, String> args) {
Properties options = new Properties();
options.putAll(args);
try (
var connection = DriverManager.getConnection(url, options);
var stmt = connection.createStatement()
) {
var result = stmt.executeQuery("select pg_total_relation_size('public.tile_data')");
result.next();
return result.getLong(1);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] getTile(int x, int y, int z) {
return null;
}
private void execute(String... queries) {
for (String query : queries) {
try (var statement = connection.createStatement()) {
LOGGER.debug("Execute postgres: {}", query);
statement.execute(query);
} catch (SQLException throwables) {
throw new IllegalStateException("Error executing queries " + String.join(",", queries), throwables);
}
}
}
public static void main(String[] args) throws SQLException {
try (
var connection = DriverManager.getConnection(args[0]);
) {
LOGGER.info("Connected...");
try (
var result = connection.prepareStatement("select id from tile_data").executeQuery()
) {
LOGGER.info("Starting...");
RoaringBitmap bitmap = new RoaringBitmap();
int i = 0;
while (result.next()) {
if (i++ % 1_000_000 == 0) {
LOGGER.info("Finished {} bitmap size {}", i, Format.defaultInstance().storage(bitmap.getLongSizeInBytes()));
}
bitmap.add(result.getInt("id"));
}
LOGGER.info("Finished {} bitmap size {}", i, Format.defaultInstance().storage(bitmap.getLongSizeInBytes()));
bitmap.runOptimize();
LOGGER.info("Finished {} bitmap size {}", i, Format.defaultInstance().storage(bitmap.getLongSizeInBytes()));
}
}
}
@Override
public void initialize(TileArchiveMetadata metadata) {
execute("""
DROP TABLE IF EXISTS tile_data
""");
execute("""
CREATE TABLE tile_data (
id INTEGER PRIMARY KEY,
data BYTEA
)""");
}
@Override
public void finish(TileArchiveMetadata metadata) {
LOGGER.info("Final postgres DB size: {}", Format.defaultInstance().storage(getSize(url, options.toMap())));
}
@Override
public CloseableIterator<TileCoord> getAllTileCoords() {
return CloseableIterator.wrap(Collections.emptyIterator());
}
@Override
public TileArchiveMetadata metadata() {
return null;
}
@Override
public boolean deduplicates() {
return false;
}
@Override
public TileOrder tileOrder() {
return TileOrder.TMS;
}
@Override
public TileWriter newTileWriter() {
return new BulkLoader();
}
@Override
public void close() throws IOException {
}
private class BulkLoader implements TileWriter {
private final SimpleRowWriter writer;
private BulkLoader() {
PGConnection pgConnection = PostgreSqlUtils.getPGConnection(connection);
try {
this.writer = new SimpleRowWriter(new SimpleRowWriter.Table("public", "tile_data", new String[]{"id", "data"}),
pgConnection);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void write(TileEncodingResult encodingResult) {
this.writer.startRow(row -> {
row.setInteger(0, encodingResult.coord().encoded());
row.setByteArray(1, encodingResult.tileData());
});
}
@Override
public void close() {
this.writer.close();
}
}
}

Wyświetl plik

@ -1,6 +1,7 @@
package com.onthegomap.planetiler.archive;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.nio.file.Path;
import java.util.Map;
@ -33,4 +34,17 @@ class TileArchiveConfigTest {
assertEquals(TileArchiveConfig.Format.PMTILES,
TileArchiveConfig.from("file:///output.mbtiles?format=pmtiles").format());
}
@Test
void testPgtiles() {
var config = TileArchiveConfig.from("postgres://localhost:5432/db?user=user&password=password");
assertEquals(TileArchiveConfig.Format.POSTGRES, config.format());
assertEquals(Map.of("user", "user", "password", "password"), config.options());
assertEquals("localhost:5432", config.uri().getAuthority());
assertEquals("/db", config.uri().getPath());
assertNull(config.getLocalPath());
config = TileArchiveConfig.from("postgres://localhost:5432?user=user&password=password");
assertEquals("", config.uri().getPath());
}
}

Wyświetl plik

@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import com.carrotsearch.hppc.LongArrayList;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.stats.Stats;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@ -76,6 +77,10 @@ abstract class OsmMirrorTest {
assertEquals(LongArrayList.from(4), fixture.getParentRelationsForRelation(3));
}
// TODO: versioned update
// TODO: versioned delete
// TODO: delete cascades?
private OsmElement.Info infoVersion(int version) {
return OsmElement.Info.forVersion(version);
}
@ -131,4 +136,13 @@ abstract class OsmMirrorTest {
fixture = OsmMirror.newLmdbWrite(tmp);
}
}
static class PostgresTest extends OsmMirrorTest {
@BeforeEach
void setup() {
fixture = PostgresOsmMirror.newMirror("jdbc:postgresql://localhost:5432/pgdb?user=admin&password=password", 1,
Stats.inMemory());
((PostgresOsmMirror) fixture).dropTables();
}
}
}