S3 upload support

pull/51/head
Piero Toffanin 2018-11-23 13:19:52 -05:00
rodzic 3a16750bb6
commit 2e9e7e9f42
6 zmienionych plików z 146 dodań i 42 usunięć

Wyświetl plik

@ -41,8 +41,10 @@ Options:
--max_images <number> Specify the maximum number of images that this processing node supports. (default: unlimited)
--callback <url> Specify a callback URL to be invoked when a task completes processing (default: none)
--s3_endpoint <url> Specify a S3 endpoint (for example, nyc3.digitaloceanspaces.com) to upload completed task results to. (default: do not upload to S3)
--s3_bucket <bucket> Specify a S3 bucket name where to upload completed task results to. (default: none)
--s3_access_key <key> S3 access key, required if --s3_endpoint is set. (default: none)
--s3_secret_key <secret> S3 secret key, required if --s3_endpoint is set. (default: none)
--s3_signature_version <version> S3 signature version. (default: 4)
Log Levels:
error | debug | info | verbose | debug | silly
`);
@ -93,8 +95,9 @@ config.powercycle = argv.powercycle || fromConfigFile("powercycle", false);
config.token = argv.token || fromConfigFile("token", "");
config.maxImages = parseInt(argv.max_images || fromConfigFile("maxImages", "")) || null;
config.callback = argv.callback || fromConfigFile("callback", "");
config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", "")
config.s3Endpoint = argv.s3_endpoint || fromConfigFile("s3Endpoint", "");
config.s3Bucket = argv.s3_bucket || fromConfigFile("s3Bucket", "");
config.s3AccessKey = argv.s3_access_key || fromConfigFile("s3AccessKey", process.env.AWS_ACCESS_KEY_ID || "")
config.s3SecretKey = argv.s3_secret_key || fromConfigFile("s3SecretKey", process.env.AWS_SECRET_ACCESS_KEY || "")
config.s3SignatureVersion = argv.s3_signature_version || fromConfigFile("s3SignatureVersion", "4")
module.exports = config;

Wyświetl plik

@ -42,6 +42,7 @@ let Directories = require('./libs/Directories');
let unzip = require('node-unzip-2');
let si = require('systeminformation');
let mv = require('mv');
let S3 = require('./libs/S3');
let auth = require('./libs/auth/factory').fromConfig(config);
const authCheck = auth.getMiddleware();
@ -61,7 +62,8 @@ let download = function(uri, filename, callback) {
let winstonStream = {
write: function(message, encoding) {
logger.debug(message.slice(0, -1));
// Uncomment to get express requests debug output
// logger.debug(message.slice(0, -1));
}
};
app.use(morgan('combined', { stream: winstonStream }));
@ -713,6 +715,7 @@ if (config.test) logger.info("Running in test mode");
let commands = [
cb => odmInfo.initialize(cb),
cb => auth.initialize(cb),
cb => S3.initialize(cb),
cb => { taskManager = new TaskManager(cb); },
cb => {
server = app.listen(config.port, err => {

Wyświetl plik

@ -19,27 +19,110 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
const async = require('async');
const AWS = require('aws-sdk');
const fs = require('fs');
const s3 = new AWS.S3({});
const glob = require('glob');
const path = require('path');
const logger = require('./logger');
const config = require('../config');
let s3 = null;
module.exports = {
uploadToS3: function(srcFolder, endpoint, credentials, cb){
enabled: function(){
return s3 !== null;
},
initialize: function(cb){
if (config.s3Endpoint && config.s3Bucket && config.s3AccessKey && config.s3SecretKey){
const spacesEndpoint = new AWS.Endpoint(config.s3Endpoint);
s3 = new AWS.S3({
endpoint: spacesEndpoint,
signatureVersion: ('v' + config.s3SignatureVersion) || 'v4',
accessKeyId: config.s3AccessKey,
secretAccessKey: config.s3SecretKey
});
// Test connection
s3.putObject({
Bucket: config.s3Bucket,
Key: 'test.txt',
Body: ''
}, err => {
if (!err){
logger.info("Connected to S3");
cb();
}else{
cb(new Error("Cannot connect to S3. Check your S3 configuration: " + err.code));
}
});
}else cb();
},
// @param srcFolder {String} folder where to find paths (on local machine)
// @param bucket {String} S3 destination bucket
// @param dstFolder {String} prefix where to upload files on S3
// @param paths [{String}] list of paths relative to srcFolder
// @param cb {Function} callback
// @param onOutput {Function} (optional) callback when output lines are available
uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){
if (!s3) throw new Error("S3 is not initialized");
const PARALLEL_UPLOADS = 5;
const q = async.queue((task, callback) => {
const q = async.queue((file, done) => {
logger.debug(`Uploading ${file.src} --> ${file.dest}`);
s3.upload({
Bucket: 'xxx',
Key: task.dest,
Body: fs.createReadStream(task.src)
}, callback);
Bucket: bucket,
Key: file.dest,
Body: fs.createReadStream(file.src),
ACL: 'public-read'
}, err => {
if (err){
logger.debug(err);
const msg = "Cannot upload file to S3: " + err.code;
if (onOutput) onOutput(msg)
done(new Error(msg));
}else done();
});
}, PARALLEL_UPLOADS);
q.drain = function() {
console.log('all items have been processed');
};
q.push([
{ src: 'image1.png', dest: 'images/image1.png' },
{ src: 'image2.png', dest: 'images/image2.png' },
]);
let uploadList = [];
paths.forEach(p => {
const fullPath = path.join(srcFolder, p);
// Skip non-existing items
if (!fs.existsSync(fullPath)) return;
if (fs.lstatSync(fullPath).isDirectory()){
let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true });
globPaths.forEach(gp => {
uploadList.push({
src: path.join(srcFolder, gp),
dest: path.join(dstFolder, gp)
});
});
}else{
uploadList.push({
src: fullPath,
dest: path.join(dstFolder, p)
});
}
});
let cbCalled = false;
q.drain = () => {
if (!cbCalled) cb();
cbCalled = true;
};
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, err => {
if (err){
q.kill();
if (!cbCalled) cb(err);
cbCalled = true;
}
});
}
};

Wyświetl plik

@ -17,21 +17,22 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
"use strict";
let config = require('../config');
let async = require('async');
let assert = require('assert');
let logger = require('./logger');
let fs = require('fs');
let glob = require("glob");
let path = require('path');
let rmdir = require('rimraf');
let odmRunner = require('./odmRunner');
let processRunner = require('./processRunner');
let archiver = require('archiver');
let Directories = require('./Directories');
let kill = require('tree-kill');
const config = require('../config');
const async = require('async');
const assert = require('assert');
const logger = require('./logger');
const fs = require('fs');
const glob = require("glob");
const path = require('path');
const rmdir = require('rimraf');
const odmRunner = require('./odmRunner');
const processRunner = require('./processRunner');
const archiver = require('archiver');
const Directories = require('./Directories');
const kill = require('tree-kill');
const S3 = require('./S3');
let statusCodes = require('./statusCodes');
const statusCodes = require('./statusCodes');
module.exports = class Task{
constructor(uuid, name, done, options = [], webhook = null){
@ -215,7 +216,11 @@ module.exports = class Task{
const finished = err => {
this.stopTrackingProcessingTime();
done(err);
};
};
const sourcePath = !config.test ?
this.getProjectFolderPath() :
path.join("tests", "processing_results");
const postProcess = () => {
const createZipArchive = (outputFilename, files) => {
@ -240,9 +245,6 @@ module.exports = class Task{
// Process files and directories first
files.forEach(file => {
let sourcePath = !config.test ?
this.getProjectFolderPath() :
path.join("tests", "processing_results");
let filePath = path.join(sourcePath, file);
// Skip non-existing items
@ -329,12 +331,25 @@ module.exports = class Task{
allPaths.splice(allPaths.indexOf(p), 1);
});
}
}
async.series([
}
let tasks = [
runPostProcessingScript(),
createZipArchive('all.zip', allPaths)
], (err) => {
];
// Upload to S3 all paths + all.zip file (if config says so)
if (S3.enabled()){
tasks.push((done) => {
S3.uploadPaths(sourcePath, config.s3Bucket, this.uuid, ['all.zip'].concat(allPaths),
err => {
if (!err) this.output.push("Done uploading to S3!");
done(err);
}, output => this.output.push(output));
});
}
async.series(tasks, (err) => {
if (!err){
this.setStatus(statusCodes.COMPLETED);
finished();

Wyświetl plik

@ -21,7 +21,7 @@
"homepage": "https://github.com/pierotofy/node-OpenDroneMap#readme",
"dependencies": {
"archiver": "^1.0.0",
"async": "^2.0.0-rc.6",
"async": "^2.6.1",
"aws-sdk": "^2.360.0",
"body-parser": "^1.18.3",
"express": "^4.16.3",

Plik binarny nie jest wyświetlany.