Modify: Dockerfile for local build

Add: Webhook callback option
Modify: .gitignore for vscode
pull/6/head^2
Lee Pepper 2017-04-07 10:55:24 -06:00
rodzic 9725cba1a2
commit 93d59292dd
8 zmienionych plików z 1320 dodań i 1206 usunięć

3
.dockerignore 100644
Wyświetl plik

@ -0,0 +1,3 @@
node_modules
tests
tmp

2
.gitignore vendored
Wyświetl plik

@ -39,3 +39,5 @@ jspm_packages
# Elastic Beanstalk
.elasticbeanstalk
.vscode

Wyświetl plik

@ -30,8 +30,13 @@ RUN cd /staging/PotreeConverter && \
RUN mkdir /var/www
WORKDIR "/var/www"
RUN git clone https://github.com/OpenDroneMap/node-OpenDroneMap .
# RUN git clone https://github.com/OpenDroneMap/node-OpenDroneMap .
COPY . /var/www
RUN npm install
RUN mkdir tmp
# Fix old version of gdal2tiles.py
# RUN (cd / && patch -p0) <patches/gdal2tiles.patch

833
index.js
Wyświetl plik

@ -40,453 +40,455 @@ let odmOptions = require('./libs/odmOptions');
let Directories = require('./libs/Directories');
let winstonStream = {
write: function(message, encoding){
logger.debug(message.slice(0, -1));
write: function(message, encoding) {
logger.debug(message.slice(0, -1));
}
};
app.use(morgan('combined', { stream : winstonStream }));
app.use(bodyParser.urlencoded({extended: true}));
app.use(morgan('combined', { stream: winstonStream }));
app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json());
app.use(express.static('public'));
app.use('/swagger.json', express.static('docs/swagger.json'));
let upload = multer({
storage: multer.diskStorage({
destination: (req, file, cb) => {
let dstPath = path.join("tmp", req.id);
fs.exists(dstPath, exists => {
if (!exists){
fs.mkdir(dstPath, undefined, () => {
cb(null, dstPath);
});
}else{
cb(null, dstPath);
}
});
},
filename: (req, file, cb) => {
cb(null, file.originalname);
}
})
storage: multer.diskStorage({
destination: (req, file, cb) => {
let dstPath = path.join("tmp", req.id);
fs.exists(dstPath, exists => {
if (!exists) {
fs.mkdir(dstPath, undefined, () => {
cb(null, dstPath);
});
} else {
cb(null, dstPath);
}
});
},
filename: (req, file, cb) => {
cb(null, file.originalname);
}
})
});
let taskManager;
let server;
/** @swagger
* /task/new:
* post:
* description: Creates a new task and places it at the end of the processing queue
* tags: [task]
* consumes:
* - multipart/form-data
* parameters:
* -
* name: images
* in: formData
* description: Images to process, plus an optional GPC file. If included, the GPC file should have .txt extension
* required: true
* type: file
* -
* name: name
* in: formData
* description: An optional name to be associated with the task
* required: false
* type: string
* -
* name: options
* in: formData
* description: 'Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, ...]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options'
* required: false
* type: string
* responses:
* 200:
* description: Success
* schema:
* type: object
* required: [uuid]
* properties:
* uuid:
* type: string
* description: UUID of the newly created task
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
* /task/new:
* post:
* description: Creates a new task and places it at the end of the processing queue
* tags: [task]
* consumes:
* - multipart/form-data
* parameters:
* -
* name: images
* in: formData
* description: Images to process, plus an optional GPC file. If included, the GPC file should have .txt extension
* required: true
* type: file
* -
* name: name
* in: formData
* description: An optional name to be associated with the task
* required: false
* type: string
* -
* name: options
* in: formData
* description: 'Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, ...]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options'
* required: false
* type: string
* responses:
* 200:
* description: Success
* schema:
* type: object
* required: [uuid]
* properties:
* uuid:
* type: string
* description: UUID of the newly created task
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
app.post('/task/new', addRequestId, upload.array('images'), (req, res) => {
if (!req.files || req.files.length === 0) res.json({error: "Need at least 1 file."});
else{
let srcPath = path.join("tmp", req.id);
let destPath = path.join(Directories.data, req.id);
let destImagesPath = path.join(destPath, "images");
let destGpcPath = path.join(destPath, "gpc");
if (!req.files || req.files.length === 0) res.json({ error: "Need at least 1 file." });
else {
let srcPath = path.join("tmp", req.id);
let destPath = path.join(Directories.data, req.id);
let destImagesPath = path.join(destPath, "images");
let destGpcPath = path.join(destPath, "gpc");
async.series([
cb => {
odmOptions.filterOptions(req.body.options, (err, options) => {
if (err) cb(err);
else{
req.body.options = options;
cb(null);
}
});
},
async.series([
cb => {
odmOptions.filterOptions(req.body.options, (err, options) => {
if (err) cb(err);
else {
req.body.options = options;
cb(null);
}
});
},
// Move all uploads to data/<uuid>/images dir
cb => {
fs.stat(destPath, (err, stat) => {
if (err && err.code === 'ENOENT') cb();
else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
});
},
cb => fs.mkdir(destPath, undefined, cb),
cb => fs.mkdir(destGpcPath, undefined, cb),
cb => fs.rename(srcPath, destImagesPath, cb),
cb => {
// Find any *.txt (GPC) file and move it to the data/<uuid>/gpc directory
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else{
async.eachSeries(entries, (entry, cb) => {
if (/\.txt$/gi.test(entry)){
fs.rename(path.join(destImagesPath, entry), path.join(destGpcPath, entry), cb);
}else cb();
}, cb);
}
});
},
// Move all uploads to data/<uuid>/images dir
cb => {
setTimeout(function() {
fs.stat(destPath, (err, stat) => {
if (err && err.code === 'ENOENT') cb();
else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
});
}, 300);
},
cb => fs.mkdir(destPath, undefined, cb),
cb => fs.mkdir(destGpcPath, undefined, cb),
cb => fs.rename(srcPath, destImagesPath, cb),
cb => {
// Find any *.txt (GPC) file and move it to the data/<uuid>/gpc directory
fs.readdir(destImagesPath, (err, entries) => {
if (err) cb(err);
else {
async.eachSeries(entries, (entry, cb) => {
if (/\.txt$/gi.test(entry)) {
fs.rename(path.join(destImagesPath, entry), path.join(destGpcPath, entry), cb);
} else cb();
}, cb);
}
});
},
// Create task
cb => {
new Task(req.id, req.body.name, (err, task) => {
if (err) cb(err);
else{
taskManager.addNew(task);
res.json({uuid: req.id});
cb();
}
}, req.body.options);
}
], err => {
if (err) res.json({error: err.message});
});
}
// Create task
cb => {
new Task(req.id, req.body.name, (err, task) => {
if (err) cb(err);
else {
taskManager.addNew(task);
res.json({ uuid: req.id });
cb();
}
}, req.body.options);
}
], err => {
if (err) res.json({ error: err.message });
});
}
});
let getTaskFromUuid = (req, res, next) => {
let task = taskManager.find(req.params.uuid);
if (task){
req.task = task;
next();
}else res.json({error: `${req.params.uuid} not found`});
let task = taskManager.find(req.params.uuid);
if (task) {
req.task = task;
next();
} else res.json({ error: `${req.params.uuid} not found` });
};
/** @swagger
* /task/{uuid}/info:
* get:
* description: Gets information about this task, such as name, creation date, processing time, status, command line options and number of images being processed. See schema definition for a full list.
* tags: [task]
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* responses:
* 200:
* description: Task Information
* schema:
* title: TaskInfo
* type: object
* required: [uuid, name, dateCreated, processingTime, status, options, imagesCount]
* properties:
* uuid:
* type: string
* description: UUID
* name:
* type: string
* description: Name
* dateCreated:
* type: integer
* description: Timestamp
* processingTime:
* type: integer
* description: Milliseconds that have elapsed since the task started being processed.
* status:
* type: integer
* description: Status code (10 = QUEUED, 20 = RUNNING, 30 = FAILED, 40 = COMPLETED, 50 = CANCELED)
* enum: [10, 20, 30, 40, 50]
* options:
* type: array
* description: List of options used to process this task
* items:
* type: object
* required: [name, value]
* properties:
* name:
* type: string
* description: 'Option name (example: "odm_meshing-octreeDepth")'
* value:
* type: string
* description: 'Value (example: 9)'
* imagesCount:
* type: integer
* description: Number of images
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
* /task/{uuid}/info:
* get:
* description: Gets information about this task, such as name, creation date, processing time, status, command line options and number of images being processed. See schema definition for a full list.
* tags: [task]
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* responses:
* 200:
* description: Task Information
* schema:
* title: TaskInfo
* type: object
* required: [uuid, name, dateCreated, processingTime, status, options, imagesCount]
* properties:
* uuid:
* type: string
* description: UUID
* name:
* type: string
* description: Name
* dateCreated:
* type: integer
* description: Timestamp
* processingTime:
* type: integer
* description: Milliseconds that have elapsed since the task started being processed.
* status:
* type: integer
* description: Status code (10 = QUEUED, 20 = RUNNING, 30 = FAILED, 40 = COMPLETED, 50 = CANCELED)
* enum: [10, 20, 30, 40, 50]
* options:
* type: array
* description: List of options used to process this task
* items:
* type: object
* required: [name, value]
* properties:
* name:
* type: string
* description: 'Option name (example: "odm_meshing-octreeDepth")'
* value:
* type: string
* description: 'Value (example: 9)'
* imagesCount:
* type: integer
* description: Number of images
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
app.get('/task/:uuid/info', getTaskFromUuid, (req, res) => {
res.json(req.task.getInfo());
res.json(req.task.getInfo());
});
/** @swagger
* /task/{uuid}/output:
* get:
* description: Retrieves the console output of the OpenDroneMap's process. Useful for monitoring execution and to provide updates to the user.
* tags: [task]
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* -
* name: line
* in: query
* description: Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Defaults to 0 (retrieve all console output).
* default: 0
* required: false
* type: integer
* responses:
* 200:
* description: Console Output
* schema:
* type: string
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
* /task/{uuid}/output:
* get:
* description: Retrieves the console output of the OpenDroneMap's process. Useful for monitoring execution and to provide updates to the user.
* tags: [task]
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* -
* name: line
* in: query
* description: Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Defaults to 0 (retrieve all console output).
* default: 0
* required: false
* type: integer
* responses:
* 200:
* description: Console Output
* schema:
* type: string
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
app.get('/task/:uuid/output', getTaskFromUuid, (req, res) => {
res.json(req.task.getOutput(req.query.line));
res.json(req.task.getOutput(req.query.line));
});
/** @swagger
* /task/{uuid}/download/{asset}:
* get:
* description: Retrieves an asset (the output of OpenDroneMap's processing) associated with a task
* tags: [task]
* produces: [application/zip]
* parameters:
* - name: uuid
* in: path
* type: string
* description: UUID of the task
* required: true
* - name: asset
* in: path
* type: string
* description: Type of asset to download. Use "all.zip" for zip file containing all assets.
* required: true
* enum:
* - all.zip
* - orthophoto.tif
* responses:
* 200:
* description: Asset File
* schema:
* type: file
* default:
* description: Error message
* schema:
* $ref: '#/definitions/Error'
*/
* /task/{uuid}/download/{asset}:
* get:
* description: Retrieves an asset (the output of OpenDroneMap's processing) associated with a task
* tags: [task]
* produces: [application/zip]
* parameters:
* - name: uuid
* in: path
* type: string
* description: UUID of the task
* required: true
* - name: asset
* in: path
* type: string
* description: Type of asset to download. Use "all.zip" for zip file containing all assets.
* required: true
* enum:
* - all.zip
* - orthophoto.tif
* responses:
* 200:
* description: Asset File
* schema:
* type: file
* default:
* description: Error message
* schema:
* $ref: '#/definitions/Error'
*/
app.get('/task/:uuid/download/:asset', getTaskFromUuid, (req, res) => {
let asset = req.params.asset !== undefined ? req.params.asset : "all.zip";
let filePath = req.task.getAssetsArchivePath(asset);
if (filePath){
if (fs.existsSync(filePath)){
res.setHeader('Content-Disposition', `attachment; filename=${asset}`);
res.setHeader('Content-Type', mime.lookup(asset));
res.setHeader('Content-Length', fs.statSync(filePath)["size"]);
let asset = req.params.asset !== undefined ? req.params.asset : "all.zip";
let filePath = req.task.getAssetsArchivePath(asset);
if (filePath) {
if (fs.existsSync(filePath)) {
res.setHeader('Content-Disposition', `attachment; filename=${asset}`);
res.setHeader('Content-Type', mime.lookup(asset));
res.setHeader('Content-Length', fs.statSync(filePath)["size"]);
const filestream = fs.createReadStream(filePath);
filestream.pipe(res);
}else{
res.json({error: "Asset not ready"});
}
}else{
res.json({error: "Invalid asset"});
}
const filestream = fs.createReadStream(filePath);
filestream.pipe(res);
} else {
res.json({ error: "Asset not ready" });
}
} else {
res.json({ error: "Invalid asset" });
}
});
/** @swagger
* definition:
* Error:
* type: object
* required:
* - error
* properties:
* error:
* type: string
* description: Description of the error
* Response:
* type: object
* required:
* - success
* properties:
* success:
* type: boolean
* description: true if the command succeeded, false otherwise
* error:
* type: string
* description: Error message if an error occured
*/
* definition:
* Error:
* type: object
* required:
* - error
* properties:
* error:
* type: string
* description: Description of the error
* Response:
* type: object
* required:
* - success
* properties:
* success:
* type: boolean
* description: true if the command succeeded, false otherwise
* error:
* type: string
* description: Error message if an error occured
*/
let uuidCheck = (req, res, next) => {
if (!req.body.uuid) res.json({error: "uuid param missing."});
if (!req.body.uuid) res.json({ error: "uuid param missing." });
else next();
};
let successHandler = res => {
return err => {
if (!err) res.json({success: true});
else res.json({success: false, error: err.message});
};
return err => {
if (!err) res.json({ success: true });
else res.json({ success: false, error: err.message });
};
};
/** @swagger
* /task/cancel:
* post:
* description: Cancels a task (stops its execution, or prevents it from being executed)
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
* /task/cancel:
* post:
* description: Cancels a task (stops its execution, or prevents it from being executed)
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
app.post('/task/cancel', uuidCheck, (req, res) => {
taskManager.cancel(req.body.uuid, successHandler(res));
taskManager.cancel(req.body.uuid, successHandler(res));
});
/** @swagger
* /task/remove:
* post:
* description: Removes a task and deletes all of its assets
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
* /task/remove:
* post:
* description: Removes a task and deletes all of its assets
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
app.post('/task/remove', uuidCheck, (req, res) => {
taskManager.remove(req.body.uuid, successHandler(res));
taskManager.remove(req.body.uuid, successHandler(res));
});
/** @swagger
* /task/restart:
* post:
* description: Restarts a task that was previously canceled or that had failed to process
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
* /task/restart:
* post:
* description: Restarts a task that was previously canceled or that had failed to process
* parameters:
* -
* name: uuid
* in: body
* description: UUID of the task
* required: true
* schema:
* type: string
* responses:
* 200:
* description: Command Received
* schema:
* $ref: "#/definitions/Response"
*/
app.post('/task/restart', uuidCheck, (req, res) => {
taskManager.restart(req.body.uuid, successHandler(res));
taskManager.restart(req.body.uuid, successHandler(res));
});
/** @swagger
* /options:
* get:
* description: Retrieves the command line options that can be passed to process a task
* tags: [server]
* responses:
* 200:
* description: Options
* schema:
* type: array
* items:
* title: Option
* type: object
* required: [name, type, value, domain, help]
* properties:
* name:
* type: string
* description: Command line option (exactly as it is passed to the OpenDroneMap process, minus the leading '--')
* type:
* type: string
* description: Datatype of the value of this option
* enum:
* - int
* - float
* - string
* - bool
* value:
* type: string
* description: Default value of this option
* domain:
* type: string
* description: Valid range of values (for example, "positive integer" or "float > 0.0")
* help:
* type: string
* description: Description of what this option does
*/
* /options:
* get:
* description: Retrieves the command line options that can be passed to process a task
* tags: [server]
* responses:
* 200:
* description: Options
* schema:
* type: array
* items:
* title: Option
* type: object
* required: [name, type, value, domain, help]
* properties:
* name:
* type: string
* description: Command line option (exactly as it is passed to the OpenDroneMap process, minus the leading '--')
* type:
* type: string
* description: Datatype of the value of this option
* enum:
* - int
* - float
* - string
* - bool
* value:
* type: string
* description: Default value of this option
* domain:
* type: string
* description: Valid range of values (for example, "positive integer" or "float > 0.0")
* help:
* type: string
* description: Description of what this option does
*/
app.get('/options', (req, res) => {
odmOptions.getOptions((err, options) => {
if (err) res.json({error: err.message});
else res.json(options);
});
odmOptions.getOptions((err, options) => {
if (err) res.json({ error: err.message });
else res.json(options);
});
});
/** @swagger
* /info:
* get:
* description: Retrieves information about this node
* tags: [server]
* responses:
* 200:
* description: Info
* schema:
* type: object
* required: [version, taskQueueCount]
* properties:
* version:
* type: string
* description: Current version
* taskQueueCount:
* type: integer
* description: Number of tasks currently being processed or waiting to be processed
*/
* /info:
* get:
* description: Retrieves information about this node
* tags: [server]
* responses:
* 200:
* description: Info
* schema:
* type: object
* required: [version, taskQueueCount]
* properties:
* version:
* type: string
* description: Current version
* taskQueueCount:
* type: integer
* description: Number of tasks currently being processed or waiting to be processed
*/
app.get('/info', (req, res) => {
res.json({
version: packageJson.version,
@ -495,46 +497,47 @@ app.get('/info', (req, res) => {
});
let gracefulShutdown = done => {
async.series([
cb => taskManager.dumpTaskList(cb),
cb => {
logger.info("Closing server");
server.close();
logger.info("Exiting...");
process.exit(0);
}
], done);
async.series([
cb => taskManager.dumpTaskList(cb),
cb => {
logger.info("Closing server");
server.close();
logger.info("Exiting...");
process.exit(0);
}
], done);
};
// listen for TERM signal .e.g. kill
process.on ('SIGTERM', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);
// listen for INT signal e.g. Ctrl-C
process.on ('SIGINT', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
// Startup
if (config.test) logger.info("Running in test mode");
let commands = [
cb => odmOptions.initialize(cb),
cb => { taskManager = new TaskManager(cb); },
cb => { server = app.listen(config.port, err => {
if (!err) logger.info('Server has started on port ' + String(config.port));
cb(err);
});
}
cb => odmOptions.initialize(cb),
cb => { taskManager = new TaskManager(cb); },
cb => {
server = app.listen(config.port, err => {
if (!err) logger.info('Server has started on port ' + String(config.port));
cb(err);
});
}
];
if (config.powercycle){
commands.push(cb => {
logger.info("Power cycling is set, application will shut down...");
process.exit(0);
});
if (config.powercycle) {
commands.push(cb => {
logger.info("Power cycling is set, application will shut down...");
process.exit(0);
});
}
async.series(commands, err => {
if (err){
logger.error("Error during startup: " + err.message);
process.exit(1);
}
});
if (err) {
logger.error("Error during startup: " + err.message);
process.exit(1);
}
});

Wyświetl plik

@ -33,449 +33,453 @@ let Directories = require('./Directories');
let statusCodes = require('./statusCodes');
module.exports = class Task{
constructor(uuid, name, done, options = []){
assert(uuid !== undefined, "uuid must be set");
assert(done !== undefined, "ready must be set");
module.exports = class Task {
constructor(uuid, name, done, options = []) {
assert(uuid !== undefined, "uuid must be set");
assert(done !== undefined, "ready must be set");
this.uuid = uuid;
this.name = name !== "" ? name : "Task of " + (new Date()).toISOString();
this.dateCreated = new Date().getTime();
this.processingTime = -1;
this.setStatus(statusCodes.QUEUED);
this.options = options;
this.gpcFiles = [];
this.output = [];
this.runningProcesses = [];
this.uuid = uuid;
this.name = name !== "" ? name : "Task of " + (new Date()).toISOString();
this.dateCreated = new Date().getTime();
this.processingTime = -1;
this.setStatus(statusCodes.QUEUED);
this.options = options;
this.gpcFiles = [];
this.output = [];
this.runningProcesses = [];
async.series([
// Read images info
cb => {
fs.readdir(this.getImagesFolderPath(), (err, files) => {
if (err) cb(err);
else{
this.images = files;
logger.debug(`Found ${this.images.length} images for ${this.uuid}`);
cb(null);
}
});
},
async.series([
// Read images info
cb => {
fs.readdir(this.getImagesFolderPath(), (err, files) => {
if (err) cb(err);
else {
this.images = files;
logger.debug(`Found ${this.images.length} images for ${this.uuid}`);
cb(null);
}
});
},
// Find GCP (if any)
cb => {
fs.readdir(this.getGpcFolderPath(), (err, files) => {
if (err) cb(err);
else{
files.forEach(file => {
if (/\.txt$/gi.test(file)){
this.gpcFiles.push(file);
}
});
logger.debug(`Found ${this.gpcFiles.length} GPC files (${this.gpcFiles.join(" ")}) for ${this.uuid}`);
cb(null);
}
});
}
], err => {
done(err, this);
});
}
// Find GCP (if any)
cb => {
fs.readdir(this.getGpcFolderPath(), (err, files) => {
if (err) cb(err);
else {
files.forEach(file => {
if (/\.txt$/gi.test(file)) {
this.gpcFiles.push(file);
}
});
logger.debug(`Found ${this.gpcFiles.length} GPC files (${this.gpcFiles.join(" ")}) for ${this.uuid}`);
cb(null);
}
});
}
], err => {
done(err, this);
});
}
static CreateFromSerialized(taskJson, done){
new Task(taskJson.uuid, taskJson.name, (err, task) => {
if (err) done(err);
else{
// Override default values with those
// provided in the taskJson
for (let k in taskJson){
task[k] = taskJson[k];
}
static CreateFromSerialized(taskJson, done) {
new Task(taskJson.uuid, taskJson.name, (err, task) => {
if (err) done(err);
else {
// Override default values with those
// provided in the taskJson
for (let k in taskJson) {
task[k] = taskJson[k];
}
// Tasks that were running should be put back to QUEUED state
if (task.status.code === statusCodes.RUNNING){
task.status.code = statusCodes.QUEUED;
}
done(null, task);
}
}, taskJson.options);
}
// Tasks that were running should be put back to QUEUED state
if (task.status.code === statusCodes.RUNNING) {
task.status.code = statusCodes.QUEUED;
}
done(null, task);
}
}, taskJson.options);
}
// Get path where images are stored for this task
// (relative to nodejs process CWD)
getImagesFolderPath(){
return path.join(this.getProjectFolderPath(), "images");
}
// Get path where images are stored for this task
// (relative to nodejs process CWD)
getImagesFolderPath() {
return path.join(this.getProjectFolderPath(), "images");
}
// Get path where GPC file(s) are stored
// (relative to nodejs process CWD)
getGpcFolderPath(){
return path.join(this.getProjectFolderPath(), "gpc");
}
// Get path where GPC file(s) are stored
// (relative to nodejs process CWD)
getGpcFolderPath() {
return path.join(this.getProjectFolderPath(), "gpc");
}
// Get path of project (where all images and assets folder are contained)
// (relative to nodejs process CWD)
getProjectFolderPath(){
return path.join(Directories.data, this.uuid);
}
// Get path of project (where all images and assets folder are contained)
// (relative to nodejs process CWD)
getProjectFolderPath() {
return path.join(Directories.data, this.uuid);
}
// Get the path of the archive where all assets
// outputted by this task are stored.
getAssetsArchivePath(filename){
if (filename == 'all.zip'){
// OK, do nothing
}else if (filename == 'orthophoto.tif'){
if (config.test){
if (config.testSkipOrthophotos) return false;
else filename = path.join('..', '..', 'processing_results', 'odm_orthophoto', `odm_${filename}`);
}else{
filename = path.join('odm_orthophoto', `odm_${filename}`);
}
}else{
return false; // Invalid
}
return path.join(this.getProjectFolderPath(), filename);
}
// Get the path of the archive where all assets
// outputted by this task are stored.
getAssetsArchivePath(filename) {
if (filename == 'all.zip') {
// OK, do nothing
} else if (filename == 'orthophoto.tif') {
if (config.test) {
if (config.testSkipOrthophotos) return false;
else filename = path.join('..', '..', 'processing_results', 'odm_orthophoto', `odm_${filename}`);
} else {
filename = path.join('odm_orthophoto', `odm_${filename}`);
}
} else {
return false; // Invalid
}
// Deletes files and folders related to this task
cleanup(cb){
rmdir(this.getProjectFolderPath(), cb);
}
return path.join(this.getProjectFolderPath(), filename);
}
setStatus(code, extra){
this.status = {
code: code
};
for (let k in extra){
this.status[k] = extra[k];
}
}
// Deletes files and folders related to this task
cleanup(cb) {
rmdir(this.getProjectFolderPath(), cb);
}
updateProcessingTime(resetTime){
this.processingTime = resetTime ?
-1 :
new Date().getTime() - this.dateCreated;
}
setStatus(code, extra) {
this.status = {
code: code
};
for (let k in extra) {
this.status[k] = extra[k];
}
}
startTrackingProcessingTime(){
this.updateProcessingTime();
if (!this._updateProcessingTimeInterval){
this._updateProcessingTimeInterval = setInterval(() => {
this.updateProcessingTime();
}, 1000);
}
}
updateProcessingTime(resetTime) {
this.processingTime = resetTime ?
-1 :
new Date().getTime() - this.dateCreated;
}
stopTrackingProcessingTime(resetTime){
this.updateProcessingTime(resetTime);
if (this._updateProcessingTimeInterval){
clearInterval(this._updateProcessingTimeInterval);
this._updateProcessingTimeInterval = null;
}
}
startTrackingProcessingTime() {
this.updateProcessingTime();
if (!this._updateProcessingTimeInterval) {
this._updateProcessingTimeInterval = setInterval(() => {
this.updateProcessingTime();
}, 1000);
}
}
getStatus(){
return this.status.code;
}
stopTrackingProcessingTime(resetTime) {
this.updateProcessingTime(resetTime);
if (this._updateProcessingTimeInterval) {
clearInterval(this._updateProcessingTimeInterval);
this._updateProcessingTimeInterval = null;
}
}
isCanceled(){
return this.status.code === statusCodes.CANCELED;
}
getStatus() {
return this.status.code;
}
// Cancels the current task (unless it's already canceled)
cancel(cb){
if (this.status.code !== statusCodes.CANCELED){
let wasRunning = this.status.code === statusCodes.RUNNING;
this.setStatus(statusCodes.CANCELED);
isCanceled() {
return this.status.code === statusCodes.CANCELED;
}
if (wasRunning){
this.runningProcesses.forEach(proc => {
// TODO: this does NOT guarantee that
// the process will immediately terminate.
// For eaxmple in the case of the ODM process, the process will continue running for a while
// This might need to be fixed on ODM's end.
proc.kill('SIGINT');
});
this.runningProcesses = [];
}
// Cancels the current task (unless it's already canceled)
cancel(cb) {
if (this.status.code !== statusCodes.CANCELED) {
let wasRunning = this.status.code === statusCodes.RUNNING;
this.setStatus(statusCodes.CANCELED);
this.stopTrackingProcessingTime(true);
cb(null);
}else{
cb(new Error("Task already cancelled"));
}
}
if (wasRunning) {
this.runningProcesses.forEach(proc => {
// TODO: this does NOT guarantee that
// the process will immediately terminate.
// For eaxmple in the case of the ODM process, the process will continue running for a while
// This might need to be fixed on ODM's end.
proc.kill('SIGINT');
});
this.runningProcesses = [];
}
// Starts processing the task with OpenDroneMap
// This will spawn a new process.
start(done){
const finished = err => {
this.stopTrackingProcessingTime();
done(err);
};
const postProcess = () => {
const createZipArchive = (outputFilename, files) => {
return (done) => {
this.output.push(`Compressing ${outputFilename}\n`);
this.stopTrackingProcessingTime(true);
cb(null);
} else {
cb(new Error("Task already cancelled"));
}
}
let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename));
let archive = archiver.create('zip', {});
// Starts processing the task with OpenDroneMap
// This will spawn a new process.
start(done) {
const finished = err => {
this.stopTrackingProcessingTime();
done(err);
};
archive.on('finish', () => {
// TODO: is this being fired twice?
done();
});
const postProcess = () => {
const createZipArchive = (outputFilename, files) => {
return (done) => {
this.output.push(`Compressing ${outputFilename}\n`);
archive.on('error', err => {
logger.error(`Could not archive .zip file: ${err.message}`);
done(err);
});
let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename));
let archive = archiver.create('zip', {});
archive.pipe(output);
let globs = [];
archive.on('finish', () => {
// TODO: is this being fired twice?
done();
});
// Process files and directories first
files.forEach(file => {
let sourcePath = !config.test ?
this.getProjectFolderPath() :
path.join("tests", "processing_results");
let filePath = path.join(sourcePath, file);
// Skip non-existing items
if (!fs.existsSync(filePath)) return;
archive.on('error', err => {
logger.error(`Could not archive .zip file: ${err.message}`);
done(err);
});
let isGlob = /\*/.test(file),
isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory();
archive.pipe(output);
let globs = [];
if (isDirectory){
archive.directory(filePath, file);
}else if (isGlob){
globs.push(filePath);
}else{
archive.file(filePath, {name: path.basename(file)});
}
});
// Process files and directories first
files.forEach(file => {
let sourcePath = !config.test ?
this.getProjectFolderPath() :
path.join("tests", "processing_results");
let filePath = path.join(sourcePath, file);
// Check for globs
if (globs.length !== 0){
let pending = globs.length;
// Skip non-existing items
if (!fs.existsSync(filePath)) return;
globs.forEach(pattern => {
glob(pattern, (err, files) => {
if (err) done(err);
else{
files.forEach(file => {
if (fs.lstatSync(file).isFile()){
archive.file(file, {name: path.basename(file)});
}else{
logger.debug(`Could not add ${file} from glob`);
}
});
let isGlob = /\*/.test(file),
isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory();
if (--pending === 0){
archive.finalize();
}
}
});
});
}else{
archive.finalize();
}
};
};
if (isDirectory) {
archive.directory(filePath, file);
} else if (isGlob) {
globs.push(filePath);
} else {
archive.file(filePath, { name: path.basename(file) });
}
});
const handleProcessExit = (done) => {
return (err, code, signal) => {
if (err) done(err);
else{
// Don't evaluate if we caused the process to exit via SIGINT?
if (code === 0) done();
else done(new Error(`Process exited with code ${code}`));
}
};
};
// Check for globs
if (globs.length !== 0) {
let pending = globs.length;
const handleOutput = output => {
this.output.push(output);
};
globs.forEach(pattern => {
glob(pattern, (err, files) => {
if (err) done(err);
else {
files.forEach(file => {
if (fs.lstatSync(file).isFile()) {
archive.file(file, { name: path.basename(file) });
} else {
logger.debug(`Could not add ${file} from glob`);
}
});
const generateTiles = (inputFile, outputDir) => {
return (done) => {
const inputFilePath = path.join(this.getProjectFolderPath(), inputFile);
// Not all datasets generate an orthophoto, so we skip
// tiling if the orthophoto is missing
if (fs.existsSync(inputFilePath)){
this.runningProcesses.push(processRunner.runTiler({
zoomLevels: "12-21",
inputFile: inputFilePath,
outputDir: path.join(this.getProjectFolderPath(), outputDir)
}, handleProcessExit(done), handleOutput));
}else{
handleOutput(`${inputFilePath} file not found, skipping tiles generation\n`);
done();
}
};
};
if (--pending === 0) {
archive.finalize();
}
}
});
});
} else {
archive.finalize();
}
};
};
const generatePotreeCloud = (inputFile, outputDir) => {
return (done) => {
this.runningProcesses.push(processRunner.runPotreeConverter({
inputFile: path.join(this.getProjectFolderPath(), inputFile),
outputDir: path.join(this.getProjectFolderPath(), outputDir)
}, handleProcessExit(done), handleOutput));
};
};
const handleProcessExit = (done) => {
return (err, code, signal) => {
if (err) done(err);
else {
// Don't evaluate if we caused the process to exit via SIGINT?
if (code === 0) done();
else done(new Error(`Process exited with code ${code}`));
}
};
};
const pdalTranslate = (inputPath, outputPath, filters) => {
return (done) => {
this.runningProcesses.push(processRunner.runPdalTranslate({
inputFile: inputPath,
outputFile: outputPath,
filters: filters
}, handleProcessExit(done), handleOutput));
};
};
const handleOutput = output => {
this.output.push(output);
};
// All paths are relative to the project directory (./data/<uuid>/)
let allFolders = ['odm_orthophoto', 'odm_georeferencing', 'odm_texturing', 'odm_meshing', 'orthophoto_tiles', 'potree_pointcloud'];
if (config.test && config.testSkipOrthophotos){
logger.info("Test mode will skip orthophoto generation");
const generateTiles = (inputFile, outputDir) => {
return (done) => {
const inputFilePath = path.join(this.getProjectFolderPath(), inputFile);
// Exclude these folders from the all.zip archive
['odm_orthophoto', 'orthophoto_tiles'].forEach(dir => {
allFolders.splice(allFolders.indexOf(dir), 1);
});
}
// Not all datasets generate an orthophoto, so we skip
// tiling if the orthophoto is missing
if (fs.existsSync(inputFilePath)) {
this.runningProcesses.push(processRunner.runTiler({
zoomLevels: "12-21",
inputFile: inputFilePath,
outputDir: path.join(this.getProjectFolderPath(), outputDir)
}, handleProcessExit(done), handleOutput));
} else {
handleOutput(`${inputFilePath} file not found, skipping tiles generation\n`);
done();
}
};
};
let orthophotoPath = path.join('odm_orthophoto', 'odm_orthophoto.tif'),
lasPointCloudPath = path.join('odm_georeferencing', 'odm_georeferenced_model.ply.las'),
projectFolderPath = this.getProjectFolderPath();
const generatePotreeCloud = (inputFile, outputDir) => {
return (done) => {
this.runningProcesses.push(processRunner.runPotreeConverter({
inputFile: path.join(this.getProjectFolderPath(), inputFile),
outputDir: path.join(this.getProjectFolderPath(), outputDir)
}, handleProcessExit(done), handleOutput));
};
};
let commands = [
const pdalTranslate = (inputPath, outputPath, filters) => {
return (done) => {
this.runningProcesses.push(processRunner.runPdalTranslate({
inputFile: inputPath,
outputFile: outputPath,
filters: filters
}, handleProcessExit(done), handleOutput));
};
};
// All paths are relative to the project directory (./data/<uuid>/)
let allFolders = ['odm_orthophoto', 'odm_georeferencing', 'odm_texturing', 'odm_meshing', 'orthophoto_tiles', 'potree_pointcloud'];
if (config.test && config.testSkipOrthophotos) {
logger.info("Test mode will skip orthophoto generation");
// Exclude these folders from the all.zip archive
['odm_orthophoto', 'orthophoto_tiles'].forEach(dir => {
allFolders.splice(allFolders.indexOf(dir), 1);
});
}
let orthophotoPath = path.join('odm_orthophoto', 'odm_orthophoto.tif'),
lasPointCloudPath = path.join('odm_georeferencing', 'odm_georeferenced_model.ply.las'),
projectFolderPath = this.getProjectFolderPath();
let commands = [
generateTiles(orthophotoPath, 'orthophoto_tiles'),
generatePotreeCloud(lasPointCloudPath, 'potree_pointcloud'),
createZipArchive('all.zip', allFolders)
];
];
// If point cloud file does not exist, it's likely because location (GPS/GPC) information
// was missing and the file was not generated.
let fullLasPointCloudPath = path.join(projectFolderPath, lasPointCloudPath);
if (!fs.existsSync(fullLasPointCloudPath)){
let unreferencedPointCloudPath = path.join(projectFolderPath, "opensfm", "depthmaps", "merged.ply");
if (fs.existsSync(unreferencedPointCloudPath)){
logger.info(`${lasPointCloudPath} is missing, will attempt to generate it from ${unreferencedPointCloudPath}`);
commands.unshift(pdalTranslate(unreferencedPointCloudPath, fullLasPointCloudPath, [
{
// opensfm's ply files map colors with the diffuse_ prefix
dimensions: "diffuse_red = red, diffuse_green = green, diffuse_blue = blue",
type: "filters.ferry"
}
]));
}
}
// If point cloud file does not exist, it's likely because location (GPS/GPC) information
// was missing and the file was not generated.
let fullLasPointCloudPath = path.join(projectFolderPath, lasPointCloudPath);
if (!fs.existsSync(fullLasPointCloudPath)) {
let unreferencedPointCloudPath = path.join(projectFolderPath, "opensfm", "depthmaps", "merged.ply");
if (fs.existsSync(unreferencedPointCloudPath)) {
logger.info(`${lasPointCloudPath} is missing, will attempt to generate it from ${unreferencedPointCloudPath}`);
commands.unshift(pdalTranslate(unreferencedPointCloudPath, fullLasPointCloudPath, [{
// opensfm's ply files map colors with the diffuse_ prefix
dimensions: "diffuse_red = red, diffuse_green = green, diffuse_blue = blue",
type: "filters.ferry"
}]));
}
}
async.series(commands, (err) => {
if (!err){
this.setStatus(statusCodes.COMPLETED);
finished();
}else{
this.setStatus(statusCodes.FAILED);
finished(err);
}
});
};
async.series(commands, (err) => {
if (!err) {
this.setStatus(statusCodes.COMPLETED);
finished();
} else {
this.setStatus(statusCodes.FAILED);
finished(err);
}
});
};
if (this.status.code === statusCodes.QUEUED){
this.startTrackingProcessingTime();
this.setStatus(statusCodes.RUNNING);
if (this.status.code === statusCodes.QUEUED) {
this.startTrackingProcessingTime();
this.setStatus(statusCodes.RUNNING);
// rmeove webhoook
var optionsToRun = this.options.filter(function(opt) {
if (opt.name !== 'webhook') {
return opt;
}
let runnerOptions = this.options.reduce((result, opt) => {
result[opt.name] = opt.value;
return result;
}, {});
});
runnerOptions["project-path"] = fs.realpathSync(Directories.data);
runnerOptions["pmvs-num-cores"] = os.cpus().length;
let runnerOptions = optionsToRun.reduce((result, opt) => {
result[opt.name] = opt.value;
return result;
}, {});
if (this.gpcFiles.length > 0){
runnerOptions.gcp = fs.realpathSync(path.join(this.getGpcFolderPath(), this.gpcFiles[0]));
}
runnerOptions["project-path"] = fs.realpathSync(Directories.data);
runnerOptions["pmvs-num-cores"] = os.cpus().length;
this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => {
if (err){
this.setStatus(statusCodes.FAILED, {errorMessage: `Could not start process (${err.message})`});
finished(err);
}else{
// Don't evaluate if we caused the process to exit via SIGINT?
if (this.status.code !== statusCodes.CANCELED){
if (code === 0){
postProcess();
}else{
this.setStatus(statusCodes.FAILED, {errorMessage: `Process exited with code ${code}`});
finished();
}
}else{
finished();
}
}
}, output => {
// Replace console colors
output = output.replace(/\x1b\[[0-9;]*m/g, "");
this.output.push(output);
})
);
if (this.gpcFiles.length > 0) {
runnerOptions.gcp = fs.realpathSync(path.join(this.getGpcFolderPath(), this.gpcFiles[0]));
}
return true;
}else{
return false;
}
}
this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => {
if (err) {
this.setStatus(statusCodes.FAILED, { errorMessage: `Could not start process (${err.message})` });
finished(err);
} else {
// Don't evaluate if we caused the process to exit via SIGINT?
if (this.status.code !== statusCodes.CANCELED) {
if (code === 0) {
postProcess();
} else {
this.setStatus(statusCodes.FAILED, { errorMessage: `Process exited with code ${code}` });
finished();
}
} else {
finished();
}
}
}, output => {
// Replace console colors
output = output.replace(/\x1b\[[0-9;]*m/g, "");
this.output.push(output);
}));
// Re-executes the task (by setting it's state back to QUEUED)
// Only tasks that have been canceled, completed or have failed can be restarted.
restart(cb){
if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1){
this.setStatus(statusCodes.QUEUED);
this.dateCreated = new Date().getTime();
this.output = [];
this.stopTrackingProcessingTime(true);
cb(null);
}else{
cb(new Error("Task cannot be restarted"));
}
}
return true;
} else {
return false;
}
}
// Returns the description of the task.
getInfo(){
return {
uuid: this.uuid,
name: this.name,
dateCreated: this.dateCreated,
processingTime: this.processingTime,
status: this.status,
options: this.options,
imagesCount: this.images.length
};
}
// Re-executes the task (by setting it's state back to QUEUED)
// Only tasks that have been canceled, completed or have failed can be restarted.
restart(cb) {
if ([statusCodes.CANCELED, statusCodes.FAILED, statusCodes.COMPLETED].indexOf(this.status.code) !== -1) {
this.setStatus(statusCodes.QUEUED);
this.dateCreated = new Date().getTime();
this.output = [];
this.stopTrackingProcessingTime(true);
cb(null);
} else {
cb(new Error("Task cannot be restarted"));
}
}
// Returns the output of the OpenDroneMap process
// Optionally starting from a certain line number
getOutput(startFromLine = 0){
return this.output.slice(startFromLine, this.output.length);
}
// Returns the description of the task.
getInfo() {
return {
uuid: this.uuid,
name: this.name,
dateCreated: this.dateCreated,
processingTime: this.processingTime,
status: this.status,
options: this.options,
imagesCount: this.images.length
};
}
// Returns the data necessary to serialize this
// task to restore it later.
serialize(){
return {
uuid: this.uuid,
name: this.name,
dateCreated: this.dateCreated,
status: this.status,
options: this.options
};
}
};
// Returns the output of the OpenDroneMap process
// Optionally starting from a certain line number
getOutput(startFromLine = 0) {
return this.output.slice(startFromLine, this.output.length);
}
// Returns the data necessary to serialize this
// task to restore it later.
serialize() {
return {
uuid: this.uuid,
name: this.name,
dateCreated: this.dateCreated,
status: this.status,
options: this.options
};
}
};

Wyświetl plik

@ -27,237 +27,316 @@ let statusCodes = require('./statusCodes');
let async = require('async');
let schedule = require('node-schedule');
let Directories = require('./Directories');
// webhook reqs
let request = require('request');
const TASKS_DUMP_FILE = path.join(Directories.data, "tasks.json");
const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * 60 * 24 * config.cleanupTasksAfter; // days
module.exports = class TaskManager{
constructor(done){
this.tasks = {};
this.runningQueue = [];
module.exports = class TaskManager {
constructor(done) {
this.tasks = {};
this.runningQueue = [];
async.series([
cb => this.restoreTaskListFromDump(cb),
cb => this.removeOldTasks(cb),
cb => this.removeOrphanedDirectories(cb),
cb => {
this.processNextTask();
cb();
},
cb => {
// Every hour
schedule.scheduleJob('0 * * * *', () => {
this.removeOldTasks();
this.dumpTaskList();
});
async.series([
cb => this.restoreTaskListFromDump(cb),
cb => this.removeOldTasks(cb),
cb => this.removeOrphanedDirectories(cb),
cb => {
this.processNextTask();
cb();
},
cb => {
// Every hour
schedule.scheduleJob('0 * * * *', () => {
this.removeOldTasks();
this.dumpTaskList();
});
cb();
}
], done);
}
cb();
}
], done);
}
// Removes old tasks that have either failed, are completed, or
// have been canceled.
removeOldTasks(done){
let list = [];
let now = new Date().getTime();
logger.debug("Checking for old tasks to be removed...");
// Removes old tasks that have either failed, are completed, or
// have been canceled.
removeOldTasks(done) {
let list = [];
let now = new Date().getTime();
logger.debug("Checking for old tasks to be removed...");
for (let uuid in this.tasks){
let task = this.tasks[uuid];
for (let uuid in this.tasks) {
let task = this.tasks[uuid];
if ([statusCodes.FAILED,
statusCodes.COMPLETED,
statusCodes.CANCELED].indexOf(task.status.code) !== -1 &&
now - task.dateCreated > CLEANUP_TASKS_IF_OLDER_THAN){
list.push(task.uuid);
}
}
if ([statusCodes.FAILED,
statusCodes.COMPLETED,
statusCodes.CANCELED
].indexOf(task.status.code) !== -1 &&
now - task.dateCreated > CLEANUP_TASKS_IF_OLDER_THAN) {
list.push(task.uuid);
}
}
async.eachSeries(list, (uuid, cb) => {
logger.info(`Cleaning up old task ${uuid}`);
this.remove(uuid, cb);
}, done);
}
async.eachSeries(list, (uuid, cb) => {
logger.info(`Cleaning up old task ${uuid}`);
this.remove(uuid, cb);
}, done);
}
// Removes directories that don't have a corresponding
// task associated with it (maybe as a cause of an abrupt exit)
removeOrphanedDirectories(done){
logger.info("Checking for orphaned directories to be removed...");
// Removes directories that don't have a corresponding
// task associated with it (maybe as a cause of an abrupt exit)
removeOrphanedDirectories(done) {
logger.info("Checking for orphaned directories to be removed...");
fs.readdir(Directories.data, (err, entries) => {
if (err) done(err);
else{
async.eachSeries(entries, (entry, cb) => {
let dirPath = path.join(Directories.data, entry);
if (fs.statSync(dirPath).isDirectory() &&
entry.match(/^[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+$/) &&
!this.tasks[entry]){
logger.info(`Found orphaned directory: ${entry}, removing...`);
rmdir(dirPath, cb);
}else cb();
}, done);
}
});
}
fs.readdir(Directories.data, (err, entries) => {
if (err) done(err);
else {
async.eachSeries(entries, (entry, cb) => {
let dirPath = path.join(Directories.data, entry);
if (fs.statSync(dirPath).isDirectory() &&
entry.match(/^[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+$/) &&
!this.tasks[entry]) {
logger.info(`Found orphaned directory: ${entry}, removing...`);
rmdir(dirPath, cb);
} else cb();
}, done);
}
});
}
// Load tasks that already exists (if any)
restoreTaskListFromDump(done){
fs.readFile(TASKS_DUMP_FILE, (err, data) => {
if (!err){
let tasks;
try{
tasks = JSON.parse(data.toString());
}catch(e){
done(new Error(`Could not load task list. It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}). Please manually delete the file and try again.`));
return;
}
// Load tasks that already exists (if any)
restoreTaskListFromDump(done) {
fs.readFile(TASKS_DUMP_FILE, (err, data) => {
if (!err) {
let tasks;
try {
tasks = JSON.parse(data.toString());
} catch (e) {
done(new Error(`Could not load task list. It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}). Please manually delete the file and try again.`));
return;
}
async.each(tasks, (taskJson, done) => {
Task.CreateFromSerialized(taskJson, (err, task) => {
if (err) done(err);
else{
this.tasks[task.uuid] = task;
done();
}
});
}, err => {
logger.info(`Initialized ${tasks.length} tasks`);
if (done !== undefined) done();
});
}else{
logger.info("No tasks dump found");
if (done !== undefined) done();
}
});
}
async.each(tasks, (taskJson, done) => {
Task.CreateFromSerialized(taskJson, (err, task) => {
if (err) done(err);
else {
this.tasks[task.uuid] = task;
done();
}
});
}, err => {
logger.info(`Initialized ${tasks.length} tasks`);
if (done !== undefined) done();
});
} else {
logger.info("No tasks dump found");
if (done !== undefined) done();
}
});
}
// Finds the first QUEUED task.
findNextTaskToProcess(){
for (let uuid in this.tasks){
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED){
return this.tasks[uuid];
}
}
}
// Finds the first QUEUED task.
findNextTaskToProcess() {
for (let uuid in this.tasks) {
if (this.tasks[uuid].getStatus() === statusCodes.QUEUED) {
return this.tasks[uuid];
}
}
}
// Finds the next tasks, adds them to the running queue,
// and starts the tasks (up to the limit).
processNextTask(){
if (this.runningQueue.length < config.parallelQueueProcessing){
let task = this.findNextTaskToProcess();
if (task){
this.addToRunningQueue(task);
task.start(() => {
this.removeFromRunningQueue(task);
this.processNextTask();
});
// Finds the next tasks, adds them to the running queue,
// and starts the tasks (up to the limit).
processNextTask() {
if (this.runningQueue.length < config.parallelQueueProcessing) {
let task = this.findNextTaskToProcess();
if (task) {
this.addToRunningQueue(task);
for (var i = 0; i < task.options.length; i++) {
if (task.options[i].name === "webhook") {
// call back webhook
request({
url: task.options[i].value,
method: 'POST',
headers: {
'User-Agent': 'node-OpenDroneMap',
'Content-Type': 'application/json'
},
json: task.getInfo(),
}, function(error, response, body) {
if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask();
}
}else{
// Do nothing
}
}
// ignore error handling for now
addToRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue.push(task);
}
// if (!error && response.statusCode == 200) {
removeFromRunningQueue(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue = this.runningQueue.filter(t => t !== task);
}
addNew(task){
assert(task.constructor.name === "Task", "Must be a Task object");
this.tasks[task.uuid] = task;
// }
})
this.processNextTask();
}
}
}
// Stops the execution of a task
// (without removing it from the system).
cancel(uuid, cb){
let task = this.find(uuid, cb);
if (task){
if (!task.isCanceled()){
task.cancel(err => {
this.removeFromRunningQueue(task);
this.processNextTask();
cb(err);
});
}else{
cb(null); // Nothing to be done
}
}
}
task.start(() => {
for (var i = 0; i < task.options.length; i++) {
if (task.options[i].name === "webhook") {
// call back webhook
request({
url: task.options[i].value,
method: 'POST',
headers: {
'User-Agent': 'node-OpenDroneMap',
'Content-Type': 'application/json'
},
json: task.getInfo(),
}, function(error, response, body) {
// Removes a task from the system.
// Before being removed, the task is canceled.
remove(uuid, cb){
this.cancel(uuid, err => {
if (!err){
let task = this.find(uuid, cb);
if (task){
task.cleanup(err => {
if (!err){
delete(this.tasks[uuid]);
this.processNextTask();
cb(null);
}else cb(err);
});
}else; // cb is called by find on error
}else cb(err);
});
}
// ignore error handling for now
// Restarts (puts back into QUEUED state)
// a task that is either in CANCELED or FAILED state.
restart(uuid, cb){
let task = this.find(uuid, cb);
if (task){
task.restart(err => {
if (!err) this.processNextTask();
cb(err);
});
}
}
// if (!error && response.statusCode == 200) {
// Finds a task by its UUID string.
find(uuid, cb){
let task = this.tasks[uuid];
if (!task && cb) cb(new Error(`${uuid} not found`));
return task;
}
// Serializes the list of tasks and saves it
// to disk
dumpTaskList(done){
let output = [];
// }
})
for (let uuid in this.tasks){
output.push(this.tasks[uuid].serialize());
}
}
}
fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => {
if (err) logger.error(`Could not dump tasks: ${err.message}`);
else logger.debug("Dumped tasks list.");
if (done !== undefined) done();
});
}
getQueueCount(){
this.removeFromRunningQueue(task);
this.processNextTask();
});
if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask();
}
} else {
// Do nothing
}
}
addToRunningQueue(task) {
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue.push(task);
}
removeFromRunningQueue(task) {
assert(task.constructor.name === "Task", "Must be a Task object");
this.runningQueue = this.runningQueue.filter(t => t !== task);
}
addNew(task) {
assert(task.constructor.name === "Task", "Must be a Task object");
this.tasks[task.uuid] = task;
this.processNextTask();
}
// Stops the execution of a task
// (without removing it from the system).
cancel(uuid, cb) {
let task = this.find(uuid, cb);
if (task) {
if (!task.isCanceled()) {
task.cancel(err => {
this.removeFromRunningQueue(task);
this.processNextTask();
for (var i = 0; i < task.options.length; i++) {
if (task.options[i].name === "webhook") {
// call back webhook
request({
url: task.options[i].value,
method: 'POST',
headers: {
'User-Agent': 'node-OpenDroneMap',
'Content-Type': 'application/json'
},
json: task.getInfo(),
}, function(error, response, body) {
// ignore error handling for now
// if (!error && response.statusCode == 200) {
// }
})
}
}
cb(err);
});
} else {
cb(null); // Nothing to be done
}
}
}
// Removes a task from the system.
// Before being removed, the task is canceled.
remove(uuid, cb) {
this.cancel(uuid, err => {
if (!err) {
let task = this.find(uuid, cb);
if (task) {
task.cleanup(err => {
if (!err) {
delete(this.tasks[uuid]);
this.processNextTask();
cb(null);
} else cb(err);
});
} else; // cb is called by find on error
} else cb(err);
});
}
// Restarts (puts back into QUEUED state)
// a task that is either in CANCELED or FAILED state.
restart(uuid, cb) {
let task = this.find(uuid, cb);
if (task) {
task.restart(err => {
if (!err) this.processNextTask();
cb(err);
});
}
}
// Finds a task by its UUID string.
find(uuid, cb) {
let task = this.tasks[uuid];
if (!task && cb) cb(new Error(`${uuid} not found`));
return task;
}
// Serializes the list of tasks and saves it
// to disk
dumpTaskList(done) {
let output = [];
for (let uuid in this.tasks) {
output.push(this.tasks[uuid].serialize());
}
fs.writeFile(TASKS_DUMP_FILE, JSON.stringify(output), err => {
if (err) logger.error(`Could not dump tasks: ${err.message}`);
else logger.debug("Dumped tasks list.");
if (done !== undefined) done();
});
}
getQueueCount() {
let count = 0;
for (let uuid in this.tasks){
for (let uuid in this.tasks) {
let task = this.tasks[uuid];
if ([statusCodes.QUEUED,
statusCodes.RUNNING].indexOf(task.status.code) !== -1){
statusCodes.RUNNING
].indexOf(task.status.code) !== -1) {
count++;
}
}
return count;
}
};
};

Wyświetl plik

@ -23,229 +23,246 @@ let logger = require('./logger');
let odmOptions = null;
module.exports = {
initialize: function(done){
this.getOptions(done);
},
initialize: function(done) {
this.getOptions(done);
},
getOptions: function(done){
if (odmOptions){
done(null, odmOptions);
return;
}
getOptions: function(done) {
if (odmOptions) {
done(null, odmOptions);
return;
}
odmRunner.getJsonOptions((err, json) => {
if (err) done(err);
else{
odmOptions = [];
for (let option in json){
// Not all options are useful to the end user
// (num cores can be set programmatically, so can gcpFile, etc.)
if (["-h", "--project-path", "--cmvs-maxImages", "--time",
"--zip-results", "--pmvs-num-cores",
"--start-with", "--gcp", "--end-with", "--images",
"--slam-config", "--video"].indexOf(option) !== -1) continue;
odmRunner.getJsonOptions((err, json) => {
if (err) done(err);
else {
odmOptions = [];
for (let option in json) {
// Not all options are useful to the end user
// (num cores can be set programmatically, so can gcpFile, etc.)
if (["-h", "--project-path", "--cmvs-maxImages", "--time",
"--zip-results", "--pmvs-num-cores",
"--start-with", "--gcp", "--end-with", "--images",
"--slam-config", "--video"
].indexOf(option) !== -1) continue;
let values = json[option];
let values = json[option];
let name = option.replace(/^--/, "");
let type = "";
let value = "";
let help = values.help || "";
let domain = values.metavar !== undefined ?
values.metavar.replace(/^[<>]/g, "")
.replace(/[<>]$/g, "")
.trim() :
"";
let name = option.replace(/^--/, "");
let type = "";
let value = "";
let help = values.help || "";
let domain = values.metavar !== undefined ?
values.metavar.replace(/^[<>]/g, "")
.replace(/[<>]$/g, "")
.trim() :
"";
switch((values.type || "").trim()){
case "<type 'int'>":
type = "int";
value = values['default'] !== undefined ?
parseInt(values['default']) :
0;
break;
case "<type 'float'>":
type = "float";
value = values['default'] !== undefined ?
parseFloat(values['default']) :
0.0;
break;
default:
type = "string";
value = values['default'] !== undefined ?
values['default'].trim() :
"";
}
switch ((values.type || "").trim()) {
case "<type 'int'>":
type = "int";
value = values['default'] !== undefined ?
parseInt(values['default']) :
0;
break;
case "<type 'float'>":
type = "float";
value = values['default'] !== undefined ?
parseFloat(values['default']) :
0.0;
break;
default:
type = "string";
value = values['default'] !== undefined ?
values['default'].trim() :
"";
}
if (values['default'] === "True"){
type = "bool";
value = true;
}else if (values['default'] === "False"){
type = "bool";
value = false;
}
if (values['default'] === "True") {
type = "bool";
value = true;
} else if (values['default'] === "False") {
type = "bool";
value = false;
}
// If 'choices' is specified, try to convert it to array
if (values.choices){
try{
values.choices = JSON.parse(values.choices.replace(/'/g, '"')); // Convert ' to "
}catch(e){
logger.warn(`Cannot parse choices: ${values.choices}`);
}
}
// If 'choices' is specified, try to convert it to array
if (values.choices) {
try {
values.choices = JSON.parse(values.choices.replace(/'/g, '"')); // Convert ' to "
} catch (e) {
logger.warn(`Cannot parse choices: ${values.choices}`);
}
}
if (Array.isArray(values.choices)){
type = "string"; // TODO: change to enum
domain = values.choices;
}
if (Array.isArray(values.choices)) {
type = "string"; // TODO: change to enum
domain = values.choices;
}
help = help.replace(/\%\(default\)s/g, value);
help = help.replace(/\%\(default\)s/g, value);
// In the end, all values must be converted back
// to strings (per OpenAPI spec which doesn't allow mixed types)
value = String(value);
odmOptions.push({
name, type, value, domain, help
});
}
done(null, odmOptions);
}
});
},
odmOptions.push({
name,
type,
value,
domain,
help
});
// Checks that the options (as received from the rest endpoint)
// Are valid and within proper ranges.
// The result of filtering is passed back via callback
// @param options[]
filterOptions: function(options, done){
assert(odmOptions !== null, "odmOptions is not set. Have you initialized odmOptions properly?");
try{
if (typeof options === "string") options = JSON.parse(options);
if (!Array.isArray(options)) options = [];
let result = [];
let errors = [];
let addError = function(opt, descr){
errors.push({
name: opt.name,
error: descr
});
};
}
let typeConversion = {
'float': Number.parseFloat,
'int': Number.parseInt,
'bool': function(value){
if (value === 'true') return true;
else if (value === 'false') return false;
else if (typeof value === 'boolean') return value;
else throw new Error(`Cannot convert ${value} to boolean`);
},
'string': function(value){
return value; // No conversion needed
},
'path': function(value){
return value; // No conversion needed
}
};
let domainChecks = [
{
regex: /^(positive |negative )?(integer|float)$/,
validate: function(matches, value){
if (matches[1] === 'positive ') return value >= 0;
else if (matches[1] === 'negative ') return value <= 0;
else if (matches[2] === 'integer') return Number.isInteger(value);
else if (matches[2] === 'float') return Number.isFinite(value);
}
},
{
regex: /^percent$/,
validate: function(matches, value){
return value >= 0 && value <= 100;
}
},
{
regex: /^(float): ([\-\+\.\d]+) <= x <= ([\-\+\.\d]+)$/,
validate: function(matches, value){
let [str, type, lower, upper] = matches;
lower = parseFloat(lower);
upper = parseFloat(upper);
return value >= lower && value <= upper;
}
},
{
regex: /^(float) (>=|>|<|<=) ([\-\+\.\d]+)$/,
validate: function(matches, value){
let [str, type, oper, bound] = matches;
bound = parseFloat(bound);
switch(oper){
case '>=':
return value >= bound;
case '>':
return value > bound;
case '<=':
return value <= bound;
case '<':
return value < bound;
default:
return false;
}
}
},
{
regex: /^(string|path)$/,
validate: function(){
return true; // All strings/paths are fine
}
}
// TODO: handle enum
];
odmOptions.push({
name: 'webhook',
type: 'string',
value: '',
domain: '',
help: 'On task complete, fail, etc. Call the above url with a post of the task serialized'
});
let checkDomain = function(domain, value){
let matches,
dc = domainChecks.find(dc => matches = domain.match(dc.regex));
// local (non odm) options
if (dc){
if (!dc.validate(matches, value)) throw new Error(`Invalid value ${value} (out of range)`);
}else{
throw new Error(`Domain value cannot be handled: '${domain}' : '${value}'`);
}
};
done(null, odmOptions);
}
});
},
// Scan through all possible options
for (let odmOption of odmOptions){
// Was this option selected by the user?
/*jshint loopfunc: true */
let opt = options.find(o => o.name === odmOption.name);
if (opt){
try{
// Convert to proper data type
let value = typeConversion[odmOption.type](opt.value);
// Checks that the options (as received from the rest endpoint)
// Are valid and within proper ranges.
// The result of filtering is passed back via callback
// @param options[]
filterOptions: function(options, done) {
assert(odmOptions !== null, "odmOptions is not set. Have you initialized odmOptions properly?");
// Domain check
if (odmOption.domain){
checkDomain(odmOption.domain, value);
}
try {
if (typeof options === "string") options = JSON.parse(options);
if (!Array.isArray(options)) options = [];
result.push({
name: odmOption.name,
value: value
});
}catch(e){
addError(opt, e.message);
}
}
}
let result = [];
let errors = [];
let addError = function(opt, descr) {
errors.push({
name: opt.name,
error: descr
});
};
if (errors.length > 0) done(new Error(JSON.stringify(errors)));
else done(null, result);
}catch(e){
done(e);
}
}
let typeConversion = {
'float': Number.parseFloat,
'int': Number.parseInt,
'bool': function(value) {
if (value === 'true') return true;
else if (value === 'false') return false;
else if (typeof value === 'boolean') return value;
else throw new Error(`Cannot convert ${value} to boolean`);
},
'string': function(value) {
return value; // No conversion needed
},
'path': function(value) {
return value; // No conversion needed
}
};
let domainChecks = [{
regex: /^(positive |negative )?(integer|float)$/,
validate: function(matches, value) {
if (matches[1] === 'positive ') return value >= 0;
else if (matches[1] === 'negative ') return value <= 0;
else if (matches[2] === 'integer') return Number.isInteger(value);
else if (matches[2] === 'float') return Number.isFinite(value);
}
},
{
regex: /^percent$/,
validate: function(matches, value) {
return value >= 0 && value <= 100;
}
},
{
regex: /^(float): ([\-\+\.\d]+) <= x <= ([\-\+\.\d]+)$/,
validate: function(matches, value) {
let [str, type, lower, upper] = matches;
lower = parseFloat(lower);
upper = parseFloat(upper);
return value >= lower && value <= upper;
}
},
{
regex: /^(float) (>=|>|<|<=) ([\-\+\.\d]+)$/,
validate: function(matches, value) {
let [str, type, oper, bound] = matches;
bound = parseFloat(bound);
switch (oper) {
case '>=':
return value >= bound;
case '>':
return value > bound;
case '<=':
return value <= bound;
case '<':
return value < bound;
default:
return false;
}
}
},
{
regex: /^(string|path)$/,
validate: function() {
return true; // All strings/paths are fine
}
}
// TODO: handle enum
];
let checkDomain = function(domain, value) {
let matches,
dc = domainChecks.find(dc => matches = domain.match(dc.regex));
if (dc) {
if (!dc.validate(matches, value)) throw new Error(`Invalid value ${value} (out of range)`);
} else {
throw new Error(`Domain value cannot be handled: '${domain}' : '${value}'`);
}
};
// Scan through all possible options
for (let odmOption of odmOptions) {
// Was this option selected by the user?
/*jshint loopfunc: true */
let opt = options.find(o => o.name === odmOption.name);
if (opt) {
try {
// Convert to proper data type
let value = typeConversion[odmOption.type](opt.value);
// Domain check
if (odmOption.domain) {
checkDomain(odmOption.domain, value);
}
result.push({
name: odmOption.name,
value: value
});
} catch (e) {
addError(opt, e.message);
}
}
}
if (errors.length > 0) done(new Error(JSON.stringify(errors)));
else done(null, result);
} catch (e) {
done(e);
}
}
};

Wyświetl plik

@ -31,6 +31,7 @@
"multer": "^1.1.0",
"node-schedule": "^1.1.1",
"node-uuid": "^1.4.7",
"request": "^2.81.0",
"rimraf": "^2.5.3",
"swagger-jsdoc": "^1.3.1",
"winston": "^2.2.0"