Improved taskNew creation by async initialization

pull/153/head
Piero Toffanin 2021-05-24 17:02:49 -04:00
rodzic 30e1038339
commit 18a714b2de
3 zmienionych plików z 193 dodań i 167 usunięć

Wyświetl plik

@ -38,9 +38,8 @@ const archiver = require('archiver');
const statusCodes = require('./statusCodes'); const statusCodes = require('./statusCodes');
module.exports = class Task{ module.exports = class Task{
constructor(uuid, name, options = [], webhook = null, skipPostProcessing = false, outputs = [], dateCreated = new Date().getTime(), done = () => {}){ constructor(uuid, name, options = [], webhook = null, skipPostProcessing = false, outputs = [], dateCreated = new Date().getTime(), imagesCountEstimate = -1){
assert(uuid !== undefined, "uuid must be set"); assert(uuid !== undefined, "uuid must be set");
assert(done !== undefined, "ready must be set");
this.uuid = uuid; this.uuid = uuid;
this.name = name !== "" ? name : "Task of " + (new Date()).toISOString(); this.name = name !== "" ? name : "Task of " + (new Date()).toISOString();
@ -58,14 +57,18 @@ module.exports = class Task{
this.skipPostProcessing = skipPostProcessing; this.skipPostProcessing = skipPostProcessing;
this.outputs = utils.parseUnsafePathsList(outputs); this.outputs = utils.parseUnsafePathsList(outputs);
this.progress = 0; this.progress = 0;
this.imagesCountEstimate = imagesCountEstimate;
async.series([ this.initialized = false;
}
initialize(done, additionalSteps = []){
async.series(additionalSteps.concat([
// Handle post-processing options logic // Handle post-processing options logic
cb => { cb => {
// If we need to post process results // If we need to post process results
// if pc-ept is supported (build entwine point cloud) // if pc-ept is supported (build entwine point cloud)
// we automatically add the pc-ept option to the task options by default // we automatically add the pc-ept option to the task options by default
if (skipPostProcessing) cb(); if (this.skipPostProcessing) cb();
else{ else{
odmInfo.supportsOption("pc-ept", (err, supported) => { odmInfo.supportsOption("pc-ept", (err, supported) => {
if (err){ if (err){
@ -113,35 +116,37 @@ module.exports = class Task{
} }
}); });
} }
], err => { ]), err => {
this.initialized = true;
done(err, this); done(err, this);
}); });
} }
static CreateFromSerialized(taskJson, done){ static CreateFromSerialized(taskJson, done){
new Task(taskJson.uuid, const task = new Task(taskJson.uuid,
taskJson.name, taskJson.name,
taskJson.options, taskJson.options,
taskJson.webhook, taskJson.webhook,
taskJson.skipPostProcessing, taskJson.skipPostProcessing,
taskJson.outputs, taskJson.outputs,
taskJson.dateCreated, taskJson.dateCreated);
(err, task) => {
if (err) done(err); task.initialize((err, task) => {
else{ if (err) done(err);
// Override default values with those else{
// provided in the taskJson // Override default values with those
for (let k in taskJson){ // provided in the taskJson
task[k] = taskJson[k]; for (let k in taskJson){
} task[k] = taskJson[k];
// Tasks that were running should be put back to QUEUED state
if (task.status.code === statusCodes.RUNNING){
task.status.code = statusCodes.QUEUED;
}
done(null, task);
} }
});
// Tasks that were running should be put back to QUEUED state
if (task.status.code === statusCodes.RUNNING){
task.status.code = statusCodes.QUEUED;
}
done(null, task);
}
});
} }
// Get path where images are stored for this task // Get path where images are stored for this task
@ -578,7 +583,7 @@ module.exports = class Task{
// Re-executes the task (by setting it's state back to QUEUED) // Re-executes the task (by setting it's state back to QUEUED)
// Only tasks that have been canceled, completed or have failed can be restarted. // Only tasks that have been canceled, completed or have failed can be restarted.
restart(options, cb){ restart(options, cb){
if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1){ if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1 && this.initialized){
this.setStatus(statusCodes.QUEUED); this.setStatus(statusCodes.QUEUED);
this.dateCreated = new Date().getTime(); this.dateCreated = new Date().getTime();
this.dateStarted = 0; this.dateStarted = 0;
@ -601,7 +606,7 @@ module.exports = class Task{
processingTime: this.processingTime, processingTime: this.processingTime,
status: this.status, status: this.status,
options: this.options, options: this.options,
imagesCount: this.images.length, imagesCount: this.images !== undefined ? this.images.length : this.imagesCountEstimate,
progress: this.progress progress: this.progress
}; };
} }

Wyświetl plik

@ -184,7 +184,7 @@ class TaskManager{
// Finds the first QUEUED task. // Finds the first QUEUED task.
findNextTaskToProcess(){ findNextTaskToProcess(){
for (let uuid in this.tasks){ for (let uuid in this.tasks){
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){ if (this.tasks[uuid].getStatus() === statusCodes.QUEUED && this.tasks[uuid].initialized){
return this.tasks[uuid]; return this.tasks[uuid];
} }
} }

Wyświetl plik

@ -30,7 +30,7 @@ const async = require('async');
const odmInfo = require('./odmInfo'); const odmInfo = require('./odmInfo');
const request = require('request'); const request = require('request');
const ziputils = require('./ziputils'); const ziputils = require('./ziputils');
const { cancelJob } = require('node-schedule'); const statusCodes = require('./statusCodes');
const download = function(uri, filename, callback) { const download = function(uri, filename, callback) {
request.head(uri, function(err, res, body) { request.head(uri, function(err, res, body) {
@ -224,10 +224,6 @@ module.exports = {
}, },
createTask: (req, res) => { createTask: (req, res) => {
// IMPROVEMENT: consider doing the file moving in the background
// and return a response more quickly instead of a long timeout.
req.setTimeout(1000 * 60 * 20);
const srcPath = path.join("tmp", req.id); const srcPath = path.join("tmp", req.id);
// Print error message and cleanup // Print error message and cleanup
@ -235,26 +231,149 @@ module.exports = {
res.json({error}); res.json({error});
removeDirectory(srcPath); removeDirectory(srcPath);
}; };
let destPath = path.join(Directories.data, req.id);
let destImagesPath = path.join(destPath, "images");
let destGcpPath = path.join(destPath, "gcp");
const checkMaxImageLimits = (cb) => {
if (!config.maxImages) cb();
else{
fs.readdir(destImagesPath, (err, files) => {
if (err) cb(err);
else if (files.length > config.maxImages) cb(new Error(`${files.length} images uploaded, but this node can only process up to ${config.maxImages}.`));
else cb();
});
}
};
let initSteps = [
// Check if dest directory already exists
cb => {
if (req.files && req.files.length > 0) {
fs.stat(destPath, (err, stat) => {
if (err && err.code === 'ENOENT') cb();
else{
// Directory already exists, this could happen
// if a previous attempt at upload failed and the user
// used set-uuid to specify the same UUID over the previous run
// Try to remove it
removeDirectory(destPath, err => {
if (err) cb(new Error(`Directory exists and we couldn't remove it.`));
else cb();
});
}
});
} else {
cb();
}
},
// Unzips zip URL to tmp/<uuid>/ (if any)
cb => {
if (req.body.zipurl) {
let archive = "zipurl.zip";
upload.storage.getDestination(req, archive, (err, dstPath) => {
if (err) cb(err);
else{
let archiveDestPath = path.join(dstPath, archive);
download(req.body.zipurl, archiveDestPath, cb);
}
});
} else {
cb();
}
},
// Move all uploads to data/<uuid>/images dir (if any)
cb => fs.mkdir(destPath, undefined, cb),
cb => fs.mkdir(destGcpPath, undefined, cb),
cb => mv(srcPath, destImagesPath, cb),
// Zip files handling
cb => {
const handleSeed = (cb) => {
const seedFileDst = path.join(destPath, "seed.zip");
async.series([
// Move to project root
cb => mv(path.join(destImagesPath, "seed.zip"), seedFileDst, cb),
// Extract
cb => {
ziputils.unzip(seedFileDst, destPath, cb);
},
// Remove
cb => {
fs.exists(seedFileDst, exists => {
if (exists) fs.unlink(seedFileDst, cb);
else cb();
});
}
], cb);
}
const handleZipUrl = (cb) => {
// Extract images
ziputils.unzip(path.join(destImagesPath, "zipurl.zip"),
destImagesPath,
cb, true);
}
// Find and handle zip files and extract
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else {
async.eachSeries(entries, (entry, cb) => {
if (entry === "seed.zip"){
handleSeed(cb);
}else if (entry === "zipurl.zip") {
handleZipUrl(cb);
} else cb();
}, cb);
}
});
},
// Verify max images limit
cb => {
checkMaxImageLimits(cb);
},
cb => {
// Find any *.txt (GCP) file and move it to the data/<uuid>/gcp directory
// also remove any lingering zipurl.zip
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else {
async.eachSeries(entries, (entry, cb) => {
if (/\.txt$/gi.test(entry)) {
mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb);
}else if (/\.zip$/gi.test(entry)){
fs.unlink(path.join(destImagesPath, entry), cb);
} else cb();
}, cb);
}
});
}
];
if (req.error !== undefined){ if (req.error !== undefined){
die(req.error); die(req.error);
}else{ }else{
let destPath = path.join(Directories.data, req.id); let imagesCountEstimate = -1;
let destImagesPath = path.join(destPath, "images");
let destGcpPath = path.join(destPath, "gcp");
const checkMaxImageLimits = (cb) => {
if (!config.maxImages) cb();
else{
fs.readdir(destImagesPath, (err, files) => {
if (err) cb(err);
else if (files.length > config.maxImages) cb(new Error(`${files.length} images uploaded, but this node can only process up to ${config.maxImages}.`));
else cb();
});
}
};
async.series([ async.series([
cb => {
// Basic path check
fs.exists(srcPath, exists => {
if (exists) cb();
else cb(new Error(`Invalid UUID`));
});
},
cb => { cb => {
odmInfo.filterOptions(req.body.options, (err, options) => { odmInfo.filterOptions(req.body.options, (err, options) => {
if (err) cb(err); if (err) cb(err);
@ -264,134 +383,36 @@ module.exports = {
} }
}); });
}, },
// Check if dest directory already exists
cb => { cb => {
if (req.files && req.files.length > 0) { fs.readdir(srcPath, (err, entries) => {
fs.stat(destPath, (err, stat) => { if (!err) imagesCountEstimate = entries.length;
if (err && err.code === 'ENOENT') cb();
else{
// Directory already exists, this could happen
// if a previous attempt at upload failed and the user
// used set-uuid to specify the same UUID over the previous run
// Try to remove it
removeDirectory(destPath, err => {
if (err) cb(new Error(`Directory exists and we couldn't remove it.`));
else cb();
});
}
});
} else {
cb(); cb();
}
},
// Unzips zip URL to tmp/<uuid>/ (if any)
cb => {
if (req.body.zipurl) {
let archive = "zipurl.zip";
upload.storage.getDestination(req, archive, (err, dstPath) => {
if (err) cb(err);
else{
let archiveDestPath = path.join(dstPath, archive);
download(req.body.zipurl, archiveDestPath, cb);
}
});
} else {
cb();
}
},
// Move all uploads to data/<uuid>/images dir (if any)
cb => fs.mkdir(destPath, undefined, cb),
cb => fs.mkdir(destGcpPath, undefined, cb),
cb => mv(srcPath, destImagesPath, cb),
// Zip files handling
cb => {
const handleSeed = (cb) => {
const seedFileDst = path.join(destPath, "seed.zip");
async.series([
// Move to project root
cb => mv(path.join(destImagesPath, "seed.zip"), seedFileDst, cb),
// Extract
cb => {
ziputils.unzip(seedFileDst, destPath, cb);
},
// Remove
cb => {
fs.exists(seedFileDst, exists => {
if (exists) fs.unlink(seedFileDst, cb);
else cb();
});
}
], cb);
}
const handleZipUrl = (cb) => {
// Extract images
ziputils.unzip(path.join(destImagesPath, "zipurl.zip"),
destImagesPath,
cb, true);
}
// Find and handle zip files and extract
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else {
async.eachSeries(entries, (entry, cb) => {
if (entry === "seed.zip"){
handleSeed(cb);
}else if (entry === "zipurl.zip") {
handleZipUrl(cb);
} else cb();
}, cb);
}
}); });
}, },
// Verify max images limit
cb => { cb => {
checkMaxImageLimits(cb); const task = new Task(req.id, req.body.name, req.body.options,
}, req.body.webhook,
req.body.skipPostProcessing === 'true',
req.body.outputs,
req.body.dateCreated,
imagesCountEstimate
);
TaskManager.singleton().addNew(task);
res.json({ uuid: req.id });
cb();
cb => { // We return a UUID right away but continue
// Find any *.txt (GCP) file and move it to the data/<uuid>/gcp directory // doing processing in the background
// also remove any lingering zipurl.zip
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else {
async.eachSeries(entries, (entry, cb) => {
if (/\.txt$/gi.test(entry)) {
mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb);
}else if (/\.zip$/gi.test(entry)){
fs.unlink(path.join(destImagesPath, entry), cb);
} else cb();
}, cb);
}
});
},
// Create task task.initialize(err => {
cb => { if (err) {
new Task(req.id, req.body.name, req.body.options, task.setStatus(statusCodes.FAILED, { errorMessage: err.message });
req.body.webhook,
req.body.skipPostProcessing === 'true', // Cleanup
req.body.outputs, removeDirectory(srcPath);
req.body.dateCreated, removeDirectory(destPath);
(err, task) => { } else TaskManager.singleton().processNextTask();
if (err) cb(err); }, initSteps);
else {
TaskManager.singleton().addNew(task);
res.json({ uuid: req.id });
cb();
}
});
} }
], err => { ], err => {
if (err) die(err.message); if (err) die(err.message);