diff --git a/libs/Task.js b/libs/Task.js index c45644c..08114a5 100644 --- a/libs/Task.js +++ b/libs/Task.js @@ -38,9 +38,8 @@ const archiver = require('archiver'); const statusCodes = require('./statusCodes'); module.exports = class Task{ - constructor(uuid, name, options = [], webhook = null, skipPostProcessing = false, outputs = [], dateCreated = new Date().getTime(), done = () => {}){ + constructor(uuid, name, options = [], webhook = null, skipPostProcessing = false, outputs = [], dateCreated = new Date().getTime(), imagesCountEstimate = -1){ assert(uuid !== undefined, "uuid must be set"); - assert(done !== undefined, "ready must be set"); this.uuid = uuid; this.name = name !== "" ? name : "Task of " + (new Date()).toISOString(); @@ -58,14 +57,18 @@ module.exports = class Task{ this.skipPostProcessing = skipPostProcessing; this.outputs = utils.parseUnsafePathsList(outputs); this.progress = 0; - - async.series([ + this.imagesCountEstimate = imagesCountEstimate; + this.initialized = false; + } + + initialize(done, additionalSteps = []){ + async.series(additionalSteps.concat([ // Handle post-processing options logic cb => { // If we need to post process results // if pc-ept is supported (build entwine point cloud) // we automatically add the pc-ept option to the task options by default - if (skipPostProcessing) cb(); + if (this.skipPostProcessing) cb(); else{ odmInfo.supportsOption("pc-ept", (err, supported) => { if (err){ @@ -113,35 +116,37 @@ module.exports = class Task{ } }); } - ], err => { + ]), err => { + this.initialized = true; done(err, this); }); } static CreateFromSerialized(taskJson, done){ - new Task(taskJson.uuid, + const task = new Task(taskJson.uuid, taskJson.name, - taskJson.options, + taskJson.options, taskJson.webhook, taskJson.skipPostProcessing, taskJson.outputs, - taskJson.dateCreated, - (err, task) => { - if (err) done(err); - else{ - // Override default values with those - // provided in the taskJson - for (let k in taskJson){ - task[k] = taskJson[k]; - } - - // Tasks that were running should be put back to QUEUED state - if (task.status.code === statusCodes.RUNNING){ - task.status.code = statusCodes.QUEUED; - } - done(null, task); + taskJson.dateCreated); + + task.initialize((err, task) => { + if (err) done(err); + else{ + // Override default values with those + // provided in the taskJson + for (let k in taskJson){ + task[k] = taskJson[k]; } - }); + + // Tasks that were running should be put back to QUEUED state + if (task.status.code === statusCodes.RUNNING){ + task.status.code = statusCodes.QUEUED; + } + done(null, task); + } + }); } // Get path where images are stored for this task @@ -578,7 +583,7 @@ module.exports = class Task{ // Re-executes the task (by setting it's state back to QUEUED) // Only tasks that have been canceled, completed or have failed can be restarted. restart(options, cb){ - if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1){ + if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1 && this.initialized){ this.setStatus(statusCodes.QUEUED); this.dateCreated = new Date().getTime(); this.dateStarted = 0; @@ -601,7 +606,7 @@ module.exports = class Task{ processingTime: this.processingTime, status: this.status, options: this.options, - imagesCount: this.images.length, + imagesCount: this.images !== undefined ? this.images.length : this.imagesCountEstimate, progress: this.progress }; } diff --git a/libs/TaskManager.js b/libs/TaskManager.js index a0fffcb..c0976df 100644 --- a/libs/TaskManager.js +++ b/libs/TaskManager.js @@ -184,7 +184,7 @@ class TaskManager{ // Finds the first QUEUED task. findNextTaskToProcess(){ for (let uuid in this.tasks){ - if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){ + if (this.tasks[uuid].getStatus() === statusCodes.QUEUED && this.tasks[uuid].initialized){ return this.tasks[uuid]; } } diff --git a/libs/taskNew.js b/libs/taskNew.js index 9692d54..11d1d51 100644 --- a/libs/taskNew.js +++ b/libs/taskNew.js @@ -30,7 +30,7 @@ const async = require('async'); const odmInfo = require('./odmInfo'); const request = require('request'); const ziputils = require('./ziputils'); -const { cancelJob } = require('node-schedule'); +const statusCodes = require('./statusCodes'); const download = function(uri, filename, callback) { request.head(uri, function(err, res, body) { @@ -224,10 +224,6 @@ module.exports = { }, createTask: (req, res) => { - // IMPROVEMENT: consider doing the file moving in the background - // and return a response more quickly instead of a long timeout. - req.setTimeout(1000 * 60 * 20); - const srcPath = path.join("tmp", req.id); // Print error message and cleanup @@ -235,26 +231,149 @@ module.exports = { res.json({error}); removeDirectory(srcPath); }; + + let destPath = path.join(Directories.data, req.id); + let destImagesPath = path.join(destPath, "images"); + let destGcpPath = path.join(destPath, "gcp"); + + const checkMaxImageLimits = (cb) => { + if (!config.maxImages) cb(); + else{ + fs.readdir(destImagesPath, (err, files) => { + if (err) cb(err); + else if (files.length > config.maxImages) cb(new Error(`${files.length} images uploaded, but this node can only process up to ${config.maxImages}.`)); + else cb(); + }); + } + }; + + let initSteps = [ + // Check if dest directory already exists + cb => { + if (req.files && req.files.length > 0) { + fs.stat(destPath, (err, stat) => { + if (err && err.code === 'ENOENT') cb(); + else{ + // Directory already exists, this could happen + // if a previous attempt at upload failed and the user + // used set-uuid to specify the same UUID over the previous run + // Try to remove it + removeDirectory(destPath, err => { + if (err) cb(new Error(`Directory exists and we couldn't remove it.`)); + else cb(); + }); + } + }); + } else { + cb(); + } + }, + + // Unzips zip URL to tmp// (if any) + cb => { + if (req.body.zipurl) { + let archive = "zipurl.zip"; + + upload.storage.getDestination(req, archive, (err, dstPath) => { + if (err) cb(err); + else{ + let archiveDestPath = path.join(dstPath, archive); + + download(req.body.zipurl, archiveDestPath, cb); + } + }); + } else { + cb(); + } + }, + + // Move all uploads to data//images dir (if any) + cb => fs.mkdir(destPath, undefined, cb), + cb => fs.mkdir(destGcpPath, undefined, cb), + cb => mv(srcPath, destImagesPath, cb), + + // Zip files handling + cb => { + const handleSeed = (cb) => { + const seedFileDst = path.join(destPath, "seed.zip"); + + async.series([ + // Move to project root + cb => mv(path.join(destImagesPath, "seed.zip"), seedFileDst, cb), + + // Extract + cb => { + ziputils.unzip(seedFileDst, destPath, cb); + }, + + // Remove + cb => { + fs.exists(seedFileDst, exists => { + if (exists) fs.unlink(seedFileDst, cb); + else cb(); + }); + } + ], cb); + } + + const handleZipUrl = (cb) => { + // Extract images + ziputils.unzip(path.join(destImagesPath, "zipurl.zip"), + destImagesPath, + cb, true); + } + + // Find and handle zip files and extract + fs.readdir(destImagesPath, (err, entries) => { + if (err) cb(err); + else { + async.eachSeries(entries, (entry, cb) => { + if (entry === "seed.zip"){ + handleSeed(cb); + }else if (entry === "zipurl.zip") { + handleZipUrl(cb); + } else cb(); + }, cb); + } + }); + }, + + // Verify max images limit + cb => { + checkMaxImageLimits(cb); + }, + + cb => { + // Find any *.txt (GCP) file and move it to the data//gcp directory + // also remove any lingering zipurl.zip + fs.readdir(destImagesPath, (err, entries) => { + if (err) cb(err); + else { + async.eachSeries(entries, (entry, cb) => { + if (/\.txt$/gi.test(entry)) { + mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb); + }else if (/\.zip$/gi.test(entry)){ + fs.unlink(path.join(destImagesPath, entry), cb); + } else cb(); + }, cb); + } + }); + } + ]; if (req.error !== undefined){ die(req.error); }else{ - let destPath = path.join(Directories.data, req.id); - let destImagesPath = path.join(destPath, "images"); - let destGcpPath = path.join(destPath, "gcp"); - - const checkMaxImageLimits = (cb) => { - if (!config.maxImages) cb(); - else{ - fs.readdir(destImagesPath, (err, files) => { - if (err) cb(err); - else if (files.length > config.maxImages) cb(new Error(`${files.length} images uploaded, but this node can only process up to ${config.maxImages}.`)); - else cb(); - }); - } - }; + let imagesCountEstimate = -1; async.series([ + cb => { + // Basic path check + fs.exists(srcPath, exists => { + if (exists) cb(); + else cb(new Error(`Invalid UUID`)); + }); + }, cb => { odmInfo.filterOptions(req.body.options, (err, options) => { if (err) cb(err); @@ -264,134 +383,36 @@ module.exports = { } }); }, - - // Check if dest directory already exists cb => { - if (req.files && req.files.length > 0) { - fs.stat(destPath, (err, stat) => { - if (err && err.code === 'ENOENT') cb(); - else{ - // Directory already exists, this could happen - // if a previous attempt at upload failed and the user - // used set-uuid to specify the same UUID over the previous run - // Try to remove it - removeDirectory(destPath, err => { - if (err) cb(new Error(`Directory exists and we couldn't remove it.`)); - else cb(); - }); - } - }); - } else { + fs.readdir(srcPath, (err, entries) => { + if (!err) imagesCountEstimate = entries.length; cb(); - } - }, - - // Unzips zip URL to tmp// (if any) - cb => { - if (req.body.zipurl) { - let archive = "zipurl.zip"; - - upload.storage.getDestination(req, archive, (err, dstPath) => { - if (err) cb(err); - else{ - let archiveDestPath = path.join(dstPath, archive); - - download(req.body.zipurl, archiveDestPath, cb); - } - }); - } else { - cb(); - } - }, - - // Move all uploads to data//images dir (if any) - cb => fs.mkdir(destPath, undefined, cb), - cb => fs.mkdir(destGcpPath, undefined, cb), - cb => mv(srcPath, destImagesPath, cb), - - // Zip files handling - cb => { - const handleSeed = (cb) => { - const seedFileDst = path.join(destPath, "seed.zip"); - - async.series([ - // Move to project root - cb => mv(path.join(destImagesPath, "seed.zip"), seedFileDst, cb), - - // Extract - cb => { - ziputils.unzip(seedFileDst, destPath, cb); - }, - - // Remove - cb => { - fs.exists(seedFileDst, exists => { - if (exists) fs.unlink(seedFileDst, cb); - else cb(); - }); - } - ], cb); - } - - const handleZipUrl = (cb) => { - // Extract images - ziputils.unzip(path.join(destImagesPath, "zipurl.zip"), - destImagesPath, - cb, true); - } - - // Find and handle zip files and extract - fs.readdir(destImagesPath, (err, entries) => { - if (err) cb(err); - else { - async.eachSeries(entries, (entry, cb) => { - if (entry === "seed.zip"){ - handleSeed(cb); - }else if (entry === "zipurl.zip") { - handleZipUrl(cb); - } else cb(); - }, cb); - } }); }, - - // Verify max images limit cb => { - checkMaxImageLimits(cb); - }, + const task = new Task(req.id, req.body.name, req.body.options, + req.body.webhook, + req.body.skipPostProcessing === 'true', + req.body.outputs, + req.body.dateCreated, + imagesCountEstimate + ); + TaskManager.singleton().addNew(task); + res.json({ uuid: req.id }); + cb(); - cb => { - // Find any *.txt (GCP) file and move it to the data//gcp directory - // also remove any lingering zipurl.zip - fs.readdir(destImagesPath, (err, entries) => { - if (err) cb(err); - else { - async.eachSeries(entries, (entry, cb) => { - if (/\.txt$/gi.test(entry)) { - mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb); - }else if (/\.zip$/gi.test(entry)){ - fs.unlink(path.join(destImagesPath, entry), cb); - } else cb(); - }, cb); - } - }); - }, + // We return a UUID right away but continue + // doing processing in the background - // Create task - cb => { - new Task(req.id, req.body.name, req.body.options, - req.body.webhook, - req.body.skipPostProcessing === 'true', - req.body.outputs, - req.body.dateCreated, - (err, task) => { - if (err) cb(err); - else { - TaskManager.singleton().addNew(task); - res.json({ uuid: req.id }); - cb(); - } - }); + task.initialize(err => { + if (err) { + task.setStatus(statusCodes.FAILED, { errorMessage: err.message }); + + // Cleanup + removeDirectory(srcPath); + removeDirectory(destPath); + } else TaskManager.singleton().processNextTask(); + }, initSteps); } ], err => { if (err) die(err.message);