diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java index 3b5369e7..2933d38a 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/config/PlanetilerConfig.java @@ -172,7 +172,7 @@ public record PlanetilerConfig( arguments.getString("http_user_agent", "User-Agent header to set when downloading files over HTTP", "Planetiler downloader (https://github.com/onthegomap/planetiler)"), arguments.getDuration("http_timeout", "Timeout to use when downloading files over HTTP", "30s"), - arguments.getInteger("http_retries", "Retries to use when downloading files over HTTP", 1), + arguments.getInteger("http_retries", "Retries to use when downloading files over HTTP", 5), arguments.getDuration("http_retry_wait", "How long to wait before retrying HTTP request", "5s"), arguments.getLong("download_chunk_size_mb", "Size of file chunks to download in parallel in megabytes", 100), arguments.getInteger("download_threads", "Number of parallel threads to use when downloading each file", 1), diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java index 6984ea94..dca61ca2 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java @@ -269,42 +269,65 @@ public class Downloader { Worker.joinFutures(chunks.stream().map(range -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> { LogUtil.setStage("download", resource.id); perFileLimiter.acquire(); - var counter = resource.progress.counterForThread(); - try ( - var fc = FileChannel.open(tmpPath, WRITE); - var inputStream = (ranges || range.start > 0) ? - openStreamRange(canonicalUrl, range.start, range.end) : - openStream(canonicalUrl); - ) { - long offset = range.start; - byte[] buffer = new byte[16384]; - int read; - while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) { - counter.incBy(read); - if (rateLimiter != null) { - rateLimiter.acquire(read); - } - int position = 0; - int remaining = read; - while (remaining > 0) { - int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset); - if (written <= 0) { - throw new IOException("Failed to write to " + tmpPath); + try { + var counter = resource.progress.counterForThread(); + for (int retry = 0; retry <= config.httpRetries(); retry++) { + boolean lastTry = retry == config.httpRetries(); + int retriesRemaining = config.httpRetries() - retry; + int countToRewind = 0; + try ( + var fc = FileChannel.open(tmpPath, WRITE); + var inputStream = (ranges || range.start > 0) ? + openStreamRange(canonicalUrl, range.start, range.end) : + openStream(canonicalUrl); + ) { + long offset = range.start; + byte[] buffer = new byte[16384]; + int read; + while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) { + counter.incBy(read); + countToRewind += read; + if (rateLimiter != null) { + rateLimiter.acquire(read); + } + int position = 0; + int remaining = read; + while (remaining > 0) { + int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset); + if (written <= 0) { + throw new IOException("Failed to write to " + tmpPath); + } + position += written; + remaining -= written; + offset += written; + } + } + if (offset < range.end && range.end != Long.MAX_VALUE) { + throw new IOException("Unexpected EOF at " + offset + "/" + range.end); + } + // successfully downloaded file + break; + } catch (IOException e) { + if (lastTry) { + throw e; + } else { + counter.incBy(-countToRewind); + LOGGER.warn("Error downloading {}, retries remaining: {} {}", canonicalUrl, retriesRemaining, + e.getMessage()); + retrySleep(); } - position += written; - remaining -= written; - offset += written; } } - if (offset < range.end && range.end != Long.MAX_VALUE) { - throw new IllegalStateException("Unexpected EOF at " + offset + " expecting " + range.end); - } } finally { perFileLimiter.release(); } }), executor)).toArray(CompletableFuture[]::new)).get(); } + protected void retrySleep() throws InterruptedException { + Thread.sleep(config.httpRetryWait()); + } + private HttpRequest.Builder newHttpRequest(String url) { return HttpRequest.newBuilder(URI.create(url)) .timeout(config.httpTimeout()) diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java index 37ff10b7..1d79fb00 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.*; import com.onthegomap.planetiler.config.PlanetilerConfig; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -13,7 +14,10 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.UnaryOperator; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -25,16 +29,22 @@ class DownloaderTest { Path path; private final PlanetilerConfig config = PlanetilerConfig.defaults(); private AtomicLong downloads = new AtomicLong(0); + private int slept = 0; private Downloader mockDownloader(Map resources, boolean supportsRange, boolean supportsContentLength) { + return mockDownloader(resources, supportsRange, supportsContentLength, UnaryOperator.identity()); + } + + private Downloader mockDownloader(Map resources, boolean supportsRange, + boolean supportsContentLength, UnaryOperator overrideBytes) { return new Downloader(config, 2L) { @Override InputStream openStream(String url) { downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); - byte[] bytes = resources.get(url); + byte[] bytes = overrideBytes.apply(resources.get(url)); return new ByteArrayInputStream(bytes); } @@ -44,7 +54,7 @@ class DownloaderTest { downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); byte[] result = new byte[(int) (end - start)]; - byte[] bytes = resources.get(url); + byte[] bytes = overrideBytes.apply(resources.get(url)); for (int i = (int) start; i < start + result.length; i++) { result[(int) (i - start)] = bytes[i]; } @@ -64,6 +74,11 @@ class DownloaderTest { return new ResourceMetadata(Optional.empty(), url, supportsContentLength ? OptionalLong.of(bytes.length) : OptionalLong.empty(), supportsRange); } + + @Override + protected void retrySleep() { + slept++; + } }; } @@ -126,6 +141,41 @@ class DownloaderTest { assertEquals(5, resource4.bytesDownloaded()); } + @ParameterizedTest + @CsvSource({ + "5, true", + "6, false" + }) + void testRetry5xOK(int failures, boolean ok) throws Exception { + String url = "http://url"; + Path dest = path.resolve("out"); + var resource = new Downloader.ResourceToDownload("resource", url, dest); + Map resources = new ConcurrentHashMap<>(); + AtomicInteger tries = new AtomicInteger(0); + String value = "abc"; + String truncatedValue = "ab"; + UnaryOperator overrideContent = + bytes -> (tries.incrementAndGet() <= failures ? truncatedValue : value).getBytes(StandardCharsets.UTF_8); + + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + Downloader downloader = mockDownloader(resources, true, true, overrideContent); + resources.put(url, bytes); + var future = downloader.downloadIfNecessary(resource); + if (ok) { + future.get(); + assertEquals(value, Files.readString(dest)); + assertEquals(FileUtils.size(path), FileUtils.size(dest)); + assertEquals(value.length(), resource.bytesDownloaded()); + assertEquals(5, slept); + } else { + Throwable exception = ExceptionUtils.getRootCause(assertThrows(ExecutionException.class, future::get)); + assertInstanceOf(IOException.class, exception); + assertFalse(Files.exists(dest)); + assertEquals(truncatedValue.length(), resource.bytesDownloaded()); + assertEquals(5, slept); + } + } + @Test void testDownloadFailsIfTooBig() { var downloader = new Downloader(config, 2L) {