diff --git a/config.js b/config.js index 3ecb96b..93a47de 100644 --- a/config.js +++ b/config.js @@ -32,14 +32,14 @@ Options: --log_level Set log level verbosity (default: info) -d, --deamonize Set process to run as a deamon --parallel_queue_processing Number of simultaneous processing tasks (default: 2) - --cleanup_tasks_after Number of days that elapse before deleting finished and canceled tasks (default: 3) + --cleanup_tasks_after Number of minutes that elapse before deleting finished and canceled tasks (default: 2880) --test Enable test mode. In test mode, no commands are sent to OpenDroneMap. This can be useful during development or testing (default: false) --test_skip_orthophotos If test mode is enabled, skip orthophoto results when generating assets. (default: false) --test_skip_dems If test mode is enabled, skip dems results when generating assets. (default: false) --powercycle When set, the application exits immediately after powering up. Useful for testing launch and compilation issues. --token Sets a token that needs to be passed for every request. This can be used to limit access to the node only to token holders. (default: none) --max_images Specify the maximum number of images that this processing node supports. (default: unlimited) - --callback Specify a callback URL to be invoked when a task completes processing (default: none) + --webhook Specify a POST URL endpoint to be invoked when a task completes processing (default: none) --s3_endpoint Specify a S3 endpoint (for example, nyc3.digitaloceanspaces.com) to upload completed task results to. (default: do not upload to S3) --s3_bucket Specify a S3 bucket name where to upload completed task results to. (default: none) --s3_access_key S3 access key, required if --s3_endpoint is set. (default: none) @@ -87,14 +87,14 @@ config.logger.logDirectory = fromConfigFile("logger.logDirectory", ''); // Set t config.port = parseInt(argv.port || argv.p || fromConfigFile("port", process.env.PORT || 3000)); config.deamon = argv.deamonize || argv.d || fromConfigFile("daemon", false); config.parallelQueueProcessing = argv.parallel_queue_processing || fromConfigFile("parallelQueueProcessing", 2); -config.cleanupTasksAfter = argv.cleanup_tasks_after || fromConfigFile("cleanupTasksAfter", 3); +config.cleanupTasksAfter = parseInt(argv.cleanup_tasks_after || fromConfigFile("cleanupTasksAfter", 60 * 24 * 2)); config.test = argv.test || fromConfigFile("test", false); config.testSkipOrthophotos = argv.test_skip_orthophotos || fromConfigFile("testSkipOrthophotos", false); config.testSkipDems = argv.test_skip_dems || fromConfigFile("testSkipDems", false); config.powercycle = argv.powercycle || fromConfigFile("powercycle", false); config.token = argv.token || fromConfigFile("token", ""); config.maxImages = parseInt(argv.max_images || fromConfigFile("maxImages", "")) || null; -config.callback = argv.callback || fromConfigFile("callback", ""); +config.webhook = argv.webhook || fromConfigFile("webhook", ""); config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", ""); config.s3Bucket = argv.s3_bucket || fromConfigFile("s3Bucket", ""); config.s3AccessKey = argv.s3_access_key || fromConfigFile("s3AccessKey", process.env.AWS_ACCESS_KEY_ID || "") diff --git a/libs/S3.js b/libs/S3.js index a753675..792792d 100644 --- a/libs/S3.js +++ b/libs/S3.js @@ -112,16 +112,20 @@ module.exports = { let cbCalled = false; q.drain = () => { - if (!cbCalled) cb(); - cbCalled = true; + if (!cbCalled){ + cbCalled = true; + cb(); + } }; if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`); q.push(uploadList, err => { if (err){ q.kill(); - if (!cbCalled) cb(err); - cbCalled = true; + if (!cbCalled){ + cbCalled = true; + cb(err); + } } }); } diff --git a/libs/TaskManager.js b/libs/TaskManager.js index 70a55a1..f922b19 100644 --- a/libs/TaskManager.js +++ b/libs/TaskManager.js @@ -29,11 +29,8 @@ let schedule = require('node-schedule'); let Directories = require('./Directories'); let request = require('request'); - - - const TASKS_DUMP_FILE = path.join(Directories.data, "tasks.json"); -const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * 60 * 24 * config.cleanupTasksAfter; // days +const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * config.cleanupTasksAfter; // minutes module.exports = class TaskManager{ constructor(done){ @@ -153,16 +150,34 @@ module.exports = class TaskManager{ if (task){ this.addToRunningQueue(task); task.start(() => { - if(task.webhook && task.webhook.length > 3){ - request({ - uri: task.webhook, - method: 'POST', - json: task.getInfo() - }, - function (error, response, body) { - if (error || response.statusCode != 200) logger.warn(`Call to webhook failed: ${task.webhook}`); - }); - } + // Hooks can be passed via command line + // or for each individual task + const hooks = [task.webhook, config.webhook]; + + hooks.forEach(hook => { + if (hook && hook.length > 3){ + const notifyCallback = (attempt) => { + if (attempt > 5){ + logger.warn(`Callback failed, will not retry: ${hook}`); + return; + } + request.post(hook, { + json: task.getInfo() + }, + (error, response) => { + if (error || response.statusCode != 200){ + logger.warn(`Callback failed, will retry in a bit: ${hook}`); + setTimeout(() => { + notifyCallback(attempt + 1); + }, attempt * 5000); + }else{ + logger.debug(`Callback invoked: ${hook}`); + } + }); + }; + notifyCallback(0); + } + }); this.removeFromRunningQueue(task); this.processNextTask(); @@ -264,16 +279,16 @@ module.exports = class TaskManager{ }); } - getQueueCount(){ - let count = 0; - for (let uuid in this.tasks){ - let task = this.tasks[uuid]; + getQueueCount(){ + let count = 0; + for (let uuid in this.tasks){ + let task = this.tasks[uuid]; - if ([statusCodes.QUEUED, - statusCodes.RUNNING].indexOf(task.status.code) !== -1){ - count++; - } - } - return count; - } + if ([statusCodes.QUEUED, + statusCodes.RUNNING].indexOf(task.status.code) !== -1){ + count++; + } + } + return count; + } };