var stream = require("stream"); var util = require("util"); var Promise = require("bluebird"); var es = require("event-stream"); var combine = require("stream-combiner"); function isInBbox(position, bbox) { if(position.lat > bbox.top || position.lat < bbox.bottom) return false; if(bbox.right < bbox.left) // bbox spans over lon = 180 return (position.lon > bbox.left || position.lon < bbox.right); else return (position.lon > bbox.left && position.lon < bbox.right); } function filterStreamPromise(inStream, filterFunction) { var error = false; var ret = new stream.Readable({ objectMode: true }); var running = false; var queue = [ ]; function handleQueue() { if(error || running) return; if(queue.length > 0) { var next = queue.shift(); if(next == null) { ret.push(null); } else { running = true; Promise.resolve(filterFunction(next)).nodeify(function(err, newData) { running = false; if(error) return; if(err) { error = true; ret.emit("error", err); } else if(newData != null) { ret.push(newData); } setImmediate(handleQueue); }); } } } inStream.on("data", function(data) { if(data != null) queue.push(data); handleQueue(); }).on("end", function() { queue.push(null); handleQueue(); }).on("error", function(err) { ret.emit("error", err); }); ret._read = function() { }; return ret; } function extend(obj1, obj2) { if(obj1 == null) return null; for(var i=1; i/g, '>').replace(/"/g, '"'); } function isoDate(date) { if(!date) date = new Date(); function pad(number, length) { number = "" + number; while(number.length < length) number = "0" + number; return number; } return pad(date.getUTCFullYear(), 4) + '-' + pad(date.getUTCMonth()+1, 2) + '-' + pad(date.getUTCDate(), 2) + 'T' + pad(date.getUTCHours(), 2) + ':' + pad(date.getUTCMinutes(), 2) + ':' + pad(date.getUTCSeconds(), 2) + 'Z'; } function round(number, digits) { var fac = Math.pow(10, digits); return Math.round(number*fac)/fac; } function streamToArrayPromise(stream) { return new Promise(function(resolve, reject) { var writer = es.writeArray(function(err, array) { if(err) reject(err); else resolve(array); }); stream.pipe(writer); stream.on("error", reject); }); } function promiseAuto(obj) { var promises = { }; function _get(str) { if(!obj[str]) throw new Error("Invalid dependency '" + str + "' in promiseAuto()."); if(promises[str]) return promises[str]; if(obj[str].then) return obj[str]; var params = getFuncParams(obj[str]); return promises[str] = _getDeps(params).then(function(res) { return obj[str].apply(null, params.map(function(param) { return res[param]; })); }); } function _getDeps(arr) { var deps = { }; arr.forEach(function(it) { deps[it] = _get(it); }); return Promise.props(deps); } return _getDeps(Object.keys(obj)); } function getFuncParams(func) { // Taken from angular injector code var ARROW_ARG = /^([^\(]+?)\s*=>/; var FN_ARGS = /^[^\(]*\(\s*([^\)]*)\)/m; var FN_ARG_SPLIT = /\s*,\s*/; var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg; var fnText = (Function.prototype.toString.call(func) + ' ').replace(STRIP_COMMENTS, ''); var params = (fnText.match(ARROW_ARG) || fnText.match(FN_ARGS))[1]; return params == "" ? [ ] : params.split(FN_ARG_SPLIT); } function modifyFunction(obj, prop, before, after) { var bkp = obj[prop]; obj[prop] = function() { before && before.apply(this, arguments); var ret = bkp.apply(this, arguments); after && after.apply(this, arguments); return ret; }; } /** * Intercepts the "write" and "end" methods of the given writable stream and buffers the values written to them instead. * When the stream ends, the buffered value is returned and the original "write" and "end" methods are restored. * @param writeStream {stream.Writable} * @returns {Promise.} */ function interceptWriteStream(writeStream) { return new Promise((resolve, reject) => { let response = ""; let writeBkp = writeStream.write; let sendBkp = writeStream.send; // For express response streams let endBkp = writeStream.end; writeStream.write = function(chunk, encoding, callback) { response += chunk; if(typeof encoding == "function") { encoding(); } else if(callback) { callback(); } return true; }; writeStream.send = function(body) { writeStream.end(body); }; writeStream.end = function(chunk, encoding, callback) { if(chunk) { response += chunk; } writeStream.write = writeBkp; writeStream.send = sendBkp; writeStream.end = endBkp; if(typeof encoding == "function") { writeStream.once("finish", encoding); } else if(callback) { writeStream.once("finish", encoding); } resolve(response); }; }); } module.exports = { isInBbox : isInBbox, filterStreamPromise : filterStreamPromise, extend : extend, calculateDistance : calculateDistance, generateRandomId : generateRandomId, stripObject : stripObject, ArrayStream : ArrayStream, streamEachPromise : streamEachPromise, escapeXml : escapeXml, isoDate : isoDate, round: round, streamToArrayPromise: streamToArrayPromise, promiseAuto: promiseAuto, modifyFunction: modifyFunction, interceptWriteStream: interceptWriteStream };