Restore non-event-stream utils.filterStreamPromise, as event-stream couldn’t deal with our streams

pull/54/merge
Candid Dauth 2016-10-30 17:34:02 +03:00
rodzic 3369ace90d
commit 5d66ffaf59
1 zmienionych plików z 51 dodań i 11 usunięć

Wyświetl plik

@ -14,17 +14,57 @@ function isInBbox(position, bbox) {
}
function filterStreamPromise(inStream, filterFunction) {
return combine(
inStream,
es.map(function(data, callback) {
filterFunction(data).then(function(newData) {
if(newData == null)
callback();
else
callback(null, newData);
}).catch(callback);
})
);
var error = false;
var ret = new stream.Readable({ objectMode: true });
var running = false;
var queue = [ ];
function handleQueue() {
if(error || running)
return;
if(queue.length > 0) {
var next = queue.shift();
if(next == null) {
ret.push(null);
} else {
running = true;
Promise.resolve(filterFunction(next)).nodeify(function(err, newData) {
running = false;
if(error)
return;
if(err) {
error = true;
ret.emit("error", err);
} else if(newData != null) {
ret.push(newData);
}
setImmediate(handleQueue);
});
}
}
}
inStream.on("data", function(data) {
if(data != null)
queue.push(data);
handleQueue();
}).on("end", function() {
queue.push(null);
handleQueue();
}).on("error", function(err) {
ret.emit("error", err);
});
ret._read = function() {
};
return ret;
}
function extend(obj1, obj2) {