From 93d59292dd905b3efc2c4068ebfd06467cc852cd Mon Sep 17 00:00:00 2001 From: Lee Pepper Date: Fri, 7 Apr 2017 10:55:24 -0600 Subject: [PATCH] Modify: Dockerfile for local build Add: Webhook callback option Modify: .gitignore for vscode --- .dockerignore | 3 + .gitignore | 2 + Dockerfile | 7 +- index.js | 833 ++++++++++++++++++++++---------------------- libs/Task.js | 788 ++++++++++++++++++++--------------------- libs/TaskManager.js | 471 ++++++++++++++----------- libs/odmOptions.js | 421 +++++++++++----------- package.json | 1 + 8 files changed, 1320 insertions(+), 1206 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0b1d14f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +node_modules +tests +tmp diff --git a/.gitignore b/.gitignore index 3eb8312..333246f 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,5 @@ jspm_packages # Elastic Beanstalk .elasticbeanstalk + +.vscode diff --git a/Dockerfile b/Dockerfile index bf79487..441bea7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,8 +30,13 @@ RUN cd /staging/PotreeConverter && \ RUN mkdir /var/www WORKDIR "/var/www" -RUN git clone https://github.com/OpenDroneMap/node-OpenDroneMap . +# RUN git clone https://github.com/OpenDroneMap/node-OpenDroneMap . + +COPY . /var/www + + RUN npm install +RUN mkdir tmp # Fix old version of gdal2tiles.py # RUN (cd / && patch -p0) { - let dstPath = path.join("tmp", req.id); - fs.exists(dstPath, exists => { - if (!exists){ - fs.mkdir(dstPath, undefined, () => { - cb(null, dstPath); - }); - }else{ - cb(null, dstPath); - } - }); - }, - filename: (req, file, cb) => { - cb(null, file.originalname); - } - }) + storage: multer.diskStorage({ + destination: (req, file, cb) => { + let dstPath = path.join("tmp", req.id); + fs.exists(dstPath, exists => { + if (!exists) { + fs.mkdir(dstPath, undefined, () => { + cb(null, dstPath); + }); + } else { + cb(null, dstPath); + } + }); + }, + filename: (req, file, cb) => { + cb(null, file.originalname); + } + }) }); let taskManager; let server; /** @swagger -* /task/new: -* post: -* description: Creates a new task and places it at the end of the processing queue -* tags: [task] -* consumes: -* - multipart/form-data -* parameters: -* - -* name: images -* in: formData -* description: Images to process, plus an optional GPC file. If included, the GPC file should have .txt extension -* required: true -* type: file -* - -* name: name -* in: formData -* description: An optional name to be associated with the task -* required: false -* type: string -* - -* name: options -* in: formData -* description: 'Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, ...]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options' -* required: false -* type: string -* responses: -* 200: -* description: Success -* schema: -* type: object -* required: [uuid] -* properties: -* uuid: -* type: string -* description: UUID of the newly created task -* default: -* description: Error -* schema: -* $ref: '#/definitions/Error' -*/ + * /task/new: + * post: + * description: Creates a new task and places it at the end of the processing queue + * tags: [task] + * consumes: + * - multipart/form-data + * parameters: + * - + * name: images + * in: formData + * description: Images to process, plus an optional GPC file. If included, the GPC file should have .txt extension + * required: true + * type: file + * - + * name: name + * in: formData + * description: An optional name to be associated with the task + * required: false + * type: string + * - + * name: options + * in: formData + * description: 'Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, ...]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options' + * required: false + * type: string + * responses: + * 200: + * description: Success + * schema: + * type: object + * required: [uuid] + * properties: + * uuid: + * type: string + * description: UUID of the newly created task + * default: + * description: Error + * schema: + * $ref: '#/definitions/Error' + */ app.post('/task/new', addRequestId, upload.array('images'), (req, res) => { - if (!req.files || req.files.length === 0) res.json({error: "Need at least 1 file."}); - else{ - let srcPath = path.join("tmp", req.id); - let destPath = path.join(Directories.data, req.id); - let destImagesPath = path.join(destPath, "images"); - let destGpcPath = path.join(destPath, "gpc"); + if (!req.files || req.files.length === 0) res.json({ error: "Need at least 1 file." }); + else { + let srcPath = path.join("tmp", req.id); + let destPath = path.join(Directories.data, req.id); + let destImagesPath = path.join(destPath, "images"); + let destGpcPath = path.join(destPath, "gpc"); - async.series([ - cb => { - odmOptions.filterOptions(req.body.options, (err, options) => { - if (err) cb(err); - else{ - req.body.options = options; - cb(null); - } - }); - }, + async.series([ + cb => { + odmOptions.filterOptions(req.body.options, (err, options) => { + if (err) cb(err); + else { + req.body.options = options; + cb(null); + } + }); + }, - // Move all uploads to data//images dir - cb => { - fs.stat(destPath, (err, stat) => { - if (err && err.code === 'ENOENT') cb(); - else cb(new Error(`Directory exists (should not have happened: ${err.code})`)); - }); - }, - cb => fs.mkdir(destPath, undefined, cb), - cb => fs.mkdir(destGpcPath, undefined, cb), - cb => fs.rename(srcPath, destImagesPath, cb), - cb => { - // Find any *.txt (GPC) file and move it to the data//gpc directory - fs.readdir(destImagesPath, (err, entries) => { - if (err) cb(err); - else{ - async.eachSeries(entries, (entry, cb) => { - if (/\.txt$/gi.test(entry)){ - fs.rename(path.join(destImagesPath, entry), path.join(destGpcPath, entry), cb); - }else cb(); - }, cb); - } - }); - }, + // Move all uploads to data//images dir + cb => { + setTimeout(function() { + fs.stat(destPath, (err, stat) => { + if (err && err.code === 'ENOENT') cb(); + else cb(new Error(`Directory exists (should not have happened: ${err.code})`)); + }); + }, 300); + }, + cb => fs.mkdir(destPath, undefined, cb), + cb => fs.mkdir(destGpcPath, undefined, cb), + cb => fs.rename(srcPath, destImagesPath, cb), + cb => { + // Find any *.txt (GPC) file and move it to the data//gpc directory + fs.readdir(destImagesPath, (err, entries) => { + if (err) cb(err); + else { + async.eachSeries(entries, (entry, cb) => { + if (/\.txt$/gi.test(entry)) { + fs.rename(path.join(destImagesPath, entry), path.join(destGpcPath, entry), cb); + } else cb(); + }, cb); + } + }); + }, - // Create task - cb => { - new Task(req.id, req.body.name, (err, task) => { - if (err) cb(err); - else{ - taskManager.addNew(task); - res.json({uuid: req.id}); - cb(); - } - }, req.body.options); - } - ], err => { - if (err) res.json({error: err.message}); - }); - } + // Create task + cb => { + new Task(req.id, req.body.name, (err, task) => { + if (err) cb(err); + else { + taskManager.addNew(task); + res.json({ uuid: req.id }); + cb(); + } + }, req.body.options); + } + ], err => { + if (err) res.json({ error: err.message }); + }); + } }); let getTaskFromUuid = (req, res, next) => { - let task = taskManager.find(req.params.uuid); - if (task){ - req.task = task; - next(); - }else res.json({error: `${req.params.uuid} not found`}); + let task = taskManager.find(req.params.uuid); + if (task) { + req.task = task; + next(); + } else res.json({ error: `${req.params.uuid} not found` }); }; /** @swagger -* /task/{uuid}/info: -* get: -* description: Gets information about this task, such as name, creation date, processing time, status, command line options and number of images being processed. See schema definition for a full list. -* tags: [task] -* parameters: -* - -* name: uuid -* in: path -* description: UUID of the task -* required: true -* type: string -* responses: -* 200: -* description: Task Information -* schema: -* title: TaskInfo -* type: object -* required: [uuid, name, dateCreated, processingTime, status, options, imagesCount] -* properties: -* uuid: -* type: string -* description: UUID -* name: -* type: string -* description: Name -* dateCreated: -* type: integer -* description: Timestamp -* processingTime: -* type: integer -* description: Milliseconds that have elapsed since the task started being processed. -* status: -* type: integer -* description: Status code (10 = QUEUED, 20 = RUNNING, 30 = FAILED, 40 = COMPLETED, 50 = CANCELED) -* enum: [10, 20, 30, 40, 50] -* options: -* type: array -* description: List of options used to process this task -* items: -* type: object -* required: [name, value] -* properties: -* name: -* type: string -* description: 'Option name (example: "odm_meshing-octreeDepth")' -* value: -* type: string -* description: 'Value (example: 9)' -* imagesCount: -* type: integer -* description: Number of images -* default: -* description: Error -* schema: -* $ref: '#/definitions/Error' -*/ + * /task/{uuid}/info: + * get: + * description: Gets information about this task, such as name, creation date, processing time, status, command line options and number of images being processed. See schema definition for a full list. + * tags: [task] + * parameters: + * - + * name: uuid + * in: path + * description: UUID of the task + * required: true + * type: string + * responses: + * 200: + * description: Task Information + * schema: + * title: TaskInfo + * type: object + * required: [uuid, name, dateCreated, processingTime, status, options, imagesCount] + * properties: + * uuid: + * type: string + * description: UUID + * name: + * type: string + * description: Name + * dateCreated: + * type: integer + * description: Timestamp + * processingTime: + * type: integer + * description: Milliseconds that have elapsed since the task started being processed. + * status: + * type: integer + * description: Status code (10 = QUEUED, 20 = RUNNING, 30 = FAILED, 40 = COMPLETED, 50 = CANCELED) + * enum: [10, 20, 30, 40, 50] + * options: + * type: array + * description: List of options used to process this task + * items: + * type: object + * required: [name, value] + * properties: + * name: + * type: string + * description: 'Option name (example: "odm_meshing-octreeDepth")' + * value: + * type: string + * description: 'Value (example: 9)' + * imagesCount: + * type: integer + * description: Number of images + * default: + * description: Error + * schema: + * $ref: '#/definitions/Error' + */ app.get('/task/:uuid/info', getTaskFromUuid, (req, res) => { - res.json(req.task.getInfo()); + res.json(req.task.getInfo()); }); /** @swagger -* /task/{uuid}/output: -* get: -* description: Retrieves the console output of the OpenDroneMap's process. Useful for monitoring execution and to provide updates to the user. -* tags: [task] -* parameters: -* - -* name: uuid -* in: path -* description: UUID of the task -* required: true -* type: string -* - -* name: line -* in: query -* description: Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Defaults to 0 (retrieve all console output). -* default: 0 -* required: false -* type: integer -* responses: -* 200: -* description: Console Output -* schema: -* type: string -* default: -* description: Error -* schema: -* $ref: '#/definitions/Error' -*/ + * /task/{uuid}/output: + * get: + * description: Retrieves the console output of the OpenDroneMap's process. Useful for monitoring execution and to provide updates to the user. + * tags: [task] + * parameters: + * - + * name: uuid + * in: path + * description: UUID of the task + * required: true + * type: string + * - + * name: line + * in: query + * description: Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Defaults to 0 (retrieve all console output). + * default: 0 + * required: false + * type: integer + * responses: + * 200: + * description: Console Output + * schema: + * type: string + * default: + * description: Error + * schema: + * $ref: '#/definitions/Error' + */ app.get('/task/:uuid/output', getTaskFromUuid, (req, res) => { - res.json(req.task.getOutput(req.query.line)); + res.json(req.task.getOutput(req.query.line)); }); /** @swagger -* /task/{uuid}/download/{asset}: -* get: -* description: Retrieves an asset (the output of OpenDroneMap's processing) associated with a task -* tags: [task] -* produces: [application/zip] -* parameters: -* - name: uuid -* in: path -* type: string -* description: UUID of the task -* required: true -* - name: asset -* in: path -* type: string -* description: Type of asset to download. Use "all.zip" for zip file containing all assets. -* required: true -* enum: -* - all.zip -* - orthophoto.tif -* responses: -* 200: -* description: Asset File -* schema: -* type: file -* default: -* description: Error message -* schema: -* $ref: '#/definitions/Error' -*/ + * /task/{uuid}/download/{asset}: + * get: + * description: Retrieves an asset (the output of OpenDroneMap's processing) associated with a task + * tags: [task] + * produces: [application/zip] + * parameters: + * - name: uuid + * in: path + * type: string + * description: UUID of the task + * required: true + * - name: asset + * in: path + * type: string + * description: Type of asset to download. Use "all.zip" for zip file containing all assets. + * required: true + * enum: + * - all.zip + * - orthophoto.tif + * responses: + * 200: + * description: Asset File + * schema: + * type: file + * default: + * description: Error message + * schema: + * $ref: '#/definitions/Error' + */ app.get('/task/:uuid/download/:asset', getTaskFromUuid, (req, res) => { - let asset = req.params.asset !== undefined ? req.params.asset : "all.zip"; - let filePath = req.task.getAssetsArchivePath(asset); - if (filePath){ - if (fs.existsSync(filePath)){ - res.setHeader('Content-Disposition', `attachment; filename=${asset}`); - res.setHeader('Content-Type', mime.lookup(asset)); - res.setHeader('Content-Length', fs.statSync(filePath)["size"]); + let asset = req.params.asset !== undefined ? req.params.asset : "all.zip"; + let filePath = req.task.getAssetsArchivePath(asset); + if (filePath) { + if (fs.existsSync(filePath)) { + res.setHeader('Content-Disposition', `attachment; filename=${asset}`); + res.setHeader('Content-Type', mime.lookup(asset)); + res.setHeader('Content-Length', fs.statSync(filePath)["size"]); - const filestream = fs.createReadStream(filePath); - filestream.pipe(res); - }else{ - res.json({error: "Asset not ready"}); - } - }else{ - res.json({error: "Invalid asset"}); - } + const filestream = fs.createReadStream(filePath); + filestream.pipe(res); + } else { + res.json({ error: "Asset not ready" }); + } + } else { + res.json({ error: "Invalid asset" }); + } }); /** @swagger -* definition: -* Error: -* type: object -* required: -* - error -* properties: -* error: -* type: string -* description: Description of the error -* Response: -* type: object -* required: -* - success -* properties: -* success: -* type: boolean -* description: true if the command succeeded, false otherwise -* error: -* type: string -* description: Error message if an error occured -*/ + * definition: + * Error: + * type: object + * required: + * - error + * properties: + * error: + * type: string + * description: Description of the error + * Response: + * type: object + * required: + * - success + * properties: + * success: + * type: boolean + * description: true if the command succeeded, false otherwise + * error: + * type: string + * description: Error message if an error occured + */ let uuidCheck = (req, res, next) => { - if (!req.body.uuid) res.json({error: "uuid param missing."}); + if (!req.body.uuid) res.json({ error: "uuid param missing." }); else next(); }; let successHandler = res => { - return err => { - if (!err) res.json({success: true}); - else res.json({success: false, error: err.message}); - }; + return err => { + if (!err) res.json({ success: true }); + else res.json({ success: false, error: err.message }); + }; }; /** @swagger -* /task/cancel: -* post: -* description: Cancels a task (stops its execution, or prevents it from being executed) -* parameters: -* - -* name: uuid -* in: body -* description: UUID of the task -* required: true -* schema: -* type: string -* responses: -* 200: -* description: Command Received -* schema: -* $ref: "#/definitions/Response" -*/ + * /task/cancel: + * post: + * description: Cancels a task (stops its execution, or prevents it from being executed) + * parameters: + * - + * name: uuid + * in: body + * description: UUID of the task + * required: true + * schema: + * type: string + * responses: + * 200: + * description: Command Received + * schema: + * $ref: "#/definitions/Response" + */ app.post('/task/cancel', uuidCheck, (req, res) => { - taskManager.cancel(req.body.uuid, successHandler(res)); + taskManager.cancel(req.body.uuid, successHandler(res)); }); /** @swagger -* /task/remove: -* post: -* description: Removes a task and deletes all of its assets -* parameters: -* - -* name: uuid -* in: body -* description: UUID of the task -* required: true -* schema: -* type: string -* responses: -* 200: -* description: Command Received -* schema: -* $ref: "#/definitions/Response" -*/ + * /task/remove: + * post: + * description: Removes a task and deletes all of its assets + * parameters: + * - + * name: uuid + * in: body + * description: UUID of the task + * required: true + * schema: + * type: string + * responses: + * 200: + * description: Command Received + * schema: + * $ref: "#/definitions/Response" + */ app.post('/task/remove', uuidCheck, (req, res) => { - taskManager.remove(req.body.uuid, successHandler(res)); + taskManager.remove(req.body.uuid, successHandler(res)); }); /** @swagger -* /task/restart: -* post: -* description: Restarts a task that was previously canceled or that had failed to process -* parameters: -* - -* name: uuid -* in: body -* description: UUID of the task -* required: true -* schema: -* type: string -* responses: -* 200: -* description: Command Received -* schema: -* $ref: "#/definitions/Response" -*/ + * /task/restart: + * post: + * description: Restarts a task that was previously canceled or that had failed to process + * parameters: + * - + * name: uuid + * in: body + * description: UUID of the task + * required: true + * schema: + * type: string + * responses: + * 200: + * description: Command Received + * schema: + * $ref: "#/definitions/Response" + */ app.post('/task/restart', uuidCheck, (req, res) => { - taskManager.restart(req.body.uuid, successHandler(res)); + taskManager.restart(req.body.uuid, successHandler(res)); }); /** @swagger -* /options: -* get: -* description: Retrieves the command line options that can be passed to process a task -* tags: [server] -* responses: -* 200: -* description: Options -* schema: -* type: array -* items: -* title: Option -* type: object -* required: [name, type, value, domain, help] -* properties: -* name: -* type: string -* description: Command line option (exactly as it is passed to the OpenDroneMap process, minus the leading '--') -* type: -* type: string -* description: Datatype of the value of this option -* enum: -* - int -* - float -* - string -* - bool -* value: -* type: string -* description: Default value of this option -* domain: -* type: string -* description: Valid range of values (for example, "positive integer" or "float > 0.0") -* help: -* type: string -* description: Description of what this option does -*/ + * /options: + * get: + * description: Retrieves the command line options that can be passed to process a task + * tags: [server] + * responses: + * 200: + * description: Options + * schema: + * type: array + * items: + * title: Option + * type: object + * required: [name, type, value, domain, help] + * properties: + * name: + * type: string + * description: Command line option (exactly as it is passed to the OpenDroneMap process, minus the leading '--') + * type: + * type: string + * description: Datatype of the value of this option + * enum: + * - int + * - float + * - string + * - bool + * value: + * type: string + * description: Default value of this option + * domain: + * type: string + * description: Valid range of values (for example, "positive integer" or "float > 0.0") + * help: + * type: string + * description: Description of what this option does + */ app.get('/options', (req, res) => { - odmOptions.getOptions((err, options) => { - if (err) res.json({error: err.message}); - else res.json(options); - }); + odmOptions.getOptions((err, options) => { + if (err) res.json({ error: err.message }); + else res.json(options); + }); }); /** @swagger -* /info: -* get: -* description: Retrieves information about this node -* tags: [server] -* responses: -* 200: -* description: Info -* schema: -* type: object -* required: [version, taskQueueCount] -* properties: -* version: -* type: string -* description: Current version -* taskQueueCount: -* type: integer -* description: Number of tasks currently being processed or waiting to be processed -*/ + * /info: + * get: + * description: Retrieves information about this node + * tags: [server] + * responses: + * 200: + * description: Info + * schema: + * type: object + * required: [version, taskQueueCount] + * properties: + * version: + * type: string + * description: Current version + * taskQueueCount: + * type: integer + * description: Number of tasks currently being processed or waiting to be processed + */ app.get('/info', (req, res) => { res.json({ version: packageJson.version, @@ -495,46 +497,47 @@ app.get('/info', (req, res) => { }); let gracefulShutdown = done => { - async.series([ - cb => taskManager.dumpTaskList(cb), - cb => { - logger.info("Closing server"); - server.close(); - logger.info("Exiting..."); - process.exit(0); - } - ], done); + async.series([ + cb => taskManager.dumpTaskList(cb), + cb => { + logger.info("Closing server"); + server.close(); + logger.info("Exiting..."); + process.exit(0); + } + ], done); }; // listen for TERM signal .e.g. kill -process.on ('SIGTERM', gracefulShutdown); +process.on('SIGTERM', gracefulShutdown); // listen for INT signal e.g. Ctrl-C -process.on ('SIGINT', gracefulShutdown); +process.on('SIGINT', gracefulShutdown); // Startup if (config.test) logger.info("Running in test mode"); let commands = [ - cb => odmOptions.initialize(cb), - cb => { taskManager = new TaskManager(cb); }, - cb => { server = app.listen(config.port, err => { - if (!err) logger.info('Server has started on port ' + String(config.port)); - cb(err); - }); - } + cb => odmOptions.initialize(cb), + cb => { taskManager = new TaskManager(cb); }, + cb => { + server = app.listen(config.port, err => { + if (!err) logger.info('Server has started on port ' + String(config.port)); + cb(err); + }); + } ]; -if (config.powercycle){ - commands.push(cb => { - logger.info("Power cycling is set, application will shut down..."); - process.exit(0); - }); +if (config.powercycle) { + commands.push(cb => { + logger.info("Power cycling is set, application will shut down..."); + process.exit(0); + }); } async.series(commands, err => { - if (err){ - logger.error("Error during startup: " + err.message); - process.exit(1); - } -}); + if (err) { + logger.error("Error during startup: " + err.message); + process.exit(1); + } +}); \ No newline at end of file diff --git a/libs/Task.js b/libs/Task.js index 1f47d02..840e5c2 100644 --- a/libs/Task.js +++ b/libs/Task.js @@ -33,449 +33,453 @@ let Directories = require('./Directories'); let statusCodes = require('./statusCodes'); -module.exports = class Task{ - constructor(uuid, name, done, options = []){ - assert(uuid !== undefined, "uuid must be set"); - assert(done !== undefined, "ready must be set"); +module.exports = class Task { + constructor(uuid, name, done, options = []) { + assert(uuid !== undefined, "uuid must be set"); + assert(done !== undefined, "ready must be set"); - this.uuid = uuid; - this.name = name !== "" ? name : "Task of " + (new Date()).toISOString(); - this.dateCreated = new Date().getTime(); - this.processingTime = -1; - this.setStatus(statusCodes.QUEUED); - this.options = options; - this.gpcFiles = []; - this.output = []; - this.runningProcesses = []; + this.uuid = uuid; + this.name = name !== "" ? name : "Task of " + (new Date()).toISOString(); + this.dateCreated = new Date().getTime(); + this.processingTime = -1; + this.setStatus(statusCodes.QUEUED); + this.options = options; + this.gpcFiles = []; + this.output = []; + this.runningProcesses = []; - async.series([ - // Read images info - cb => { - fs.readdir(this.getImagesFolderPath(), (err, files) => { - if (err) cb(err); - else{ - this.images = files; - logger.debug(`Found ${this.images.length} images for ${this.uuid}`); - cb(null); - } - }); - }, + async.series([ + // Read images info + cb => { + fs.readdir(this.getImagesFolderPath(), (err, files) => { + if (err) cb(err); + else { + this.images = files; + logger.debug(`Found ${this.images.length} images for ${this.uuid}`); + cb(null); + } + }); + }, - // Find GCP (if any) - cb => { - fs.readdir(this.getGpcFolderPath(), (err, files) => { - if (err) cb(err); - else{ - files.forEach(file => { - if (/\.txt$/gi.test(file)){ - this.gpcFiles.push(file); - } - }); - logger.debug(`Found ${this.gpcFiles.length} GPC files (${this.gpcFiles.join(" ")}) for ${this.uuid}`); - cb(null); - } - }); - } - ], err => { - done(err, this); - }); - } + // Find GCP (if any) + cb => { + fs.readdir(this.getGpcFolderPath(), (err, files) => { + if (err) cb(err); + else { + files.forEach(file => { + if (/\.txt$/gi.test(file)) { + this.gpcFiles.push(file); + } + }); + logger.debug(`Found ${this.gpcFiles.length} GPC files (${this.gpcFiles.join(" ")}) for ${this.uuid}`); + cb(null); + } + }); + } + ], err => { + done(err, this); + }); + } - static CreateFromSerialized(taskJson, done){ - new Task(taskJson.uuid, taskJson.name, (err, task) => { - if (err) done(err); - else{ - // Override default values with those - // provided in the taskJson - for (let k in taskJson){ - task[k] = taskJson[k]; - } + static CreateFromSerialized(taskJson, done) { + new Task(taskJson.uuid, taskJson.name, (err, task) => { + if (err) done(err); + else { + // Override default values with those + // provided in the taskJson + for (let k in taskJson) { + task[k] = taskJson[k]; + } - // Tasks that were running should be put back to QUEUED state - if (task.status.code === statusCodes.RUNNING){ - task.status.code = statusCodes.QUEUED; - } - done(null, task); - } - }, taskJson.options); - } + // Tasks that were running should be put back to QUEUED state + if (task.status.code === statusCodes.RUNNING) { + task.status.code = statusCodes.QUEUED; + } + done(null, task); + } + }, taskJson.options); + } - // Get path where images are stored for this task - // (relative to nodejs process CWD) - getImagesFolderPath(){ - return path.join(this.getProjectFolderPath(), "images"); - } + // Get path where images are stored for this task + // (relative to nodejs process CWD) + getImagesFolderPath() { + return path.join(this.getProjectFolderPath(), "images"); + } - // Get path where GPC file(s) are stored - // (relative to nodejs process CWD) - getGpcFolderPath(){ - return path.join(this.getProjectFolderPath(), "gpc"); - } + // Get path where GPC file(s) are stored + // (relative to nodejs process CWD) + getGpcFolderPath() { + return path.join(this.getProjectFolderPath(), "gpc"); + } - // Get path of project (where all images and assets folder are contained) - // (relative to nodejs process CWD) - getProjectFolderPath(){ - return path.join(Directories.data, this.uuid); - } + // Get path of project (where all images and assets folder are contained) + // (relative to nodejs process CWD) + getProjectFolderPath() { + return path.join(Directories.data, this.uuid); + } - // Get the path of the archive where all assets - // outputted by this task are stored. - getAssetsArchivePath(filename){ - if (filename == 'all.zip'){ - // OK, do nothing - }else if (filename == 'orthophoto.tif'){ - if (config.test){ - if (config.testSkipOrthophotos) return false; - else filename = path.join('..', '..', 'processing_results', 'odm_orthophoto', `odm_${filename}`); - }else{ - filename = path.join('odm_orthophoto', `odm_${filename}`); - } - }else{ - return false; // Invalid - } - - return path.join(this.getProjectFolderPath(), filename); - } + // Get the path of the archive where all assets + // outputted by this task are stored. + getAssetsArchivePath(filename) { + if (filename == 'all.zip') { + // OK, do nothing + } else if (filename == 'orthophoto.tif') { + if (config.test) { + if (config.testSkipOrthophotos) return false; + else filename = path.join('..', '..', 'processing_results', 'odm_orthophoto', `odm_${filename}`); + } else { + filename = path.join('odm_orthophoto', `odm_${filename}`); + } + } else { + return false; // Invalid + } - // Deletes files and folders related to this task - cleanup(cb){ - rmdir(this.getProjectFolderPath(), cb); - } + return path.join(this.getProjectFolderPath(), filename); + } - setStatus(code, extra){ - this.status = { - code: code - }; - for (let k in extra){ - this.status[k] = extra[k]; - } - } + // Deletes files and folders related to this task + cleanup(cb) { + rmdir(this.getProjectFolderPath(), cb); + } - updateProcessingTime(resetTime){ - this.processingTime = resetTime ? - -1 : - new Date().getTime() - this.dateCreated; - } + setStatus(code, extra) { + this.status = { + code: code + }; + for (let k in extra) { + this.status[k] = extra[k]; + } + } - startTrackingProcessingTime(){ - this.updateProcessingTime(); - if (!this._updateProcessingTimeInterval){ - this._updateProcessingTimeInterval = setInterval(() => { - this.updateProcessingTime(); - }, 1000); - } - } + updateProcessingTime(resetTime) { + this.processingTime = resetTime ? + -1 : + new Date().getTime() - this.dateCreated; + } - stopTrackingProcessingTime(resetTime){ - this.updateProcessingTime(resetTime); - if (this._updateProcessingTimeInterval){ - clearInterval(this._updateProcessingTimeInterval); - this._updateProcessingTimeInterval = null; - } - } + startTrackingProcessingTime() { + this.updateProcessingTime(); + if (!this._updateProcessingTimeInterval) { + this._updateProcessingTimeInterval = setInterval(() => { + this.updateProcessingTime(); + }, 1000); + } + } - getStatus(){ - return this.status.code; - } + stopTrackingProcessingTime(resetTime) { + this.updateProcessingTime(resetTime); + if (this._updateProcessingTimeInterval) { + clearInterval(this._updateProcessingTimeInterval); + this._updateProcessingTimeInterval = null; + } + } - isCanceled(){ - return this.status.code === statusCodes.CANCELED; - } + getStatus() { + return this.status.code; + } - // Cancels the current task (unless it's already canceled) - cancel(cb){ - if (this.status.code !== statusCodes.CANCELED){ - let wasRunning = this.status.code === statusCodes.RUNNING; - this.setStatus(statusCodes.CANCELED); + isCanceled() { + return this.status.code === statusCodes.CANCELED; + } - if (wasRunning){ - this.runningProcesses.forEach(proc => { - // TODO: this does NOT guarantee that - // the process will immediately terminate. - // For eaxmple in the case of the ODM process, the process will continue running for a while - // This might need to be fixed on ODM's end. - proc.kill('SIGINT'); - }); - this.runningProcesses = []; - } + // Cancels the current task (unless it's already canceled) + cancel(cb) { + if (this.status.code !== statusCodes.CANCELED) { + let wasRunning = this.status.code === statusCodes.RUNNING; + this.setStatus(statusCodes.CANCELED); - this.stopTrackingProcessingTime(true); - cb(null); - }else{ - cb(new Error("Task already cancelled")); - } - } + if (wasRunning) { + this.runningProcesses.forEach(proc => { + // TODO: this does NOT guarantee that + // the process will immediately terminate. + // For eaxmple in the case of the ODM process, the process will continue running for a while + // This might need to be fixed on ODM's end. + proc.kill('SIGINT'); + }); + this.runningProcesses = []; + } - // Starts processing the task with OpenDroneMap - // This will spawn a new process. - start(done){ - const finished = err => { - this.stopTrackingProcessingTime(); - done(err); - }; - - const postProcess = () => { - const createZipArchive = (outputFilename, files) => { - return (done) => { - this.output.push(`Compressing ${outputFilename}\n`); + this.stopTrackingProcessingTime(true); + cb(null); + } else { + cb(new Error("Task already cancelled")); + } + } - let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename)); - let archive = archiver.create('zip', {}); + // Starts processing the task with OpenDroneMap + // This will spawn a new process. + start(done) { + const finished = err => { + this.stopTrackingProcessingTime(); + done(err); + }; - archive.on('finish', () => { - // TODO: is this being fired twice? - done(); - }); + const postProcess = () => { + const createZipArchive = (outputFilename, files) => { + return (done) => { + this.output.push(`Compressing ${outputFilename}\n`); - archive.on('error', err => { - logger.error(`Could not archive .zip file: ${err.message}`); - done(err); - }); + let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename)); + let archive = archiver.create('zip', {}); - archive.pipe(output); - let globs = []; + archive.on('finish', () => { + // TODO: is this being fired twice? + done(); + }); - // Process files and directories first - files.forEach(file => { - let sourcePath = !config.test ? - this.getProjectFolderPath() : - path.join("tests", "processing_results"); - let filePath = path.join(sourcePath, file); - - // Skip non-existing items - if (!fs.existsSync(filePath)) return; + archive.on('error', err => { + logger.error(`Could not archive .zip file: ${err.message}`); + done(err); + }); - let isGlob = /\*/.test(file), - isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory(); + archive.pipe(output); + let globs = []; - if (isDirectory){ - archive.directory(filePath, file); - }else if (isGlob){ - globs.push(filePath); - }else{ - archive.file(filePath, {name: path.basename(file)}); - } - }); + // Process files and directories first + files.forEach(file => { + let sourcePath = !config.test ? + this.getProjectFolderPath() : + path.join("tests", "processing_results"); + let filePath = path.join(sourcePath, file); - // Check for globs - if (globs.length !== 0){ - let pending = globs.length; + // Skip non-existing items + if (!fs.existsSync(filePath)) return; - globs.forEach(pattern => { - glob(pattern, (err, files) => { - if (err) done(err); - else{ - files.forEach(file => { - if (fs.lstatSync(file).isFile()){ - archive.file(file, {name: path.basename(file)}); - }else{ - logger.debug(`Could not add ${file} from glob`); - } - }); + let isGlob = /\*/.test(file), + isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory(); - if (--pending === 0){ - archive.finalize(); - } - } - }); - }); - }else{ - archive.finalize(); - } - }; - }; + if (isDirectory) { + archive.directory(filePath, file); + } else if (isGlob) { + globs.push(filePath); + } else { + archive.file(filePath, { name: path.basename(file) }); + } + }); - const handleProcessExit = (done) => { - return (err, code, signal) => { - if (err) done(err); - else{ - // Don't evaluate if we caused the process to exit via SIGINT? - if (code === 0) done(); - else done(new Error(`Process exited with code ${code}`)); - } - }; - }; + // Check for globs + if (globs.length !== 0) { + let pending = globs.length; - const handleOutput = output => { - this.output.push(output); - }; + globs.forEach(pattern => { + glob(pattern, (err, files) => { + if (err) done(err); + else { + files.forEach(file => { + if (fs.lstatSync(file).isFile()) { + archive.file(file, { name: path.basename(file) }); + } else { + logger.debug(`Could not add ${file} from glob`); + } + }); - const generateTiles = (inputFile, outputDir) => { - return (done) => { - const inputFilePath = path.join(this.getProjectFolderPath(), inputFile); - - // Not all datasets generate an orthophoto, so we skip - // tiling if the orthophoto is missing - if (fs.existsSync(inputFilePath)){ - this.runningProcesses.push(processRunner.runTiler({ - zoomLevels: "12-21", - inputFile: inputFilePath, - outputDir: path.join(this.getProjectFolderPath(), outputDir) - }, handleProcessExit(done), handleOutput)); - }else{ - handleOutput(`${inputFilePath} file not found, skipping tiles generation\n`); - done(); - } - }; - }; + if (--pending === 0) { + archive.finalize(); + } + } + }); + }); + } else { + archive.finalize(); + } + }; + }; - const generatePotreeCloud = (inputFile, outputDir) => { - return (done) => { - this.runningProcesses.push(processRunner.runPotreeConverter({ - inputFile: path.join(this.getProjectFolderPath(), inputFile), - outputDir: path.join(this.getProjectFolderPath(), outputDir) - }, handleProcessExit(done), handleOutput)); - }; - }; + const handleProcessExit = (done) => { + return (err, code, signal) => { + if (err) done(err); + else { + // Don't evaluate if we caused the process to exit via SIGINT? + if (code === 0) done(); + else done(new Error(`Process exited with code ${code}`)); + } + }; + }; - const pdalTranslate = (inputPath, outputPath, filters) => { - return (done) => { - this.runningProcesses.push(processRunner.runPdalTranslate({ - inputFile: inputPath, - outputFile: outputPath, - filters: filters - }, handleProcessExit(done), handleOutput)); - }; - }; + const handleOutput = output => { + this.output.push(output); + }; - // All paths are relative to the project directory (./data//) - let allFolders = ['odm_orthophoto', 'odm_georeferencing', 'odm_texturing', 'odm_meshing', 'orthophoto_tiles', 'potree_pointcloud']; - - if (config.test && config.testSkipOrthophotos){ - logger.info("Test mode will skip orthophoto generation"); + const generateTiles = (inputFile, outputDir) => { + return (done) => { + const inputFilePath = path.join(this.getProjectFolderPath(), inputFile); - // Exclude these folders from the all.zip archive - ['odm_orthophoto', 'orthophoto_tiles'].forEach(dir => { - allFolders.splice(allFolders.indexOf(dir), 1); - }); - } + // Not all datasets generate an orthophoto, so we skip + // tiling if the orthophoto is missing + if (fs.existsSync(inputFilePath)) { + this.runningProcesses.push(processRunner.runTiler({ + zoomLevels: "12-21", + inputFile: inputFilePath, + outputDir: path.join(this.getProjectFolderPath(), outputDir) + }, handleProcessExit(done), handleOutput)); + } else { + handleOutput(`${inputFilePath} file not found, skipping tiles generation\n`); + done(); + } + }; + }; - let orthophotoPath = path.join('odm_orthophoto', 'odm_orthophoto.tif'), - lasPointCloudPath = path.join('odm_georeferencing', 'odm_georeferenced_model.ply.las'), - projectFolderPath = this.getProjectFolderPath(); + const generatePotreeCloud = (inputFile, outputDir) => { + return (done) => { + this.runningProcesses.push(processRunner.runPotreeConverter({ + inputFile: path.join(this.getProjectFolderPath(), inputFile), + outputDir: path.join(this.getProjectFolderPath(), outputDir) + }, handleProcessExit(done), handleOutput)); + }; + }; - let commands = [ + const pdalTranslate = (inputPath, outputPath, filters) => { + return (done) => { + this.runningProcesses.push(processRunner.runPdalTranslate({ + inputFile: inputPath, + outputFile: outputPath, + filters: filters + }, handleProcessExit(done), handleOutput)); + }; + }; + + // All paths are relative to the project directory (./data//) + let allFolders = ['odm_orthophoto', 'odm_georeferencing', 'odm_texturing', 'odm_meshing', 'orthophoto_tiles', 'potree_pointcloud']; + + if (config.test && config.testSkipOrthophotos) { + logger.info("Test mode will skip orthophoto generation"); + + // Exclude these folders from the all.zip archive + ['odm_orthophoto', 'orthophoto_tiles'].forEach(dir => { + allFolders.splice(allFolders.indexOf(dir), 1); + }); + } + + let orthophotoPath = path.join('odm_orthophoto', 'odm_orthophoto.tif'), + lasPointCloudPath = path.join('odm_georeferencing', 'odm_georeferenced_model.ply.las'), + projectFolderPath = this.getProjectFolderPath(); + + let commands = [ generateTiles(orthophotoPath, 'orthophoto_tiles'), generatePotreeCloud(lasPointCloudPath, 'potree_pointcloud'), createZipArchive('all.zip', allFolders) - ]; + ]; - // If point cloud file does not exist, it's likely because location (GPS/GPC) information - // was missing and the file was not generated. - let fullLasPointCloudPath = path.join(projectFolderPath, lasPointCloudPath); - if (!fs.existsSync(fullLasPointCloudPath)){ - let unreferencedPointCloudPath = path.join(projectFolderPath, "opensfm", "depthmaps", "merged.ply"); - if (fs.existsSync(unreferencedPointCloudPath)){ - logger.info(`${lasPointCloudPath} is missing, will attempt to generate it from ${unreferencedPointCloudPath}`); - commands.unshift(pdalTranslate(unreferencedPointCloudPath, fullLasPointCloudPath, [ - { - // opensfm's ply files map colors with the diffuse_ prefix - dimensions: "diffuse_red = red, diffuse_green = green, diffuse_blue = blue", - type: "filters.ferry" - } - ])); - } - } + // If point cloud file does not exist, it's likely because location (GPS/GPC) information + // was missing and the file was not generated. + let fullLasPointCloudPath = path.join(projectFolderPath, lasPointCloudPath); + if (!fs.existsSync(fullLasPointCloudPath)) { + let unreferencedPointCloudPath = path.join(projectFolderPath, "opensfm", "depthmaps", "merged.ply"); + if (fs.existsSync(unreferencedPointCloudPath)) { + logger.info(`${lasPointCloudPath} is missing, will attempt to generate it from ${unreferencedPointCloudPath}`); + commands.unshift(pdalTranslate(unreferencedPointCloudPath, fullLasPointCloudPath, [{ + // opensfm's ply files map colors with the diffuse_ prefix + dimensions: "diffuse_red = red, diffuse_green = green, diffuse_blue = blue", + type: "filters.ferry" + }])); + } + } - async.series(commands, (err) => { - if (!err){ - this.setStatus(statusCodes.COMPLETED); - finished(); - }else{ - this.setStatus(statusCodes.FAILED); - finished(err); - } - }); - }; + async.series(commands, (err) => { + if (!err) { + this.setStatus(statusCodes.COMPLETED); + finished(); + } else { + this.setStatus(statusCodes.FAILED); + finished(err); + } + }); + }; - if (this.status.code === statusCodes.QUEUED){ - this.startTrackingProcessingTime(); - this.setStatus(statusCodes.RUNNING); + if (this.status.code === statusCodes.QUEUED) { + this.startTrackingProcessingTime(); + this.setStatus(statusCodes.RUNNING); + // rmeove webhoook + var optionsToRun = this.options.filter(function(opt) { + if (opt.name !== 'webhook') { + return opt; + } - let runnerOptions = this.options.reduce((result, opt) => { - result[opt.name] = opt.value; - return result; - }, {}); + }); - runnerOptions["project-path"] = fs.realpathSync(Directories.data); - runnerOptions["pmvs-num-cores"] = os.cpus().length; + let runnerOptions = optionsToRun.reduce((result, opt) => { + result[opt.name] = opt.value; + return result; + }, {}); - if (this.gpcFiles.length > 0){ - runnerOptions.gcp = fs.realpathSync(path.join(this.getGpcFolderPath(), this.gpcFiles[0])); - } + runnerOptions["project-path"] = fs.realpathSync(Directories.data); + runnerOptions["pmvs-num-cores"] = os.cpus().length; - this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => { - if (err){ - this.setStatus(statusCodes.FAILED, {errorMessage: `Could not start process (${err.message})`}); - finished(err); - }else{ - // Don't evaluate if we caused the process to exit via SIGINT? - if (this.status.code !== statusCodes.CANCELED){ - if (code === 0){ - postProcess(); - }else{ - this.setStatus(statusCodes.FAILED, {errorMessage: `Process exited with code ${code}`}); - finished(); - } - }else{ - finished(); - } - } - }, output => { - // Replace console colors - output = output.replace(/\x1b\[[0-9;]*m/g, ""); - this.output.push(output); - }) - ); + if (this.gpcFiles.length > 0) { + runnerOptions.gcp = fs.realpathSync(path.join(this.getGpcFolderPath(), this.gpcFiles[0])); + } - return true; - }else{ - return false; - } - } + this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => { + if (err) { + this.setStatus(statusCodes.FAILED, { errorMessage: `Could not start process (${err.message})` }); + finished(err); + } else { + // Don't evaluate if we caused the process to exit via SIGINT? + if (this.status.code !== statusCodes.CANCELED) { + if (code === 0) { + postProcess(); + } else { + this.setStatus(statusCodes.FAILED, { errorMessage: `Process exited with code ${code}` }); + finished(); + } + } else { + finished(); + } + } + }, output => { + // Replace console colors + output = output.replace(/\x1b\[[0-9;]*m/g, ""); + this.output.push(output); + })); - // Re-executes the task (by setting it's state back to QUEUED) - // Only tasks that have been canceled, completed or have failed can be restarted. - restart(cb){ - if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1){ - this.setStatus(statusCodes.QUEUED); - this.dateCreated = new Date().getTime(); - this.output = []; - this.stopTrackingProcessingTime(true); - cb(null); - }else{ - cb(new Error("Task cannot be restarted")); - } - } + return true; + } else { + return false; + } + } - // Returns the description of the task. - getInfo(){ - return { - uuid: this.uuid, - name: this.name, - dateCreated: this.dateCreated, - processingTime: this.processingTime, - status: this.status, - options: this.options, - imagesCount: this.images.length - }; - } + // Re-executes the task (by setting it's state back to QUEUED) + // Only tasks that have been canceled, completed or have failed can be restarted. + restart(cb) { + if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1) { + this.setStatus(statusCodes.QUEUED); + this.dateCreated = new Date().getTime(); + this.output = []; + this.stopTrackingProcessingTime(true); + cb(null); + } else { + cb(new Error("Task cannot be restarted")); + } + } - // Returns the output of the OpenDroneMap process - // Optionally starting from a certain line number - getOutput(startFromLine = 0){ - return this.output.slice(startFromLine, this.output.length); - } + // Returns the description of the task. + getInfo() { + return { + uuid: this.uuid, + name: this.name, + dateCreated: this.dateCreated, + processingTime: this.processingTime, + status: this.status, + options: this.options, + imagesCount: this.images.length + }; + } - // Returns the data necessary to serialize this - // task to restore it later. - serialize(){ - return { - uuid: this.uuid, - name: this.name, - dateCreated: this.dateCreated, - status: this.status, - options: this.options - }; - } -}; + // Returns the output of the OpenDroneMap process + // Optionally starting from a certain line number + getOutput(startFromLine = 0) { + return this.output.slice(startFromLine, this.output.length); + } + + // Returns the data necessary to serialize this + // task to restore it later. + serialize() { + return { + uuid: this.uuid, + name: this.name, + dateCreated: this.dateCreated, + status: this.status, + options: this.options + }; + } +}; \ No newline at end of file diff --git a/libs/TaskManager.js b/libs/TaskManager.js index 8231b3d..3875428 100644 --- a/libs/TaskManager.js +++ b/libs/TaskManager.js @@ -27,237 +27,316 @@ let statusCodes = require('./statusCodes'); let async = require('async'); let schedule = require('node-schedule'); let Directories = require('./Directories'); +// webhook reqs +let request = require('request'); + const TASKS_DUMP_FILE = path.join(Directories.data, "tasks.json"); const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * 60 * 24 * config.cleanupTasksAfter; // days -module.exports = class TaskManager{ - constructor(done){ - this.tasks = {}; - this.runningQueue = []; +module.exports = class TaskManager { + constructor(done) { + this.tasks = {}; + this.runningQueue = []; - async.series([ - cb => this.restoreTaskListFromDump(cb), - cb => this.removeOldTasks(cb), - cb => this.removeOrphanedDirectories(cb), - cb => { - this.processNextTask(); - cb(); - }, - cb => { - // Every hour - schedule.scheduleJob('0 * * * *', () => { - this.removeOldTasks(); - this.dumpTaskList(); - }); + async.series([ + cb => this.restoreTaskListFromDump(cb), + cb => this.removeOldTasks(cb), + cb => this.removeOrphanedDirectories(cb), + cb => { + this.processNextTask(); + cb(); + }, + cb => { + // Every hour + schedule.scheduleJob('0 * * * *', () => { + this.removeOldTasks(); + this.dumpTaskList(); + }); - cb(); - } - ], done); - } + cb(); + } + ], done); + } - // Removes old tasks that have either failed, are completed, or - // have been canceled. - removeOldTasks(done){ - let list = []; - let now = new Date().getTime(); - logger.debug("Checking for old tasks to be removed..."); + // Removes old tasks that have either failed, are completed, or + // have been canceled. + removeOldTasks(done) { + let list = []; + let now = new Date().getTime(); + logger.debug("Checking for old tasks to be removed..."); - for (let uuid in this.tasks){ - let task = this.tasks[uuid]; + for (let uuid in this.tasks) { + let task = this.tasks[uuid]; - if ([statusCodes.FAILED, - statusCodes.COMPLETED, - statusCodes.CANCELED].indexOf(task.status.code) !== -1 && - now - task.dateCreated > CLEANUP_TASKS_IF_OLDER_THAN){ - list.push(task.uuid); - } - } + if ([statusCodes.FAILED, + statusCodes.COMPLETED, + statusCodes.CANCELED + ].indexOf(task.status.code) !== -1 && + now - task.dateCreated > CLEANUP_TASKS_IF_OLDER_THAN) { + list.push(task.uuid); + } + } - async.eachSeries(list, (uuid, cb) => { - logger.info(`Cleaning up old task ${uuid}`); - this.remove(uuid, cb); - }, done); - } + async.eachSeries(list, (uuid, cb) => { + logger.info(`Cleaning up old task ${uuid}`); + this.remove(uuid, cb); + }, done); + } - // Removes directories that don't have a corresponding - // task associated with it (maybe as a cause of an abrupt exit) - removeOrphanedDirectories(done){ - logger.info("Checking for orphaned directories to be removed..."); + // Removes directories that don't have a corresponding + // task associated with it (maybe as a cause of an abrupt exit) + removeOrphanedDirectories(done) { + logger.info("Checking for orphaned directories to be removed..."); - fs.readdir(Directories.data, (err, entries) => { - if (err) done(err); - else{ - async.eachSeries(entries, (entry, cb) => { - let dirPath = path.join(Directories.data, entry); - if (fs.statSync(dirPath).isDirectory() && - entry.match(/^[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+$/) && - !this.tasks[entry]){ - logger.info(`Found orphaned directory: ${entry}, removing...`); - rmdir(dirPath, cb); - }else cb(); - }, done); - } - }); - } + fs.readdir(Directories.data, (err, entries) => { + if (err) done(err); + else { + async.eachSeries(entries, (entry, cb) => { + let dirPath = path.join(Directories.data, entry); + if (fs.statSync(dirPath).isDirectory() && + entry.match(/^[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+$/) && + !this.tasks[entry]) { + logger.info(`Found orphaned directory: ${entry}, removing...`); + rmdir(dirPath, cb); + } else cb(); + }, done); + } + }); + } - // Load tasks that already exists (if any) - restoreTaskListFromDump(done){ - fs.readFile(TASKS_DUMP_FILE, (err, data) => { - if (!err){ - let tasks; - try{ - tasks = JSON.parse(data.toString()); - }catch(e){ - done(new Error(`Could not load task list. It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}). Please manually delete the file and try again.`)); - return; - } + // Load tasks that already exists (if any) + restoreTaskListFromDump(done) { + fs.readFile(TASKS_DUMP_FILE, (err, data) => { + if (!err) { + let tasks; + try { + tasks = JSON.parse(data.toString()); + } catch (e) { + done(new Error(`Could not load task list. It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}). Please manually delete the file and try again.`)); + return; + } - async.each(tasks, (taskJson, done) => { - Task.CreateFromSerialized(taskJson, (err, task) => { - if (err) done(err); - else{ - this.tasks[task.uuid] = task; - done(); - } - }); - }, err => { - logger.info(`Initialized ${tasks.length} tasks`); - if (done !== undefined) done(); - }); - }else{ - logger.info("No tasks dump found"); - if (done !== undefined) done(); - } - }); - } + async.each(tasks, (taskJson, done) => { + Task.CreateFromSerialized(taskJson, (err, task) => { + if (err) done(err); + else { + this.tasks[task.uuid] = task; + done(); + } + }); + }, err => { + logger.info(`Initialized ${tasks.length} tasks`); + if (done !== undefined) done(); + }); + } else { + logger.info("No tasks dump found"); + if (done !== undefined) done(); + } + }); + } - // Finds the first QUEUED task. - findNextTaskToProcess(){ - for (let uuid in this.tasks){ - if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){ - return this.tasks[uuid]; - } - } - } + // Finds the first QUEUED task. + findNextTaskToProcess() { + for (let uuid in this.tasks) { + if (this.tasks[uuid].getStatus() === statusCodes.QUEUED) { + return this.tasks[uuid]; + } + } + } - // Finds the next tasks, adds them to the running queue, - // and starts the tasks (up to the limit). - processNextTask(){ - if (this.runningQueue.length < config.parallelQueueProcessing){ - let task = this.findNextTaskToProcess(); - if (task){ - this.addToRunningQueue(task); - task.start(() => { - this.removeFromRunningQueue(task); - this.processNextTask(); - }); + // Finds the next tasks, adds them to the running queue, + // and starts the tasks (up to the limit). + processNextTask() { + if (this.runningQueue.length < config.parallelQueueProcessing) { + let task = this.findNextTaskToProcess(); + if (task) { + this.addToRunningQueue(task); + for (var i = 0; i < task.options.length; i++) { + if (task.options[i].name === "webhook") { + // call back webhook + request({ + url: task.options[i].value, + method: 'POST', + headers: { + 'User-Agent': 'node-OpenDroneMap', + 'Content-Type': 'application/json' + }, + json: task.getInfo(), + }, function(error, response, body) { - if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask(); - } - }else{ - // Do nothing - } - } + // ignore error handling for now - addToRunningQueue(task){ - assert(task.constructor.name === "Task", "Must be a Task object"); - this.runningQueue.push(task); - } + // if (!error && response.statusCode == 200) { - removeFromRunningQueue(task){ - assert(task.constructor.name === "Task", "Must be a Task object"); - this.runningQueue = this.runningQueue.filter(t => t !== task); - } - addNew(task){ - assert(task.constructor.name === "Task", "Must be a Task object"); - this.tasks[task.uuid] = task; + // } + }) - this.processNextTask(); - } + } + } - // Stops the execution of a task - // (without removing it from the system). - cancel(uuid, cb){ - let task = this.find(uuid, cb); - if (task){ - if (!task.isCanceled()){ - task.cancel(err => { - this.removeFromRunningQueue(task); - this.processNextTask(); - cb(err); - }); - }else{ - cb(null); // Nothing to be done - } - } - } + task.start(() => { + for (var i = 0; i < task.options.length; i++) { + if (task.options[i].name === "webhook") { + // call back webhook + request({ + url: task.options[i].value, + method: 'POST', + headers: { + 'User-Agent': 'node-OpenDroneMap', + 'Content-Type': 'application/json' + }, + json: task.getInfo(), + }, function(error, response, body) { - // Removes a task from the system. - // Before being removed, the task is canceled. - remove(uuid, cb){ - this.cancel(uuid, err => { - if (!err){ - let task = this.find(uuid, cb); - if (task){ - task.cleanup(err => { - if (!err){ - delete(this.tasks[uuid]); - this.processNextTask(); - cb(null); - }else cb(err); - }); - }else; // cb is called by find on error - }else cb(err); - }); - } + // ignore error handling for now - // Restarts (puts back into QUEUED state) - // a task that is either in CANCELED or FAILED state. - restart(uuid, cb){ - let task = this.find(uuid, cb); - if (task){ - task.restart(err => { - if (!err) this.processNextTask(); - cb(err); - }); - } - } + // if (!error && response.statusCode == 200) { - // Finds a task by its UUID string. - find(uuid, cb){ - let task = this.tasks[uuid]; - if (!task && cb) cb(new Error(`${uuid} not found`)); - return task; - } - // Serializes the list of tasks and saves it - // to disk - dumpTaskList(done){ - let output = []; + // } + }) - for (let uuid in this.tasks){ - output.push(this.tasks[uuid].serialize()); - } + } + } - fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => { - if (err) logger.error(`Could not dump tasks: ${err.message}`); - else logger.debug("Dumped tasks list."); - if (done !== undefined) done(); - }); - } - getQueueCount(){ + this.removeFromRunningQueue(task); + this.processNextTask(); + }); + + if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask(); + } + } else { + // Do nothing + } + } + + addToRunningQueue(task) { + assert(task.constructor.name === "Task", "Must be a Task object"); + this.runningQueue.push(task); + } + + removeFromRunningQueue(task) { + assert(task.constructor.name === "Task", "Must be a Task object"); + this.runningQueue = this.runningQueue.filter(t => t !== task); + } + + addNew(task) { + assert(task.constructor.name === "Task", "Must be a Task object"); + this.tasks[task.uuid] = task; + + this.processNextTask(); + } + + // Stops the execution of a task + // (without removing it from the system). + cancel(uuid, cb) { + let task = this.find(uuid, cb); + if (task) { + if (!task.isCanceled()) { + task.cancel(err => { + this.removeFromRunningQueue(task); + this.processNextTask(); + for (var i = 0; i < task.options.length; i++) { + if (task.options[i].name === "webhook") { + // call back webhook + request({ + url: task.options[i].value, + method: 'POST', + headers: { + 'User-Agent': 'node-OpenDroneMap', + 'Content-Type': 'application/json' + }, + json: task.getInfo(), + }, function(error, response, body) { + + // ignore error handling for now + + // if (!error && response.statusCode == 200) { + + + // } + }) + + } + } + + + cb(err); + }); + } else { + cb(null); // Nothing to be done + } + } + } + + // Removes a task from the system. + // Before being removed, the task is canceled. + remove(uuid, cb) { + this.cancel(uuid, err => { + if (!err) { + let task = this.find(uuid, cb); + if (task) { + task.cleanup(err => { + if (!err) { + delete(this.tasks[uuid]); + this.processNextTask(); + cb(null); + } else cb(err); + }); + } else; // cb is called by find on error + } else cb(err); + }); + } + + // Restarts (puts back into QUEUED state) + // a task that is either in CANCELED or FAILED state. + restart(uuid, cb) { + let task = this.find(uuid, cb); + if (task) { + task.restart(err => { + if (!err) this.processNextTask(); + cb(err); + }); + } + } + + // Finds a task by its UUID string. + find(uuid, cb) { + let task = this.tasks[uuid]; + if (!task && cb) cb(new Error(`${uuid} not found`)); + return task; + } + + // Serializes the list of tasks and saves it + // to disk + dumpTaskList(done) { + let output = []; + + for (let uuid in this.tasks) { + output.push(this.tasks[uuid].serialize()); + } + + fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => { + if (err) logger.error(`Could not dump tasks: ${err.message}`); + else logger.debug("Dumped tasks list."); + if (done !== undefined) done(); + }); + } + + getQueueCount() { let count = 0; - for (let uuid in this.tasks){ + for (let uuid in this.tasks) { let task = this.tasks[uuid]; if ([statusCodes.QUEUED, - statusCodes.RUNNING].indexOf(task.status.code) !== -1){ + statusCodes.RUNNING + ].indexOf(task.status.code) !== -1) { count++; } } return count; } -}; +}; \ No newline at end of file diff --git a/libs/odmOptions.js b/libs/odmOptions.js index 6c5bf6f..3ef8211 100644 --- a/libs/odmOptions.js +++ b/libs/odmOptions.js @@ -23,229 +23,246 @@ let logger = require('./logger'); let odmOptions = null; module.exports = { - initialize: function(done){ - this.getOptions(done); - }, + initialize: function(done) { + this.getOptions(done); + }, - getOptions: function(done){ - if (odmOptions){ - done(null, odmOptions); - return; - } + getOptions: function(done) { + if (odmOptions) { + done(null, odmOptions); + return; + } - odmRunner.getJsonOptions((err, json) => { - if (err) done(err); - else{ - odmOptions = []; - for (let option in json){ - // Not all options are useful to the end user - // (num cores can be set programmatically, so can gcpFile, etc.) - if (["-h", "--project-path", "--cmvs-maxImages", "--time", - "--zip-results", "--pmvs-num-cores", - "--start-with", "--gcp", "--end-with", "--images", - "--slam-config", "--video"].indexOf(option) !== -1) continue; + odmRunner.getJsonOptions((err, json) => { + if (err) done(err); + else { + odmOptions = []; + for (let option in json) { + // Not all options are useful to the end user + // (num cores can be set programmatically, so can gcpFile, etc.) + if (["-h", "--project-path", "--cmvs-maxImages", "--time", + "--zip-results", "--pmvs-num-cores", + "--start-with", "--gcp", "--end-with", "--images", + "--slam-config", "--video" + ].indexOf(option) !== -1) continue; - let values = json[option]; + let values = json[option]; - let name = option.replace(/^--/, ""); - let type = ""; - let value = ""; - let help = values.help || ""; - let domain = values.metavar !== undefined ? - values.metavar.replace(/^[<>]/g, "") - .replace(/[<>]$/g, "") - .trim() : - ""; + let name = option.replace(/^--/, ""); + let type = ""; + let value = ""; + let help = values.help || ""; + let domain = values.metavar !== undefined ? + values.metavar.replace(/^[<>]/g, "") + .replace(/[<>]$/g, "") + .trim() : + ""; - switch((values.type || "").trim()){ - case "": - type = "int"; - value = values['default'] !== undefined ? - parseInt(values['default']) : - 0; - break; - case "": - type = "float"; - value = values['default'] !== undefined ? - parseFloat(values['default']) : - 0.0; - break; - default: - type = "string"; - value = values['default'] !== undefined ? - values['default'].trim() : - ""; - } + switch ((values.type || "").trim()) { + case "": + type = "int"; + value = values['default'] !== undefined ? + parseInt(values['default']) : + 0; + break; + case "": + type = "float"; + value = values['default'] !== undefined ? + parseFloat(values['default']) : + 0.0; + break; + default: + type = "string"; + value = values['default'] !== undefined ? + values['default'].trim() : + ""; + } - if (values['default'] === "True"){ - type = "bool"; - value = true; - }else if (values['default'] === "False"){ - type = "bool"; - value = false; - } + if (values['default'] === "True") { + type = "bool"; + value = true; + } else if (values['default'] === "False") { + type = "bool"; + value = false; + } - // If 'choices' is specified, try to convert it to array - if (values.choices){ - try{ - values.choices = JSON.parse(values.choices.replace(/'/g, '"')); // Convert ' to " - }catch(e){ - logger.warn(`Cannot parse choices: ${values.choices}`); - } - } + // If 'choices' is specified, try to convert it to array + if (values.choices) { + try { + values.choices = JSON.parse(values.choices.replace(/'/g, '"')); // Convert ' to " + } catch (e) { + logger.warn(`Cannot parse choices: ${values.choices}`); + } + } - if (Array.isArray(values.choices)){ - type = "string"; // TODO: change to enum - domain = values.choices; - } + if (Array.isArray(values.choices)) { + type = "string"; // TODO: change to enum + domain = values.choices; + } - help = help.replace(/\%\(default\)s/g, value); + help = help.replace(/\%\(default\)s/g, value); // In the end, all values must be converted back // to strings (per OpenAPI spec which doesn't allow mixed types) value = String(value); - odmOptions.push({ - name, type, value, domain, help - }); - } - done(null, odmOptions); - } - }); - }, + odmOptions.push({ + name, + type, + value, + domain, + help + }); - // Checks that the options (as received from the rest endpoint) - // Are valid and within proper ranges. - // The result of filtering is passed back via callback - // @param options[] - filterOptions: function(options, done){ - assert(odmOptions !== null, "odmOptions is not set. Have you initialized odmOptions properly?"); - - try{ - if (typeof options === "string") options = JSON.parse(options); - if (!Array.isArray(options)) options = []; - - let result = []; - let errors = []; - let addError = function(opt, descr){ - errors.push({ - name: opt.name, - error: descr - }); - }; + } - let typeConversion = { - 'float': Number.parseFloat, - 'int': Number.parseInt, - 'bool': function(value){ - if (value === 'true') return true; - else if (value === 'false') return false; - else if (typeof value === 'boolean') return value; - else throw new Error(`Cannot convert ${value} to boolean`); - }, - 'string': function(value){ - return value; // No conversion needed - }, - 'path': function(value){ - return value; // No conversion needed - } - }; - - let domainChecks = [ - { - regex: /^(positive |negative )?(integer|float)$/, - validate: function(matches, value){ - if (matches[1] === 'positive ') return value >= 0; - else if (matches[1] === 'negative ') return value <= 0; - - else if (matches[2] === 'integer') return Number.isInteger(value); - else if (matches[2] === 'float') return Number.isFinite(value); - } - }, - { - regex: /^percent$/, - validate: function(matches, value){ - return value >= 0 && value <= 100; - } - }, - { - regex: /^(float): ([\-\+\.\d]+) <= x <= ([\-\+\.\d]+)$/, - validate: function(matches, value){ - let [str, type, lower, upper] = matches; - lower = parseFloat(lower); - upper = parseFloat(upper); - return value >= lower && value <= upper; - } - }, - { - regex: /^(float) (>=|>|<|<=) ([\-\+\.\d]+)$/, - validate: function(matches, value){ - let [str, type, oper, bound] = matches; - bound = parseFloat(bound); - switch(oper){ - case '>=': - return value >= bound; - case '>': - return value > bound; - case '<=': - return value <= bound; - case '<': - return value < bound; - default: - return false; - } - } - }, - { - regex: /^(string|path)$/, - validate: function(){ - return true; // All strings/paths are fine - } - } - // TODO: handle enum - ]; + odmOptions.push({ + name: 'webhook', + type: 'string', + value: '', + domain: '', + help: 'On task complete, fail, etc. Call the above url with a post of the task serialized' + }); - let checkDomain = function(domain, value){ - let matches, - dc = domainChecks.find(dc => matches = domain.match(dc.regex)); + // local (non odm) options - if (dc){ - if (!dc.validate(matches, value)) throw new Error(`Invalid value ${value} (out of range)`); - }else{ - throw new Error(`Domain value cannot be handled: '${domain}' : '${value}'`); - } - }; + done(null, odmOptions); + } + }); + }, - // Scan through all possible options - for (let odmOption of odmOptions){ - // Was this option selected by the user? - /*jshint loopfunc: true */ - let opt = options.find(o => o.name === odmOption.name); - if (opt){ - try{ - // Convert to proper data type - let value = typeConversion[odmOption.type](opt.value); + // Checks that the options (as received from the rest endpoint) + // Are valid and within proper ranges. + // The result of filtering is passed back via callback + // @param options[] + filterOptions: function(options, done) { + assert(odmOptions !== null, "odmOptions is not set. Have you initialized odmOptions properly?"); - // Domain check - if (odmOption.domain){ - checkDomain(odmOption.domain, value); - } + try { + if (typeof options === "string") options = JSON.parse(options); + if (!Array.isArray(options)) options = []; - result.push({ - name: odmOption.name, - value: value - }); - }catch(e){ - addError(opt, e.message); - } - } - } + let result = []; + let errors = []; + let addError = function(opt, descr) { + errors.push({ + name: opt.name, + error: descr + }); + }; - if (errors.length > 0) done(new Error(JSON.stringify(errors))); - else done(null, result); - }catch(e){ - done(e); - } - } + let typeConversion = { + 'float': Number.parseFloat, + 'int': Number.parseInt, + 'bool': function(value) { + if (value === 'true') return true; + else if (value === 'false') return false; + else if (typeof value === 'boolean') return value; + else throw new Error(`Cannot convert ${value} to boolean`); + }, + 'string': function(value) { + return value; // No conversion needed + }, + 'path': function(value) { + return value; // No conversion needed + } + }; + + let domainChecks = [{ + regex: /^(positive |negative )?(integer|float)$/, + validate: function(matches, value) { + if (matches[1] === 'positive ') return value >= 0; + else if (matches[1] === 'negative ') return value <= 0; + + else if (matches[2] === 'integer') return Number.isInteger(value); + else if (matches[2] === 'float') return Number.isFinite(value); + } + }, + { + regex: /^percent$/, + validate: function(matches, value) { + return value >= 0 && value <= 100; + } + }, + { + regex: /^(float): ([\-\+\.\d]+) <= x <= ([\-\+\.\d]+)$/, + validate: function(matches, value) { + let [str, type, lower, upper] = matches; + lower = parseFloat(lower); + upper = parseFloat(upper); + return value >= lower && value <= upper; + } + }, + { + regex: /^(float) (>=|>|<|<=) ([\-\+\.\d]+)$/, + validate: function(matches, value) { + let [str, type, oper, bound] = matches; + bound = parseFloat(bound); + switch (oper) { + case '>=': + return value >= bound; + case '>': + return value > bound; + case '<=': + return value <= bound; + case '<': + return value < bound; + default: + return false; + } + } + }, + { + regex: /^(string|path)$/, + validate: function() { + return true; // All strings/paths are fine + } + } + + // TODO: handle enum + ]; + + let checkDomain = function(domain, value) { + let matches, + dc = domainChecks.find(dc => matches = domain.match(dc.regex)); + + if (dc) { + if (!dc.validate(matches, value)) throw new Error(`Invalid value ${value} (out of range)`); + } else { + throw new Error(`Domain value cannot be handled: '${domain}' : '${value}'`); + } + }; + + // Scan through all possible options + for (let odmOption of odmOptions) { + // Was this option selected by the user? + /*jshint loopfunc: true */ + let opt = options.find(o => o.name === odmOption.name); + if (opt) { + try { + // Convert to proper data type + let value = typeConversion[odmOption.type](opt.value); + + // Domain check + if (odmOption.domain) { + checkDomain(odmOption.domain, value); + } + + result.push({ + name: odmOption.name, + value: value + }); + } catch (e) { + addError(opt, e.message); + } + } + } + + if (errors.length > 0) done(new Error(JSON.stringify(errors))); + else done(null, result); + } catch (e) { + done(e); + } + } }; \ No newline at end of file diff --git a/package.json b/package.json index 1b245db..3a3b945 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "multer": "^1.1.0", "node-schedule": "^1.1.1", "node-uuid": "^1.4.7", + "request": "^2.81.0", "rimraf": "^2.5.3", "swagger-jsdoc": "^1.3.1", "winston": "^2.2.0"