diff --git a/config.js b/config.js index c1d8742..3ecb96b 100644 --- a/config.js +++ b/config.js @@ -41,8 +41,10 @@ Options: --max_images Specify the maximum number of images that this processing node supports. (default: unlimited) --callback Specify a callback URL to be invoked when a task completes processing (default: none) --s3_endpoint Specify a S3 endpoint (for example, nyc3.digitaloceanspaces.com) to upload completed task results to. (default: do not upload to S3) + --s3_bucket Specify a S3 bucket name where to upload completed task results to. (default: none) --s3_access_key S3 access key, required if --s3_endpoint is set. (default: none) --s3_secret_key S3 secret key, required if --s3_endpoint is set. (default: none) + --s3_signature_version S3 signature version. (default: 4) Log Levels: error | debug | info | verbose | debug | silly `); @@ -93,8 +95,9 @@ config.powercycle = argv.powercycle || fromConfigFile("powercycle", false); config.token = argv.token || fromConfigFile("token", ""); config.maxImages = parseInt(argv.max_images || fromConfigFile("maxImages", "")) || null; config.callback = argv.callback || fromConfigFile("callback", ""); -config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", "") +config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", ""); +config.s3Bucket = argv.s3_bucket || fromConfigFile("s3Bucket", ""); config.s3AccessKey = argv.s3_access_key || fromConfigFile("s3AccessKey", process.env.AWS_ACCESS_KEY_ID || "") config.s3SecretKey = argv.s3_secret_key || fromConfigFile("s3SecretKey", process.env.AWS_SECRET_ACCESS_KEY || "") - +config.s3SignatureVersion = argv.s3_signature_version || fromConfigFile("s3SignatureVersion", "4") module.exports = config; diff --git a/index.js b/index.js index aa4b299..8889b31 100644 --- a/index.js +++ b/index.js @@ -42,6 +42,7 @@ let Directories = require('./libs/Directories'); let unzip = require('node-unzip-2'); let si = require('systeminformation'); let mv = require('mv'); +let S3 = require('./libs/S3'); let auth = require('./libs/auth/factory').fromConfig(config); const authCheck = auth.getMiddleware(); @@ -61,7 +62,8 @@ let download = function(uri, filename, callback) { let winstonStream = { write: function(message, encoding) { - logger.debug(message.slice(0, -1)); + // Uncomment to get express requests debug output + // logger.debug(message.slice(0, -1)); } }; app.use(morgan('combined', { stream: winstonStream })); @@ -713,6 +715,7 @@ if (config.test) logger.info("Running in test mode"); let commands = [ cb => odmInfo.initialize(cb), cb => auth.initialize(cb), + cb => S3.initialize(cb), cb => { taskManager = new TaskManager(cb); }, cb => { server = app.listen(config.port, err => { diff --git a/libs/S3.js b/libs/S3.js index 2826707..a753675 100644 --- a/libs/S3.js +++ b/libs/S3.js @@ -19,27 +19,110 @@ along with this program. If not, see . const async = require('async'); const AWS = require('aws-sdk'); const fs = require('fs'); -const s3 = new AWS.S3({}); +const glob = require('glob'); +const path = require('path'); +const logger = require('./logger'); +const config = require('../config'); + +let s3 = null; module.exports = { - uploadToS3: function(srcFolder, endpoint, credentials, cb){ + enabled: function(){ + return s3 !== null; + }, + + initialize: function(cb){ + if (config.s3Endpoint && config.s3Bucket && config.s3AccessKey && config.s3SecretKey){ + const spacesEndpoint = new AWS.Endpoint(config.s3Endpoint); + s3 = new AWS.S3({ + endpoint: spacesEndpoint, + signatureVersion: ('v' + config.s3SignatureVersion) || 'v4', + accessKeyId: config.s3AccessKey, + secretAccessKey: config.s3SecretKey + }); + + // Test connection + s3.putObject({ + Bucket: config.s3Bucket, + Key: 'test.txt', + Body: '' + }, err => { + if (!err){ + logger.info("Connected to S3"); + cb(); + }else{ + cb(new Error("Cannot connect to S3. Check your S3 configuration: " + err.code)); + } + }); + }else cb(); + }, + + // @param srcFolder {String} folder where to find paths (on local machine) + // @param bucket {String} S3 destination bucket + // @param dstFolder {String} prefix where to upload files on S3 + // @param paths [{String}] list of paths relative to srcFolder + // @param cb {Function} callback + // @param onOutput {Function} (optional) callback when output lines are available + uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){ + if (!s3) throw new Error("S3 is not initialized"); + const PARALLEL_UPLOADS = 5; - const q = async.queue((task, callback) => { + const q = async.queue((file, done) => { + logger.debug(`Uploading ${file.src} --> ${file.dest}`); s3.upload({ - Bucket: 'xxx', - Key: task.dest, - Body: fs.createReadStream(task.src) - }, callback); + Bucket: bucket, + Key: file.dest, + Body: fs.createReadStream(file.src), + ACL: 'public-read' + }, err => { + if (err){ + logger.debug(err); + const msg = "Cannot upload file to S3: " + err.code; + if (onOutput) onOutput(msg) + done(new Error(msg)); + }else done(); + }); }, PARALLEL_UPLOADS); - - q.drain = function() { - console.log('all items have been processed'); - }; - - q.push([ - { src: 'image1.png', dest: 'images/image1.png' }, - { src: 'image2.png', dest: 'images/image2.png' }, - ]); + + let uploadList = []; + + paths.forEach(p => { + const fullPath = path.join(srcFolder, p); + + // Skip non-existing items + if (!fs.existsSync(fullPath)) return; + + if (fs.lstatSync(fullPath).isDirectory()){ + let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true }); + + globPaths.forEach(gp => { + uploadList.push({ + src: path.join(srcFolder, gp), + dest: path.join(dstFolder, gp) + }); + }); + }else{ + uploadList.push({ + src: fullPath, + dest: path.join(dstFolder, p) + }); + } + }); + + let cbCalled = false; + q.drain = () => { + if (!cbCalled) cb(); + cbCalled = true; + }; + + if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`); + q.push(uploadList, err => { + if (err){ + q.kill(); + if (!cbCalled) cb(err); + cbCalled = true; + } + }); } }; diff --git a/libs/Task.js b/libs/Task.js index c394cae..9cb84eb 100644 --- a/libs/Task.js +++ b/libs/Task.js @@ -17,21 +17,22 @@ along with this program. If not, see . */ "use strict"; -let config = require('../config'); -let async = require('async'); -let assert = require('assert'); -let logger = require('./logger'); -let fs = require('fs'); -let glob = require("glob"); -let path = require('path'); -let rmdir = require('rimraf'); -let odmRunner = require('./odmRunner'); -let processRunner = require('./processRunner'); -let archiver = require('archiver'); -let Directories = require('./Directories'); -let kill = require('tree-kill'); +const config = require('../config'); +const async = require('async'); +const assert = require('assert'); +const logger = require('./logger'); +const fs = require('fs'); +const glob = require("glob"); +const path = require('path'); +const rmdir = require('rimraf'); +const odmRunner = require('./odmRunner'); +const processRunner = require('./processRunner'); +const archiver = require('archiver'); +const Directories = require('./Directories'); +const kill = require('tree-kill'); +const S3 = require('./S3'); -let statusCodes = require('./statusCodes'); +const statusCodes = require('./statusCodes'); module.exports = class Task{ constructor(uuid, name, done, options = [], webhook = null){ @@ -215,7 +216,11 @@ module.exports = class Task{ const finished = err => { this.stopTrackingProcessingTime(); done(err); - }; + }; + + const sourcePath = !config.test ? + this.getProjectFolderPath() : + path.join("tests", "processing_results"); const postProcess = () => { const createZipArchive = (outputFilename, files) => { @@ -240,9 +245,6 @@ module.exports = class Task{ // Process files and directories first files.forEach(file => { - let sourcePath = !config.test ? - this.getProjectFolderPath() : - path.join("tests", "processing_results"); let filePath = path.join(sourcePath, file); // Skip non-existing items @@ -329,12 +331,25 @@ module.exports = class Task{ allPaths.splice(allPaths.indexOf(p), 1); }); } - } - - async.series([ + } + + let tasks = [ runPostProcessingScript(), createZipArchive('all.zip', allPaths) - ], (err) => { + ]; + + // Upload to S3 all paths + all.zip file (if config says so) + if (S3.enabled()){ + tasks.push((done) => { + S3.uploadPaths(sourcePath, config.s3Bucket, this.uuid, ['all.zip'].concat(allPaths), + err => { + if (!err) this.output.push("Done uploading to S3!"); + done(err); + }, output => this.output.push(output)); + }); + } + + async.series(tasks, (err) => { if (!err){ this.setStatus(statusCodes.COMPLETED); finished(); diff --git a/package.json b/package.json index b0d79a9..2a134a9 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "homepage": "https://github.com/pierotofy/node-OpenDroneMap#readme", "dependencies": { "archiver": "^1.0.0", - "async": "^2.0.0-rc.6", + "async": "^2.6.1", "aws-sdk": "^2.360.0", "body-parser": "^1.18.3", "express": "^4.16.3", diff --git a/tests/processing_results/all.zip b/tests/processing_results/all.zip new file mode 100644 index 0000000..ed2f92f Binary files /dev/null and b/tests/processing_results/all.zip differ