From 6b3938d616a260202913dc068f8bdcf8261a49eb Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Fri, 15 Jul 2016 16:19:50 -0500 Subject: [PATCH] Task persistence after shutdown --- index.js | 38 ++++++++++++++++++++++++++---- libs/Task.js | 39 +++++++++++++++++++++++++++---- libs/TaskManager.js | 57 +++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 123 insertions(+), 11 deletions(-) diff --git a/index.js b/index.js index 4388d6e..33d9b9f 100644 --- a/index.js +++ b/index.js @@ -10,8 +10,7 @@ let addRequestId = require('./libs/expressRequestId')(); let multer = require('multer'); let bodyParser = require('body-parser'); let morgan = require('morgan'); - -let taskManager = new (require('./libs/taskManager'))(); +let TaskManager = require('./libs/taskManager'); let Task = require('./libs/Task'); app.use(morgan('tiny')); @@ -112,6 +111,35 @@ app.post('/task/restart', uuidCheck, (req, res) => { taskManager.restart(req.body.uuid, successHandler(res)); }); -app.listen(3000, () => { - console.log('Server has started on port 3000'); -}); \ No newline at end of file +let gracefulShutdown = done => { + async.series([ + cb => { taskManager.dumpTaskList(cb) }, + cb => { + console.log("Closing server"); + server.close(); + console.log("Exiting..."); + process.exit(0); + } + ], done); +}; + +// listen for TERM signal .e.g. kill +process.on ('SIGTERM', gracefulShutdown); + +// listen for INT signal e.g. Ctrl-C +process.on ('SIGINT', gracefulShutdown); + +// Startup +let taskManager; +let server; + +async.series([ + cb => { taskManager = new TaskManager(cb); }, + cb => { server = app.listen(3000, err => { + if (!err) console.log('Server has started on port 3000'); + cb(err); + }); + } +], err => { + if (err) console.log("Error during startup: " + err.message); +}); diff --git a/libs/Task.js b/libs/Task.js index a0544f9..074b4e2 100644 --- a/libs/Task.js +++ b/libs/Task.js @@ -7,9 +7,9 @@ let odmRunner = require('./odmRunner'); let statusCodes = require('./statusCodes'); module.exports = class Task{ - constructor(uuid, name, readyCb){ + constructor(uuid, name, done){ assert(uuid !== undefined, "uuid must be set"); - assert(readyCb !== undefined, "ready must be set"); + assert(done !== undefined, "ready must be set"); this.uuid = uuid; this.name = name != "" ? name : "Task of " + (new Date()).toISOString(); @@ -22,14 +22,33 @@ module.exports = class Task{ // Read images info fs.readdir(this.getImagesFolderPath(), (err, files) => { - if (err) readyCb(err); + if (err) done(err); else{ this.images = files; - readyCb(null, this); + done(null, this); } }); } + static CreateFromSerialized(taskJson, done){ + new Task(taskJson.uuid, taskJson.name, (err, task) => { + if (err) done(err); + else{ + // Override default values with those + // provided in the taskJson + for (let k in taskJson){ + task[k] = taskJson[k]; + } + + // Tasks that were running should be put back to QUEUED state + if (task.status.code === statusCodes.RUNNING){ + task.status.code = statusCodes.QUEUED; + } + done(null, task); + } + }) + } + // Get path where images are stored for this task // (relative to nodejs process CWD) getImagesFolderPath(){ @@ -176,4 +195,16 @@ module.exports = class Task{ getOutput(startFromLine = 0){ return this.output.slice(startFromLine, this.output.length); } + + // Returns the data necessary to serialize this + // task to restore it later. + serialize(){ + return { + uuid: this.uuid, + name: this.name, + dateCreated: this.dateCreated, + status: this.status, + options: this.options + } + } }; \ No newline at end of file diff --git a/libs/TaskManager.js b/libs/TaskManager.js index 02a0493..26ac2fa 100644 --- a/libs/TaskManager.js +++ b/libs/TaskManager.js @@ -1,14 +1,51 @@ "use strict"; let assert = require('assert'); +let fs = require('fs'); let Task = require('./Task'); let statusCodes = require('./statusCodes'); +let async = require('async'); -let PARALLEL_QUEUE_PROCESS_LIMIT = 2; +const PARALLEL_QUEUE_PROCESS_LIMIT = 2; +const TASKS_DUMP_FILE = "data/tasks.json"; module.exports = class TaskManager{ - constructor(){ + constructor(done){ this.tasks = {}; this.runningQueue = []; + + async.series([ + cb => { this.restoreTaskListFromDump(cb); }, + cb => { + this.processNextTask(); + cb(); + } + ], done); + + } + + // Load tasks that already exists (if any) + restoreTaskListFromDump(done){ + fs.readFile(TASKS_DUMP_FILE, (err, data) => { + if (!err){ + let tasks = JSON.parse(data.toString()); + + async.each(tasks, (taskJson, done) => { + Task.CreateFromSerialized(taskJson, (err, task) => { + if (err) done(err); + else{ + this.tasks[task.uuid] = task; + done(); + } + }); + }, err => { + console.log(`Initialized ${tasks.length} tasks`); + if (done !== undefined) done(); + }); + }else{ + console.log("No tasks dump found"); + if (done !== undefined) done(); + } + }); } // Finds the first QUEUED task. @@ -113,4 +150,20 @@ module.exports = class TaskManager{ if (!task && cb) cb(new Error(`${uuid} not found`)); return task; } + + // Serializes the list of tasks and saves it + // to disk + dumpTaskList(done){ + var output = []; + + for (let uuid in this.tasks){ + output.push(this.tasks[uuid].serialize()); + } + + fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => { + if (err) console.log(`Could not dump tasks: ${err.message}`); + else console.log("Dumped tasks list."); + if (done !== undefined) done(); + }) + } }; \ No newline at end of file