Task processing, UI improvements, OpenDroneMap spawning (still need to fix things)

pull/1/head
Piero Toffanin 2016-07-08 15:44:48 -05:00
rodzic c0e239d709
commit 7a4bee53b9
11 zmienionych plików z 249 dodań i 42 usunięć

Wyświetl plik

@ -4,8 +4,9 @@ MAINTAINER Piero Toffanin <pt@masseranolabs.com>
EXPOSE 3000
USER root
RUN curl --silent --location https://deb.nodesource.com/setup_5.x | sudo bash -
RUN curl --silent --location https://deb.nodesource.com/setup_6.x | sudo bash -
RUN apt-get install -y nodejs
RUN npm install -g nodemon
RUN mkdir /var/www
RUN chown odm:odm /var/www

Wyświetl plik

@ -1,16 +1,20 @@
"use strict";
let fs = require('fs');
let async = require('async');
let express = require('express');
let app = express();
let addRequestId = require('./libs/express-request-id')();
let addRequestId = require('./libs/expressRequestId')();
let multer = require('multer');
let bodyParser = require('body-parser');
let morgan = require('morgan');
let taskManager = new (require('./libs/taskManager'))();
let Task = require('./libs/Task');
app.use(morgan('tiny'));
app.use(bodyParser.urlencoded({extended: true}));
app.use(bodyParser.json());
app.use(express.static('public'));
@ -34,34 +38,54 @@ let upload = multer({
}
})
});
app.post('/newTask', addRequestId, upload.array('images'), (req, res) => {
if (req.files.length === 0) res.json({error: "Need at least 1 file."});
else{
console.log(`Received ${req.files.length} files`);
// Move to data
fs.rename(`tmp/${req.id}`, `data/${req.id}`, err => {
if (!err){
new Task(req.id, req.body.name, (err, task) => {
if (err) res.json({error: err.message});
else{
taskManager.addNew(task);
res.json({uuid: req.id, success: true});
async.series([
cb => {
fs.stat(`data/${req.id}`, (err, stat) => {
if (err && err.code === 'ENOENT') cb();
else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
});
},
cb => { fs.mkdir(`data/${req.id}`, undefined, cb); },
cb => {
fs.rename(`tmp/${req.id}`, `data/${req.id}/images`, err => {
if (!err){
new Task(req.id, req.body.name, (err, task) => {
if (err) cb(err);
else{
taskManager.addNew(task);
res.json({uuid: req.id, success: true});
cb();
}
});
}else{
cb(new Error("Could not move images folder."))
}
});
}else{
res.json({error: "Could not move images folder."});
}
], err => {
if (err) res.json({error: err.message})
});
}
});
app.get('/taskInfo/:uuid', (req, res) => {
let getTaskFromUuid = (req, res, next) => {
let task = taskManager.find(req.params.uuid);
if (task){
res.json(task.getInfo());
req.task = task;
next();
}else res.json({error: `${req.params.uuid} not found`});
}
app.get('/taskInfo/:uuid', getTaskFromUuid, (req, res) => {
res.json(req.task.getInfo());
});
app.get('/taskOutput/:uuid', getTaskFromUuid, (req, res) => {
res.json(req.task.getOutput());
});
let uuidCheck = (req, res, next) => {
@ -89,5 +113,5 @@ app.post('/restartTask', uuidCheck, (req, res) => {
});
app.listen(3000, () => {
console.log('Example app listening on port 3000!');
console.log('Server has started on port 3000');
});

Wyświetl plik

@ -1,14 +1,9 @@
"use strict";
let assert = require('assert');
let fs = require('fs');
let odmRunner = require('./odmRunner');
let statusCodes = {
QUEUED: 10,
RUNNING: 20,
FAILED: 30,
COMPLETED: 40,
CANCELED: 50
};
let statusCodes = require('./statusCodes');
module.exports = class Task{
constructor(uuid, name, readyCb){
@ -22,9 +17,11 @@ module.exports = class Task{
code: statusCodes.QUEUED
};
this.options = {};
this.output = [];
this.runnerProcess = null;
// Read images info
fs.readdir(`data/${this.uuid}`, (err, files) => {
fs.readdir(this.getImagesFolderPath(), (err, files) => {
if (err) readyCb(err);
else{
this.images = files;
@ -33,9 +30,29 @@ module.exports = class Task{
});
}
// Get path where images are stored for this task
// (relative to nodejs process CWD)
getImagesFolderPath(){
return `data/${this.uuid}/images`;
}
setStatus(code, extra){
this.status = {
code: code
};
for (var k in extra){
this.status[k] = extra[k];
}
}
getStatus(){
return this.status.code;
}
// Cancels the current task (unless it's already canceled)
cancel(cb){
if (this.status.code !== statusCodes.CANCELED){
this.status.code = statusCodes.CANCELED;
this.setStatus(statusCodes.CANCELED);
console.log("Requested to cancel " + this.name);
// TODO
@ -45,9 +62,39 @@ module.exports = class Task{
}
}
// Starts processing the task with OpenDroneMap
// This will spawn a new process.
start(done){
if (this.status.code === statusCodes.QUEUED){
this.setStatus(statusCodes.RUNNING);
this.runnerProcess = odmRunner.run({
projectPath: `${__dirname}/../${this.getImagesFolderPath()}`
}, (err, code, signal) => {
if (err){
this.setStatus(statusCodes.FAILED, {errorMessage: `Could not start process (${err.message})`});
}else{
if (code === 0){
this.setStatus(statusCodes.COMPLETED);
}else{
this.setStatus(statusCodes.FAILED, {errorMessage: `Process exited with code ${code}`});
}
}
done();
}, output => {
this.output.push(output);
});
return true;
}else{
return false;
}
}
// Re-executes the task (by setting it's state back to QUEUED)
// Only tasks that have been canceled or have failed can be restarted.
restart(cb){
if (this.status.code === statusCodes.CANCELED){
this.status.code = statusCodes.QUEUED;
if (this.status.code === statusCodes.CANCELED || this.status.code === statusCodes.FAILED){
this.setStatus(statusCodes.QUEUED);
console.log("Requested to restart " + this.name);
// TODO
@ -58,6 +105,7 @@ module.exports = class Task{
}
}
// Returns the description of the task.
getInfo(){
return {
uuid: this.uuid,
@ -68,4 +116,11 @@ module.exports = class Task{
imagesCount: this.images.length
}
}
// Returns the output of the OpenDroneMap process
// Optionally starting from a certain line number
getOutput(startFromLine = 0){
let lineNum = Math.min(this.output.length, startFromLine);
return this.output.slice(lineNum, this.output.length);
}
};

Wyświetl plik

@ -1,24 +1,80 @@
"use strict";
let assert = require('assert');
let Task = require('./Task');
let statusCodes = require('./statusCodes');
let PARALLEL_QUEUE_PROCESS_LIMIT = 1;
module.exports = class TaskManager{
constructor(){
this.tasks = {};
this.runningQueue = [];
}
// Finds the first QUEUED task.
findNextTaskToProcess(){
for (let uuid in this.tasks){
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){
return this.tasks[uuid];
}
}
}
// Finds the next tasks, adds them to the running queue,
// and starts the tasks (up to the limit).
processNextTask(){
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT){
let task = this.findNextTaskToProcess();
if (task){
this.addToRunningQueue(task);
task.start(() => {
this.removeFromRunningQueue(task);
this.processNextTask();
});
if (this.runningQueue.length < PARALLEL_QUEUE_PROCESS_LIMIT) this.processNextTask();
}
}else{
// Do nothing
}
}
addToRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue.push(task);
}
removeFromRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue = this.runningQueue.filter(t => {
return t !== task;
});
console.log("New queue length: " + this.runningQueue.length);
}
addNew(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.tasks[task.uuid] = task;
this.processNextTask();
}
// Stops the execution of a task
// (without removing it from the system).
cancel(uuid, cb){
let task;
if (task = this.find(uuid, cb)){
task.cancel(cb);
task.cancel(err => {
this.removeFromRunningQueue(task);
cb(err);
});
}
}
// Removes a task from the system.
// Before being removed, the task is canceled.
remove(uuid, cb){
this.cancel(uuid, err => {
if (!err){
@ -29,6 +85,8 @@ module.exports = class TaskManager{
});
}
// Restarts (puts back into QUEUED state)
// a task that is either in CANCELED or FAILED state.
restart(uuid, cb){
let task;
if (task = this.find(uuid, cb)){
@ -36,9 +94,10 @@ module.exports = class TaskManager{
}
}
find(uuid, errCb){
// Finds a task by its UUID string.
find(uuid, cb){
let task = this.tasks[uuid];
if (!task && errCb) errCb(new Error(`${uuid} not found`));
if (!task && cb) cb(new Error(`${uuid} not found`));
return task;
}
};

31
libs/odmRunner.js 100644
Wyświetl plik

@ -0,0 +1,31 @@
"use strict";
let spawn = require('child_process').spawn;
const ODM_PATH = "/code";
module.exports = {
run: function(options = {
projectPath: "/images"
}, done, outputReceived){
// Launch
let childProcess = spawn("python", [`${ODM_PATH}/run.py`,
"--project-path", options.projectPath
], {cwd: ODM_PATH});
childProcess
.on('exit', (code, signal) => {
done(null, code, signal);
})
.on('error', done);
childProcess.stdout.on('data', chunk => {
outputReceived(chunk.toString());
});
childProcess.stderr.on('data', chunk => {
outputReceived(chunk.toString());
});
return childProcess;
}
};

Wyświetl plik

@ -0,0 +1,8 @@
"use strict";
module.exports = {
QUEUED: 10,
RUNNING: 20,
FAILED: 30,
COMPLETED: 40,
CANCELED: 50
};

Wyświetl plik

@ -20,7 +20,10 @@
},
"homepage": "https://github.com/pierotofy/node-OpenDroneMap#readme",
"dependencies": {
"async": "^2.0.0-rc.6",
"body-parser": "^1.15.2",
"express": "^4.14.0",
"morgan": "^1.7.0",
"multer": "^1.1.0",
"node-uuid": "^1.4.7"
},

Wyświetl plik

@ -16,24 +16,36 @@
}
.spinning{
-webkit-animation: spin 3s infinite linear;
-moz-animation: spin 3s infinite linear;
animation: spin 3s infinite linear;
}
@-moz-keyframes spin {
from { -moz-transform: rotate(0deg); }
to { -moz-transform: rotate(360deg); }
}
@-webkit-keyframes spin {
from { -webkit-transform: rotate(0deg); }
to { -webkit-transform: rotate(360deg); }
}
@keyframes spin {
from {transform:rotate(0deg);}
to {transform:rotate(360deg);}
}
.pulsePositive {
animation: pulsatePositive 2s ease-out;
animation-iteration-count: 1;
background-color: #fff;
}
@keyframes pulsatePositive {
0% {background-color: #fff;}
50% {background-color: lightgreen;}
100% {background-color: #fff;}
}
.pulseNegative {
animation: pulsateNegative 2s ease-out;
animation-iteration-count: 1;
background-color: #fff;
}
@keyframes pulsateNegative {
0% {background-color: #fff;}
50% {background-color: lightred;}
100% {background-color: #fff;}
}
.task .actionButtons{
text-align: right;
}

Wyświetl plik

@ -58,7 +58,7 @@
<h2>Current Tasks (<span data-bind="text: tasks().length"></span>)</h2>
<p data-bind="visible: tasks().length === 0">No running tasks.</p>
<div data-bind="foreach: tasks">
<div class="task">
<div class="task" data-bind="css: {pulsePositive: info().status && info().status.code === 40, pulseNegative: info().status && info().status.code === 30}">
<p data-bind="visible: loading()">Retrieving <span data-bind="text: uuid"></span> ... <span class="glyphicon glyphicon-refresh spinning"></span></p>
<div data-bind="visible: !loading() && !info().error">
<div class="taskItem"><strong>Name:</strong> <span data-bind="text: info().name"></span></div>

Wyświetl plik

@ -25,6 +25,8 @@ $(function(){
};
function Task(uuid){
var self = this;
this.uuid = uuid;
this.loading = ko.observable(true);
this.info = ko.observable({});
@ -72,6 +74,9 @@ $(function(){
}, this);
this.refreshInfo();
this.refreshInterval = setInterval(function(){
self.refreshInfo();
}, 2000);
}
Task.prototype.refreshInfo = function(){
var self = this;
@ -96,6 +101,11 @@ $(function(){
}else{
self.info({error: json.error});
}
if (self.refreshInterval){
clearInterval(self.refreshInterval);
self.refreshInterval = null;
}
})
.fail(function(){
self.info({error: url + " is unreachable."});
@ -106,6 +116,10 @@ $(function(){
return function(){
var self = this;
// TODO: maybe there's a better way
// to handle refreshInfo here...
$.post(url, {
uuid: this.uuid
})