Pausing and resuming stream to avoid memory overflow in database migration

before-bootstrap
Candid Dauth 2014-12-28 02:13:43 +01:00
rodzic da79d6472c
commit 972ea75153
1 zmienionych plików z 24 dodań i 29 usunięć

Wyświetl plik

@ -84,49 +84,44 @@ function migrateData(title, stream, deal, callback) {
console.log("Migrating "+title); console.log("Migrating "+title);
console.log(); console.log();
var number = 0; var queue = async.queue(function(data, next) {
var ended = false;
stream.on("data", function(data) {
data = JSON.parse(JSON.stringify(data)); data = JSON.parse(JSON.stringify(data));
data.id = data._id; data.id = data._id;
delete data._id; delete data._id;
var queries = deal(data); var queries = deal(data);
var outstandingQueries = queries.length;
if(queries.length == 0) async.each(queries, function(it, next) {
return check(); it.complete(function(err) {
number++;
for(var i=0; i<queries.length; i++) {
queries[i].complete(function(err) {
err && console.error(err); err && console.error(err);
next();
if(--outstandingQueries == 0) {
number--;
check();
}
});
}
}); });
}, next);
}, 5);
stream.on("error", function(err) { var startStop = function() {
console.error(err); if(queue.length() == 0)
ended = true; stream.resume();
check(); else
stream.pause();
};
stream.on("data", function(data) {
queue.push(data, startStop);
startStop();
}); });
stream.on("end", function() { stream.on("end", function() {
ended = true; if(queue.running() == 0 && queue.length() == 0)
check(); callback();
else
queue.drain = callback;
}); });
function check() { stream.on("error", function(err) {
if(ended && number == 0) queue.kill();
callback(); callback(err);
} });
} }
var DEFAULT_MARKER_TYPE = db2._defaultTypes[0]; var DEFAULT_MARKER_TYPE = db2._defaultTypes[0];