Memory-aware multipart uploads

pull/132/head
Piero Toffanin 2020-11-02 12:53:50 -05:00
rodzic b2f7a19e1f
commit 6ae2cca1f6
3 zmienionych plików z 106 dodań i 79 usunięć

Wyświetl plik

@ -52,7 +52,7 @@ Options:
--s3_secret_key <secret> S3 secret key, required if --s3_endpoint is set. (default: none)
--s3_signature_version <version> S3 signature version. (default: 4)
--s3_acl <canned-acl> S3 object acl. (default: public-read)
--s3_upload_everything Upload all task results to S3. (default: upload only .zip archive and orthophoto)
--s3_upload_everything Upload all task results to S3. (default: upload only all.zip archive)
--max_concurrency <number> Place a cap on the max-concurrency option to use for each task. (default: no limit)
--max_runtime <number> Number of minutes (approximate) that a task is allowed to run before being forcibly canceled (timeout). (default: no limit)
Log Levels:

Wyświetl plik

@ -23,6 +23,7 @@ const glob = require('glob');
const path = require('path');
const logger = require('./logger');
const config = require('../config');
const si = require('systeminformation');
let s3 = null;
@ -76,92 +77,120 @@ module.exports = {
uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){
if (!s3) throw new Error("S3 is not initialized");
const PARALLEL_UPLOADS = 5; // Upload these many files at the same time
const PARALLEL_UPLOADS = 4; // Upload these many files at the same time
const MAX_RETRIES = 6;
const MIN_PART_SIZE = 5 * 1024 * 1024;
let concurrency = 10; // Upload these many parts per file at the same time
let progress = 0;
// Get available memory, as on low-powered machines
// we might not be able to upload many large chunks at once
si.mem(memory => {
let concurrency = 10; // Upload these many parts per file at the same time
let progress = {};
const q = async.queue((file, done) => {
logger.debug(`Uploading ${file.src} --> ${file.dest}`);
s3.upload({
Bucket: bucket,
Key: file.dest,
Body: fs.createReadStream(file.src),
ACL: config.s3ACL
}, {partSize: 100 * 1024 * 1024, queueSize: concurrency}, err => {
if (err){
logger.debug(err);
const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`;
if (onOutput) onOutput(msg);
if (file.retries < MAX_RETRIES){
file.retries++;
concurrency = Math.max(1, Math.floor(concurrency * 0.66));
progress = 0;
let partSize = 100 * 1024 * 1024;
let memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; // Conservative
setTimeout(() => {
q.push(file, errHandler);
done();
}, (2 ** file.retries) * 1000);
}else{
done(new Error(msg));
}
}else done();
}).on('httpUploadProgress', p => {
const perc = Math.round((p.loaded / p.total) * 100)
if (perc % 5 == 0 && progress < perc){
progress = perc;
if (onOutput) onOutput(`Uploading ${path.basename(file.dest)}... ${progress}%`);
}
});
}, PARALLEL_UPLOADS);
const errHandler = err => {
if (err){
q.kill();
if (!cbCalled){
cbCalled = true;
cb(err);
}
// Try reducing concurrency first
while(memoryRequirement > memory.available && concurrency > 1){
concurrency--;
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
}
};
let uploadList = [];
// Try reducing partSize afterwards
while(memoryRequirement > memory.available && partSize > MIN_PART_SIZE){
partSize = Math.max(MIN_PART_SIZE, Math.floor(partSize * 0.80));
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
}
paths.forEach(p => {
const fullPath = path.join(srcFolder, p);
// Skip non-existing items
if (!fs.existsSync(fullPath)) return;
const q = async.queue((file, done) => {
logger.debug(`Uploading ${file.src} --> ${file.dest}`);
const filename = path.basename(file.dest);
progress[filename] = 0;
if (fs.lstatSync(fullPath).isDirectory()){
let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true });
globPaths.forEach(gp => {
s3.upload({
Bucket: bucket,
Key: file.dest,
Body: fs.createReadStream(file.src),
ACL: config.s3ACL
}, {partSize, queueSize: concurrency}, err => {
if (err){
logger.debug(err);
const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`;
if (onOutput) onOutput(msg);
if (file.retries < MAX_RETRIES){
file.retries++;
concurrency = Math.max(1, Math.floor(concurrency * 0.66));
progress[filename] = 0;
setTimeout(() => {
q.push(file, errHandler);
done();
}, (2 ** file.retries) * 1000);
}else{
done(new Error(msg));
}
}else done();
}).on('httpUploadProgress', p => {
const perc = Math.round((p.loaded / p.total) * 100)
if (perc % 5 == 0 && progress[filename] < perc){
progress[filename] = perc;
if (onOutput) {
onOutput(`Uploading ${filename}... ${progress[filename]}%`);
if (progress[filename] == 100){
onOutput(`Merging ${filename} parts (if any), this could take a bit...`);
}
}
}
});
}, PARALLEL_UPLOADS);
const errHandler = err => {
if (err){
q.kill();
if (!cbCalled){
cbCalled = true;
cb(err);
}
}
};
let uploadList = [];
paths.forEach(p => {
const fullPath = path.join(srcFolder, p);
// Skip non-existing items
if (!fs.existsSync(fullPath)) return;
if (fs.lstatSync(fullPath).isDirectory()){
let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true });
globPaths.forEach(gp => {
uploadList.push({
src: path.join(srcFolder, gp),
dest: path.join(dstFolder, gp),
retries: 0
});
});
}else{
uploadList.push({
src: path.join(srcFolder, gp),
dest: path.join(dstFolder, gp),
src: fullPath,
dest: path.join(dstFolder, p),
retries: 0
});
});
}else{
uploadList.push({
src: fullPath,
dest: path.join(dstFolder, p),
retries: 0
});
}
}
});
let cbCalled = false;
q.drain = () => {
if (!cbCalled){
cbCalled = true;
cb();
}
};
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, errHandler);
});
let cbCalled = false;
q.drain = () => {
if (!cbCalled){
cbCalled = true;
cb();
}
};
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, errHandler);
}
};

Wyświetl plik

@ -423,9 +423,7 @@ module.exports = class Task{
if (S3.enabled()){
tasks.push((done) => {
let s3Paths;
if (config.test){
s3Paths = ['all.zip']; // During testing only upload all.zip
}else if (config.s3UploadEverything){
if (config.s3UploadEverything){
s3Paths = ['all.zip'].concat(allPaths);
}else{
s3Paths = ['all.zip'];