diff --git a/config.js b/config.js index 8979bea..aac1dfb 100644 --- a/config.js +++ b/config.js @@ -52,7 +52,7 @@ Options: --s3_secret_key S3 secret key, required if --s3_endpoint is set. (default: none) --s3_signature_version S3 signature version. (default: 4) --s3_acl S3 object acl. (default: public-read) - --s3_upload_everything Upload all task results to S3. (default: upload only .zip archive and orthophoto) + --s3_upload_everything Upload all task results to S3. (default: upload only all.zip archive) --max_concurrency Place a cap on the max-concurrency option to use for each task. (default: no limit) --max_runtime Number of minutes (approximate) that a task is allowed to run before being forcibly canceled (timeout). (default: no limit) Log Levels: diff --git a/libs/S3.js b/libs/S3.js index b1e4ffe..5672e53 100644 --- a/libs/S3.js +++ b/libs/S3.js @@ -23,6 +23,7 @@ const glob = require('glob'); const path = require('path'); const logger = require('./logger'); const config = require('../config'); +const si = require('systeminformation'); let s3 = null; @@ -76,80 +77,120 @@ module.exports = { uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){ if (!s3) throw new Error("S3 is not initialized"); - const PARALLEL_UPLOADS = 5; + const PARALLEL_UPLOADS = 4; // Upload these many files at the same time const MAX_RETRIES = 6; + const MIN_PART_SIZE = 5 * 1024 * 1024; - const q = async.queue((file, done) => { - logger.debug(`Uploading ${file.src} --> ${file.dest}`); - s3.upload({ - Bucket: bucket, - Key: file.dest, - Body: fs.createReadStream(file.src), - ACL: config.s3ACL - }, {partSize: 5 * 1024 * 1024, queueSize: 1}, err => { - if (err){ - logger.debug(err); - const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`; - if (onOutput) onOutput(msg); - if (file.retries < MAX_RETRIES){ - file.retries++; - setTimeout(() => { - q.push(file, errHandler); - done(); - }, (2 ** file.retries) * 1000); - }else{ - done(new Error(msg)); - } - }else done(); - }); - }, PARALLEL_UPLOADS); + // Get available memory, as on low-powered machines + // we might not be able to upload many large chunks at once + si.mem(memory => { + let concurrency = 10; // Upload these many parts per file at the same time + let progress = {}; - const errHandler = err => { - if (err){ - q.kill(); - if (!cbCalled){ - cbCalled = true; - cb(err); - } + let partSize = 100 * 1024 * 1024; + let memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; // Conservative + + // Try reducing concurrency first + while(memoryRequirement > memory.available && concurrency > 1){ + concurrency--; + memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; } - }; - let uploadList = []; + // Try reducing partSize afterwards + while(memoryRequirement > memory.available && partSize > MIN_PART_SIZE){ + partSize = Math.max(MIN_PART_SIZE, Math.floor(partSize * 0.80)); + memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; + } - paths.forEach(p => { - const fullPath = path.join(srcFolder, p); - - // Skip non-existing items - if (!fs.existsSync(fullPath)) return; + const q = async.queue((file, done) => { + logger.debug(`Uploading ${file.src} --> ${file.dest}`); + const filename = path.basename(file.dest); + progress[filename] = 0; - if (fs.lstatSync(fullPath).isDirectory()){ - let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true }); - - globPaths.forEach(gp => { + s3.upload({ + Bucket: bucket, + Key: file.dest, + Body: fs.createReadStream(file.src), + ACL: config.s3ACL + }, {partSize, queueSize: concurrency}, err => { + if (err){ + logger.debug(err); + const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`; + if (onOutput) onOutput(msg); + if (file.retries < MAX_RETRIES){ + file.retries++; + concurrency = Math.max(1, Math.floor(concurrency * 0.66)); + progress[filename] = 0; + + setTimeout(() => { + q.push(file, errHandler); + done(); + }, (2 ** file.retries) * 1000); + }else{ + done(new Error(msg)); + } + }else done(); + }).on('httpUploadProgress', p => { + const perc = Math.round((p.loaded / p.total) * 100) + if (perc % 5 == 0 && progress[filename] < perc){ + progress[filename] = perc; + if (onOutput) { + onOutput(`Uploading ${filename}... ${progress[filename]}%`); + if (progress[filename] == 100){ + onOutput(`Finalizing ${filename} upload, this could take a bit...`); + } + } + } + }); + }, PARALLEL_UPLOADS); + + const errHandler = err => { + if (err){ + q.kill(); + if (!cbCalled){ + cbCalled = true; + cb(err); + } + } + }; + + let uploadList = []; + + paths.forEach(p => { + const fullPath = path.join(srcFolder, p); + + // Skip non-existing items + if (!fs.existsSync(fullPath)) return; + + if (fs.lstatSync(fullPath).isDirectory()){ + let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true }); + + globPaths.forEach(gp => { + uploadList.push({ + src: path.join(srcFolder, gp), + dest: path.join(dstFolder, gp), + retries: 0 + }); + }); + }else{ uploadList.push({ - src: path.join(srcFolder, gp), - dest: path.join(dstFolder, gp), + src: fullPath, + dest: path.join(dstFolder, p), retries: 0 }); - }); - }else{ - uploadList.push({ - src: fullPath, - dest: path.join(dstFolder, p), - retries: 0 - }); - } + } + }); + + let cbCalled = false; + q.drain = () => { + if (!cbCalled){ + cbCalled = true; + cb(); + } + }; + + if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`); + q.push(uploadList, errHandler); }); - - let cbCalled = false; - q.drain = () => { - if (!cbCalled){ - cbCalled = true; - cb(); - } - }; - - if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`); - q.push(uploadList, errHandler); } }; diff --git a/libs/Task.js b/libs/Task.js index 78b1eb1..b9a3c95 100644 --- a/libs/Task.js +++ b/libs/Task.js @@ -423,9 +423,7 @@ module.exports = class Task{ if (S3.enabled()){ tasks.push((done) => { let s3Paths; - if (config.test){ - s3Paths = ['all.zip']; // During testing only upload all.zip - }else if (config.s3UploadEverything){ + if (config.s3UploadEverything){ s3Paths = ['all.zip'].concat(allPaths); }else{ s3Paths = ['all.zip'];