diff --git a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java index 3e47e938..c3c37803 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/NaturalEarthReader.java @@ -6,10 +6,9 @@ import com.onthegomap.flatmap.monitoring.Stats; import com.onthegomap.flatmap.worker.Topology.SourceStep; import java.io.File; import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -17,7 +16,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; -import java.util.zip.ZipEntry; import java.util.zip.ZipFile; import org.locationtech.jts.geom.Geometry; import org.slf4j.Logger; @@ -45,30 +43,17 @@ public class NaturalEarthReader extends Reader { private Connection open(File file, File tmpLocation) throws IOException, SQLException { String path = "jdbc:sqlite:" + file.getAbsolutePath(); - File toOpen = file; if (file.getName().endsWith(".zip")) { - toOpen = tmpLocation == null ? File.createTempFile("sqlite", "natearth") : tmpLocation; + File toOpen = tmpLocation == null ? File.createTempFile("sqlite", "natearth") : tmpLocation; extracted = toOpen.toPath(); - toOpen.delete(); toOpen.deleteOnExit(); - String sqliteFileInZip; - try (ZipFile zip = new ZipFile(file)) { - sqliteFileInZip = zip.stream() - .map(ZipEntry::getName) - .filter(z -> z.endsWith(".sqlite")) - .findFirst().orElse(null); - } - if (sqliteFileInZip == null) { - throw new IllegalArgumentException("No .sqlite file found inside " + file.getName()); - } else { + try (ZipFile zipFile = new ZipFile(file)) { + var zipEntry = zipFile.stream() + .filter(entry -> entry.getName().endsWith(".sqlite")) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No .sqlite file found inside " + file.getName())); LOGGER.info("unzipping " + file.getAbsolutePath() + " to " + extracted); - try (FileSystem fileSystem = FileSystems.newFileSystem(file.toPath())) { - Path fileToExtract = fileSystem.getPath(sqliteFileInZip); - Files.copy( - fileToExtract, - extracted - ); - } + Files.copy(zipFile.getInputStream(zipEntry), extracted, StandardCopyOption.REPLACE_EXISTING); } path = "jdbc:sqlite:" + toOpen.getAbsolutePath(); } diff --git a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java index daec6c1d..ba846e3d 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java @@ -28,7 +28,7 @@ import org.opengis.referencing.operation.MathTransform; public class ShapefileReader extends Reader implements Closeable { private final FeatureCollection inputSource; - private String[] attributeNames; + private final String[] attributeNames; private final ShapefileDataStore dataStore; private MathTransform transform; @@ -76,17 +76,14 @@ public class ShapefileReader extends Reader implements Closeable { URI uri; if (name.endsWith(".zip")) { - String shapeFileInZip; try (ZipFile zip = new ZipFile(file)) { - shapeFileInZip = zip.stream() + String shapeFileInZip = zip.stream() .map(ZipEntry::getName) .filter(z -> z.endsWith(".shp")) - .findAny().orElse(null); + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No .shp file found inside " + name)); + uri = URI.create("jar:file:" + file.toPath().toAbsolutePath() + "!/" + shapeFileInZip); } - if (shapeFileInZip == null) { - throw new IllegalArgumentException("No .shp file found inside " + name); - } - uri = URI.create("jar:file:" + file.toPath().toAbsolutePath() + "!/" + shapeFileInZip); } else if (name.endsWith(".shp")) { uri = file.toURI(); } else { diff --git a/src/main/java/com/onthegomap/flatmap/worker/Topology.java b/src/main/java/com/onthegomap/flatmap/worker/Topology.java index 0c718e0f..9311a828 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/Topology.java +++ b/src/main/java/com/onthegomap/flatmap/worker/Topology.java @@ -23,9 +23,9 @@ public record Topology( private void doAwaitAndLog(ProgressLoggers loggers, Duration logInterval, long startNanos) { if (previous != null) { previous.doAwaitAndLog(loggers, logInterval, startNanos); - } - if (inputQueue != null) { - inputQueue.close(); + if (inputQueue != null) { + inputQueue.close(); + } } if (worker != null) { long elapsedSoFar = System.nanoTime() - startNanos; @@ -43,9 +43,9 @@ public record Topology( public void await() { if (previous != null) { previous.await(); - } - if (inputQueue != null) { - inputQueue.close(); + if (inputQueue != null) { + inputQueue.close(); + } } if (worker != null) { worker.await(); diff --git a/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java b/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java index d6c1f955..e826a9da 100644 --- a/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java +++ b/src/test/java/com/onthegomap/flatmap/worker/TopologyTest.java @@ -55,14 +55,15 @@ public class TopologyTest { }).addBuffer("writer_queue", 1) .sinkToConsumer("writer", 1, result::add); - queue.accept(0); - queue.accept(1); - queue.accept(2); - queue.close(); + new Thread(() -> { + queue.accept(0); + queue.accept(1); + queue.close(); + }).start(); topology.await(); - assertEquals(Set.of(1, 2, 3, 4, 5, 6), result); + assertEquals(Set.of(1, 2, 3, 4), result); } @Test