Add backups to second S3 provider, don't upload Mastodon archives

trunk
Una Thompson 2019-11-11 22:26:51 -08:00
rodzic a6d5c896bf
commit ecfa6aba32
4 zmienionych plików z 223 dodań i 25 usunięć

Wyświetl plik

@ -6,6 +6,12 @@
bucket: "mybucket"
publicHost: "https://sfo2.digitaloceanspaces.com/mybucket"
}
backupBackend: {
endpoint: "https://s3.us-east-2.wasabisys.com"
accessKeyId: "ACCESS_KEY_ID"
secretAccessKey: "SECRET_ACCESS_KEY"
bucket: "mybucket-backup"
}
mysql: {
host: "localhost"
port: 3306

Wyświetl plik

@ -5,7 +5,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.sql.DataSource;
@ -30,6 +32,8 @@ import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.blobstore.util.ForwardingBlobStore;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationScope;
import org.jclouds.domain.internal.LocationImpl;
import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload;
@ -43,12 +47,15 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
public class JortageBlobStore extends ForwardingBlobStore {
private final BlobStore dumpsStore;
private final String identity;
private final String bucket;
private final DataSource dataSource;
public JortageBlobStore(BlobStore blobStore, String bucket, String identity, DataSource dataSource) {
public JortageBlobStore(BlobStore blobStore, BlobStore dumpsStore, String bucket, String identity, DataSource dataSource) {
super(blobStore);
this.dumpsStore = dumpsStore;
dumpsStore.createContainerInLocation(null, identity);
this.bucket = bucket;
this.identity = identity;
this.dataSource = dataSource;
@ -65,6 +72,10 @@ public class JortageBlobStore extends ForwardingBlobStore {
return JortageProxy.hashToPath(Queries.getMap(dataSource, container, name).toString());
}
private boolean isDump(String name) {
return name.startsWith("backups/dumps") || name.startsWith("/backups/dumps");
}
@Override
public BlobStoreContext getContext() {
return delegate().getContext();
@ -72,41 +83,69 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public BlobBuilder blobBuilder(String name) {
if (isDump(name)) return dumpsStore.blobBuilder(name);
return delegate().blobBuilder(name);
}
@Override
public Blob getBlob(String container, String name) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.getBlob(container, name);
}
return delegate().getBlob(bucket, getMapPath(container, name));
}
@Override
public Blob getBlob(String container, String name, GetOptions getOptions) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.getBlob(container, name, getOptions);
}
return delegate().getBlob(bucket, getMapPath(container, name), getOptions);
}
@Override
public void downloadBlob(String container, String name, File destination) {
if (isDump(name)) {
checkContainer(container);
dumpsStore.downloadBlob(container, name, destination);
return;
}
delegate().downloadBlob(bucket, getMapPath(container, name), destination);
}
@Override
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
if (isDump(name)) {
checkContainer(container);
dumpsStore.downloadBlob(container, name, destination, executor);
return;
}
delegate().downloadBlob(bucket, getMapPath(container, name), destination, executor);
}
@Override
public InputStream streamBlob(String container, String name) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.streamBlob(container, name);
}
return delegate().streamBlob(bucket, getMapPath(container, name));
}
@Override
public InputStream streamBlob(String container, String name, ExecutorService executor) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.streamBlob(container, name, executor);
}
return delegate().streamBlob(bucket, getMapPath(container, name), executor);
}
@Override
public BlobAccess getBlobAccess(String container, String name) {
checkContainer(container);
return BlobAccess.PUBLIC_READ;
}
@ -128,16 +167,25 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public ContainerAccess getContainerAccess(String container) {
checkContainer(container);
return ContainerAccess.PUBLIC_READ;
}
@Override
public boolean blobExists(String container, String name) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.blobExists(container, name);
}
return delegate().blobExists(bucket, getMapPath(container, name));
}
@Override
public BlobMetadata blobMetadata(String container, String name) {
if (isDump(name)) {
checkContainer(container);
return dumpsStore.blobMetadata(container, name);
}
return delegate().blobMetadata(bucket, getMapPath(container, name));
}
@ -164,6 +212,9 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public String putBlob(String container, Blob blob) {
checkContainer(container);
if (isDump(blob.getMetadata().getName())) {
return dumpsStore.putBlob(container, blob);
}
File tempFile = null;
try {
File f = File.createTempFile("jortage-proxy-", ".dat");
@ -189,6 +240,7 @@ public class JortageBlobStore extends ForwardingBlobStore {
.userMetadata(blob.getMetadata().getUserMetadata())
.build();
String etag = delegate().putBlob(bucket, blob2, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ).multipart());
Queries.putPendingBackup(dataSource, hash);
Queries.putMap(dataSource, identity, blob.getMetadata().getName(), hash);
Queries.putFilesize(dataSource, hash, f.length());
return etag;
@ -202,9 +254,13 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName, CopyOptions options) {
// javadoc says options are ignored, so we ignore them too
checkContainer(fromContainer);
checkContainer(toContainer);
if (isDump(fromName)) {
if (!isDump(toName)) throw new UnsupportedOperationException();
return dumpsStore.copyBlob(fromContainer, fromName, toContainer, toName, options);
}
// javadoc says options are ignored, so we ignore them too
HashCode hash = Queries.getMap(dataSource, identity, fromName);
Queries.putMap(dataSource, identity, toName, hash);
return blobMetadata(bucket, JortageProxy.hashToPath(hash.toString())).getETag();
@ -213,6 +269,9 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata, PutOptions options) {
checkContainer(container);
if (isDump(blobMetadata.getName())) {
return dumpsStore.initiateMultipartUpload(container, blobMetadata, options);
}
MutableBlobMetadata mbm = new MutableBlobMetadataImpl(blobMetadata);
String tempfile = "multitmp/"+identity+"-"+System.currentTimeMillis()+"-"+System.nanoTime();
mbm.setName(tempfile);
@ -234,11 +293,20 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public void abortMultipartUpload(MultipartUpload mpu) {
if (isDump(mpu.blobName())) {
checkContainer(mpu.containerName());
dumpsStore.abortMultipartUpload(mpu);
return;
}
delegate().abortMultipartUpload(mask(mpu));
}
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
if (isDump(mpu.blobName())) {
checkContainer(mpu.containerName());
return dumpsStore.completeMultipartUpload(mpu, parts);
}
mpu = mask(mpu);
// TODO this is a bit of a hack and isn't very efficient
String etag = delegate().completeMultipartUpload(mpu, parts);
@ -257,6 +325,7 @@ public class JortageBlobStore extends ForwardingBlobStore {
etag = delegate().copyBlob(mpu.containerName(), mpu.blobName(), bucket, path, CopyOptions.builder().contentMetadata(meta.getContentMetadata()).build());
Thread.sleep(500);
delegate().setBlobAccess(bucket, path, BlobAccess.PUBLIC_READ);
Queries.putPendingBackup(dataSource, hash);
} else {
Thread.sleep(500);
etag = delegate().blobMetadata(bucket, path).getETag();
@ -276,11 +345,19 @@ public class JortageBlobStore extends ForwardingBlobStore {
@Override
public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) {
if (isDump(mpu.blobName())) {
checkContainer(mpu.containerName());
return dumpsStore.uploadMultipartPart(mpu, partNumber, payload);
}
return delegate().uploadMultipartPart(mask(mpu), partNumber, payload);
}
@Override
public List<MultipartPart> listMultipartUpload(MultipartUpload mpu) {
if (isDump(mpu.blobName())) {
checkContainer(mpu.containerName());
return dumpsStore.listMultipartUpload(mpu);
}
return delegate().listMultipartUpload(mask(mpu));
}
@ -293,6 +370,7 @@ public class JortageBlobStore extends ForwardingBlobStore {
out.add(revmask(mpu));
}
}
out.addAll(dumpsStore.listMultipartUploads(container));
return out;
}
@ -302,15 +380,43 @@ public class JortageBlobStore extends ForwardingBlobStore {
}
@Override
public boolean createContainerInLocation(Location location,
String container) {
public void removeBlob(String container, String name) {
if (isDump(name)) {
checkContainer(container);
dumpsStore.removeBlob(container, name);
return;
}
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void removeBlobs(String container, Iterable<String> iterable) {
for (String s : iterable) {
removeBlob(container, s);
}
}
@Override
public Set<? extends Location> listAssignableLocations() {
return Collections.singleton(new LocationImpl(LocationScope.PROVIDER, "jort", "jort", null, Collections.emptySet(), Collections.emptyMap()));
}
@Override
public boolean createContainerInLocation(Location location, String container) {
checkContainer(container);
return true;
}
@Override
public boolean createContainerInLocation(Location location,
String container, CreateContainerOptions createContainerOptions) {
throw new UnsupportedOperationException("Read-only BlobStore");
checkContainer(container);
return true;
}
@Override
public boolean containerExists(String container) {
return identity.equals(container);
}
@Override
@ -349,16 +455,6 @@ public class JortageBlobStore extends ForwardingBlobStore {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void removeBlob(String container, String name) {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void removeBlobs(String container, Iterable<String> iterable) {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void setBlobAccess(String container, String name,
BlobAccess access) {

Wyświetl plik

@ -4,6 +4,8 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
@ -13,6 +15,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import sun.misc.Signal;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -24,13 +28,18 @@ import org.gaul.s3proxy.S3Proxy;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.filesystem.reference.FilesystemConstants;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.mariadb.jdbc.MariaDbPoolDataSource;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.hash.HashCode;
import com.google.common.io.ByteStreams;
import com.google.common.net.UrlEscapers;
import blue.endless.jankson.Jankson;
@ -45,10 +54,14 @@ public class JortageProxy {
private static JsonObject config;
private static long configFileLastLoaded;
private static BlobStore backingBlobStore;
private static BlobStore backingBackupBlobStore;
private static String bucket;
private static String backupBucket;
private static String publicHost;
private static MariaDbPoolDataSource dataSource;
private static boolean backingUp = false;
@SuppressWarnings("restriction")
public static void main(String[] args) throws Exception {
reloadConfig();
@ -59,13 +72,20 @@ public class JortageProxy {
.v4MaxNonChunkedRequestSize(128L*1024L*1024L)
.build();
Properties dumpsProps = new Properties();
dumpsProps.setProperty(FilesystemConstants.PROPERTY_BASEDIR, "dumps");
BlobStore dumpsStore = ContextBuilder.newBuilder("filesystem")
.overrides(dumpsProps)
.build(BlobStoreContext.class)
.getBlobStore();
s3Proxy.setBlobStoreLocator(new BlobStoreLocator() {
@Override
public Entry<String, BlobStore> locateBlobStore(String identity, String container, String blob) {
if (System.currentTimeMillis()-configFileLastLoaded > 500 && configFile.lastModified() > configFileLastLoaded) reloadConfig();
reloadConfigIfChanged();
if (config.containsKey("users") && config.getObject("users").containsKey(identity)) {
return Maps.immutableEntry(((JsonPrimitive)config.getObject("users").get(identity)).asString(), new JortageBlobStore(backingBlobStore, bucket, identity, dataSource));
return Maps.immutableEntry(((JsonPrimitive)config.getObject("users").get(identity)).asString(), new JortageBlobStore(backingBlobStore, dumpsStore, bucket, identity, dataSource));
} else {
throw new RuntimeException("Access denied");
}
@ -99,8 +119,21 @@ public class JortageProxy {
} else {
String identity = split.get(0);
String name = split.get(1);
if (name.startsWith("backups/dumps") || name.startsWith("/backups/dumps")) {
Blob b = dumpsStore.getBlob(identity, name);
if (b != null) {
response.setHeader("Cache-Control", "private, no-cache");
response.setHeader("Content-Type", b.getMetadata().getContentMetadata().getContentType());
response.setStatus(200);
ByteStreams.copy(b.getPayload().openStream(), response.getOutputStream());
} else {
response.sendError(404);
}
return;
}
try {
String hash = Queries.getMap(dataSource, identity, name).toString();
response.setHeader("Cache-Control", "public");
response.setHeader("Location", publicHost+"/"+hashToPath(hash));
response.setStatus(301);
} catch (IllegalArgumentException e) {
@ -111,6 +144,49 @@ public class JortageProxy {
});
redir.start();
System.err.println("Redirector listening on localhost:23279");
Signal.handle(new Signal("ALRM"), (sig) -> {
reloadConfigIfChanged();
if (backingUp) {
System.err.println("Ignoring SIGALRM, backup already in progress");
return;
}
new Thread(() -> {
int count = 0;
Stopwatch sw = Stopwatch.createStarted();
try (Connection c = dataSource.getConnection()) {
backingUp = true;
try (PreparedStatement delete = c.prepareStatement("DELETE FROM `pending_backup` WHERE `hash` = ?;")) {
try (PreparedStatement ps = c.prepareStatement("SELECT `hash` FROM `pending_backup`;")) {
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
byte[] bys = rs.getBytes("hash");
String path = hashToPath(HashCode.fromBytes(bys).toString());
Blob src = backingBlobStore.getBlob(bucket, path);
backingBackupBlobStore.putBlob(backupBucket, src);
delete.setBytes(1, bys);
delete.executeUpdate();
count++;
}
System.err.println("Backup of "+count+" item"+s(count)+" successful in "+sw);
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("Backup failed after "+count+" item"+s(count)+" in "+sw);
} finally {
backingUp = false;
}
}, "Backup thread").start();
});
}
private static void reloadConfigIfChanged() {
if (System.currentTimeMillis()-configFileLastLoaded > 500 && configFile.lastModified() > configFileLastLoaded) reloadConfig();
}
private static String s(int i) {
return i == 1 ? "" : "s";
}
private static void reloadConfig() {
@ -118,15 +194,10 @@ public class JortageProxy {
config = Jankson.builder().build().load(configFile);
configFileLastLoaded = System.currentTimeMillis();
bucket = ((JsonPrimitive)config.getObject("backend").get("bucket")).asString();
backupBucket = ((JsonPrimitive)config.getObject("backupBackend").get("bucket")).asString();
publicHost = ((JsonPrimitive)config.getObject("backend").get("publicHost")).asString();
Properties props = new Properties();
backingBlobStore = ContextBuilder.newBuilder("s3")
.credentials(((JsonPrimitive)config.getObject("backend").get("accessKeyId")).asString(), ((JsonPrimitive)config.getObject("backend").get("secretAccessKey")).asString())
.modules(ImmutableList.of(new SLF4JLoggingModule()))
.endpoint(((JsonPrimitive)config.getObject("backend").get("endpoint")).asString())
.overrides(props)
.build(BlobStoreContext.class)
.getBlobStore();
backingBlobStore = createBlobStore("backend");
backingBackupBlobStore = createBlobStore("backupBackend");
JsonObject sql = config.getObject("mysql");
String sqlHost = ((JsonPrimitive)sql.get("host")).asString();
int sqlPort = ((Number)((JsonPrimitive)sql.get("port")).getValue()).intValue();
@ -163,6 +234,10 @@ public class JortageProxy {
" `size` BIGINT UNSIGNED NOT NULL,\n" +
" PRIMARY KEY (`hash`)\n" +
") ROW_FORMAT=COMPRESSED;");
execOneshot(c, "CREATE TABLE IF NOT EXISTS `pending_backup` (\n" +
" `hash` BINARY(64) NOT NULL,\n" +
" PRIMARY KEY (`hash`)\n" +
") ROW_FORMAT=COMPRESSED;");
}
System.err.println("Config file reloaded.");
} catch (Exception e) {
@ -171,6 +246,16 @@ public class JortageProxy {
}
}
private static BlobStore createBlobStore(String string) {
JsonObject obj = config.getObject(string);
return ContextBuilder.newBuilder("s3")
.credentials(((JsonPrimitive)obj.get("accessKeyId")).asString(), ((JsonPrimitive)obj.get("secretAccessKey")).asString())
.modules(ImmutableList.of(new SLF4JLoggingModule()))
.endpoint(((JsonPrimitive)obj.get("endpoint")).asString())
.build(BlobStoreContext.class)
.getBlobStore();
}
private static void execOneshot(Connection c, String sql) throws SQLException {
try (Statement s = c.createStatement()) {
s.execute(sql);

Wyświetl plik

@ -55,6 +55,17 @@ public class Queries {
}
}
public static void putPendingBackup(DataSource dataSource, HashCode hash) {
try (Connection c = dataSource.getConnection()) {
try (PreparedStatement ps = c.prepareStatement("INSERT IGNORE INTO `pending_backup` (`hash`) VALUES (?);")) {
ps.setBytes(1, hash.asBytes());
ps.executeUpdate();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public static void putMultipart(DataSource dataSource, String identity, String name, String tempfile) {
try (Connection c = dataSource.getConnection()) {
try (PreparedStatement ps = c.prepareStatement("INSERT INTO `multipart_uploads` (`identity`, `name`, `tempfile`) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `tempfile` = ?;")) {