Merge pull request #132 from pierotofy/s3

Faster S3 uploads, progress reporting
pull/135/head
Piero Toffanin 2020-11-02 12:55:21 -05:00 zatwierdzone przez GitHub
commit c1d87c1b25
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
3 zmienionych plików z 107 dodań i 68 usunięć

Wyświetl plik

@ -52,7 +52,7 @@ Options:
--s3_secret_key <secret> S3 secret key, required if --s3_endpoint is set. (default: none)
--s3_signature_version <version> S3 signature version. (default: 4)
--s3_acl <canned-acl> S3 object acl. (default: public-read)
--s3_upload_everything Upload all task results to S3. (default: upload only .zip archive and orthophoto)
--s3_upload_everything Upload all task results to S3. (default: upload only all.zip archive)
--max_concurrency <number> Place a cap on the max-concurrency option to use for each task. (default: no limit)
--max_runtime <number> Number of minutes (approximate) that a task is allowed to run before being forcibly canceled (timeout). (default: no limit)
Log Levels:

Wyświetl plik

@ -23,6 +23,7 @@ const glob = require('glob');
const path = require('path');
const logger = require('./logger');
const config = require('../config');
const si = require('systeminformation');
let s3 = null;
@ -76,23 +77,51 @@ module.exports = {
uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){
if (!s3) throw new Error("S3 is not initialized");
const PARALLEL_UPLOADS = 5;
const PARALLEL_UPLOADS = 4; // Upload these many files at the same time
const MAX_RETRIES = 6;
const MIN_PART_SIZE = 5 * 1024 * 1024;
// Get available memory, as on low-powered machines
// we might not be able to upload many large chunks at once
si.mem(memory => {
let concurrency = 10; // Upload these many parts per file at the same time
let progress = {};
let partSize = 100 * 1024 * 1024;
let memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; // Conservative
// Try reducing concurrency first
while(memoryRequirement > memory.available && concurrency > 1){
concurrency--;
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
}
// Try reducing partSize afterwards
while(memoryRequirement > memory.available && partSize > MIN_PART_SIZE){
partSize = Math.max(MIN_PART_SIZE, Math.floor(partSize * 0.80));
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
}
const q = async.queue((file, done) => {
logger.debug(`Uploading ${file.src} --> ${file.dest}`);
const filename = path.basename(file.dest);
progress[filename] = 0;
s3.upload({
Bucket: bucket,
Key: file.dest,
Body: fs.createReadStream(file.src),
ACL: config.s3ACL
}, {partSize: 5 * 1024 * 1024, queueSize: 1}, err => {
}, {partSize, queueSize: concurrency}, err => {
if (err){
logger.debug(err);
const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`;
if (onOutput) onOutput(msg);
if (file.retries < MAX_RETRIES){
file.retries++;
concurrency = Math.max(1, Math.floor(concurrency * 0.66));
progress[filename] = 0;
setTimeout(() => {
q.push(file, errHandler);
done();
@ -101,6 +130,17 @@ module.exports = {
done(new Error(msg));
}
}else done();
}).on('httpUploadProgress', p => {
const perc = Math.round((p.loaded / p.total) * 100)
if (perc % 5 == 0 && progress[filename] < perc){
progress[filename] = perc;
if (onOutput) {
onOutput(`Uploading ${filename}... ${progress[filename]}%`);
if (progress[filename] == 100){
onOutput(`Finalizing ${filename} upload, this could take a bit...`);
}
}
}
});
}, PARALLEL_UPLOADS);
@ -151,5 +191,6 @@ module.exports = {
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, errHandler);
});
}
};

Wyświetl plik

@ -423,9 +423,7 @@ module.exports = class Task{
if (S3.enabled()){
tasks.push((done) => {
let s3Paths;
if (config.test){
s3Paths = ['all.zip']; // During testing only upload all.zip
}else if (config.s3UploadEverything){
if (config.s3UploadEverything){
s3Paths = ['all.zip'].concat(allPaths);
}else{
s3Paths = ['all.zip'];