Chunked API draft working

pull/70/head
Piero Toffanin 2019-01-31 13:33:50 -05:00
rodzic 9fb1097e87
commit 5c78f8c506
6 zmienionych plików z 372 dodań i 42 usunięć

Wyświetl plik

@ -33,6 +33,7 @@ Options:
-d, --deamonize Set process to run as a deamon
--parallel_queue_processing <number> Number of simultaneous processing tasks (default: 2)
--cleanup_tasks_after <number> Number of minutes that elapse before deleting finished and canceled tasks (default: 2880)
--cleanup_uploads_after <number> Number of minutes that elapse before deleting unfinished uploads. Set this value to the maximum time you expect a dataset to be uploaded. (default: 2880)
--test Enable test mode. In test mode, no commands are sent to OpenDroneMap. This can be useful during development or testing (default: false)
--test_skip_orthophotos If test mode is enabled, skip orthophoto results when generating assets. (default: false)
--test_skip_dems If test mode is enabled, skip dems results when generating assets. (default: false)
@ -90,6 +91,7 @@ config.port = parseInt(argv.port || argv.p || fromConfigFile("port", process.env
config.deamon = argv.deamonize || argv.d || fromConfigFile("daemon", false);
config.parallelQueueProcessing = argv.parallel_queue_processing || fromConfigFile("parallelQueueProcessing", 2);
config.cleanupTasksAfter = parseInt(argv.cleanup_tasks_after || fromConfigFile("cleanupTasksAfter", 2880));
config.cleanupUploadsAfter = parseInt(argv.cleanup_uploads_after || fromConfigFile("cleanupUploadsAfter", 2880));
config.test = argv.test || fromConfigFile("test", false);
config.testSkipOrthophotos = argv.test_skip_orthophotos || fromConfigFile("testSkipOrthophotos", false);
config.testSkipDems = argv.test_skip_dems || fromConfigFile("testSkipDems", false);

Wyświetl plik

@ -8,7 +8,7 @@ REST API to access ODM
=== Version information
[%hardbreaks]
_Version_ : 1.3.1
_Version_ : 1.4.0
=== Contact information
@ -281,7 +281,7 @@ _required_|UUID of the task|string|
=== POST /task/new
==== Description
Creates a new task and places it at the end of the processing queue
Creates a new task and places it at the end of the processing queue. For uploading really large tasks, see /task/new/init instead.
==== Parameters
@ -301,6 +301,8 @@ _optional_|An optional name to be associated with the task|string|
_optional_|Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, …]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options|string|
|*FormData*|*skipPostProcessing* +
_optional_|When set, skips generation of map tiles, derivate assets, point cloud tiles.|boolean|
|*FormData*|*webhook* +
_optional_|Optional URL to call when processing has ended (either successfully or unsuccessfully).|string|
|*FormData*|*zipurl* +
_optional_|URL of the zip file containing the images to process, plus an optional GCP file. If included, the GCP file should have .txt extension|string|
|===
@ -336,6 +338,143 @@ _required_|UUID of the newly created task|string
* task
[[_task_new_commit_uuid_post]]
=== POST /task/new/commit/{uuid}
==== Description
Creates a new task for which images have been uploaded via /task/new/upload.
==== Parameters
[options="header", cols=".^2,.^3,.^9,.^4,.^2"]
|===
|Type|Name|Description|Schema|Default
|*Path*|*uuid* +
_required_|UUID of the task|string|
|*Query*|*token* +
_optional_|Token required for authentication (when authentication is required).|string|
|===
==== Responses
[options="header", cols=".^2,.^14,.^4"]
|===
|HTTP Code|Description|Schema
|*200*|Success|<<_task_new_commit_uuid_post_response_200,Response 200>>
|*default*|Error|<<_error,Error>>
|===
[[_task_new_commit_uuid_post_response_200]]
*Response 200*
[options="header", cols=".^3,.^11,.^4"]
|===
|Name|Description|Schema
|*uuid* +
_required_|UUID of the newly created task|string
|===
==== Tags
* task
[[_task_new_init_post]]
=== POST /task/new/init
==== Description
Initialize the upload of a new task. If successful, a user can start uploading files via /task/new/upload. The task will not start until /task/new/commit is called.
==== Parameters
[options="header", cols=".^2,.^3,.^9,.^4,.^2"]
|===
|Type|Name|Description|Schema|Default
|*Header*|*set-uuid* +
_optional_|An optional UUID string that will be used as UUID for this task instead of generating a random one.|string|
|*Query*|*token* +
_optional_|Token required for authentication (when authentication is required).|string|
|*FormData*|*name* +
_optional_|An optional name to be associated with the task|string|
|*FormData*|*options* +
_optional_|Serialized JSON string of the options to use for processing, as an array of the format: [{name: option1, value: value1}, {name: option2, value: value2}, …]. For example, [{"name":"cmvs-maxImages","value":"500"},{"name":"time","value":true}]. For a list of all options, call /options|string|
|*FormData*|*skipPostProcessing* +
_optional_|When set, skips generation of map tiles, derivate assets, point cloud tiles.|boolean|
|*FormData*|*webhook* +
_optional_|Optional URL to call when processing has ended (either successfully or unsuccessfully).|string|
|===
==== Responses
[options="header", cols=".^2,.^14,.^4"]
|===
|HTTP Code|Description|Schema
|*200*|Success|<<_task_new_init_post_response_200,Response 200>>
|*default*|Error|<<_error,Error>>
|===
[[_task_new_init_post_response_200]]
*Response 200*
[options="header", cols=".^3,.^11,.^4"]
|===
|Name|Description|Schema
|*uuid* +
_required_|UUID of the newly created task|string
|===
==== Tags
* task
[[_task_new_upload_uuid_post]]
=== POST /task/new/upload/{uuid}
==== Description
Adds one or more files to the task created via /task/new/init. It does not start the task. To start the task, call /task/new/commit.
==== Parameters
[options="header", cols=".^2,.^3,.^9,.^4,.^2"]
|===
|Type|Name|Description|Schema|Default
|*Path*|*uuid* +
_required_|UUID of the task|string|
|*Query*|*token* +
_optional_|Token required for authentication (when authentication is required).|string|
|*FormData*|*images* +
_required_|Images to process, plus an optional GCP file. If included, the GCP file should have .txt extension|file|
|===
==== Responses
[options="header", cols=".^2,.^14,.^4"]
|===
|HTTP Code|Description|Schema
|*200*|File Received|<<_response,Response>>
|*default*|Error|<<_error,Error>>
|===
==== Consumes
* `multipart/form-data`
==== Tags
* task
[[_task_remove_post]]
=== POST /task/remove

File diff suppressed because one or more lines are too long

101
index.js
Wyświetl plik

@ -29,6 +29,7 @@ const express = require('express');
const app = express();
const bodyParser = require('body-parser');
const multer = require('multer');
const TaskManager = require('./libs/TaskManager');
const odmInfo = require('./libs/odmInfo');
@ -39,21 +40,10 @@ const auth = require('./libs/auth/factory').fromConfig(config);
const authCheck = auth.getMiddleware();
const taskNew = require('./libs/taskNew');
// zip files
let request = require('request');
let download = function(uri, filename, callback) {
request.head(uri, function(err, res, body) {
if (err) callback(err);
else{
request(uri).pipe(fs.createWriteStream(filename)).on('close', callback);
}
});
};
app.use(express.static('public'));
app.use('/swagger.json', express.static('docs/swagger.json'));
const formDataParser = multer().none();
const urlEncodedBodyParser = bodyParser.urlencoded({extended: false});
let taskManager;
@ -65,6 +55,7 @@ let server;
* description: Initialize the upload of a new task. If successful, a user can start uploading files via /task/new/upload. The task will not start until /task/new/commit is called.
* tags: [task]
* parameters:
* -
* name: name
* in: formData
* description: An optional name to be associated with the task
@ -115,21 +106,85 @@ let server;
* schema:
* $ref: '#/definitions/Error'
*/
app.post('/task/new/init', authCheck, taskNew.assignUUID, (req, res) => {
});
app.post('/task/new/init', authCheck, taskNew.assignUUID, formDataParser, taskNew.handleInit);
app.post('/task/new/upload/:uuid', authCheck, (req, res) => {
});
/** @swagger
* /task/new/upload/{uuid}:
* post:
* description: Adds one or more files to the task created via /task/new/init. It does not start the task. To start the task, call /task/new/commit.
* tags: [task]
* consumes:
* - multipart/form-data
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* -
* name: images
* in: formData
* description: Images to process, plus an optional GCP file. If included, the GCP file should have .txt extension
* required: true
* type: file
* -
* name: token
* in: query
* description: 'Token required for authentication (when authentication is required).'
* required: false
* type: string
* responses:
* 200:
* description: File Received
* schema:
* $ref: "#/definitions/Response"
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
app.post('/task/new/upload/:uuid', authCheck, taskNew.getUUID, taskNew.uploadImages, taskNew.handleUpload);
app.post('/task/new/commit/:uuid', authCheck, (req, res) => {
});
/** @swagger
* /task/new/commit/{uuid}:
* post:
* description: Creates a new task for which images have been uploaded via /task/new/upload.
* tags: [task]
* parameters:
* -
* name: uuid
* in: path
* description: UUID of the task
* required: true
* type: string
* -
* name: token
* in: query
* description: 'Token required for authentication (when authentication is required).'
* required: false
* type: string
* responses:
* 200:
* description: Success
* schema:
* type: object
* required: [uuid]
* properties:
* uuid:
* type: string
* description: UUID of the newly created task
* default:
* description: Error
* schema:
* $ref: '#/definitions/Error'
*/
app.post('/task/new/commit/:uuid', authCheck, taskNew.getUUID, taskNew.handleCommit, taskNew.createTask);
/** @swagger
* /task/new:
* post:
* description: Creates a new task and places it at the end of the processing queue
* description: Creates a new task and places it at the end of the processing queue. For uploading really large tasks, see /task/new/init instead.
* tags: [task]
* consumes:
* - multipart/form-data
@ -198,10 +253,12 @@ app.post('/task/new/commit/:uuid', authCheck, (req, res) => {
* $ref: '#/definitions/Error'
*/
app.post('/task/new', authCheck, taskNew.assignUUID, taskNew.uploadImages, (req, res, next) => {
console.log(req.body);
req.body = req.body || {};
if ((!req.files || req.files.length === 0) && !req.body.zipurl) req.error = "Need at least 1 file or a zip file url.";
else if (config.maxImages && req.files && req.files.length > config.maxImages) req.error = `${req.files.length} images uploaded, but this node can only process up to ${config.maxImages}.`;
next();
}, taskNew.handleTaskNew);
}, taskNew.createTask);
let getTaskFromUuid = (req, res, next) => {
let task = taskManager.find(req.params.uuid);

Wyświetl plik

@ -30,6 +30,7 @@ const Directories = require('./Directories');
const TASKS_DUMP_FILE = path.join(Directories.data, "tasks.json");
const CLEANUP_TASKS_IF_OLDER_THAN = 1000 * 60 * config.cleanupTasksAfter; // minutes
const CLEANUP_STALE_UPLOADS_AFTER = 1000 * 60 * config.cleanupUploadsAfter; // minutes
let taskManager;
@ -42,6 +43,7 @@ class TaskManager{
cb => this.restoreTaskListFromDump(cb),
cb => this.removeOldTasks(cb),
cb => this.removeOrphanedDirectories(cb),
cb => this.removeStaleUploads(cb),
cb => {
this.processNextTask();
cb();
@ -51,6 +53,7 @@ class TaskManager{
schedule.scheduleJob('0 * * * *', () => {
this.removeOldTasks();
this.dumpTaskList();
this.removeStaleUploads();
});
cb();
@ -84,7 +87,6 @@ class TaskManager{
// Removes directories that don't have a corresponding
// task associated with it (maybe as a cause of an abrupt exit)
// TODO: do not delete /task/new/init directories!!!
removeOrphanedDirectories(done){
logger.info("Checking for orphaned directories to be removed...");
@ -104,6 +106,30 @@ class TaskManager{
});
}
removeStaleUploads(done){
logger.info("Checking for stale uploads...");
fs.readdir("tmp", (err, entries) => {
if (err) done(err);
else{
const now = new Date();
async.eachSeries(entries, (entry, cb) => {
let dirPath = path.join("tmp", entry);
if (entry.match(/^[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+\-[\w\d]+$/)){
fs.stat(dirPath, (err, stats) => {
if (err) cb(err);
else{
if (stats.isDirectory() && stats.ctime.getTime() + CLEANUP_STALE_UPLOADS_AFTER < now.getTime()){
logger.info(`Found stale upload directory: ${entry}, removing...`);
rmdir(dirPath, cb);
}else cb();
}
});
}else cb();
}, done);
}
});
}
// Load tasks that already exists (if any)
restoreTaskListFromDump(done){
fs.readFile(TASKS_DUMP_FILE, (err, data) => {

Wyświetl plik

@ -27,6 +27,25 @@ const Directories = require('./Directories');
const unzip = require('node-unzip-2');
const mv = require('mv');
const Task = require('./Task');
const async = require('async');
const odmInfo = require('./odmInfo');
const request = require('request');
const download = function(uri, filename, callback) {
request.head(uri, function(err, res, body) {
if (err) callback(err);
else{
request(uri).pipe(fs.createWriteStream(filename)).on('close', callback);
}
});
};
const removeDirectory = function(dir, cb = () => {}){
fs.stat(dir, (err, stats) => {
if (!err && stats.isDirectory()) rmdir(dir, cb); // ignore errors, don't wait
else cb(err);
});
};
const upload = multer({
storage: multer.diskStorage({
@ -43,7 +62,9 @@ const upload = multer({
});
},
filename: (req, file, cb) => {
cb(null, file.originalname);
let filename = file.originalname;
if (filename === "body.json") filename = "_body.json";
cb(null, filename);
}
})
});
@ -68,30 +89,115 @@ module.exports = {
}
},
uploadImages: upload.array("images"),
getUUID: (req, res, next) => {
req.id = req.params.uuid;
if (!req.id) res.json({error: `Invalid uuid (not set)`});
setupFiles: (req, res, next) => {
// populate req.id (here or somehwere else)
// populate req.files from directory
// populate req.body from metadata file
const srcPath = path.join("tmp", req.id);
const bodyFile = path.join(srcPath, "body.json");
fs.access(bodyFile, fs.F_OK, err => {
if (err) res.json({error: `Invalid uuid (not found)`});
else next();
});
},
handleTaskNew: (req, res) => {
// TODO: consider doing the file moving in the background
// and return a response more quickly instead of a long timeout.
req.setTimeout(1000 * 60 * 20);
uploadImages: upload.array("images"),
let srcPath = path.join("tmp", req.id);
handleUpload: (req, res) => {
// IMPROVEMENT: check files count limits ahead of handleTaskNew
if (req.files && req.files.length > 0){
res.json({success: true});
}else{
res.json({error: "Need at least 1 file."});
}
},
handleCommit: (req, res, next) => {
const srcPath = path.join("tmp", req.id);
const bodyFile = path.join(srcPath, "body.json");
async.series([
cb => {
fs.readFile(bodyFile, 'utf8', (err, data) => {
if (err) cb(err);
else{
try{
const body = JSON.parse(data);
fs.unlink(bodyFile, err => {
if (err) cb(err);
else cb(null, body);
});
}catch(e){
cb("Malformed body.json");
}
}
});
},
cb => fs.readdir(srcPath, cb),
], (err, [ body, files ]) => {
if (err) res.json({error: err.message});
else{
req.body = body;
req.files = files;
next();
}
});
},
handleInit: (req, res) => {
console.log(req.body);
req.body = req.body || {};
const srcPath = path.join("tmp", req.id);
const bodyFile = path.join(srcPath, "body.json");
// Print error message and cleanup
const die = (error) => {
res.json({error});
removeDirectory(srcPath);
};
// Check if tmp/ directory needs to be cleaned
if (fs.stat(srcPath, (err, stats) => {
if (!err && stats.isDirectory()) rmdir(srcPath, () => {}); // ignore errors, don't wait
}));
async.series([
cb => {
// Check for problems before file uploads
if (req.body && req.body.options){
odmInfo.filterOptions(req.body.options, err => {
if (err) cb(err);
else cb();
});
}else cb();
},
cb => {
fs.stat(srcPath, (err, stat) => {
if (err && err.code === 'ENOENT') cb();
else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
});
},
cb => fs.mkdir(srcPath, undefined, cb),
cb => {
fs.writeFile(bodyFile, JSON.stringify(req.body), {encoding: 'utf8'}, cb);
},
cb => {
res.json({uuid: req.id});
cb();
}
], err => {
if (err) die(err.message);
});
},
createTask: (req, res) => {
// IMPROVEMENT: consider doing the file moving in the background
// and return a response more quickly instead of a long timeout.
req.setTimeout(1000 * 60 * 20);
const srcPath = path.join("tmp", req.id);
// Print error message and cleanup
const die = (error) => {
res.json({error});
removeDirectory(srcPath);
};
if (req.error !== undefined){
@ -202,7 +308,7 @@ module.exports = {
res.json({ uuid: req.id });
cb();
}
}, req.body.options,
}, req.body.options,
req.body.webhook,
req.body.skipPostProcessing === 'true');
}