Add download retries (#1270)

pull/1271/head v0.9.1
Michael Barry 2025-06-19 10:17:08 -04:00 zatwierdzone przez GitHub
rodzic a2a1eb4089
commit 855fa914fa
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
3 zmienionych plików z 103 dodań i 30 usunięć

Wyświetl plik

@ -172,7 +172,7 @@ public record PlanetilerConfig(
arguments.getString("http_user_agent", "User-Agent header to set when downloading files over HTTP",
"Planetiler downloader (https://github.com/onthegomap/planetiler)"),
arguments.getDuration("http_timeout", "Timeout to use when downloading files over HTTP", "30s"),
arguments.getInteger("http_retries", "Retries to use when downloading files over HTTP", 1),
arguments.getInteger("http_retries", "Retries to use when downloading files over HTTP", 5),
arguments.getDuration("http_retry_wait", "How long to wait before retrying HTTP request", "5s"),
arguments.getLong("download_chunk_size_mb", "Size of file chunks to download in parallel in megabytes", 100),
arguments.getInteger("download_threads", "Number of parallel threads to use when downloading each file", 1),

Wyświetl plik

@ -269,42 +269,65 @@ public class Downloader {
Worker.joinFutures(chunks.stream().map(range -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> {
LogUtil.setStage("download", resource.id);
perFileLimiter.acquire();
var counter = resource.progress.counterForThread();
try (
var fc = FileChannel.open(tmpPath, WRITE);
var inputStream = (ranges || range.start > 0) ?
openStreamRange(canonicalUrl, range.start, range.end) :
openStream(canonicalUrl);
) {
long offset = range.start;
byte[] buffer = new byte[16384];
int read;
while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) {
counter.incBy(read);
if (rateLimiter != null) {
rateLimiter.acquire(read);
}
int position = 0;
int remaining = read;
while (remaining > 0) {
int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset);
if (written <= 0) {
throw new IOException("Failed to write to " + tmpPath);
try {
var counter = resource.progress.counterForThread();
for (int retry = 0; retry <= config.httpRetries(); retry++) {
boolean lastTry = retry == config.httpRetries();
int retriesRemaining = config.httpRetries() - retry;
int countToRewind = 0;
try (
var fc = FileChannel.open(tmpPath, WRITE);
var inputStream = (ranges || range.start > 0) ?
openStreamRange(canonicalUrl, range.start, range.end) :
openStream(canonicalUrl);
) {
long offset = range.start;
byte[] buffer = new byte[16384];
int read;
while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) {
counter.incBy(read);
countToRewind += read;
if (rateLimiter != null) {
rateLimiter.acquire(read);
}
int position = 0;
int remaining = read;
while (remaining > 0) {
int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset);
if (written <= 0) {
throw new IOException("Failed to write to " + tmpPath);
}
position += written;
remaining -= written;
offset += written;
}
}
if (offset < range.end && range.end != Long.MAX_VALUE) {
throw new IOException("Unexpected EOF at " + offset + "/" + range.end);
}
// successfully downloaded file
break;
} catch (IOException e) {
if (lastTry) {
throw e;
} else {
counter.incBy(-countToRewind);
LOGGER.warn("Error downloading {}, retries remaining: {} {}", canonicalUrl, retriesRemaining,
e.getMessage());
retrySleep();
}
position += written;
remaining -= written;
offset += written;
}
}
if (offset < range.end && range.end != Long.MAX_VALUE) {
throw new IllegalStateException("Unexpected EOF at " + offset + " expecting " + range.end);
}
} finally {
perFileLimiter.release();
}
}), executor)).toArray(CompletableFuture[]::new)).get();
}
protected void retrySleep() throws InterruptedException {
Thread.sleep(config.httpRetryWait());
}
private HttpRequest.Builder newHttpRequest(String url) {
return HttpRequest.newBuilder(URI.create(url))
.timeout(config.httpTimeout())

Wyświetl plik

@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.*;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -13,7 +14,10 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.UnaryOperator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@ -25,16 +29,22 @@ class DownloaderTest {
Path path;
private final PlanetilerConfig config = PlanetilerConfig.defaults();
private AtomicLong downloads = new AtomicLong(0);
private int slept = 0;
private Downloader mockDownloader(Map<String, byte[]> resources, boolean supportsRange,
boolean supportsContentLength) {
return mockDownloader(resources, supportsRange, supportsContentLength, UnaryOperator.identity());
}
private Downloader mockDownloader(Map<String, byte[]> resources, boolean supportsRange,
boolean supportsContentLength, UnaryOperator<byte[]> overrideBytes) {
return new Downloader(config, 2L) {
@Override
InputStream openStream(String url) {
downloads.incrementAndGet();
assertTrue(resources.containsKey(url), "no resource for " + url);
byte[] bytes = resources.get(url);
byte[] bytes = overrideBytes.apply(resources.get(url));
return new ByteArrayInputStream(bytes);
}
@ -44,7 +54,7 @@ class DownloaderTest {
downloads.incrementAndGet();
assertTrue(resources.containsKey(url), "no resource for " + url);
byte[] result = new byte[(int) (end - start)];
byte[] bytes = resources.get(url);
byte[] bytes = overrideBytes.apply(resources.get(url));
for (int i = (int) start; i < start + result.length; i++) {
result[(int) (i - start)] = bytes[i];
}
@ -64,6 +74,11 @@ class DownloaderTest {
return new ResourceMetadata(Optional.empty(), url,
supportsContentLength ? OptionalLong.of(bytes.length) : OptionalLong.empty(), supportsRange);
}
@Override
protected void retrySleep() {
slept++;
}
};
}
@ -126,6 +141,41 @@ class DownloaderTest {
assertEquals(5, resource4.bytesDownloaded());
}
@ParameterizedTest
@CsvSource({
"5, true",
"6, false"
})
void testRetry5xOK(int failures, boolean ok) throws Exception {
String url = "http://url";
Path dest = path.resolve("out");
var resource = new Downloader.ResourceToDownload("resource", url, dest);
Map<String, byte[]> resources = new ConcurrentHashMap<>();
AtomicInteger tries = new AtomicInteger(0);
String value = "abc";
String truncatedValue = "ab";
UnaryOperator<byte[]> overrideContent =
bytes -> (tries.incrementAndGet() <= failures ? truncatedValue : value).getBytes(StandardCharsets.UTF_8);
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
Downloader downloader = mockDownloader(resources, true, true, overrideContent);
resources.put(url, bytes);
var future = downloader.downloadIfNecessary(resource);
if (ok) {
future.get();
assertEquals(value, Files.readString(dest));
assertEquals(FileUtils.size(path), FileUtils.size(dest));
assertEquals(value.length(), resource.bytesDownloaded());
assertEquals(5, slept);
} else {
Throwable exception = ExceptionUtils.getRootCause(assertThrows(ExecutionException.class, future::get));
assertInstanceOf(IOException.class, exception);
assertFalse(Files.exists(dest));
assertEquals(truncatedValue.length(), resource.bytesDownloaded());
assertEquals(5, slept);
}
}
@Test
void testDownloadFailsIfTooBig() {
var downloader = new Downloader(config, 2L) {