pull/1/head
Mike Barry 2021-04-23 05:44:29 -04:00
rodzic adcb6576d9
commit 1235d155e7
3 zmienionych plików z 36 dodań i 48 usunięć

Wyświetl plik

@ -15,7 +15,6 @@ import java.util.zip.ZipFile;
import org.geotools.data.FeatureSource;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geometry.jts.JTS;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Geometry;
@ -29,7 +28,6 @@ import org.opengis.referencing.operation.MathTransform;
public class ShapefileReader extends Reader implements Closeable {
private final FeatureCollection<SimpleFeatureType, SimpleFeature> inputSource;
final FeatureIterator<SimpleFeature> featureIterator;
private String[] attributeNames;
private final ShapefileDataStore dataStore;
private MathTransform transform;
@ -66,7 +64,6 @@ public class ShapefileReader extends Reader implements Closeable {
for (int i = 0; i < attributeNames.length; i++) {
attributeNames[i] = inputSource.getSchema().getDescriptor(i).getLocalName();
}
this.featureIterator = inputSource.features();
} catch (IOException | FactoryException e) {
throw new RuntimeException(e);
}
@ -113,20 +110,22 @@ public class ShapefileReader extends Reader implements Closeable {
@Override
public SourceStep<SourceFeature> read() {
return next -> {
while (featureIterator.hasNext()) {
SimpleFeature feature = featureIterator.next();
Geometry source = (Geometry) feature.getDefaultGeometry();
Geometry transformed = source;
if (transform != null) {
transformed = JTS.transform(source, transform);
}
if (transformed != null) {
SourceFeature geom = new ReaderFeature(transformed);
// TODO
// for (int i = 1; i < attributeNames.length; i++) {
// geom.setTag(attributeNames[i], feature.getAttribute(i));
// }
next.accept(geom);
try (var iter = inputSource.features()) {
while (iter.hasNext()) {
SimpleFeature feature = iter.next();
Geometry source = (Geometry) feature.getDefaultGeometry();
Geometry transformed = source;
if (transform != null) {
transformed = JTS.transform(source, transform);
}
if (transformed != null) {
SourceFeature geom = new ReaderFeature(transformed);
// TODO
// for (int i = 1; i < attributeNames.length; i++) {
// geom.setTag(attributeNames[i], feature.getAttribute(i));
// }
next.accept(geom);
}
}
}
};
@ -134,7 +133,6 @@ public class ShapefileReader extends Reader implements Closeable {
@Override
public void close() {
featureIterator.close();
dataStore.dispose();
}
}

Wyświetl plik

@ -1,11 +1,7 @@
package com.onthegomap.flatmap.worker;
import com.onthegomap.flatmap.monitoring.Stats;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -15,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public class WorkQueue<T> implements AutoCloseable, Supplier<T>, Consumer<T> {
private final ThreadLocal<Queue<T>> itemWriteBatchProvider = new ThreadLocal<>();
private final ThreadLocal<Queue<T>> itemReadBatchProvider = new ThreadLocal<>();
@ -25,7 +21,6 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
private final int pendingBatchesCapacity;
private volatile boolean hasIncomingData = true;
private final AtomicInteger pendingCount = new AtomicInteger(0);
private final List<Closeable> closeables = new ArrayList<>();
public WorkQueue(String name, int capacity, int maxBatch, Stats stats) {
this.pendingBatchesCapacity = capacity / maxBatch;
@ -42,10 +37,7 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
}
}
hasIncomingData = false;
for (Closeable closeable : closeables) {
closeable.close();
}
} catch (InterruptedException | IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@ -123,9 +115,5 @@ public class WorkQueue<T> implements Closeable, Supplier<T>, Consumer<T> {
public int getCapacity() {
return pendingBatchesCapacity * batchSize;
}
public void alsoClose(Closeable toClose) {
closeables.add(toClose);
}
}

Wyświetl plik

@ -31,21 +31,23 @@ public class ShapefileReaderTest {
@Test
@Timeout(30)
public void testReadShapefile() {
Map<String, Integer> counts = new TreeMap<>();
List<Geometry> points = new ArrayList<>();
Topology.start("test", new InMemory())
.fromGenerator("shapefile", reader.read())
.addBuffer("reader_queue", 100, 1)
.sinkToConsumer("counter", 1, elem -> {
String type = elem.getGeometry().getGeometryType();
counts.put(type, counts.getOrDefault(type, 0) + 1);
points.add(elem.getGeometry());
}).await();
assertEquals(86, points.size());
var gc = GeoUtils.gf.createGeometryCollection(points.toArray(new Geometry[0]));
var centroid = gc.getCentroid();
assertEquals(-77.0297995, centroid.getX(), 5);
assertEquals(38.9119684, centroid.getY(), 5);
public void testReadShapefileTwice() {
for (int i = 1; i <= 2; i++) {
Map<String, Integer> counts = new TreeMap<>();
List<Geometry> points = new ArrayList<>();
Topology.start("test", new InMemory())
.fromGenerator("shapefile", reader.read())
.addBuffer("reader_queue", 100, 1)
.sinkToConsumer("counter", 1, elem -> {
String type = elem.getGeometry().getGeometryType();
counts.put(type, counts.getOrDefault(type, 0) + 1);
points.add(elem.getGeometry());
}).await();
assertEquals(86, points.size());
var gc = GeoUtils.gf.createGeometryCollection(points.toArray(new Geometry[0]));
var centroid = gc.getCentroid();
assertEquals(-77.0297995, centroid.getX(), 5, "iter " + i);
assertEquals(38.9119684, centroid.getY(), 5, "iter " + i);
}
}
}