
199 wiersze
7.6 KiB
Czysty Zwykły widok Historia

2018-11-22 03:22:37 +00:00
Node-OpenDroneMap Node.js App and REST API to access OpenDroneMap.
Copyright (C) 2016 Node-OpenDroneMap Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <>.
"use strict";
const async = require('async');
const AWS = require('aws-sdk');
const fs = require('fs');
2018-11-23 18:19:52 +00:00
const glob = require('glob');
const path = require('path');
const logger = require('./logger');
const config = require('../config');
2020-11-02 17:53:50 +00:00
const si = require('systeminformation');
2018-11-23 18:19:52 +00:00
let s3 = null;
2018-11-22 03:22:37 +00:00
module.exports = {
2018-11-23 18:19:52 +00:00
enabled: function(){
return s3 !== null;
initialize: function(cb){
if (config.s3Endpoint && config.s3Bucket){
2018-11-23 18:19:52 +00:00
const spacesEndpoint = new AWS.Endpoint(config.s3Endpoint);
const s3Config = {
2018-11-23 18:19:52 +00:00
endpoint: spacesEndpoint,
signatureVersion: ('v' + config.s3SignatureVersion) || 'v4',
s3ForcePathStyle: config.s3ForcePathStyle,
// If we are not using IAM roles then we need to pass access key and secret key in our config
if (config.s3AccessKey && config.s3SecretKey) {
s3Config['accessKeyId'] = config.s3AccessKey;
s3Config['secretAccessKey'] = config.s3SecretKey;
}else{"Secret Key and Access ID not passed. Using the IAM role");
s3 = new AWS.S3(s3Config);
2018-11-23 18:19:52 +00:00
// Test connection
Bucket: config.s3Bucket,
Key: 'test.txt',
Body: ''
}, err => {
if (!err){"Connected to S3");
cb(new Error("Cannot connect to S3. Check your S3 configuration: " + err.code));
}else cb();
// @param srcFolder {String} folder where to find paths (on local machine)
// @param bucket {String} S3 destination bucket
// @param dstFolder {String} prefix where to upload files on S3
// @param paths [{String}] list of paths relative to srcFolder
// @param cb {Function} callback
// @param onOutput {Function} (optional) callback when output lines are available
uploadPaths: function(srcFolder, bucket, dstFolder, paths, cb, onOutput){
if (!s3) throw new Error("S3 is not initialized");
2020-11-02 17:53:50 +00:00
const PARALLEL_UPLOADS = 4; // Upload these many files at the same time
const MAX_RETRIES = 6;
2020-11-02 17:53:50 +00:00
const MIN_PART_SIZE = 5 * 1024 * 1024;
2018-11-22 03:22:37 +00:00
2020-11-02 17:53:50 +00:00
// Get available memory, as on low-powered machines
// we might not be able to upload many large chunks at once
si.mem(memory => {
let concurrency = 10; // Upload these many parts per file at the same time
let progress = {};
2020-11-02 17:13:30 +00:00
2020-11-02 17:53:50 +00:00
let partSize = 100 * 1024 * 1024;
let memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS; // Conservative
2018-11-23 18:19:52 +00:00
2020-11-02 17:53:50 +00:00
// Try reducing concurrency first
while(memoryRequirement > memory.available && concurrency > 1){
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
2018-11-23 18:19:52 +00:00
2020-11-02 17:53:50 +00:00
// Try reducing partSize afterwards
while(memoryRequirement > memory.available && partSize > MIN_PART_SIZE){
partSize = Math.max(MIN_PART_SIZE, Math.floor(partSize * 0.80));
memoryRequirement = partSize * concurrency * PARALLEL_UPLOADS;
2018-11-23 18:19:52 +00:00
2020-11-02 17:53:50 +00:00
const q = async.queue((file, done) => {
logger.debug(`Uploading ${file.src} --> ${file.dest}`);
const filename = path.basename(file.dest);
progress[filename] = 0;
2022-07-04 16:45:32 +00:00
const uploadOpts = {
2020-11-02 17:53:50 +00:00
Bucket: bucket,
Key: file.dest,
2022-07-04 16:45:32 +00:00
Body: fs.createReadStream(file.src)
if (config.s3ACL) uploadOpts.ACL = config.s3ACL;
s3.upload(uploadOpts, {partSize, queueSize: concurrency}, err => {
2020-11-02 17:53:50 +00:00
if (err){
const msg = `Cannot upload file to S3: ${err.code}, retrying... ${file.retries}`;
if (onOutput) onOutput(msg);
if (file.retries < MAX_RETRIES){
concurrency = Math.max(1, Math.floor(concurrency * 0.66));
progress[filename] = 0;
setTimeout(() => {
q.push(file, errHandler);
}, (2 ** file.retries) * 1000);
done(new Error(msg));
}else done();
}).on('httpUploadProgress', p => {
const perc = Math.round((p.loaded / * 100)
if (perc % 5 == 0 && progress[filename] < perc){
progress[filename] = perc;
if (onOutput) {
onOutput(`Uploading ${filename}... ${progress[filename]}%`);
if (progress[filename] == 100){
2020-11-02 17:54:32 +00:00
onOutput(`Finalizing ${filename} upload, this could take a bit...`);
2020-11-02 17:53:50 +00:00
const errHandler = err => {
if (err){
if (!cbCalled){
cbCalled = true;
let uploadList = [];
paths.forEach(p => {
const fullPath = path.join(srcFolder, p);
// Skip non-existing items
if (!fs.existsSync(fullPath)) return;
if (fs.lstatSync(fullPath).isDirectory()){
let globPaths = glob.sync(`${p}/**`, { cwd: srcFolder, nodir: true, nosort: true });
globPaths.forEach(gp => {
src: path.join(srcFolder, gp),
dest: path.join(dstFolder, gp),
retries: 0
2018-11-23 18:19:52 +00:00
2020-11-02 17:53:50 +00:00
src: fullPath,
dest: path.join(dstFolder, p),
retries: 0
2018-11-23 18:19:52 +00:00
2020-11-02 17:53:50 +00:00
let cbCalled = false;
q.drain = () => {
if (!cbCalled){
cbCalled = true;
if (onOutput) onOutput(`Uploading ${uploadList.length} files to S3...`);
q.push(uploadList, errHandler);
2018-11-23 18:19:52 +00:00
2018-11-22 03:22:37 +00:00