OpenDroneMap-NodeODM/libs/TaskManager.js

116 wiersze
2.6 KiB
JavaScript

"use strict";
let assert = require('assert');
let Task = require('./Task');
let statusCodes = require('./statusCodes');
let PARALLEL_QUEUE_PROCESS_LIMIT = 2;
module.exports = class TaskManager{
constructor(){
this.tasks = {};
this.runningQueue = [];
}
// Finds the first QUEUED task.
findNextTaskToProcess(){
for (let uuid in this.tasks){
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){
return this.tasks[uuid];
}
}
}
// Finds the next tasks, adds them to the running queue,
// and starts the tasks (up to the limit).
processNextTask(){
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT){
let task = this.findNextTaskToProcess();
if (task){
this.addToRunningQueue(task);
task.start(() => {
this.removeFromRunningQueue(task);
this.processNextTask();
});
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT) this.processNextTask();
}
}else{
// Do nothing
}
}
addToRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue.push(task);
}
removeFromRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue = this.runningQueue.filter(t => {
return t !== task;
});
}
addNew(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.tasks[task.uuid] = task;
this.processNextTask();
}
// Stops the execution of a task
// (without removing it from the system).
cancel(uuid, cb){
let task;
if (task = this.find(uuid, cb)){
if (!task.isCanceled()){
task.cancel(err => {
this.removeFromRunningQueue(task);
this.processNextTask();
cb(err);
});
}else{
cb(null); // Nothing to be done
}
}
}
// Removes a task from the system.
// Before being removed, the task is canceled.
remove(uuid, cb){
this.cancel(uuid, err => {
if (!err){
let task;
if (task = this.find(uuid, cb)){
task.cleanup(err => {
if (!err){
delete(this.tasks[uuid]);
this.processNextTask();
cb(null);
}else cb(err);
});
}else; // cb is called by find on error
}else cb(err);
});
}
// Restarts (puts back into QUEUED state)
// a task that is either in CANCELED or FAILED state.
restart(uuid, cb){
let task;
if (task = this.find(uuid, cb)){
task.restart(err => {
if (!err) this.processNextTask();
cb(err);
});
}
}
// Finds a task by its UUID string.
find(uuid, cb){
let task = this.tasks[uuid];
if (!task && cb) cb(new Error(`${uuid} not found`));
return task;
}
};