2016-07-06 18:44:20 +00:00
|
|
|
"use strict";
|
|
|
|
let assert = require('assert');
|
2016-07-15 21:19:50 +00:00
|
|
|
let fs = require('fs');
|
2016-07-06 18:44:20 +00:00
|
|
|
let Task = require('./Task');
|
2016-07-08 20:44:48 +00:00
|
|
|
let statusCodes = require('./statusCodes');
|
2016-07-15 21:19:50 +00:00
|
|
|
let async = require('async');
|
2016-07-17 23:01:38 +00:00
|
|
|
let schedule = require('node-schedule');
|
2016-07-08 20:44:48 +00:00
|
|
|
|
2016-07-15 21:19:50 +00:00
|
|
|
const PARALLEL_QUEUE_PROCESS_LIMIT = 2;
|
|
|
|
const TASKS_DUMP_FILE = "data/tasks.json";
|
2016-07-17 23:01:38 +00:00
|
|
|
const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * 60 * 24 * 3; // 3 days
|
2016-07-06 18:44:20 +00:00
|
|
|
|
|
|
|
module.exports = class TaskManager{
|
2016-07-15 21:19:50 +00:00
|
|
|
constructor(done){
|
2016-07-06 18:44:20 +00:00
|
|
|
this.tasks = {};
|
2016-07-08 20:44:48 +00:00
|
|
|
this.runningQueue = [];
|
2016-07-15 21:19:50 +00:00
|
|
|
|
|
|
|
async.series([
|
|
|
|
cb => { this.restoreTaskListFromDump(cb); },
|
2016-07-17 23:01:38 +00:00
|
|
|
cb => { this.removeOldTasks(cb); },
|
2016-07-15 21:19:50 +00:00
|
|
|
cb => {
|
|
|
|
this.processNextTask();
|
2016-07-17 23:01:38 +00:00
|
|
|
cb();
|
|
|
|
},
|
|
|
|
cb => {
|
|
|
|
// Every hour
|
|
|
|
schedule.scheduleJob('* 0 * * * *', () => {
|
|
|
|
this.removeOldTasks();
|
|
|
|
this.dumpTaskList();
|
|
|
|
});
|
|
|
|
|
2016-07-15 21:19:50 +00:00
|
|
|
cb();
|
|
|
|
}
|
2016-07-17 23:01:38 +00:00
|
|
|
], done);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Removes old tasks that have either failed, are completed, or
|
|
|
|
// have been canceled.
|
|
|
|
removeOldTasks(done){
|
|
|
|
let list = [];
|
|
|
|
let now = new Date().getTime();
|
|
|
|
console.log("Checking for old tasks to be removed...");
|
|
|
|
|
|
|
|
for (let uuid in this.tasks){
|
|
|
|
let task = this.tasks[uuid];
|
|
|
|
|
|
|
|
if ([statusCodes.FAILED,
|
|
|
|
statusCodes.COMPLETED,
|
|
|
|
statusCodes.CANCELED].indexOf(task.status.code) !== -1 &&
|
|
|
|
now - task.dateCreated > CLEANUP_TASKS_IF_OLDER_THAN){
|
|
|
|
list.push(task.uuid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async.eachSeries(list, (uuid, cb) => {
|
|
|
|
console.log(`Cleaning up old task ${uuid}`)
|
|
|
|
this.remove(uuid, cb);
|
|
|
|
}, done);
|
2016-07-15 21:19:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Load tasks that already exists (if any)
|
|
|
|
restoreTaskListFromDump(done){
|
|
|
|
fs.readFile(TASKS_DUMP_FILE, (err, data) => {
|
|
|
|
if (!err){
|
|
|
|
let tasks = JSON.parse(data.toString());
|
|
|
|
|
|
|
|
async.each(tasks, (taskJson, done) => {
|
|
|
|
Task.CreateFromSerialized(taskJson, (err, task) => {
|
|
|
|
if (err) done(err);
|
|
|
|
else{
|
|
|
|
this.tasks[task.uuid] = task;
|
|
|
|
done();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}, err => {
|
|
|
|
console.log(`Initialized ${tasks.length} tasks`);
|
|
|
|
if (done !== undefined) done();
|
|
|
|
});
|
|
|
|
}else{
|
|
|
|
console.log("No tasks dump found");
|
|
|
|
if (done !== undefined) done();
|
|
|
|
}
|
|
|
|
});
|
2016-07-08 20:44:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Finds the first QUEUED task.
|
|
|
|
findNextTaskToProcess(){
|
|
|
|
for (let uuid in this.tasks){
|
|
|
|
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){
|
|
|
|
return this.tasks[uuid];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Finds the next tasks, adds them to the running queue,
|
|
|
|
// and starts the tasks (up to the limit).
|
|
|
|
processNextTask(){
|
|
|
|
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT){
|
|
|
|
let task = this.findNextTaskToProcess();
|
|
|
|
if (task){
|
|
|
|
this.addToRunningQueue(task);
|
|
|
|
task.start(() => {
|
|
|
|
this.removeFromRunningQueue(task);
|
|
|
|
this.processNextTask();
|
|
|
|
});
|
|
|
|
|
|
|
|
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT) this.processNextTask();
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
// Do nothing
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
addToRunningQueue(task){
|
|
|
|
assert(task.constructor.name === "Task", "Must be a Task object");
|
|
|
|
this.runningQueue.push(task);
|
|
|
|
}
|
|
|
|
|
|
|
|
removeFromRunningQueue(task){
|
|
|
|
assert(task.constructor.name === "Task", "Must be a Task object");
|
|
|
|
|
|
|
|
this.runningQueue = this.runningQueue.filter(t => {
|
|
|
|
return t !== task;
|
|
|
|
});
|
2016-07-06 18:44:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
addNew(task){
|
|
|
|
assert(task.constructor.name === "Task", "Must be a Task object");
|
|
|
|
this.tasks[task.uuid] = task;
|
2016-07-08 20:44:48 +00:00
|
|
|
|
|
|
|
this.processNextTask();
|
2016-07-06 18:44:20 +00:00
|
|
|
}
|
|
|
|
|
2016-07-08 20:44:48 +00:00
|
|
|
// Stops the execution of a task
|
|
|
|
// (without removing it from the system).
|
2016-07-07 22:07:17 +00:00
|
|
|
cancel(uuid, cb){
|
|
|
|
let task;
|
|
|
|
if (task = this.find(uuid, cb)){
|
2016-07-14 21:42:12 +00:00
|
|
|
if (!task.isCanceled()){
|
|
|
|
task.cancel(err => {
|
|
|
|
this.removeFromRunningQueue(task);
|
|
|
|
this.processNextTask();
|
|
|
|
cb(err);
|
|
|
|
});
|
|
|
|
}else{
|
|
|
|
cb(null); // Nothing to be done
|
|
|
|
}
|
2016-07-07 22:07:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-08 20:44:48 +00:00
|
|
|
// Removes a task from the system.
|
|
|
|
// Before being removed, the task is canceled.
|
2016-07-07 22:07:17 +00:00
|
|
|
remove(uuid, cb){
|
|
|
|
this.cancel(uuid, err => {
|
|
|
|
if (!err){
|
2016-07-14 21:42:12 +00:00
|
|
|
let task;
|
|
|
|
if (task = this.find(uuid, cb)){
|
|
|
|
task.cleanup(err => {
|
|
|
|
if (!err){
|
|
|
|
delete(this.tasks[uuid]);
|
|
|
|
this.processNextTask();
|
|
|
|
cb(null);
|
|
|
|
}else cb(err);
|
|
|
|
});
|
|
|
|
}else; // cb is called by find on error
|
2016-07-07 22:07:17 +00:00
|
|
|
}else cb(err);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-07-08 20:44:48 +00:00
|
|
|
// Restarts (puts back into QUEUED state)
|
|
|
|
// a task that is either in CANCELED or FAILED state.
|
2016-07-07 22:07:17 +00:00
|
|
|
restart(uuid, cb){
|
|
|
|
let task;
|
|
|
|
if (task = this.find(uuid, cb)){
|
2016-07-09 16:58:14 +00:00
|
|
|
task.restart(err => {
|
2016-07-14 21:42:12 +00:00
|
|
|
if (!err) this.processNextTask();
|
2016-07-09 16:58:14 +00:00
|
|
|
cb(err);
|
|
|
|
});
|
2016-07-07 22:07:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-08 20:44:48 +00:00
|
|
|
// Finds a task by its UUID string.
|
|
|
|
find(uuid, cb){
|
2016-07-07 22:07:17 +00:00
|
|
|
let task = this.tasks[uuid];
|
2016-07-08 20:44:48 +00:00
|
|
|
if (!task && cb) cb(new Error(`${uuid} not found`));
|
2016-07-07 22:07:17 +00:00
|
|
|
return task;
|
2016-07-06 18:44:20 +00:00
|
|
|
}
|
2016-07-15 21:19:50 +00:00
|
|
|
|
|
|
|
// Serializes the list of tasks and saves it
|
|
|
|
// to disk
|
|
|
|
dumpTaskList(done){
|
|
|
|
var output = [];
|
|
|
|
|
|
|
|
for (let uuid in this.tasks){
|
|
|
|
output.push(this.tasks[uuid].serialize());
|
|
|
|
}
|
|
|
|
|
|
|
|
fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => {
|
|
|
|
if (err) console.log(`Could not dump tasks: ${err.message}`);
|
|
|
|
else console.log("Dumped tasks list.");
|
|
|
|
if (done !== undefined) done();
|
|
|
|
})
|
|
|
|
}
|
2016-07-06 18:44:20 +00:00
|
|
|
};
|