diff --git a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java index b31fb996..3158cb25 100644 --- a/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java +++ b/src/main/java/com/onthegomap/flatmap/reader/ShapefileReader.java @@ -15,7 +15,6 @@ import java.util.zip.ZipFile; import org.geotools.data.FeatureSource; import org.geotools.data.shapefile.ShapefileDataStore; import org.geotools.feature.FeatureCollection; -import org.geotools.feature.FeatureIterator; import org.geotools.geometry.jts.JTS; import org.geotools.referencing.CRS; import org.locationtech.jts.geom.Geometry; @@ -29,7 +28,6 @@ import org.opengis.referencing.operation.MathTransform; public class ShapefileReader extends Reader implements Closeable { private final FeatureCollection inputSource; - final FeatureIterator featureIterator; private String[] attributeNames; private final ShapefileDataStore dataStore; private MathTransform transform; @@ -66,7 +64,6 @@ public class ShapefileReader extends Reader implements Closeable { for (int i = 0; i < attributeNames.length; i++) { attributeNames[i] = inputSource.getSchema().getDescriptor(i).getLocalName(); } - this.featureIterator = inputSource.features(); } catch (IOException | FactoryException e) { throw new RuntimeException(e); } @@ -113,20 +110,22 @@ public class ShapefileReader extends Reader implements Closeable { @Override public SourceStep read() { return next -> { - while (featureIterator.hasNext()) { - SimpleFeature feature = featureIterator.next(); - Geometry source = (Geometry) feature.getDefaultGeometry(); - Geometry transformed = source; - if (transform != null) { - transformed = JTS.transform(source, transform); - } - if (transformed != null) { - SourceFeature geom = new ReaderFeature(transformed); - // TODO - // for (int i = 1; i < attributeNames.length; i++) { - // geom.setTag(attributeNames[i], feature.getAttribute(i)); - // } - next.accept(geom); + try (var iter = inputSource.features()) { + while (iter.hasNext()) { + SimpleFeature feature = iter.next(); + Geometry source = (Geometry) feature.getDefaultGeometry(); + Geometry transformed = source; + if (transform != null) { + transformed = JTS.transform(source, transform); + } + if (transformed != null) { + SourceFeature geom = new ReaderFeature(transformed); + // TODO + // for (int i = 1; i < attributeNames.length; i++) { + // geom.setTag(attributeNames[i], feature.getAttribute(i)); + // } + next.accept(geom); + } } } }; @@ -134,7 +133,6 @@ public class ShapefileReader extends Reader implements Closeable { @Override public void close() { - featureIterator.close(); dataStore.dispose(); } } diff --git a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java index 8df4cff6..7d24002e 100644 --- a/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java +++ b/src/main/java/com/onthegomap/flatmap/worker/WorkQueue.java @@ -1,11 +1,7 @@ package com.onthegomap.flatmap.worker; import com.onthegomap.flatmap.monitoring.Stats; -import java.io.Closeable; -import java.io.IOException; import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -15,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; -public class WorkQueue implements Closeable, Supplier, Consumer { +public class WorkQueue implements AutoCloseable, Supplier, Consumer { private final ThreadLocal> itemWriteBatchProvider = new ThreadLocal<>(); private final ThreadLocal> itemReadBatchProvider = new ThreadLocal<>(); @@ -25,7 +21,6 @@ public class WorkQueue implements Closeable, Supplier, Consumer { private final int pendingBatchesCapacity; private volatile boolean hasIncomingData = true; private final AtomicInteger pendingCount = new AtomicInteger(0); - private final List closeables = new ArrayList<>(); public WorkQueue(String name, int capacity, int maxBatch, Stats stats) { this.pendingBatchesCapacity = capacity / maxBatch; @@ -42,10 +37,7 @@ public class WorkQueue implements Closeable, Supplier, Consumer { } } hasIncomingData = false; - for (Closeable closeable : closeables) { - closeable.close(); - } - } catch (InterruptedException | IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -123,9 +115,5 @@ public class WorkQueue implements Closeable, Supplier, Consumer { public int getCapacity() { return pendingBatchesCapacity * batchSize; } - - public void alsoClose(Closeable toClose) { - closeables.add(toClose); - } } diff --git a/src/test/java/com/onthegomap/flatmap/reader/ShapefileReaderTest.java b/src/test/java/com/onthegomap/flatmap/reader/ShapefileReaderTest.java index d0cdd0e3..17df1da9 100644 --- a/src/test/java/com/onthegomap/flatmap/reader/ShapefileReaderTest.java +++ b/src/test/java/com/onthegomap/flatmap/reader/ShapefileReaderTest.java @@ -31,21 +31,23 @@ public class ShapefileReaderTest { @Test @Timeout(30) - public void testReadShapefile() { - Map counts = new TreeMap<>(); - List points = new ArrayList<>(); - Topology.start("test", new InMemory()) - .fromGenerator("shapefile", reader.read()) - .addBuffer("reader_queue", 100, 1) - .sinkToConsumer("counter", 1, elem -> { - String type = elem.getGeometry().getGeometryType(); - counts.put(type, counts.getOrDefault(type, 0) + 1); - points.add(elem.getGeometry()); - }).await(); - assertEquals(86, points.size()); - var gc = GeoUtils.gf.createGeometryCollection(points.toArray(new Geometry[0])); - var centroid = gc.getCentroid(); - assertEquals(-77.0297995, centroid.getX(), 5); - assertEquals(38.9119684, centroid.getY(), 5); + public void testReadShapefileTwice() { + for (int i = 1; i <= 2; i++) { + Map counts = new TreeMap<>(); + List points = new ArrayList<>(); + Topology.start("test", new InMemory()) + .fromGenerator("shapefile", reader.read()) + .addBuffer("reader_queue", 100, 1) + .sinkToConsumer("counter", 1, elem -> { + String type = elem.getGeometry().getGeometryType(); + counts.put(type, counts.getOrDefault(type, 0) + 1); + points.add(elem.getGeometry()); + }).await(); + assertEquals(86, points.size()); + var gc = GeoUtils.gf.createGeometryCollection(points.toArray(new Geometry[0])); + var centroid = gc.getCentroid(); + assertEquals(-77.0297995, centroid.getX(), 5, "iter " + i); + assertEquals(38.9119684, centroid.getY(), 5, "iter " + i); + } } }