Webhook improvements

pull/51/head
Piero Toffanin 2018-11-23 14:48:22 -05:00
rodzic 2e9e7e9f42
commit 4788bfde4f
3 zmienionych plików z 52 dodań i 33 usunięć

Wyświetl plik

@ -32,14 +32,14 @@ Options:
--log_level <logLevel> Set log level verbosity (default: info)
-d, --deamonize Set process to run as a deamon
--parallel_queue_processing <number> Number of simultaneous processing tasks (default: 2)
--cleanup_tasks_after <number> Number of days that elapse before deleting finished and canceled tasks (default: 3)
--cleanup_tasks_after <number> Number of minutes that elapse before deleting finished and canceled tasks (default: 2880)
--test Enable test mode. In test mode, no commands are sent to OpenDroneMap. This can be useful during development or testing (default: false)
--test_skip_orthophotos If test mode is enabled, skip orthophoto results when generating assets. (default: false)
--test_skip_dems If test mode is enabled, skip dems results when generating assets. (default: false)
--powercycle When set, the application exits immediately after powering up. Useful for testing launch and compilation issues.
--token <token> Sets a token that needs to be passed for every request. This can be used to limit access to the node only to token holders. (default: none)
--max_images <number> Specify the maximum number of images that this processing node supports. (default: unlimited)
--callback <url> Specify a callback URL to be invoked when a task completes processing (default: none)
--webhook <url> Specify a POST URL endpoint to be invoked when a task completes processing (default: none)
--s3_endpoint <url> Specify a S3 endpoint (for example, nyc3.digitaloceanspaces.com) to upload completed task results to. (default: do not upload to S3)
--s3_bucket <bucket> Specify a S3 bucket name where to upload completed task results to. (default: none)
--s3_access_key <key> S3 access key, required if --s3_endpoint is set. (default: none)
@ -87,14 +87,14 @@ config.logger.logDirectory = fromConfigFile("logger.logDirectory", ''); // Set t
config.port = parseInt(argv.port || argv.p || fromConfigFile("port", process.env.PORT || 3000));
config.deamon = argv.deamonize || argv.d || fromConfigFile("daemon", false);
config.parallelQueueProcessing = argv.parallel_queue_processing || fromConfigFile("parallelQueueProcessing", 2);
config.cleanupTasksAfter = argv.cleanup_tasks_after || fromConfigFile("cleanupTasksAfter", 3);
config.cleanupTasksAfter = parseInt(argv.cleanup_tasks_after || fromConfigFile("cleanupTasksAfter", 60 * 24 * 2));
config.test = argv.test || fromConfigFile("test", false);
config.testSkipOrthophotos = argv.test_skip_orthophotos || fromConfigFile("testSkipOrthophotos", false);
config.testSkipDems = argv.test_skip_dems || fromConfigFile("testSkipDems", false);
config.powercycle = argv.powercycle || fromConfigFile("powercycle", false);
config.token = argv.token || fromConfigFile("token", "");
config.maxImages = parseInt(argv.max_images || fromConfigFile("maxImages", "")) || null;
config.callback = argv.callback || fromConfigFile("callback", "");
config.webhook = argv.webhook || fromConfigFile("webhook", "");
config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", "");
config.s3Bucket = argv.s3_bucket || fromConfigFile("s3Bucket", "");
config.s3AccessKey = argv.s3_access_key || fromConfigFile("s3AccessKey", process.env.AWS_ACCESS_KEY_ID || "")

Wyświetl plik

@ -112,16 +112,20 @@ module.exports = {
let cbCalled = false;
q.drain = () => {
if (!cbCalled) cb();
cbCalled = true;
if (!cbCalled){
cbCalled = true;
cb();
}
};
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, err => {
if (err){
q.kill();
if (!cbCalled) cb(err);
cbCalled = true;
if (!cbCalled){
cbCalled = true;
cb(err);
}
}
});
}

Wyświetl plik

@ -29,11 +29,8 @@ let schedule = require('node-schedule');
let Directories = require('./Directories');
let request = require('request');
const TASKS_DUMP_FILE = path.join(Directories.data, "tasks.json");
const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * 60 * 24 * config.cleanupTasksAfter; // days
const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * config.cleanupTasksAfter; // minutes
module.exports = class TaskManager{
constructor(done){
@ -153,16 +150,34 @@ module.exports = class TaskManager{
if (task){
this.addToRunningQueue(task);
task.start(() => {
if(task.webhook && task.webhook.length > 3){
request({
uri: task.webhook,
method: 'POST',
json: task.getInfo()
},
function (error, response, body) {
if (error || response.statusCode != 200) logger.warn(`Call to webhook failed: ${task.webhook}`);
});
}
// Hooks can be passed via command line
// or for each individual task
const hooks = [task.webhook, config.webhook];
hooks.forEach(hook => {
if (hook && hook.length > 3){
const notifyCallback = (attempt) => {
if (attempt > 5){
logger.warn(`Callback failed, will not retry: ${hook}`);
return;
}
request.post(hook, {
json: task.getInfo()
},
(error, response) => {
if (error || response.statusCode != 200){
logger.warn(`Callback failed, will retry in a bit: ${hook}`);
setTimeout(() => {
notifyCallback(attempt + 1);
}, attempt * 5000);
}else{
logger.debug(`Callback invoked: ${hook}`);
}
});
};
notifyCallback(0);
}
});
this.removeFromRunningQueue(task);
this.processNextTask();
@ -264,16 +279,16 @@ module.exports = class TaskManager{
});
}
getQueueCount(){
let count = 0;
for (let uuid in this.tasks){
let task = this.tasks[uuid];
getQueueCount(){
let count = 0;
for (let uuid in this.tasks){
let task = this.tasks[uuid];
if ([statusCodes.QUEUED,
statusCodes.RUNNING].indexOf(task.status.code) !== -1){
count++;
}
}
return count;
}
if ([statusCodes.QUEUED,
statusCodes.RUNNING].indexOf(task.status.code) !== -1){
count++;
}
}
return count;
}
};