diff --git a/server/utils.js b/server/utils.js index 26690b30..d4139d87 100644 --- a/server/utils.js +++ b/server/utils.js @@ -14,17 +14,57 @@ function isInBbox(position, bbox) { } function filterStreamPromise(inStream, filterFunction) { - return combine( - inStream, - es.map(function(data, callback) { - filterFunction(data).then(function(newData) { - if(newData == null) - callback(); - else - callback(null, newData); - }).catch(callback); - }) - ); + var error = false; + + var ret = new stream.Readable({ objectMode: true }); + + var running = false; + var queue = [ ]; + + function handleQueue() { + if(error || running) + return; + + if(queue.length > 0) { + var next = queue.shift(); + if(next == null) { + ret.push(null); + } else { + running = true; + Promise.resolve(filterFunction(next)).nodeify(function(err, newData) { + running = false; + + if(error) + return; + + if(err) { + error = true; + ret.emit("error", err); + } else if(newData != null) { + ret.push(newData); + } + + setImmediate(handleQueue); + }); + } + } + } + + inStream.on("data", function(data) { + if(data != null) + queue.push(data); + handleQueue(); + }).on("end", function() { + queue.push(null); + handleQueue(); + }).on("error", function(err) { + ret.emit("error", err); + }); + + ret._read = function() { + }; + + return ret; } function extend(obj1, obj2) {