var stream = require("stream");
var util = require("util");
var Promise = require("bluebird");
var es = require("event-stream");
var combine = require("stream-combiner");
function isInBbox(position, bbox) {
if(position.lat > bbox.top || position.lat < bbox.bottom)
return false;
if(bbox.right < bbox.left) // bbox spans over lon = 180
return (position.lon > bbox.left || position.lon < bbox.right);
else
return (position.lon > bbox.left && position.lon < bbox.right);
}
function filterStreamPromise(inStream, filterFunction) {
var error = false;
var ret = new stream.Readable({ objectMode: true });
var running = false;
var queue = [ ];
function handleQueue() {
if(error || running)
return;
if(queue.length > 0) {
var next = queue.shift();
if(next == null) {
ret.push(null);
} else {
running = true;
Promise.resolve(filterFunction(next)).nodeify(function(err, newData) {
running = false;
if(error)
return;
if(err) {
error = true;
ret.emit("error", err);
} else if(newData != null) {
ret.push(newData);
}
setImmediate(handleQueue);
});
}
}
}
inStream.on("data", function(data) {
if(data != null)
queue.push(data);
handleQueue();
}).on("end", function() {
queue.push(null);
handleQueue();
}).on("error", function(err) {
ret.emit("error", err);
});
ret._read = function() {
};
return ret;
}
function extend(obj1, obj2) {
if(obj1 == null)
return null;
for(var i=1; i/g, '>').replace(/"/g, '"');
}
function isoDate(date) {
if(!date)
date = new Date();
function pad(number, length) {
number = "" + number;
while(number.length < length)
number = "0" + number;
return number;
}
return pad(date.getUTCFullYear(), 4) + '-' + pad(date.getUTCMonth()+1, 2) + '-' + pad(date.getUTCDate(), 2) + 'T' + pad(date.getUTCHours(), 2) + ':' + pad(date.getUTCMinutes(), 2) + ':' + pad(date.getUTCSeconds(), 2) + 'Z';
}
function round(number, digits) {
var fac = Math.pow(10, digits);
return Math.round(number*fac)/fac;
}
function streamToArrayPromise(stream) {
return new Promise(function(resolve, reject) {
var writer = es.writeArray(function(err, array) {
if(err)
reject(err);
else
resolve(array);
});
stream.pipe(writer);
stream.on("error", reject);
});
}
function promiseAuto(obj) {
var promises = { };
function _get(str) {
if(!obj[str])
throw new Error("Invalid dependency '" + str + "' in promiseAuto().");
if(promises[str])
return promises[str];
if(obj[str].then)
return obj[str];
var params = getFuncParams(obj[str]);
return promises[str] = _getDeps(params).then(function(res) {
return obj[str].apply(null, params.map(function(param) { return res[param]; }));
});
}
function _getDeps(arr) {
var deps = { };
arr.forEach(function(it) {
deps[it] = _get(it);
});
return Promise.props(deps);
}
return _getDeps(Object.keys(obj));
}
function getFuncParams(func) {
// Taken from angular injector code
var ARROW_ARG = /^([^\(]+?)\s*=>/;
var FN_ARGS = /^[^\(]*\(\s*([^\)]*)\)/m;
var FN_ARG_SPLIT = /\s*,\s*/;
var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg;
var fnText = (Function.prototype.toString.call(func) + ' ').replace(STRIP_COMMENTS, '');
var params = (fnText.match(ARROW_ARG) || fnText.match(FN_ARGS))[1];
return params == "" ? [ ] : params.split(FN_ARG_SPLIT);
}
function modifyFunction(obj, prop, before, after) {
var bkp = obj[prop];
obj[prop] = function() {
before && before.apply(this, arguments);
var ret = bkp.apply(this, arguments);
after && after.apply(this, arguments);
return ret;
};
}
/**
* Intercepts the "write" and "end" methods of the given writable stream and buffers the values written to them instead.
* When the stream ends, the buffered value is returned and the original "write" and "end" methods are restored.
* @param writeStream {stream.Writable}
* @returns {Promise.}
*/
function interceptWriteStream(writeStream) {
return new Promise((resolve, reject) => {
let response = "";
let writeBkp = writeStream.write;
let sendBkp = writeStream.send; // For express response streams
let endBkp = writeStream.end;
writeStream.write = function(chunk, encoding, callback) {
response += chunk;
if(typeof encoding == "function") {
encoding();
} else if(callback) {
callback();
}
return true;
};
writeStream.send = function(body) {
writeStream.end(body);
};
writeStream.end = function(chunk, encoding, callback) {
if(chunk) {
response += chunk;
}
writeStream.write = writeBkp;
writeStream.send = sendBkp;
writeStream.end = endBkp;
if(typeof encoding == "function") {
writeStream.once("finish", encoding);
} else if(callback) {
writeStream.once("finish", encoding);
}
resolve(response);
};
});
}
module.exports = {
isInBbox : isInBbox,
filterStreamPromise : filterStreamPromise,
extend : extend,
calculateDistance : calculateDistance,
generateRandomId : generateRandomId,
stripObject : stripObject,
ArrayStream : ArrayStream,
streamEachPromise : streamEachPromise,
escapeXml : escapeXml,
isoDate : isoDate,
round: round,
streamToArrayPromise: streamToArrayPromise,
promiseAuto: promiseAuto,
modifyFunction: modifyFunction,
interceptWriteStream: interceptWriteStream
};