got tests passing

pull/1/head
Mike Barry 2021-04-29 06:22:41 -04:00
rodzic 14e28619e2
commit ca73e5d491
9 zmienionych plików z 758 dodań i 532 usunięć

Wyświetl plik

@ -66,7 +66,7 @@ public class OpenMapTilesMain {
FileUtils.forceMkdir(tmpDir.toFile());
File nodeDb = tmpDir.resolve("node.db").toFile();
LongLongMap nodeLocations = new LongLongMap.MapdbSortedTable(nodeDb);
MergeSort featureDb = new MergeSort(tmpDir.resolve("feature.db"), threads, stats);
MergeSort featureDb = MergeSort.newExternalMergeSort(tmpDir.resolve("feature.db"), threads, stats);
MergeSortFeatureMap featureMap = new MergeSortFeatureMap(featureDb, profile);
FlatMapConfig config = new FlatMapConfig(profile, envelope, threads, stats, logInterval);
FeatureRenderer renderer = new FeatureRenderer(config);

Wyświetl plik

@ -21,8 +21,8 @@ package com.onthegomap.flatmap;
import com.carrotsearch.hppc.DoubleArrayList;
import com.carrotsearch.hppc.IntArrayList;
import com.google.common.primitives.Ints;
import com.google.protobuf.InvalidProtocolBufferException;
import com.onthegomap.flatmap.geo.GeoUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -226,55 +226,59 @@ public class VectorTileEncoder {
return geometry;
}
public static List<DecodedFeature> decode(byte[] encoded) throws IOException {
VectorTile.Tile tile = VectorTile.Tile.parseFrom(encoded);
List<DecodedFeature> features = new ArrayList<>();
for (VectorTile.Tile.Layer layer : tile.getLayersList()) {
String layerName = layer.getName();
int extent = layer.getExtent();
List<String> keys = layer.getKeysList();
List<Object> values = new ArrayList<>();
public static List<DecodedFeature> decode(byte[] encoded) {
try {
VectorTile.Tile tile = VectorTile.Tile.parseFrom(encoded);
List<DecodedFeature> features = new ArrayList<>();
for (VectorTile.Tile.Layer layer : tile.getLayersList()) {
String layerName = layer.getName();
int extent = layer.getExtent();
List<String> keys = layer.getKeysList();
List<Object> values = new ArrayList<>();
for (VectorTile.Tile.Value value : layer.getValuesList()) {
if (value.hasBoolValue()) {
values.add(value.getBoolValue());
} else if (value.hasDoubleValue()) {
values.add(value.getDoubleValue());
} else if (value.hasFloatValue()) {
values.add(value.getFloatValue());
} else if (value.hasIntValue()) {
values.add(value.getIntValue());
} else if (value.hasSintValue()) {
values.add(value.getSintValue());
} else if (value.hasUintValue()) {
values.add(value.getUintValue());
} else if (value.hasStringValue()) {
values.add(value.getStringValue());
} else {
values.add(null);
for (VectorTile.Tile.Value value : layer.getValuesList()) {
if (value.hasBoolValue()) {
values.add(value.getBoolValue());
} else if (value.hasDoubleValue()) {
values.add(value.getDoubleValue());
} else if (value.hasFloatValue()) {
values.add(value.getFloatValue());
} else if (value.hasIntValue()) {
values.add(value.getIntValue());
} else if (value.hasSintValue()) {
values.add(value.getSintValue());
} else if (value.hasUintValue()) {
values.add(value.getUintValue());
} else if (value.hasStringValue()) {
values.add(value.getStringValue());
} else {
values.add(null);
}
}
for (VectorTile.Tile.Feature feature : layer.getFeaturesList()) {
int tagsCount = feature.getTagsCount();
Map<String, Object> attrs = new HashMap<>(tagsCount / 2);
int tagIdx = 0;
while (tagIdx < feature.getTagsCount()) {
String key = keys.get(feature.getTags(tagIdx++));
Object value = values.get(feature.getTags(tagIdx++));
attrs.put(key, value);
}
Geometry geometry = decodeCommands(feature.getType(), feature.getGeometryList());
features.add(new DecodedFeature(
layerName,
extent,
geometry,
attrs,
feature.getId()
));
}
}
for (VectorTile.Tile.Feature feature : layer.getFeaturesList()) {
int tagsCount = feature.getTagsCount();
Map<String, Object> attrs = new HashMap<>(tagsCount / 2);
int tagIdx = 0;
while (tagIdx < feature.getTagsCount()) {
String key = keys.get(feature.getTags(tagIdx++));
Object value = values.get(feature.getTags(tagIdx++));
attrs.put(key, value);
}
Geometry geometry = decodeCommands(feature.getType(), feature.getGeometryList());
features.add(new DecodedFeature(
layerName,
extent,
geometry,
attrs,
feature.getId()
));
}
return features;
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException(e);
}
return features;
}
private static Geometry decodeCommands(GeomType type, List<Integer> geometryList) {

Wyświetl plik

@ -0,0 +1,337 @@
package com.onthegomap.flatmap.collections;
import com.onthegomap.flatmap.monitoring.ProcessInfo;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ExternalMergeSort implements MergeSort {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeSort.class);
private static final long MAX_CHUNK_SIZE = 1_000_000_000; // 1GB
private final Path dir;
private final Stats stats;
private final int chunkSizeLimit;
private final int workers;
private final List<Chunk> chunks = new ArrayList<>();
private Chunk current;
private volatile boolean sorted = false;
ExternalMergeSort(Path tempDir, int threads, Stats stats) {
this(
tempDir,
threads,
(int) Math.min(
MAX_CHUNK_SIZE,
(ProcessInfo.getMaxMemoryBytes() / 2) / threads
),
stats
);
}
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
this.dir = dir;
this.stats = stats;
this.chunkSizeLimit = chunkSizeLimit;
long memory = ProcessInfo.getMaxMemoryBytes();
if (chunkSizeLimit > memory / 2) {
throw new IllegalStateException(
"Not enough memory to use chunk size " + chunkSizeLimit + " only have " + memory);
}
this.workers = workers;
LOGGER.info("Using merge sort feature map, chunk size=" + (chunkSizeLimit / 1_000_000) + "mb workers=" + workers);
try {
FileUtils.deleteDirectory(dir.toFile());
Files.createDirectories(dir);
newChunk();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public void add(Entry item) {
try {
assert !sorted;
current.add(item);
if (current.bytesInMemory > chunkSizeLimit) {
newChunk();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public long getStorageSize() {
return FileUtils.sizeOfDirectory(dir.toFile());
}
private static <T> T time(AtomicLong timer, Supplier<T> func) {
long start = System.nanoTime();
try {
return func.get();
} finally {
timer.addAndGet(System.nanoTime() - start);
}
}
@Override
public void sort() {
assert !sorted;
if (current != null) {
try {
current.close();
} catch (IOException e) {
// ok
}
}
long start = System.nanoTime();
AtomicLong reading = new AtomicLong(0);
AtomicLong writing = new AtomicLong(0);
AtomicLong sorting = new AtomicLong(0);
AtomicLong doneCounter = new AtomicLong(0);
CompletableFuture<ProgressLoggers> logger = new CompletableFuture<>();
var topology = Topology.start("sort", stats)
.readFromTiny("item_queue", chunks)
.sinkToConsumer("worker", workers, chunk -> {
var toSort = time(reading, chunk::readAll);
time(sorting, toSort::sort);
time(writing, toSort::flush);
doneCounter.incrementAndGet();
try {
logger.get().log();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
});
ProgressLoggers loggers = new ProgressLoggers("sort")
.addPercentCounter("chunks", chunks.size(), doneCounter)
.addFileSize(this::getStorageSize)
.addProcessStats()
.addTopologyStats(topology);
logger.complete(loggers);
topology.await();
sorted = true;
LOGGER.info("Sorted all chunks " + Duration.ofNanos(System.nanoTime() - start).toSeconds() +
"s read:" + Duration.ofNanos(reading.get()).toSeconds() +
"s write:" + Duration.ofNanos(writing.get()).toSeconds() +
"s sort:" + Duration.ofNanos(sorting.get()).toSeconds() + "s");
}
@NotNull
@Override
public Iterator<Entry> iterator() {
assert sorted;
PriorityQueue<PeekableScanner> queue = new PriorityQueue<>(chunks.size());
for (Chunk chunk : chunks) {
if (chunk.itemCount > 0) {
queue.add(chunk.newReader());
}
}
return new Iterator<>() {
@Override
public boolean hasNext() {
return !queue.isEmpty();
}
@Override
public Entry next() {
PeekableScanner scanner = queue.poll();
assert scanner != null;
Entry next = scanner.next();
if (scanner.hasNext()) {
queue.add(scanner);
}
return next;
}
};
}
private void newChunk() throws IOException {
Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1));
chunkPath.toFile().deleteOnExit();
if (current != null) {
current.close();
}
chunks.add(current = new Chunk(chunkPath));
}
class Chunk implements Closeable {
private final Path path;
private final DataOutputStream outputStream;
private int bytesInMemory = 0;
private int itemCount = 0;
private Chunk(Path path) throws IOException {
this.path = path;
this.outputStream = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(path), 50_000));
}
public PeekableScanner newReader() {
return new PeekableScanner(path, itemCount);
}
public void add(Entry entry) throws IOException {
write(outputStream, entry);
bytesInMemory +=
// pointer to feature
8 +
// Feature class overhead
16 +
// long sort member of feature
8 +
// byte array pointer
8 +
// byte array size
24 + entry.value().length;
itemCount++;
}
public class SortableChunk {
private Entry[] featuresToSort;
private SortableChunk(Entry[] featuresToSort) {
this.featuresToSort = featuresToSort;
}
public SortableChunk sort() {
Arrays.sort(featuresToSort);
return this;
}
public SortableChunk flush() {
try (DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(path), 50_000))) {
for (Entry feature : featuresToSort) {
write(out, feature);
}
featuresToSort = null;
return this;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
public SortableChunk readAll() {
try (PeekableScanner scanner = newReader()) {
Entry[] featuresToSort = new Entry[itemCount];
int i = 0;
while (scanner.hasNext()) {
featuresToSort[i] = scanner.next();
i++;
}
if (i != itemCount) {
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i);
}
return new SortableChunk(featuresToSort);
}
}
public static void write(DataOutputStream out, Entry entry) throws IOException {
out.writeLong(entry.sortKey());
out.writeInt(entry.value().length);
out.write(entry.value());
}
@Override
public void close() throws IOException {
outputStream.close();
}
}
class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private final int count;
private int read = 0;
private final DataInputStream input;
private Entry next;
PeekableScanner(Path path, int count) {
this.count = count;
try {
input = new DataInputStream(new BufferedInputStream(Files.newInputStream(path), 50_000));
next = readNextFeature();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Entry next() {
Entry current = next;
if ((next = readNextFeature()) == null) {
close();
}
return current;
}
private Entry readNextFeature() {
if (read < count) {
try {
long nextSort = input.readLong();
int length = input.readInt();
byte[] bytes = input.readNBytes(length);
read++;
return new Entry(nextSort, bytes);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
return null;
}
}
@Override
public void close() {
try {
input.close();
} catch (IOException e) {
LOGGER.warn("Error closing chunk", e);
}
}
@Override
public int compareTo(@NotNull PeekableScanner o) {
return next.compareTo(o.next);
}
}
}

Wyświetl plik

@ -1,348 +1,37 @@
package com.onthegomap.flatmap.collections;
import com.google.common.annotations.VisibleForTesting;
import com.onthegomap.flatmap.monitoring.ProcessInfo;
import com.onthegomap.flatmap.monitoring.ProgressLoggers;
import com.onthegomap.flatmap.monitoring.Stats;
import com.onthegomap.flatmap.worker.Topology;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergeSort implements Iterable<MergeSort.Entry> {
public interface MergeSort extends Iterable<MergeSort.Entry> {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeSort.class);
private static final long MAX_CHUNK_SIZE = 1_000_000_000; // 1GB
private final Path dir;
private final Stats stats;
private final int chunkSizeLimit;
private final int workers;
private final List<Chunk> chunks = new ArrayList<>();
private Chunk current;
private volatile boolean sorted = false;
public MergeSort(Path tempDir, int threads, Stats stats) {
this(
tempDir,
threads,
(int) Math.min(
MAX_CHUNK_SIZE,
(ProcessInfo.getMaxMemoryBytes() / 2) / threads
),
stats
);
static MergeSort newExternalMergeSort(Path tempDir, int threads, Stats stats) {
return new ExternalMergeSort(tempDir, threads, stats);
}
public MergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
this.dir = dir;
this.stats = stats;
this.chunkSizeLimit = chunkSizeLimit;
long memory = ProcessInfo.getMaxMemoryBytes();
if (chunkSizeLimit > memory / 2) {
throw new IllegalStateException(
"Not enough memory to use chunk size " + chunkSizeLimit + " only have " + memory);
}
this.workers = workers;
LOGGER.info("Using merge sort feature map, chunk size=" + (chunkSizeLimit / 1_000_000) + "mb workers=" + workers);
try {
FileUtils.deleteDirectory(dir.toFile());
Files.createDirectories(dir);
newChunk();
} catch (IOException e) {
throw new IllegalStateException(e);
}
static MergeSort newExternalMergeSort(Path dir, int workers, int chunkSizeLimit, Stats stats) {
return new ExternalMergeSort(dir, workers, chunkSizeLimit, stats);
}
public void add(Entry item) {
try {
assert !sorted;
current.add(item);
if (current.bytesInMemory > chunkSizeLimit) {
newChunk();
}
} catch (IOException e) {
throw new IllegalStateException(e);
void sort();
default List<Entry> toList() {
List<Entry> list = new ArrayList<>();
for (Entry entry : this) {
list.add(entry);
}
return list;
}
public long getStorageSize() {
return FileUtils.sizeOfDirectory(dir.toFile());
}
void add(Entry newEntry);
private static <T> T time(AtomicLong timer, Supplier<T> func) {
long start = System.nanoTime();
try {
return func.get();
} finally {
timer.addAndGet(System.nanoTime() - start);
}
}
long getStorageSize();
public void sort() {
assert !sorted;
if (current != null) {
try {
current.close();
} catch (IOException e) {
// ok
}
}
long start = System.nanoTime();
AtomicLong reading = new AtomicLong(0);
AtomicLong writing = new AtomicLong(0);
AtomicLong sorting = new AtomicLong(0);
AtomicLong doneCounter = new AtomicLong(0);
CompletableFuture<ProgressLoggers> logger = new CompletableFuture<>();
var topology = Topology.start("sort", stats)
.readFromTiny("item_queue", chunks)
.sinkToConsumer("worker", workers, chunk -> {
var toSort = time(reading, chunk::readAll);
time(sorting, toSort::sort);
time(writing, toSort::flush);
doneCounter.incrementAndGet();
try {
logger.get().log();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
});
ProgressLoggers loggers = new ProgressLoggers("sort")
.addPercentCounter("chunks", chunks.size(), doneCounter)
.addFileSize(this::getStorageSize)
.addProcessStats()
.addTopologyStats(topology);
logger.complete(loggers);
topology.await();
sorted = true;
LOGGER.info("Sorted all chunks " + Duration.ofNanos(System.nanoTime() - start).toSeconds() +
"s read:" + Duration.ofNanos(reading.get()).toSeconds() +
"s write:" + Duration.ofNanos(writing.get()).toSeconds() +
"s sort:" + Duration.ofNanos(sorting.get()).toSeconds() + "s");
}
@NotNull
@Override
public Iterator<Entry> iterator() {
assert sorted;
PriorityQueue<PeekableScanner> queue = new PriorityQueue<>(chunks.size());
for (Chunk chunk : chunks) {
if (chunk.itemCount > 0) {
queue.add(chunk.newReader());
}
}
return new Iterator<>() {
@Override
public boolean hasNext() {
return !queue.isEmpty();
}
@Override
public Entry next() {
PeekableScanner scanner = queue.poll();
assert scanner != null;
Entry next = scanner.next();
if (scanner.hasNext()) {
queue.add(scanner);
}
return next;
}
};
}
private void newChunk() throws IOException {
Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1));
chunkPath.toFile().deleteOnExit();
if (current != null) {
current.close();
}
chunks.add(current = new Chunk(chunkPath));
}
@VisibleForTesting
List<Entry> toList() {
List<Entry> result = new ArrayList<>();
for (Entry item : this) {
result.add(item);
}
return result;
}
class Chunk implements Closeable {
private final Path path;
private final DataOutputStream outputStream;
private int bytesInMemory = 0;
private int itemCount = 0;
private Chunk(Path path) throws IOException {
this.path = path;
this.outputStream = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(path), 50_000));
}
public PeekableScanner newReader() {
return new PeekableScanner(path, itemCount);
}
public void add(Entry entry) throws IOException {
write(outputStream, entry);
bytesInMemory +=
// pointer to feature
8 +
// Feature class overhead
16 +
// long sort member of feature
8 +
// byte array pointer
8 +
// byte array size
24 + entry.value.length;
itemCount++;
}
public class SortableChunk {
private Entry[] featuresToSort;
private SortableChunk(Entry[] featuresToSort) {
this.featuresToSort = featuresToSort;
}
public SortableChunk sort() {
Arrays.sort(featuresToSort);
return this;
}
public SortableChunk flush() {
try (DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(path), 50_000))) {
for (Entry feature : featuresToSort) {
write(out, feature);
}
featuresToSort = null;
return this;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
public SortableChunk readAll() {
try (PeekableScanner scanner = newReader()) {
Entry[] featuresToSort = new Entry[itemCount];
int i = 0;
while (scanner.hasNext()) {
featuresToSort[i] = scanner.next();
i++;
}
if (i != itemCount) {
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i);
}
return new SortableChunk(featuresToSort);
}
}
public static void write(DataOutputStream out, Entry entry) throws IOException {
out.writeLong(entry.sortKey);
out.writeInt(entry.value.length);
out.write(entry.value);
}
@Override
public void close() throws IOException {
outputStream.close();
}
}
class PeekableScanner implements Closeable, Comparable<PeekableScanner>, Iterator<Entry> {
private final int count;
private int read = 0;
private final DataInputStream input;
private Entry next;
PeekableScanner(Path path, int count) {
this.count = count;
try {
input = new DataInputStream(new BufferedInputStream(Files.newInputStream(path), 50_000));
next = readNextFeature();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Entry next() {
Entry current = next;
if ((next = readNextFeature()) == null) {
close();
}
return current;
}
private Entry readNextFeature() {
if (read < count) {
try {
long nextSort = input.readLong();
int length = input.readInt();
byte[] bytes = input.readNBytes(length);
read++;
return new Entry(nextSort, bytes);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
return null;
}
}
@Override
public void close() {
try {
input.close();
} catch (IOException e) {
LOGGER.warn("Error closing chunk", e);
}
}
@Override
public int compareTo(@NotNull PeekableScanner o) {
return next.compareTo(o.next);
}
}
public static record Entry(long sortKey, byte[] value) implements Comparable<Entry> {
record Entry(long sortKey, byte[] value) implements Comparable<Entry> {
@Override
public int compareTo(@NotNull Entry o) {

Wyświetl plik

@ -1,6 +1,5 @@
package com.onthegomap.flatmap.collections;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.LongLongHashMap;
import com.graphhopper.coll.GHLongLongHashMap;
import com.onthegomap.flatmap.LayerFeature;
@ -18,6 +17,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;
import org.locationtech.jts.geom.Geometry;
@ -38,6 +38,22 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
this(mergeSort, profile, new CommonStringEncoder());
}
public MergeSort.Entry encode(
long featureId,
TileCoord tile,
String layer,
Map<String, Object> attrs,
Geometry geom,
int zOrder,
boolean hasGroup,
int groupLimit
) {
return new MergeSort.Entry(
FeatureMapKey.encode(tile.encoded(), commonStrings.encode(layer), zOrder, hasGroup),
FeatureMapValue.from(featureId, attrs, geom, hasGroup, groupLimit).encode(commonStrings)
);
}
@Override
public void accept(MergeSort.Entry entry) {
mergeSort.add(entry);
@ -50,33 +66,31 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
return Collections.emptyIterator();
}
MergeSort.Entry firstFeature = entries.next();
byte[] firstData = firstFeature.value();
long firstSort = firstFeature.sortKey();
return new Iterator<>() {
private byte[] last = firstData;
private long lastSortKey = firstSort;
private int lastTileId = FeatureMapKey.extractTileFromKey(firstSort);
private MergeSort.Entry lastFeature = firstFeature;
private int lastTileId = FeatureMapKey.extractTileFromKey(firstFeature.sortKey());
@Override
public boolean hasNext() {
return last != null;
return lastFeature != null;
}
@Override
public TileFeatures next() {
TileFeatures result = new TileFeatures(lastTileId);
result.add(lastSortKey, last);
result.accept(lastFeature);
int lastTile = lastTileId;
while (entries.hasNext()) {
MergeSort.Entry next = entries.next();
last = next.value();
lastSortKey = next.sortKey();
lastTileId = FeatureMapKey.extractTileFromKey(lastSortKey);
lastFeature = next;
lastTileId = FeatureMapKey.extractTileFromKey(lastFeature.sortKey());
if (lastTile != lastTileId) {
return result;
}
result.add(next.sortKey(), last);
result.accept(next);
}
lastFeature = null;
return result;
}
};
@ -89,8 +103,7 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
public class TileFeatures implements Consumer<MergeSort.Entry> {
private final TileCoord tile;
private final LongArrayList sortKeys = new LongArrayList();
private final List<byte[]> entries = new ArrayList<>();
private final List<MergeSort.Entry> entries = new ArrayList<>();
private LongLongHashMap counts = null;
private byte layer = Byte.MAX_VALUE;
@ -112,8 +125,8 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
return false;
}
for (int i = 0; i < entries.size(); i++) {
byte[] a = entries.get(i);
byte[] b = other.entries.get(i);
byte[] a = entries.get(i).value();
byte[] b = other.entries.get(i).value();
if (!Arrays.equals(a, b)) {
return false;
}
@ -126,11 +139,10 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
List<VectorTileFeature> items = new ArrayList<>(entries.size());
String currentLayer = null;
for (int index = entries.size() - 1; index >= 0; index--) {
byte[] entry = entries.get(index);
long sortKey = sortKeys.get(index);
MergeSort.Entry entry = entries.get(index);
FeatureMapKey key = FeatureMapKey.decode(sortKey);
FeatureMapValue value = FeatureMapValue.decode(entry, key.hasGroup(), commonStrings);
FeatureMapKey key = FeatureMapKey.decode(entry.sortKey());
FeatureMapValue value = FeatureMapValue.decode(entry.value(), key.hasGroup(), commonStrings);
String layer = commonStrings.decode(key.layer);
if (currentLayer == null) {
@ -153,7 +165,9 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
return encoder;
}
public TileFeatures add(long sortKey, byte[] entry) {
@Override
public void accept(MergeSort.Entry entry) {
long sortKey = entry.sortKey();
if (FeatureMapKey.extractHasGroupFromKey(sortKey)) {
byte thisLayer = FeatureMapKey.extractLayerIdFromKey(sortKey);
if (counts == null) {
@ -163,29 +177,21 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
layer = thisLayer;
counts.clear();
}
var groupInfo = FeatureMapValue.decodeGroupInfo(entry);
var groupInfo = FeatureMapValue.decodeGroupInfo(entry.value());
long old = counts.getOrDefault(groupInfo.group, 0);
if (old >= groupInfo.limit && groupInfo.limit > 0) {
return this;
if (groupInfo.limit > 0 && old >= groupInfo.limit) {
return;
}
counts.put(groupInfo.group, old + 1);
}
sortKeys.add(sortKey);
entries.add(entry);
return this;
}
@Override
public void accept(MergeSort.Entry renderedFeature) {
add(renderedFeature.sortKey(), renderedFeature.value());
}
@Override
public String toString() {
return "TileFeatures{" +
"tile=" + tile +
", sortKeys=" + sortKeys +
", entries=" + entries +
", num entries=" + entries.size() +
'}';
}
}
@ -193,6 +199,16 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
private static final ThreadLocal<MessageBufferPacker> messagePackers = ThreadLocal
.withInitial(MessagePack::newDefaultBufferPacker);
public record RenderedFeature(
long featureId,
TileCoord tile,
String layer,
int zOrder,
Optional<FeatureMapValue.GroupInfo> groupInfo
) {
}
public record FeatureMapKey(long encoded, TileCoord tile, byte layer, int zOrder, boolean hasGroup) implements
Comparable<FeatureMapKey> {
@ -215,6 +231,7 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
}
public static long encode(int tile, byte layer, int zOrder, boolean hasGroup) {
zOrder = -zOrder - 1;
return ((long) tile << 32L) | ((long) (layer & 0xff) << 24L) | (((zOrder - Z_ORDER_MIN) & Z_ORDER_MASK) << 1L) | (
hasGroup ? 1 : 0);
}
@ -232,7 +249,7 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
}
public static int extractZorderFromKey(long sortKey) {
return ((int) ((sortKey >> 1) & Z_ORDER_MASK) + Z_ORDER_MIN);
return Z_ORDER_MAX - ((int) ((sortKey >> 1) & Z_ORDER_MASK));
}
@Override
@ -291,7 +308,7 @@ public record MergeSortFeatureMap(MergeSort mergeSort, Profile profile, CommonSt
boolean hasGrouping,
int groupLimit
) {
long group = geom.getUserData() instanceof Long longValue ? longValue : 0;
long group = geom.getUserData() instanceof Number number ? number.longValue() : 0;
byte geomType = (byte) VectorTileEncoder.toGeomType(geom).getNumber();
int[] commands = VectorTileEncoder.getCommands(geom);
return new FeatureMapValue(

Wyświetl plik

@ -1,6 +1,8 @@
package com.onthegomap.flatmap.geo;
public record TileCoord(int encoded, int x, int y, int z) {
import org.jetbrains.annotations.NotNull;
public record TileCoord(int encoded, int x, int y, int z) implements Comparable<TileCoord> {
public TileCoord {
assert z <= 14;
@ -70,4 +72,9 @@ public record TileCoord(int encoded, int x, int y, int z) {
", encoded=" + encoded +
'}';
}
@Override
public int compareTo(@NotNull TileCoord o) {
return Long.compare(encoded, o.encoded);
}
}

Wyświetl plik

@ -36,6 +36,12 @@ public class TestUtils {
return GeoUtils.gf.createPoint(new CoordinateXY(x, y));
}
public static Point newPointWithUserData(double x, double y, Object userData) {
Point point = GeoUtils.gf.createPoint(new CoordinateXY(x, y));
point.setUserData(userData);
return point;
}
public static MultiPoint newMultiPoint(Point... points) {
return GeoUtils.gf.createMultiPoint(points);
}

Wyświetl plik

@ -1,134 +1,300 @@
package com.onthegomap.flatmap.collections;
import static com.onthegomap.flatmap.TestUtils.newPoint;
import static com.onthegomap.flatmap.TestUtils.newPointWithUserData;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
import com.onthegomap.flatmap.Profile;
import com.onthegomap.flatmap.VectorTileEncoder;
import com.onthegomap.flatmap.geo.TileCoord;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.locationtech.jts.geom.Geometry;
public abstract class MergeSortFeatureMapTest {
public class MergeSortFeatureMapTest {
@TempDir
tmp
private final List<MergeSort.Entry> list = new ArrayList<>();
private final MergeSort sorter = new MergeSort() {
@Override
public void sort() {
list.sort(Comparator.naturalOrder());
}
@Override
public void add(Entry newEntry) {
list.add(newEntry);
}
@Override
public long getStorageSize() {
return 0;
}
@NotNull
@Override
public Iterator<Entry> iterator() {
return list.iterator();
}
};
private MergeSortFeatureMap features = new MergeSortFeatureMap(sorter, new Profile.NullProfile());
@Test
public void testEmpty() {
sorter.sort();
assertFalse(features.iterator().hasNext());
}
public void test() {
long id = 0;
private void put(int tile, String layer, Map<String, Object> attrs, Geometry geom) {
MergeSort.Entry key = features.encode(id++, TileCoord.decode(tile), layer, attrs, geom, 0, false, 0);
features.accept(key);
}
private void putWithZorder(int tile, String layer, Map<String, Object> attrs, Geometry geom, int zOrder) {
MergeSort.Entry key = features.encode(id++, TileCoord.decode(tile), layer, attrs, geom, zOrder, false, 0);
features.accept(key);
}
private void putWithGroup(int tile, String layer, Map<String, Object> attrs, Geometry geom, int zOrder, int limit) {
MergeSort.Entry key = features.encode(id++, TileCoord.decode(tile), layer, attrs, geom, zOrder, true, limit);
features.accept(key);
}
private Map<Integer, Map<String, List<Feature>>> getFeatures() {
Map<Integer, Map<String, List<Feature>>> map = new TreeMap<>();
for (MergeSortFeatureMap.TileFeatures tile : features) {
for (var feature : VectorTileEncoder.decode(tile.getTile().encode())) {
map.computeIfAbsent(tile.coord().encoded(), (i) -> new TreeMap<>())
.computeIfAbsent(feature.layerName(), l -> new ArrayList<>())
.add(new Feature(feature.attributes(), feature.geometry()));
}
}
return map;
}
private static record Feature(Map<String, Object> attrs, Geometry geom) {
}
// private MergeSortFeatureMap features;
//
// @Before
// public void setup() {
// this.features = getMap(new Profile.NullProfile());
// }
//
// protected abstract MergeSortFeatureMap getMap(Profile profile);
//
// @TempDir
// Path tmpDir;
//
// @Test
// public void testEmpty() {
// features.sort();
// assertFalse(features.iterator().hasNext());
// }
//
// @Test
// public void testThrowsWhenPreparedOutOfOrder() {
// features.accept(new RenderedFeature(1, new byte[]{}));
// assertThrows(IllegalStateException.class, features::iterator);
// features.sort();
// assertThrows(IllegalStateException.class, () -> features.accept(new RenderedFeature(1, new byte[]{})));
// }
//
// @Test
// public void test() {
// features.accept(FeatureMapKey.);
// features.sort();
// var actual = StreamSupport.stream(features.spliterator(), false).toList();
// assertEquals(List.of(
// new TileFeatures().add
// ), actual);
// }
//
// public static class TwoWorkers extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 2, 1_000, new InMemory());
// }
// }
//
// public static class MergeSortOnePerFileFeatureMapTest extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 2, 1, new InMemory());
// }
// }
//
//
// public static class MergeSortOnePerFileOneWorkerFeatureMapTest extends MergeSortFeatureMapTest {
//
// @Override
// protected MergeSortFeatureMap getMap(Profile profile) {
// return new MergeSortFeatureMap(tmpDir, profile, 1, 1_000_00, new InMemory());
// }
// }
//
// public static class FeatureMapKeyTest {
//
// @TestFactory
// public List<DynamicTest> testEncodeLongKey() {
// List<TileCoord> tiles = List.of(
// TileCoord.ofXYZ(0, 0, 14),
// TileCoord.ofXYZ((1 << 14) - 1, (1 << 14) - 1, 14),
// TileCoord.ofXYZ(0, 0, 0),
// TileCoord.ofXYZ(0, 0, 7),
// TileCoord.ofXYZ((1 << 7) - 1, (1 << 7) - 1, 7)
// );
// List<Byte> layers = List.of((byte) 0, (byte) 1, (byte) 255);
// List<Integer> zOrders = List.of((1 << 22) - 1, 0, -(1 << 22));
// List<Boolean> hasGroups = List.of(false, true);
// List<DynamicTest> result = new ArrayList<>();
// for (TileCoord tile : tiles) {
// for (byte layer : layers) {
// for (int zOrder : zOrders) {
// for (boolean hasGroup : hasGroups) {
// FeatureMapKey key = FeatureMapKey.of(tile.encoded(), layer, zOrder, hasGroup);
// result.add(dynamicTest(key.toString(), () -> {
// FeatureMapKey decoded = FeatureMapKey.decode(key.encoded());
// assertEquals(decoded.tile(), tile.encoded(), "tile");
// assertEquals(decoded.layer(), layer, "layer");
// assertEquals(decoded.zOrder(), zOrder, "zOrder");
// assertEquals(decoded.hasGroup(), hasGroup, "hasGroup");
// }));
// }
// }
// }
// }
// return result;
// }
//
// @ParameterizedTest
// @CsvSource({
// "0,0,-2,true, 0,0,-1,false",
// "0,0,1,false, 0,0,2,false",
// "0,0,-1,false, 0,0,1,false",
// "-1,0,-2,false, -1,0,-1,false",
// "-1,0,1,false, -1,0,2,false",
// "-1,0,-1,false, -1,0,1,false",
// "-1,0,-1,false, -1,0,-1,true",
// "1,0,1,false, 1,0,1,true"
// })
// public void testEncodeLongKeyOrdering(
// int tileA, byte layerA, int zOrderA, boolean hasGroupA,
// int tileB, byte layerB, int zOrderB, boolean hasGroupB
// ) {
// assertTrue(
// FeatureMapKey.encode(tileA, layerA, zOrderA, hasGroupA)
// <
// FeatureMapKey.encode(tileB, layerB, zOrderB, hasGroupB)
// );
// }
// }
@Test
public void testPutPoints() {
put(3, "layer3", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(3, "layer4", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(2, "layer", Map.of("a", 1.5d, "b", "string"), newPoint(5, 6));
put(1, "layer", Map.of("a", 1, "b", 2L), newPoint(1, 2));
put(1, "layer2", Map.of("c", 3d, "d", true), newPoint(3, 4));
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
new Feature(Map.of("a", 1L, "b", 2L), newPoint(1, 2))
),
"layer2", List.of(
new Feature(Map.of("c", 3d, "d", true), newPoint(3, 4))
)
)), 2, new TreeMap<>(Map.of(
"layer", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
)
)), 3, new TreeMap<>(Map.of(
"layer3", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
),
"layer4", List.of(
new Feature(Map.of("a", 1.5d, "b", "string"), newPoint(5, 6))
)
)))), getFeatures());
}
@Test
public void testPutPointsWithZorder() {
putWithZorder(
1, "layer", Map.of("id", 1), newPoint(1, 2), 2
);
putWithZorder(
1, "layer", Map.of("id", 2), newPoint(3, 4), 1
);
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
// order reversed because of z-order
new Feature(Map.of("id", 2L), newPoint(3, 4)),
new Feature(Map.of("id", 1L), newPoint(1, 2))
)
)))), getFeatures());
}
@Test
public void testLimitPoints() {
int x = 5, y = 6;
putWithGroup(
1, "layer", Map.of("id", 3), newPointWithUserData(x, y, 1), 0, 2
);
putWithGroup(
1, "layer", Map.of("id", 1), newPointWithUserData(1, 2, 1), 2, 2
);
putWithGroup(
1, "layer", Map.of("id", 2), newPointWithUserData(3, 4, 1), 1, 2
);
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
// order reversed because of z-order
// id=3 omitted because past limit
new Feature(Map.of("id", 2L), newPoint(3, 4)),
new Feature(Map.of("id", 1L), newPoint(1, 2))
)
)))), getFeatures());
}
@Test
public void testLimitPointsInDifferentGroups() {
int x = 5, y = 6;
putWithGroup(
1, "layer", Map.of("id", 3), newPointWithUserData(x, y, 2), 0, 2
);
putWithGroup(
1, "layer", Map.of("id", 1), newPointWithUserData(1, 2, 1), 2, 2
);
putWithGroup(
1, "layer", Map.of("id", 2), newPointWithUserData(3, 4, 1), 1, 2
);
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
// order reversed because of z-order
new Feature(Map.of("id", 3L), newPoint(x, y)),
new Feature(Map.of("id", 2L), newPoint(3, 4)),
new Feature(Map.of("id", 1L), newPoint(1, 2))
)
)))), getFeatures());
}
@Test
public void testDontLimitPointsWithGroup() {
int x = 5, y = 6;
putWithGroup(
1, "layer", Map.of("id", 3), newPointWithUserData(x, y, 1), 0, 0
);
putWithGroup(
1, "layer", Map.of("id", 1), newPointWithUserData(1, 2, 1), 2, 0
);
putWithGroup(
1, "layer", Map.of("id", 2), newPointWithUserData(3, 4, 1), 1, 0
);
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
// order reversed because of z-order,
new Feature(Map.of("id", 3L), newPoint(x, y)),
new Feature(Map.of("id", 2L), newPoint(3, 4)),
new Feature(Map.of("id", 1L), newPoint(1, 2))
)
)))), getFeatures());
}
@Test
public void testProfileChangesGeometry() {
features = new MergeSortFeatureMap(sorter, new Profile.NullProfile() {
@Override
public List<VectorTileEncoder.VectorTileFeature> postProcessLayerFeatures(String layer, int zoom,
List<VectorTileEncoder.VectorTileFeature> items) {
Collections.reverse(items);
return items;
}
});
int x = 5, y = 6;
putWithGroup(
1, "layer", Map.of("id", 3), newPointWithUserData(x, y, 1), 0, 2
);
putWithGroup(
1, "layer", Map.of("id", 1), newPointWithUserData(1, 2, 1), 2, 2
);
putWithGroup(
1, "layer", Map.of("id", 2), newPointWithUserData(3, 4, 1), 1, 2
);
sorter.sort();
assertEquals(new TreeMap<>(Map.of(
1, new TreeMap<>(Map.of(
"layer", List.of(
// back to same order because profile reversed
new Feature(Map.of("id", 1L), newPoint(1, 2)),
new Feature(Map.of("id", 2L), newPoint(3, 4))
)
)))), getFeatures());
}
@TestFactory
public List<DynamicTest> testEncodeLongKey() {
List<TileCoord> tiles = List.of(
TileCoord.ofXYZ(0, 0, 14),
TileCoord.ofXYZ((1 << 14) - 1, (1 << 14) - 1, 14),
TileCoord.ofXYZ(0, 0, 0),
TileCoord.ofXYZ(0, 0, 7),
TileCoord.ofXYZ((1 << 7) - 1, (1 << 7) - 1, 7)
);
List<Byte> layers = List.of((byte) 0, (byte) 1, (byte) 255);
List<Integer> zOrders = List.of((1 << 22) - 1, 0, -(1 << 22));
List<Boolean> hasGroups = List.of(false, true);
List<DynamicTest> result = new ArrayList<>();
for (TileCoord tile : tiles) {
for (byte layer : layers) {
for (int zOrder : zOrders) {
for (boolean hasGroup : hasGroups) {
MergeSortFeatureMap.FeatureMapKey key = MergeSortFeatureMap.FeatureMapKey
.of(tile.encoded(), layer, zOrder, hasGroup);
result.add(dynamicTest(key.toString(), () -> {
MergeSortFeatureMap.FeatureMapKey decoded = MergeSortFeatureMap.FeatureMapKey.decode(key.encoded());
assertEquals(decoded.tile(), tile, "tile");
assertEquals(decoded.layer(), layer, "layer");
assertEquals(decoded.zOrder(), zOrder, "zOrder");
assertEquals(decoded.hasGroup(), hasGroup, "hasGroup");
}));
}
}
}
}
return result;
}
@ParameterizedTest
@CsvSource({
"0,0,-1,true, 0,0,-2,false",
"0,0,2,false, 0,0,1,false",
"0,0,1,false, 0,0,-1,false",
"-1,0,-1,false, -1,0,-2,false",
"-1,0,2,false, -1,0,1,false",
"-1,0,1,false, -1,0,-1,false",
"-1,0,-1,false, -1,0,-1,true",
"1,0,1,false, 1,0,1,true"
})
public void testEncodeLongKeyOrdering(
int tileA, byte layerA, int zOrderA, boolean hasGroupA,
int tileB, byte layerB, int zOrderB, boolean hasGroupB
) {
assertTrue(
MergeSortFeatureMap.FeatureMapKey.encode(tileA, layerA, zOrderA, hasGroupA)
<
MergeSortFeatureMap.FeatureMapKey.encode(tileB, layerB, zOrderB, hasGroupB)
);
}
}

Wyświetl plik

@ -21,7 +21,7 @@ public class MergeSortTest {
}
private MergeSort newSorter(int workers, int chunkSizeLimit) {
return new MergeSort(tmpDir, workers, chunkSizeLimit, new Stats.InMemory());
return MergeSort.newExternalMergeSort(tmpDir, workers, chunkSizeLimit, new Stats.InMemory());
}
@Test