kopia lustrzana https://github.com/onthegomap/planetiler
Add `--mmap-temp` option to use memory-mapped IO for temp feature files (#209)
rodzic
891537e2bc
commit
01b52f9812
|
@ -0,0 +1,136 @@
|
|||
package com.onthegomap.planetiler.collection;
|
||||
|
||||
import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;
|
||||
|
||||
import com.onthegomap.planetiler.config.PlanetilerConfig;
|
||||
import com.onthegomap.planetiler.stats.Counter;
|
||||
import com.onthegomap.planetiler.stats.ProgressLoggers;
|
||||
import com.onthegomap.planetiler.stats.Stats;
|
||||
import com.onthegomap.planetiler.stats.Timer;
|
||||
import com.onthegomap.planetiler.util.FileUtils;
|
||||
import com.onthegomap.planetiler.util.Format;
|
||||
import com.onthegomap.planetiler.worker.Worker;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* Performance tests for {@link ExternalMergeSort}. Times how long it takes to write temp features, sort, then read them
|
||||
* back with different parameters.
|
||||
* <p>
|
||||
* Usage: {@code BenchmarkExternalMergeSort <number of GB of features to write>}
|
||||
*/
|
||||
public class BenchmarkExternalMergeSort {
|
||||
private static final Format FORMAT = Format.defaultInstance();
|
||||
private static final int ITEM_SIZE_BYTES = 76;
|
||||
private static final byte[] TEST_DATA = new byte[ITEM_SIZE_BYTES - Long.BYTES - Integer.BYTES];
|
||||
static {
|
||||
ThreadLocalRandom.current().nextBytes(TEST_DATA);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
double gb = args.length == 0 ? 1 : Double.parseDouble(args[0]);
|
||||
long number = (long) (gb * 1_000_000_000 / ITEM_SIZE_BYTES);
|
||||
Path path = Path.of("./featuretest");
|
||||
FileUtils.delete(path);
|
||||
FileUtils.deleteOnExit(path);
|
||||
var config = PlanetilerConfig.defaults();
|
||||
try {
|
||||
List<Results> results = new ArrayList<>();
|
||||
for (int limit : List.of(500_000_000, 2_000_000_000)) {
|
||||
results.add(run(path, number, limit, false, true, true, config));
|
||||
results.add(run(path, number, limit, true, true, true, config));
|
||||
}
|
||||
for (var result : results) {
|
||||
System.err.println(result);
|
||||
}
|
||||
} finally {
|
||||
FileUtils.delete(path);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private record Results(
|
||||
String write, String read, String sort,
|
||||
int chunks, long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort,
|
||||
boolean madvise
|
||||
) {}
|
||||
|
||||
private static Results run(Path tmpDir, long items, int chunkSizeLimit, boolean mmap, boolean parallelSort,
|
||||
boolean madvise, PlanetilerConfig config) {
|
||||
boolean gzip = false;
|
||||
int writeWorkers = 1;
|
||||
int sortWorkers = Runtime.getRuntime().availableProcessors();
|
||||
int readWorkers = 1;
|
||||
FileUtils.delete(tmpDir);
|
||||
var sorter =
|
||||
new ExternalMergeSort(tmpDir, sortWorkers, chunkSizeLimit, gzip, mmap, parallelSort, madvise, config,
|
||||
Stats.inMemory());
|
||||
|
||||
var writeTimer = Timer.start();
|
||||
doWrites(writeWorkers, items, sorter);
|
||||
writeTimer.stop();
|
||||
|
||||
var sortTimer = Timer.start();
|
||||
sorter.sort();
|
||||
sortTimer.stop();
|
||||
|
||||
var readTimer = Timer.start();
|
||||
doReads(readWorkers, items, sorter);
|
||||
readTimer.stop();
|
||||
|
||||
return new Results(
|
||||
FORMAT.numeric(items * NANOSECONDS_PER_SECOND / writeTimer.elapsed().wall().toNanos()) + "/s",
|
||||
FORMAT.numeric(items * NANOSECONDS_PER_SECOND / readTimer.elapsed().wall().toNanos()) + "/s",
|
||||
FORMAT.duration(sortTimer.elapsed().wall()),
|
||||
sorter.chunks(),
|
||||
items,
|
||||
chunkSizeLimit,
|
||||
gzip,
|
||||
mmap,
|
||||
parallelSort,
|
||||
madvise
|
||||
);
|
||||
}
|
||||
|
||||
private static void doReads(int readWorkers, long items, ExternalMergeSort sorter) {
|
||||
var counters = Counter.newMultiThreadCounter();
|
||||
var reader = new Worker("read", Stats.inMemory(), readWorkers, () -> {
|
||||
var counter = counters.counterForThread();
|
||||
for (var ignored : sorter) {
|
||||
counter.inc();
|
||||
}
|
||||
});
|
||||
ProgressLoggers loggers = ProgressLoggers.create()
|
||||
.addRatePercentCounter("items", items, counters, false)
|
||||
.addFileSize(sorter)
|
||||
.newLine()
|
||||
.addProcessStats()
|
||||
.newLine()
|
||||
.addThreadPoolStats("reader", reader);
|
||||
reader.awaitAndLog(loggers, Duration.ofSeconds(1));
|
||||
}
|
||||
|
||||
private static void doWrites(int writeWorkers, long items, ExternalMergeSort sorter) {
|
||||
var counters = Counter.newMultiThreadCounter();
|
||||
var writer = new Worker("write", Stats.inMemory(), writeWorkers, () -> {
|
||||
var counter = counters.counterForThread();
|
||||
var random = ThreadLocalRandom.current();
|
||||
long toWrite = items / writeWorkers;
|
||||
for (long i = 0; i < toWrite; i++) {
|
||||
sorter.add(new SortableFeature(random.nextLong(), TEST_DATA));
|
||||
counter.inc();
|
||||
}
|
||||
});
|
||||
ProgressLoggers loggers = ProgressLoggers.create()
|
||||
.addRatePercentCounter("items", items, counters, false)
|
||||
.addFileSize(sorter)
|
||||
.newLine()
|
||||
.addProcessStats()
|
||||
.newLine()
|
||||
.addThreadPoolStats("writer", writer);
|
||||
writer.awaitAndLog(loggers, Duration.ofSeconds(1));
|
||||
}
|
||||
}
|
|
@ -6,6 +6,8 @@ import com.onthegomap.planetiler.config.PlanetilerConfig;
|
|||
import com.onthegomap.planetiler.stats.ProcessInfo;
|
||||
import com.onthegomap.planetiler.stats.ProgressLoggers;
|
||||
import com.onthegomap.planetiler.stats.Stats;
|
||||
import com.onthegomap.planetiler.stats.Timer;
|
||||
import com.onthegomap.planetiler.util.ByteBufferUtil;
|
||||
import com.onthegomap.planetiler.util.FileUtils;
|
||||
import com.onthegomap.planetiler.worker.WorkerPipeline;
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -16,8 +18,13 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -26,6 +33,7 @@ import java.util.List;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.zip.Deflater;
|
||||
|
@ -39,9 +47,9 @@ import org.slf4j.LoggerFactory;
|
|||
* A utility that writes {@link SortableFeature SortableFeatures} to disk and uses merge sort to efficiently sort much
|
||||
* more data than fits in RAM.
|
||||
* <p>
|
||||
* Writes append features to a "chunk" file that can be sorted with 1GB or RAM until it is full, then starts writing to
|
||||
* a new chunk. The sort process sorts the chunks, limiting the number of parallel threads by CPU cores and available
|
||||
* RAM. Reads do a k-way merge of the sorted chunks using a priority queue of minimum values from each.
|
||||
* Writes append features to a "chunk" file that can be sorted with a fixed amount of RAM, then starts writing to a new
|
||||
* chunk. The sort process sorts the chunks, limiting the number of parallel threads by CPU cores and available RAM.
|
||||
* Reads do a k-way merge of the sorted chunks using a priority queue of minimum values from each.
|
||||
* <p>
|
||||
* Only supports single-threaded writes and reads.
|
||||
*/
|
||||
|
@ -60,6 +68,10 @@ class ExternalMergeSort implements FeatureSort {
|
|||
private final PlanetilerConfig config;
|
||||
private final int readerLimit;
|
||||
private final int writerLimit;
|
||||
private final boolean mmapIO;
|
||||
private final boolean parallelSort;
|
||||
private final boolean madvise;
|
||||
private final AtomicBoolean madviseFailed = new AtomicBoolean(false);
|
||||
private Chunk currentChunk;
|
||||
private volatile boolean sorted = false;
|
||||
|
||||
|
@ -72,17 +84,28 @@ class ExternalMergeSort implements FeatureSort {
|
|||
ProcessInfo.getMaxMemoryBytes() / 3
|
||||
),
|
||||
config.gzipTempStorage(),
|
||||
config.mmapTempStorage(),
|
||||
true,
|
||||
true,
|
||||
config,
|
||||
stats
|
||||
);
|
||||
}
|
||||
|
||||
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, boolean gzip, PlanetilerConfig config, Stats stats) {
|
||||
ExternalMergeSort(Path dir, int workers, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort,
|
||||
boolean madvise, PlanetilerConfig config, Stats stats) {
|
||||
this.config = config;
|
||||
this.madvise = madvise;
|
||||
this.dir = dir;
|
||||
this.stats = stats;
|
||||
this.parallelSort = parallelSort;
|
||||
this.chunkSizeLimit = chunkSizeLimit;
|
||||
if (gzip && mmap) {
|
||||
LOGGER.warn("--gzip-temp option not supported with --mmap-temp, falling back to --gzip-temp=false");
|
||||
gzip = false;
|
||||
}
|
||||
this.gzip = gzip;
|
||||
this.mmapIO = mmap;
|
||||
long memLimit = ProcessInfo.getMaxMemoryBytes() / 2;
|
||||
if (chunkSizeLimit > memLimit) {
|
||||
throw new IllegalStateException("Not enough memory for chunkSize=" + chunkSizeLimit + " limit=" + memLimit);
|
||||
|
@ -97,37 +120,19 @@ class ExternalMergeSort implements FeatureSort {
|
|||
Files.createDirectories(dir);
|
||||
newChunk();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T time(AtomicLong timer, Supplier<T> func) {
|
||||
long start = System.nanoTime();
|
||||
private static <T> T time(AtomicLong total, Supplier<T> func) {
|
||||
var timer = Timer.start();
|
||||
try {
|
||||
return func.get();
|
||||
} finally {
|
||||
timer.addAndGet(System.nanoTime() - start);
|
||||
total.addAndGet(timer.stop().elapsed().wall().toNanos());
|
||||
}
|
||||
}
|
||||
|
||||
private DataInputStream newInputStream(Path path) throws IOException {
|
||||
@SuppressWarnings("java:S2095") // DataInputStream closes inputStream
|
||||
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path), 50_000);
|
||||
if (gzip) {
|
||||
inputStream = new GZIPInputStream(inputStream);
|
||||
}
|
||||
return new DataInputStream(inputStream);
|
||||
}
|
||||
|
||||
private DataOutputStream newOutputStream(Path path) throws IOException {
|
||||
@SuppressWarnings("java:S2095") // DataInputStream closes inputStream
|
||||
OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(path), 50_000);
|
||||
if (gzip) {
|
||||
outputStream = new FastGzipOutputStream(outputStream);
|
||||
}
|
||||
return new DataOutputStream(outputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(SortableFeature item) {
|
||||
try {
|
||||
|
@ -138,7 +143,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
newChunk();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -219,7 +224,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
assert sorted;
|
||||
|
||||
// k-way merge to interleave all the sorted chunks
|
||||
PriorityQueue<ChunkIterator> queue = new PriorityQueue<>(chunks.size());
|
||||
PriorityQueue<Reader<?>> queue = new PriorityQueue<>(chunks.size());
|
||||
for (Chunk chunk : chunks) {
|
||||
if (chunk.itemCount > 0) {
|
||||
queue.add(chunk.newReader());
|
||||
|
@ -234,7 +239,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
|
||||
@Override
|
||||
public SortableFeature next() {
|
||||
ChunkIterator iterator = queue.poll();
|
||||
Reader<?> iterator = queue.poll();
|
||||
assert iterator != null;
|
||||
SortableFeature next = iterator.next();
|
||||
if (iterator.hasNext()) {
|
||||
|
@ -254,6 +259,31 @@ class ExternalMergeSort implements FeatureSort {
|
|||
chunks.add(currentChunk = new Chunk(chunkPath));
|
||||
}
|
||||
|
||||
public int chunks() {
|
||||
return chunks.size();
|
||||
}
|
||||
|
||||
private void tryMadviseSequential(ByteBuffer buffer) {
|
||||
try {
|
||||
ByteBufferUtil.posixMadvise(buffer, ByteBufferUtil.Madvice.SEQUENTIAL);
|
||||
} catch (IOException e) {
|
||||
if (madviseFailed.compareAndSet(false, true)) { // log once
|
||||
LOGGER.info("madvise not available on this system to speed up temporary feature IO.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private interface Writer extends Closeable {
|
||||
void write(SortableFeature feature) throws IOException;
|
||||
}
|
||||
|
||||
private interface Reader<T extends Reader<?>>
|
||||
extends Closeable, Iterator<SortableFeature>, Comparable<T> {
|
||||
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
||||
/** Compresses bytes with minimal impact on write performance. Equivalent to {@code gzip -1} */
|
||||
private static class FastGzipOutputStream extends GZIPOutputStream {
|
||||
|
||||
|
@ -263,28 +293,166 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
}
|
||||
|
||||
/** Read all features from a chunk file using a {@link BufferedInputStream}. */
|
||||
private static class ReaderBuffered extends BaseReader<ReaderBuffered> {
|
||||
|
||||
private final int count;
|
||||
private final DataInputStream input;
|
||||
private int read = 0;
|
||||
|
||||
ReaderBuffered(Path path, int count, boolean gzip) {
|
||||
this.count = count;
|
||||
try {
|
||||
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path));
|
||||
if (gzip) {
|
||||
inputStream = new GZIPInputStream(inputStream);
|
||||
}
|
||||
input = new DataInputStream(inputStream);
|
||||
next = readNextFeature();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
SortableFeature readNextFeature() {
|
||||
if (read < count) {
|
||||
try {
|
||||
long nextSort = input.readLong();
|
||||
int length = input.readInt();
|
||||
byte[] bytes = input.readNBytes(length);
|
||||
read++;
|
||||
return new SortableFeature(nextSort, bytes);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
input.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Error closing chunk", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Write features to the chunk file using a {@link BufferedOutputStream}. */
|
||||
private static class WriterBuffered implements Writer {
|
||||
|
||||
private final DataOutputStream out;
|
||||
|
||||
WriterBuffered(Path path, boolean gzip) {
|
||||
try {
|
||||
OutputStream rawOutputStream = new BufferedOutputStream(Files.newOutputStream(path));
|
||||
if (gzip) {
|
||||
rawOutputStream = new FastGzipOutputStream(rawOutputStream);
|
||||
}
|
||||
this.out = new DataOutputStream(rawOutputStream);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(SortableFeature feature) throws IOException {
|
||||
out.writeLong(feature.key());
|
||||
out.writeInt(feature.value().length);
|
||||
out.write(feature.value());
|
||||
}
|
||||
}
|
||||
|
||||
/** Common functionality between {@link ReaderMmap} and {@link ReaderBuffered}. */
|
||||
private abstract static class BaseReader<T extends BaseReader<?>> implements Reader<T> {
|
||||
SortableFeature next;
|
||||
|
||||
@Override
|
||||
public final boolean hasNext() {
|
||||
return next != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final SortableFeature next() {
|
||||
SortableFeature current = next;
|
||||
if (current == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
if ((next = readNextFeature()) == null) {
|
||||
close();
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int compareTo(T o) {
|
||||
return next.compareTo(o.next);
|
||||
}
|
||||
|
||||
abstract SortableFeature readNextFeature();
|
||||
}
|
||||
|
||||
/** Write features to the chunk file through a memory-mapped file. */
|
||||
private class WriterMmap implements Writer {
|
||||
private final FileChannel channel;
|
||||
private final MappedByteBuffer buffer;
|
||||
|
||||
WriterMmap(Path path) {
|
||||
try {
|
||||
this.channel =
|
||||
FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
|
||||
this.buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, chunkSizeLimit);
|
||||
if (madvise) {
|
||||
tryMadviseSequential(buffer);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// on windows, truncating throws an exception if the file is still mapped
|
||||
ByteBufferUtil.free(buffer);
|
||||
channel.truncate(buffer.position());
|
||||
channel.close();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void write(SortableFeature feature) throws IOException {
|
||||
buffer.putLong(feature.key());
|
||||
buffer.putInt(feature.value().length);
|
||||
buffer.put(feature.value());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An output segment that can be sorted in ~1GB RAM.
|
||||
* An output segment that can be sorted with a fixed amount of RAM.
|
||||
*/
|
||||
private class Chunk implements Closeable {
|
||||
|
||||
private final Path path;
|
||||
private final DataOutputStream outputStream;
|
||||
private final Writer writer;
|
||||
// estimate how much RAM it would take to sort this chunk
|
||||
private int bytesInMemory = 0;
|
||||
private int itemCount = 0;
|
||||
|
||||
private Chunk(Path path) throws IOException {
|
||||
private Chunk(Path path) {
|
||||
this.path = path;
|
||||
this.outputStream = newOutputStream(path);
|
||||
}
|
||||
|
||||
public ChunkIterator newReader() {
|
||||
return new ChunkIterator(path, itemCount);
|
||||
this.writer = newWriter(path);
|
||||
}
|
||||
|
||||
public void add(SortableFeature entry) throws IOException {
|
||||
write(outputStream, entry);
|
||||
writer.write(entry);
|
||||
bytesInMemory +=
|
||||
// pointer to feature
|
||||
8 +
|
||||
|
@ -300,7 +468,7 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
|
||||
private SortableChunk readAll() {
|
||||
try (ChunkIterator iterator = newReader()) {
|
||||
try (var iterator = newReader()) {
|
||||
SortableFeature[] featuresToSort = new SortableFeature[itemCount];
|
||||
int i = 0;
|
||||
while (iterator.hasNext()) {
|
||||
|
@ -314,17 +482,17 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
}
|
||||
|
||||
private static void write(DataOutputStream out, SortableFeature entry) throws IOException {
|
||||
// feature header
|
||||
out.writeLong(entry.key());
|
||||
out.writeInt(entry.value().length);
|
||||
// value
|
||||
out.write(entry.value());
|
||||
private Writer newWriter(Path path) {
|
||||
return mmapIO ? new WriterMmap(path) : new WriterBuffered(path, gzip);
|
||||
}
|
||||
|
||||
private Reader<?> newReader() {
|
||||
return mmapIO ? new ReaderMmap(path, itemCount) : new ReaderBuffered(path, itemCount, gzip);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
outputStream.close();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -339,73 +507,59 @@ class ExternalMergeSort implements FeatureSort {
|
|||
}
|
||||
|
||||
public SortableChunk sort() {
|
||||
Arrays.parallelSort(featuresToSort);
|
||||
if (parallelSort) {
|
||||
Arrays.parallelSort(featuresToSort);
|
||||
} else {
|
||||
Arrays.sort(featuresToSort);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public SortableChunk flush() {
|
||||
try (DataOutputStream out = newOutputStream(path)) {
|
||||
try (Writer out = newWriter(path)) {
|
||||
for (SortableFeature feature : featuresToSort) {
|
||||
write(out, feature);
|
||||
out.write(feature);
|
||||
}
|
||||
featuresToSort = null;
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterator through all features of a sorted chunk that peeks at the next item before returning it to support k-way
|
||||
* merge using a {@link PriorityQueue}.
|
||||
*/
|
||||
private class ChunkIterator implements Closeable, Comparable<ChunkIterator>, Iterator<SortableFeature> {
|
||||
|
||||
/** Memory-map the chunk file, then iterate through all features in it. */
|
||||
private class ReaderMmap extends BaseReader<ReaderMmap> {
|
||||
private final int count;
|
||||
private final DataInputStream input;
|
||||
private final FileChannel channel;
|
||||
private final MappedByteBuffer buffer;
|
||||
private int read = 0;
|
||||
private SortableFeature next;
|
||||
|
||||
ChunkIterator(Path path, int count) {
|
||||
ReaderMmap(Path path, int count) {
|
||||
this.count = count;
|
||||
try {
|
||||
input = newInputStream(path);
|
||||
channel = FileChannel.open(path, StandardOpenOption.READ);
|
||||
buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
|
||||
if (madvise) {
|
||||
// give the OS a hint that pages will be read sequentially so it can read-ahead and drop as soon as we're done
|
||||
tryMadviseSequential(buffer);
|
||||
}
|
||||
next = readNextFeature();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return next != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortableFeature next() {
|
||||
SortableFeature current = next;
|
||||
if (current == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
if ((next = readNextFeature()) == null) {
|
||||
close();
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private SortableFeature readNextFeature() {
|
||||
SortableFeature readNextFeature() {
|
||||
if (read < count) {
|
||||
try {
|
||||
long nextSort = input.readLong();
|
||||
int length = input.readInt();
|
||||
byte[] bytes = input.readNBytes(length);
|
||||
read++;
|
||||
return new SortableFeature(nextSort, bytes);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
long nextSort = buffer.getLong();
|
||||
int length = buffer.getInt();
|
||||
byte[] bytes = new byte[length];
|
||||
buffer.get(bytes);
|
||||
read++;
|
||||
return new SortableFeature(nextSort, bytes);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -414,15 +568,15 @@ class ExternalMergeSort implements FeatureSort {
|
|||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
input.close();
|
||||
ByteBufferUtil.free(buffer);
|
||||
} catch (IOException e) {
|
||||
LOGGER.info("Unable to unmap chunk", e);
|
||||
}
|
||||
try {
|
||||
channel.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Error closing chunk", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(ChunkIterator o) {
|
||||
return next.compareTo(o.next);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ public record PlanetilerConfig(
|
|||
boolean emitTilesInOrder,
|
||||
boolean force,
|
||||
boolean gzipTempStorage,
|
||||
boolean mmapTempStorage,
|
||||
int sortMaxReaders,
|
||||
int sortMaxWriters,
|
||||
String nodeMapType,
|
||||
|
@ -86,6 +87,7 @@ public record PlanetilerConfig(
|
|||
arguments.getBoolean("emit_tiles_in_order", "emit tiles in index order", true),
|
||||
arguments.getBoolean("force", "overwriting output file and ignore disk/RAM warnings", false),
|
||||
arguments.getBoolean("gzip_temp", "gzip temporary feature storage (uses more CPU, but less disk space)", false),
|
||||
arguments.getBoolean("mmap_temp", "use memory-mapped IO for temp feature files", false),
|
||||
arguments.getInteger("sort_max_readers", "maximum number of concurrent read threads to use when sorting chunks",
|
||||
6),
|
||||
arguments.getInteger("sort_max_writers", "maximum number of concurrent write threads to use when sorting chunks",
|
||||
|
|
|
@ -12,7 +12,7 @@ import java.util.Random;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
|
||||
class FeatureSortTest {
|
||||
|
||||
|
@ -25,20 +25,21 @@ class FeatureSortTest {
|
|||
return new SortableFeature(Long.MIN_VALUE + i, new byte[]{(byte) i, (byte) (1 + i)});
|
||||
}
|
||||
|
||||
private FeatureSort newSorter(int workers, int chunkSizeLimit, boolean gzip) {
|
||||
return new ExternalMergeSort(tmpDir, workers, chunkSizeLimit, gzip, config, Stats.inMemory());
|
||||
private FeatureSort newSorter(int workers, int chunkSizeLimit, boolean gzip, boolean mmap) {
|
||||
return new ExternalMergeSort(tmpDir, workers, chunkSizeLimit, gzip, mmap, true, true, config,
|
||||
Stats.inMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEmpty() {
|
||||
FeatureSort sorter = newSorter(1, 100, false);
|
||||
FeatureSort sorter = newSorter(1, 100, false, false);
|
||||
sorter.sort();
|
||||
assertEquals(List.of(), sorter.toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSingle() {
|
||||
FeatureSort sorter = newSorter(1, 100, false);
|
||||
FeatureSort sorter = newSorter(1, 100, false, false);
|
||||
sorter.add(newEntry(1));
|
||||
sorter.sort();
|
||||
assertEquals(List.of(newEntry(1)), sorter.toList());
|
||||
|
@ -46,7 +47,7 @@ class FeatureSortTest {
|
|||
|
||||
@Test
|
||||
void testTwoItemsOneChunk() {
|
||||
FeatureSort sorter = newSorter(1, 100, false);
|
||||
FeatureSort sorter = newSorter(1, 100, false, false);
|
||||
sorter.add(newEntry(2));
|
||||
sorter.add(newEntry(1));
|
||||
sorter.sort();
|
||||
|
@ -55,7 +56,7 @@ class FeatureSortTest {
|
|||
|
||||
@Test
|
||||
void testTwoItemsTwoChunks() {
|
||||
FeatureSort sorter = newSorter(1, 0, false);
|
||||
FeatureSort sorter = newSorter(1, 0, false, false);
|
||||
sorter.add(newEntry(2));
|
||||
sorter.add(newEntry(1));
|
||||
sorter.sort();
|
||||
|
@ -64,7 +65,7 @@ class FeatureSortTest {
|
|||
|
||||
@Test
|
||||
void testTwoWorkers() {
|
||||
FeatureSort sorter = newSorter(2, 0, false);
|
||||
FeatureSort sorter = newSorter(2, 0, false, false);
|
||||
sorter.add(newEntry(4));
|
||||
sorter.add(newEntry(3));
|
||||
sorter.add(newEntry(2));
|
||||
|
@ -74,8 +75,13 @@ class FeatureSortTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
void testManyItems(boolean gzip) {
|
||||
@CsvSource({
|
||||
"false,false",
|
||||
"false,true",
|
||||
"true,false",
|
||||
"true,true",
|
||||
})
|
||||
void testManyItems(boolean gzip, boolean mmap) {
|
||||
List<SortableFeature> sorted = new ArrayList<>();
|
||||
List<SortableFeature> shuffled = new ArrayList<>();
|
||||
for (int i = 0; i < 10_000; i++) {
|
||||
|
@ -83,7 +89,7 @@ class FeatureSortTest {
|
|||
sorted.add(newEntry(i));
|
||||
}
|
||||
Collections.shuffle(shuffled, new Random(0));
|
||||
FeatureSort sorter = newSorter(2, 20_000, gzip);
|
||||
FeatureSort sorter = newSorter(2, 20_000, gzip, mmap);
|
||||
shuffled.forEach(sorter::add);
|
||||
sorter.sort();
|
||||
assertEquals(sorted, sorter.toList());
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699,js3776,js1121
|
||||
sonar.issue.ignore.multicriteria=js1659,js3358,js1172,js106,js125,js2699,js3776,js1121,js107
|
||||
# subjective
|
||||
sonar.issue.ignore.multicriteria.js1659.ruleKey=java:S1659
|
||||
sonar.issue.ignore.multicriteria.js1659.resourceKey=**/*.java
|
||||
|
@ -12,6 +12,8 @@ sonar.issue.ignore.multicriteria.js3776.ruleKey=java:S3776
|
|||
sonar.issue.ignore.multicriteria.js3776.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js1121.ruleKey=java:S1121
|
||||
sonar.issue.ignore.multicriteria.js1121.resourceKey=**/*.java
|
||||
sonar.issue.ignore.multicriteria.js107.ruleKey=java:S107
|
||||
sonar.issue.ignore.multicriteria.js107.resourceKey=**/*.java
|
||||
|
||||
# layer constructors need same signatures
|
||||
sonar.issue.ignore.multicriteria.js1172.ruleKey=java:S1172
|
||||
|
|
Ładowanie…
Reference in New Issue