From 168b51b2e8d9bf53db0442b0eb038d59507b492c Mon Sep 17 00:00:00 2001 From: Una Thompson Date: Sun, 14 Jun 2020 02:33:43 -0700 Subject: [PATCH] Refactor, implement Rivet, make s3proxy a submodule to avoid shading --- .gitignore | 1 + .gitmodules | 3 + build.gradle | 33 +- config.jkson => config.jkson.example | 0 s3proxy | 1 + .../com/jortage/proxy/ByteSinkSource.java | 12 + .../com/jortage/proxy/FileByteSinkSource.java | 36 ++ .../com/jortage/proxy/JortageBlobStore.java | 88 +-- .../java/com/jortage/proxy/JortageProxy.java | 334 ++++++------ .../jortage/proxy/MemoryByteSinkSource.java | 62 +++ .../java/com/jortage/proxy/OuterHandler.java | 35 ++ src/main/java/com/jortage/proxy/Queries.java | 26 +- .../java/com/jortage/proxy/RedirHandler.java | 68 +++ .../java/com/jortage/proxy/RivetHandler.java | 507 ++++++++++++++++++ .../java/com/jortage/proxy/RivetTest.java | 85 +++ src/s3proxy/java | 1 + 16 files changed, 1084 insertions(+), 208 deletions(-) create mode 100644 .gitmodules rename config.jkson => config.jkson.example (100%) create mode 160000 s3proxy create mode 100644 src/main/java/com/jortage/proxy/ByteSinkSource.java create mode 100644 src/main/java/com/jortage/proxy/FileByteSinkSource.java create mode 100644 src/main/java/com/jortage/proxy/MemoryByteSinkSource.java create mode 100644 src/main/java/com/jortage/proxy/OuterHandler.java create mode 100644 src/main/java/com/jortage/proxy/RedirHandler.java create mode 100644 src/main/java/com/jortage/proxy/RivetHandler.java create mode 100644 src/main/java/com/jortage/proxy/RivetTest.java create mode 120000 src/s3proxy/java diff --git a/.gitignore b/.gitignore index ce03eaf..b02998b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ hs_err_pid*.log .idea/ data.mv /backups +/config.jkson diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..20d1c81 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "s3proxy"] + path = s3proxy + url = https://github.com/gaul/s3proxy.git diff --git a/build.gradle b/build.gradle index 534ec6d..d46bd09 100644 --- a/build.gradle +++ b/build.gradle @@ -7,12 +7,43 @@ repositories { mavenCentral() } +sourceSets { + main { + java { + srcDirs = ['src/main/java', 'src/s3proxy/java'] + } + } +} + dependencies { implementation 'blue.endless:jankson:1.1.2' implementation 'org.mariadb.jdbc:mariadb-java-client:2.4.4' - implementation 'org.gaul:s3proxy:1.6.1' + implementation 'com.squareup.okhttp3:okhttp:4.7.2' + implementation 'com.squareup.okhttp3:okhttp-brotli:4.7.2' + + implementation 'org.apache.jclouds:jclouds-blobstore:2.2.1' + implementation 'org.apache.jclouds.provider:aws-s3:2.2.1' + implementation 'org.apache.jclouds.api:filesystem:2.2.1' + implementation 'org.apache.jclouds.driver:jclouds-slf4j:2.2.1' + + implementation 'org.eclipse.jetty:jetty-server:9.4.24.v20191120' + + implementation 'org.slf4j:slf4j-api:1.7.9' + implementation 'org.slf4j:slf4j-nop:1.7.9' + + implementation 'com.google.code.findbugs:jsr305:3.0.2' + implementation 'com.google.code.findbugs:findbugs-annotations:3.0.1' + + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.11.0' + implementation 'com.fasterxml.woodstox:woodstox-core:6.2.1' + + implementation 'commons-fileupload:commons-fileupload:1.4' } +// I am *not* pulling in five different dependencies for a couple classes we don't use +file('s3proxy/src/main/java/org/gaul/s3proxy/junit/S3ProxyRule.java').delete(); +file('s3proxy/src/main/java/org/gaul/s3proxy/Main.java').delete(); + project.configurations.implementation.setCanBeResolved(true) task capsule(type: FatCapsule) { diff --git a/config.jkson b/config.jkson.example similarity index 100% rename from config.jkson rename to config.jkson.example diff --git a/s3proxy b/s3proxy new file mode 160000 index 0000000..e638111 --- /dev/null +++ b/s3proxy @@ -0,0 +1 @@ +Subproject commit e638111d05b5e7193d2ae8dd1d0e7a63110d878b diff --git a/src/main/java/com/jortage/proxy/ByteSinkSource.java b/src/main/java/com/jortage/proxy/ByteSinkSource.java new file mode 100644 index 0000000..7796c31 --- /dev/null +++ b/src/main/java/com/jortage/proxy/ByteSinkSource.java @@ -0,0 +1,12 @@ +package com.jortage.proxy; + +import java.io.Closeable; +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; + +public interface ByteSinkSource extends Closeable { + ByteSink getSink(); + ByteSource getSource(); + @Override + void close(); +} diff --git a/src/main/java/com/jortage/proxy/FileByteSinkSource.java b/src/main/java/com/jortage/proxy/FileByteSinkSource.java new file mode 100644 index 0000000..f7178f8 --- /dev/null +++ b/src/main/java/com/jortage/proxy/FileByteSinkSource.java @@ -0,0 +1,36 @@ +package com.jortage.proxy; + +import java.io.File; + +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; +import com.google.common.io.Files; + +public class FileByteSinkSource implements ByteSinkSource { + + private final File file; + private final boolean deleteOnClose; + + public FileByteSinkSource(File file, boolean deleteOnClose) { + this.file = file; + this.deleteOnClose = deleteOnClose; + } + + @Override + public ByteSink getSink() { + return Files.asByteSink(file); + } + + @Override + public ByteSource getSource() { + return Files.asByteSource(file); + } + + @Override + public void close() { + if (deleteOnClose) { + file.delete(); + } + } + +} diff --git a/src/main/java/com/jortage/proxy/JortageBlobStore.java b/src/main/java/com/jortage/proxy/JortageBlobStore.java index 4e9efd3..4ffa390 100644 --- a/src/main/java/com/jortage/proxy/JortageBlobStore.java +++ b/src/main/java/com/jortage/proxy/JortageBlobStore.java @@ -149,22 +149,6 @@ public class JortageBlobStore extends ForwardingBlobStore { return BlobAccess.PUBLIC_READ; } - @Override - public PageSet list() { - throw new UnsupportedOperationException(); - } - - @Override - public PageSet list(String container) { - throw new UnsupportedOperationException(); - } - - @Override - public PageSet list(String container, - ListContainerOptions options) { - throw new UnsupportedOperationException(); - } - @Override public ContainerAccess getContainerAccess(String container) { checkContainer(container); @@ -317,23 +301,25 @@ public class JortageBlobStore extends ForwardingBlobStore { HashCode hash = hos.hash(); String hashStr = hash.toString(); String path = JortageProxy.hashToPath(hashStr); - // don't fall afoul of request rate limits - Thread.sleep(500); + // we're about to do a bunch of stuff at once + // sleep so we don't fall afoul of request rate limits + // (causes intermittent 429s on at least DigitalOcean) + Thread.sleep(250); BlobMetadata meta = delegate().blobMetadata(mpu.containerName(), mpu.blobName()); if (!delegate().blobExists(bucket, path)) { - Thread.sleep(500); + Thread.sleep(250); etag = delegate().copyBlob(mpu.containerName(), mpu.blobName(), bucket, path, CopyOptions.builder().contentMetadata(meta.getContentMetadata()).build()); - Thread.sleep(500); + Thread.sleep(250); delegate().setBlobAccess(bucket, path, BlobAccess.PUBLIC_READ); Queries.putPendingBackup(dataSource, hash); } else { - Thread.sleep(500); + Thread.sleep(250); etag = delegate().blobMetadata(bucket, path).getETag(); } Queries.putMap(dataSource, identity, Preconditions.checkNotNull(meta.getUserMetadata().get("jortage-originalname")), hash); Queries.putFilesize(dataSource, hash, counter.getCount()); Queries.removeMultipart(dataSource, mpu.blobName()); - Thread.sleep(500); + Thread.sleep(250); delegate().removeBlob(mpu.containerName(), mpu.blobName()); } catch (IOException e) { throw new UncheckedIOException(e); @@ -387,11 +373,14 @@ public class JortageBlobStore extends ForwardingBlobStore { return; } HashCode hc = Queries.getMap(dataSource, identity, name); - if (Queries.deleteMap(dataSource, identity, name)) { - int rc = Queries.getReferenceCount(dataSource, hc); + if (Queries.removeMap(dataSource, identity, name)) { + int rc = Queries.getMapCount(dataSource, hc); if (rc == 0) { String hashString = hc.toString(); - delegate().removeBlob(bucket, JortageProxy.hashToPath(hashString)); + String path = JortageProxy.hashToPath(hashString); + delegate().removeBlob(bucket, path); + Queries.removeFilesize(dataSource, hc); + Queries.removePendingBackup(dataSource, hc); } } } @@ -426,46 +415,65 @@ public class JortageBlobStore extends ForwardingBlobStore { return identity.equals(container); } + private static final String NO_DIR_MSG = "Directories are an illusion"; + private static final String NO_BULK_MSG = "Bulk operations are not implemented by Jortage for safety and speed"; + private static final String NO_PRIVATE_MSG = "Jortage does not support private objects"; + @Override - public void setContainerAccess(String container, ContainerAccess - containerAccess) { - throw new UnsupportedOperationException("Read-only BlobStore"); + public void setContainerAccess(String container, ContainerAccess containerAccess) { + if (containerAccess != ContainerAccess.PUBLIC_READ) + throw new UnsupportedOperationException(NO_PRIVATE_MSG); + } + + @Override + public void setBlobAccess(String container, String name, BlobAccess access) { + if (access != BlobAccess.PUBLIC_READ) + throw new UnsupportedOperationException(NO_PRIVATE_MSG); } @Override public void clearContainer(String container) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_BULK_MSG); } @Override public void clearContainer(String container, ListContainerOptions options) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_BULK_MSG); } @Override public void deleteContainer(String container) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_BULK_MSG); } @Override public boolean deleteContainerIfEmpty(String container) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_BULK_MSG); } + @Override + public PageSet list() { + throw new UnsupportedOperationException(NO_BULK_MSG); + } + + @Override + public PageSet list(String container) { + throw new UnsupportedOperationException(NO_BULK_MSG); + } + + @Override + public PageSet list(String container, ListContainerOptions options) { + throw new UnsupportedOperationException(NO_BULK_MSG); + } + @Override public void createDirectory(String container, String directory) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_DIR_MSG); } @Override public void deleteDirectory(String container, String directory) { - throw new UnsupportedOperationException("Read-only BlobStore"); - } - - @Override - public void setBlobAccess(String container, String name, - BlobAccess access) { - throw new UnsupportedOperationException("Read-only BlobStore"); + throw new UnsupportedOperationException(NO_DIR_MSG); } } \ No newline at end of file diff --git a/src/main/java/com/jortage/proxy/JortageProxy.java b/src/main/java/com/jortage/proxy/JortageProxy.java index 54359b3..267b81b 100644 --- a/src/main/java/com/jortage/proxy/JortageProxy.java +++ b/src/main/java/com/jortage/proxy/JortageProxy.java @@ -1,29 +1,21 @@ package com.jortage.proxy; import java.io.File; -import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; import java.util.Properties; -import java.util.Map.Entry; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import sun.misc.Signal; -import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.gaul.s3proxy.AuthenticationType; -import org.gaul.s3proxy.BlobStoreLocator; import org.gaul.s3proxy.S3Proxy; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; @@ -35,13 +27,11 @@ import org.jclouds.filesystem.reference.FilesystemConstants; import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; import org.mariadb.jdbc.MariaDbPoolDataSource; -import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.escape.Escaper; import com.google.common.hash.HashCode; -import com.google.common.io.ByteStreams; import com.google.common.net.UrlEscapers; import blue.endless.jankson.Jankson; @@ -50,162 +40,147 @@ import blue.endless.jankson.JsonPrimitive; public class JortageProxy { - private static final Splitter SPLITTER = Splitter.on('/').limit(2).omitEmptyStrings(); - private static final File configFile = new File("config.jkson"); - private static JsonObject config; - private static long configFileLastLoaded; - private static BlobStore backingBlobStore; - private static BlobStore backingBackupBlobStore; - private static String bucket; - private static String backupBucket; - private static String publicHost; - private static MariaDbPoolDataSource dataSource; + public static JsonObject config; + public static long configFileLastLoaded; + public static BlobStore backingBlobStore; + public static BlobStore backingBackupBlobStore; + public static String bucket; + public static String backupBucket; + public static String publicHost; + public static MariaDbPoolDataSource dataSource; private static boolean backingUp = false; @SuppressWarnings("restriction") public static void main(String[] args) throws Exception { - reloadConfig(); - - S3Proxy s3Proxy = S3Proxy.builder() - .awsAuthentication(AuthenticationType.AWS_V2_OR_V4, "DUMMY", "DUMMY") - .endpoint(URI.create("http://localhost:23278")) - .jettyMaxThreads(24) - .v4MaxNonChunkedRequestSize(128L*1024L*1024L) - .build(); - - Properties dumpsProps = new Properties(); - dumpsProps.setProperty(FilesystemConstants.PROPERTY_BASEDIR, "dumps"); - BlobStore dumpsStore = ContextBuilder.newBuilder("filesystem") - .overrides(dumpsProps) - .build(BlobStoreContext.class) - .getBlobStore(); - - s3Proxy.setBlobStoreLocator(new BlobStoreLocator() { - - @Override - public Entry locateBlobStore(String identity, String container, String blob) { + try { + Stopwatch initSw = Stopwatch.createStarted(); + reloadConfig(); + + System.err.print("Starting S3 server... "); + System.err.flush(); + S3Proxy s3Proxy = S3Proxy.builder() + .awsAuthentication(AuthenticationType.AWS_V2_OR_V4, "DUMMY", "DUMMY") + .endpoint(URI.create("http://localhost:23278")) + .jettyMaxThreads(24) + .v4MaxNonChunkedRequestSize(128L*1024L*1024L) + .build(); + + // excuse me, this is mine now + Field serverField = S3Proxy.class.getDeclaredField("server"); + serverField.setAccessible(true); + Server s3ProxyServer = (Server) serverField.get(s3Proxy); + s3ProxyServer.setHandler(new OuterHandler(s3ProxyServer.getHandler())); + QueuedThreadPool pool = (QueuedThreadPool)s3ProxyServer.getThreadPool(); + pool.setName("Jetty-Common"); + + Properties dumpsProps = new Properties(); + dumpsProps.setProperty(FilesystemConstants.PROPERTY_BASEDIR, "dumps"); + BlobStore dumpsStore = ContextBuilder.newBuilder("filesystem") + .overrides(dumpsProps) + .build(BlobStoreContext.class) + .getBlobStore(); + + s3Proxy.setBlobStoreLocator((identity, container, blob) -> { reloadConfigIfChanged(); if (config.containsKey("users") && config.getObject("users").containsKey(identity)) { - return Maps.immutableEntry(((JsonPrimitive)config.getObject("users").get(identity)).asString(), new JortageBlobStore(backingBlobStore, dumpsStore, bucket, identity, dataSource)); + return Maps.immutableEntry(((JsonPrimitive)config.getObject("users").get(identity)).asString(), + new JortageBlobStore(backingBlobStore, dumpsStore, bucket, identity, dataSource)); } else { throw new RuntimeException("Access denied"); } - } - }); - - s3Proxy.start(); - System.err.println("S3 listening on localhost:23278"); - - QueuedThreadPool pool = new QueuedThreadPool(24); - pool.setName("Redir-Jetty"); - Server redir = new Server(pool); - ServerConnector conn = new ServerConnector(redir); - conn.setHost("localhost"); - conn.setPort(23279); - redir.addConnector(conn); - redir.setHandler(new AbstractHandler() { - - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); - if ("/".equals(target) || "/index.html".equals(target) || "".equals(target)) { - response.setHeader("Location", "https://jortage.com"); - response.setStatus(301); - return; - } - List split = SPLITTER.splitToList(target); - if (split.size() != 2) { - response.sendError(400); - return; - } else { - String identity = split.get(0); - String name = split.get(1); - if (name.startsWith("backups/dumps") || name.startsWith("/backups/dumps")) { - Blob b = dumpsStore.getBlob(identity, name); - if (b != null) { - response.setHeader("Cache-Control", "private, no-cache"); - response.setHeader("Content-Type", b.getMetadata().getContentMetadata().getContentType()); - if (b.getMetadata().getContentMetadata().getContentLength() != null) { - response.setHeader("Content-Length", b.getMetadata().getContentMetadata().getContentLength().toString()); - } - response.setStatus(200); - ByteStreams.copy(b.getPayload().openStream(), response.getOutputStream()); - } else { - response.sendError(404); - } + }); + + s3Proxy.start(); + System.err.println("ready on http://localhost:23278"); + + System.err.print("Starting redirector server... "); + System.err.flush(); + Server redir = new Server(pool); + ServerConnector redirConn = new ServerConnector(redir); + redirConn.setHost("localhost"); + redirConn.setPort(23279); + redir.addConnector(redirConn); + redir.setHandler(new OuterHandler(new RedirHandler(dumpsStore))); + redir.start(); + System.err.println("ready on http://localhost:23279"); + + System.err.print("Starting Rivet server... "); + System.err.flush(); + Server rivet = new Server(pool); + ServerConnector rivetConn = new ServerConnector(rivet); + rivetConn.setHost("localhost"); + rivetConn.setPort(23280); + rivet.addConnector(rivetConn); + rivet.setHandler(new OuterHandler(new RivetHandler())); + rivet.start(); + System.err.println("ready on http://localhost:23280"); + + System.err.print("Registering SIGALRM handler for backups... "); + System.err.flush(); + try { + Signal.handle(new Signal("ALRM"), (sig) -> { + reloadConfigIfChanged(); + if (backingUp) { + System.err.println("Ignoring SIGALRM, backup already in progress"); return; } - try { - String hash = Queries.getMap(dataSource, identity, name).toString(); - BlobAccess ba = backingBlobStore.getBlobAccess(bucket, hashToPath(hash)); - if (ba != BlobAccess.PUBLIC_READ) { - backingBlobStore.setBlobAccess(bucket, hashToPath(hash), BlobAccess.PUBLIC_READ); - } - response.setHeader("Cache-Control", "public"); - response.setHeader("Location", publicHost+"/"+hashToPath(hash)); - response.setStatus(301); - } catch (IllegalArgumentException e) { - response.sendError(404); + if (backupBucket == null) { + System.err.println("Ignoring SIGALRM, nowhere to backup to"); + return; } - } - } - }); - redir.start(); - System.err.println("Redirector listening on localhost:23279"); - Signal.handle(new Signal("ALRM"), (sig) -> { - reloadConfigIfChanged(); - if (backingUp) { - System.err.println("Ignoring SIGALRM, backup already in progress"); - return; - } - if (backupBucket == null) { - System.err.println("Ignoring SIGALRM, nowhere to backup to"); - return; - } - new Thread(() -> { - int count = 0; - Stopwatch sw = Stopwatch.createStarted(); - try (Connection c = dataSource.getConnection()) { - backingUp = true; - try (PreparedStatement delete = c.prepareStatement("DELETE FROM `pending_backup` WHERE `hash` = ?;")) { - try (PreparedStatement ps = c.prepareStatement("SELECT `hash` FROM `pending_backup`;")) { - try (ResultSet rs = ps.executeQuery()) { - while (rs.next()) { - byte[] bys = rs.getBytes("hash"); - String path = hashToPath(HashCode.fromBytes(bys).toString()); - Blob src = backingBlobStore.getBlob(bucket, path); - if (src == null) { - Blob actualSrc = backingBackupBlobStore.getBlob(backupBucket, path); - if (actualSrc == null) { - System.err.println("Can't find blob "+path+" in source or destination?"); - continue; - } else { - System.err.println("Copying "+path+" from \"backup\" to current - this is a little odd"); - backingBlobStore.putBlob(bucket, actualSrc, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + new Thread(() -> { + int count = 0; + Stopwatch sw = Stopwatch.createStarted(); + try (Connection c = dataSource.getConnection()) { + backingUp = true; + try (PreparedStatement delete = c.prepareStatement("DELETE FROM `pending_backup` WHERE `hash` = ?;")) { + try (PreparedStatement ps = c.prepareStatement("SELECT `hash` FROM `pending_backup`;")) { + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + byte[] bys = rs.getBytes("hash"); + String path = hashToPath(HashCode.fromBytes(bys).toString()); + Blob src = backingBlobStore.getBlob(bucket, path); + if (src == null) { + Blob actualSrc = backingBackupBlobStore.getBlob(backupBucket, path); + if (actualSrc == null) { + System.err.println("Can't find blob "+path+" in source or destination?"); + continue; + } else { + System.err.println("Copying "+path+" from \"backup\" to current - this is a little odd"); + backingBlobStore.putBlob(bucket, actualSrc, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + } + } else { + backingBackupBlobStore.putBlob(backupBucket, src, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + } + delete.setBytes(1, bys); + delete.executeUpdate(); + count++; } - } else { - backingBackupBlobStore.putBlob(backupBucket, src, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + System.err.println("Backup of "+count+" item"+s(count)+" successful in "+sw); } - delete.setBytes(1, bys); - delete.executeUpdate(); - count++; } - System.err.println("Backup of "+count+" item"+s(count)+" successful in "+sw); } + } catch (Exception e) { + e.printStackTrace(); + System.err.println("Backup failed after "+count+" item"+s(count)+" in "+sw); + } finally { + backingUp = false; } - } - } catch (Exception e) { - e.printStackTrace(); - System.err.println("Backup failed after "+count+" item"+s(count)+" in "+sw); - } finally { - backingUp = false; - } - }, "Backup thread").start(); - }); + }, "Backup thread").start(); + }); + System.err.println("done"); + } catch (Exception e) { + System.err.println("failed"); + } + System.err.println("This proxy has Super Denim Powers. (Done in "+initSw+")"); + } catch (Throwable t) { + System.err.println(" failed"); + t.printStackTrace(); + } } - private static void reloadConfigIfChanged() { + public static void reloadConfigIfChanged() { if (System.currentTimeMillis()-configFileLastLoaded > 500 && configFile.lastModified() > configFileLastLoaded) reloadConfig(); } @@ -214,17 +189,28 @@ public class JortageProxy { } private static void reloadConfig() { + boolean reloading = config != null; try { - config = Jankson.builder().build().load(configFile); - configFileLastLoaded = System.currentTimeMillis(); - bucket = ((JsonPrimitive)config.getObject("backend").get("bucket")).asString(); - publicHost = ((JsonPrimitive)config.getObject("backend").get("publicHost")).asString(); - backingBlobStore = createBlobStore("backend"); - if (config.containsKey("backupBackend")) { - backupBucket = ((JsonPrimitive)config.getObject("backupBackend").get("bucket")).asString(); - backingBackupBlobStore = createBlobStore("backupBackend"); + String prelude = "\r"+(reloading ? "Reloading" : "Loading")+" config: "; + System.err.print(prelude+"Parsing..."); + System.err.flush(); + JsonObject configTmp = Jankson.builder().build().load(configFile); + long configFileLastLoadedTmp = System.currentTimeMillis(); + String bucketTmp = ((JsonPrimitive)configTmp.getObject("backend").get("bucket")).asString(); + String publicHostTmp = ((JsonPrimitive)configTmp.getObject("backend").get("publicHost")).asString(); + System.err.print(prelude+"Constructing blob stores..."); + System.err.flush(); + BlobStore backingBlobStoreTmp = createBlobStore(configTmp.getObject("backend")); + String backupBucketTmp; + BlobStore backingBackupBlobStoreTmp; + if (configTmp.containsKey("backupBackend")) { + backupBucketTmp = ((JsonPrimitive)configTmp.getObject("backupBackend").get("bucket")).asString(); + backingBackupBlobStoreTmp = createBlobStore(configTmp.getObject("backupBackend")); + } else { + backupBucketTmp = null; + backingBackupBlobStoreTmp = null; } - JsonObject sql = config.getObject("mysql"); + JsonObject sql = configTmp.getObject("mysql"); String sqlHost = ((JsonPrimitive)sql.get("host")).asString(); int sqlPort = ((Number)((JsonPrimitive)sql.get("port")).getValue()).intValue(); String sqlDb = ((JsonPrimitive)sql.get("database")).asString(); @@ -232,11 +218,12 @@ public class JortageProxy { String sqlPass = ((JsonPrimitive)sql.get("pass")).asString(); Escaper pesc = UrlEscapers.urlPathSegmentEscaper(); Escaper esc = UrlEscapers.urlFormParameterEscaper(); - if (dataSource != null) { - dataSource.close(); - } - dataSource = new MariaDbPoolDataSource("jdbc:mariadb://"+pesc.escape(sqlHost)+":"+sqlPort+"/"+pesc.escape(sqlDb)+"?user="+esc.escape(sqlUser)+"&password="+esc.escape(sqlPass)+"&autoReconnect=true"); - try (Connection c = dataSource.getConnection()) { + System.err.print(prelude+"Connecting to MariaDB... "); + System.err.flush(); + MariaDbPoolDataSource dataSourceTmp = + new MariaDbPoolDataSource("jdbc:mariadb://"+pesc.escape(sqlHost)+":"+sqlPort+"/"+pesc.escape(sqlDb) + +"?user="+esc.escape(sqlUser)+"&password="+esc.escape(sqlPass)+"&autoReconnect=true"); + try (Connection c = dataSourceTmp.getConnection()) { execOneshot(c, "CREATE TABLE IF NOT EXISTS `name_map` (\n" + " `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n" + " `identity` VARCHAR(255) NOT NULL,\n" + @@ -265,15 +252,32 @@ public class JortageProxy { " PRIMARY KEY (`hash`)\n" + ") ROW_FORMAT=COMPRESSED;"); } - System.err.println("Config file reloaded."); + System.err.println("\r"+(reloading ? "Reloading" : "Loading")+" config... done"); + MariaDbPoolDataSource oldDataSource = dataSource; + config = configTmp; + configFileLastLoaded = configFileLastLoadedTmp; + bucket = bucketTmp; + publicHost = publicHostTmp; + backingBlobStore = backingBlobStoreTmp; + backupBucket = backupBucketTmp; + backingBackupBlobStore = backingBackupBlobStoreTmp; + dataSource = dataSourceTmp; + if (oldDataSource != null) { + oldDataSource.close(); + } } catch (Exception e) { + System.err.println(" failed"); e.printStackTrace(); - System.err.println("Failed to reload config. Behavior unchanged."); + if (reloading) { + System.err.println("Failed to reload config. Behavior unchanged."); + } else { + System.err.println("Failed to load config. Exit"); + System.exit(2); + } } } - private static BlobStore createBlobStore(String string) { - JsonObject obj = config.getObject(string); + private static BlobStore createBlobStore(JsonObject obj) { return ContextBuilder.newBuilder("s3") .credentials(((JsonPrimitive)obj.get("accessKeyId")).asString(), ((JsonPrimitive)obj.get("secretAccessKey")).asString()) .modules(ImmutableList.of(new SLF4JLoggingModule())) diff --git a/src/main/java/com/jortage/proxy/MemoryByteSinkSource.java b/src/main/java/com/jortage/proxy/MemoryByteSinkSource.java new file mode 100644 index 0000000..9d978d9 --- /dev/null +++ b/src/main/java/com/jortage/proxy/MemoryByteSinkSource.java @@ -0,0 +1,62 @@ +package com.jortage.proxy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; + +public class MemoryByteSinkSource implements ByteSinkSource { + + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + public MemoryByteSinkSource() {} + public MemoryByteSinkSource(byte[] bys) { + this(bys, 0, bys.length); + } + public MemoryByteSinkSource(byte[] bys, int ofs, int len) { + baos.write(bys, ofs, len); + } + + @Override + public ByteSink getSink() { + return new ByteSink() { + @Override + public OutputStream openStream() throws IOException { + baos.reset(); + return baos; + } + }; + } + + @Override + public ByteSource getSource() { + return new ByteSource() { + @Override + public InputStream openStream() throws IOException { + return new ByteArrayInputStream(baos.toByteArray()); + } + @Override + public InputStream openBufferedStream() throws IOException { + return openStream(); + } + @Override + public byte[] read() throws IOException { + return baos.toByteArray(); + } + @Override + public long size() throws IOException { + return baos.size(); + } + }; + } + + @Override + public void close() { + baos.reset(); + } + +} diff --git a/src/main/java/com/jortage/proxy/OuterHandler.java b/src/main/java/com/jortage/proxy/OuterHandler.java new file mode 100644 index 0000000..aaadc4c --- /dev/null +++ b/src/main/java/com/jortage/proxy/OuterHandler.java @@ -0,0 +1,35 @@ +package com.jortage.proxy; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.HandlerWrapper; +import org.eclipse.jetty.util.Jetty; + +public class OuterHandler extends HandlerWrapper { + + public OuterHandler(Handler delegate) { + setHandler(delegate); + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException { + res.setHeader("Server", "jortage-proxy"); + res.setHeader("Powered-By", "Jetty/"+Jetty.VERSION); + res.setHeader("Clacks-Overhead", "GNU Natalie Nguyen, Shiina Mota"); + res.setHeader("Jeans-Teleshorted", Integer.toString((int)(Math.random()*200000)+70)); + if (target.isEmpty() || target.equals("/") || target.equals("/index.html")) { + baseRequest.setHandled(true); + res.setHeader("Location", "https://jortage.com"); + res.setStatus(301); + return; + } + super.handle(target, baseRequest, req, res); + } + +} diff --git a/src/main/java/com/jortage/proxy/Queries.java b/src/main/java/com/jortage/proxy/Queries.java index 43308d4..a0638ba 100644 --- a/src/main/java/com/jortage/proxy/Queries.java +++ b/src/main/java/com/jortage/proxy/Queries.java @@ -71,7 +71,7 @@ public class Queries { } } - public static boolean deleteMap(DataSource dataSource, String identity, String name) { + public static boolean removeMap(DataSource dataSource, String identity, String name) { name = toSFN(name); try (Connection c = dataSource.getConnection()) { try (PreparedStatement ps = c.prepareStatement("DELETE FROM `name_map` WHERE `identity` = ? AND `name` = ?;")) { @@ -84,7 +84,7 @@ public class Queries { } } - public static int getReferenceCount(DataSource dataSource, HashCode hash) { + public static int getMapCount(DataSource dataSource, HashCode hash) { try (Connection c = dataSource.getConnection()) { try (PreparedStatement ps = c.prepareStatement("SELECT COUNT(1) AS count FROM `name_map` WHERE `hash` = ?;")) { ps.setBytes(1, hash.asBytes()); @@ -112,6 +112,17 @@ public class Queries { throw new RuntimeException(e); } } + + public static void removeFilesize(DataSource dataSource, HashCode hash) { + try (Connection c = dataSource.getConnection()) { + try (PreparedStatement ps = c.prepareStatement("DELETE FROM `filesizes` WHERE `hash` = ?;")) { + ps.setBytes(1, hash.asBytes()); + ps.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } public static void putPendingBackup(DataSource dataSource, HashCode hash) { try (Connection c = dataSource.getConnection()) { @@ -123,6 +134,17 @@ public class Queries { throw new RuntimeException(e); } } + + public static void removePendingBackup(DataSource dataSource, HashCode hash) { + try (Connection c = dataSource.getConnection()) { + try (PreparedStatement ps = c.prepareStatement("DELETE FROM `pending_backup` WHERE `hash` = ?;")) { + ps.setBytes(1, hash.asBytes()); + ps.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } public static void putMultipart(DataSource dataSource, String identity, String name, String tempfile) { name = toSFN(name); diff --git a/src/main/java/com/jortage/proxy/RedirHandler.java b/src/main/java/com/jortage/proxy/RedirHandler.java new file mode 100644 index 0000000..2ece755 --- /dev/null +++ b/src/main/java/com/jortage/proxy/RedirHandler.java @@ -0,0 +1,68 @@ +package com.jortage.proxy; + +import java.io.IOException; +import java.util.List; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobAccess; +import com.google.common.base.Splitter; +import com.google.common.io.ByteStreams; + +public final class RedirHandler extends AbstractHandler { + private static final Splitter REDIR_SPLITTER = Splitter.on('/').limit(2).omitEmptyStrings(); + + private final BlobStore dumpsStore; + + public RedirHandler(BlobStore dumpsStore) { + this.dumpsStore = dumpsStore; + } + + + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + List split = REDIR_SPLITTER.splitToList(target); + if (split.size() != 2) { + response.sendError(400); + return; + } else { + String identity = split.get(0); + String name = split.get(1); + if (name.startsWith("backups/dumps") || name.startsWith("/backups/dumps")) { + Blob b = dumpsStore.getBlob(identity, name); + if (b != null) { + response.setHeader("Cache-Control", "private, no-cache"); + response.setHeader("Content-Type", b.getMetadata().getContentMetadata().getContentType()); + if (b.getMetadata().getContentMetadata().getContentLength() != null) { + response.setHeader("Content-Length", b.getMetadata().getContentMetadata().getContentLength().toString()); + } + response.setStatus(200); + ByteStreams.copy(b.getPayload().openStream(), response.getOutputStream()); + } else { + response.sendError(404); + } + return; + } + JortageProxy.reloadConfigIfChanged(); + try { + String hash = Queries.getMap(JortageProxy.dataSource, identity, name).toString(); + BlobAccess ba = JortageProxy.backingBlobStore.getBlobAccess(JortageProxy.bucket, JortageProxy.hashToPath(hash)); + if (ba != BlobAccess.PUBLIC_READ) { + JortageProxy.backingBlobStore.setBlobAccess(JortageProxy.bucket, JortageProxy.hashToPath(hash), BlobAccess.PUBLIC_READ); + } + response.setHeader("Cache-Control", "public"); + response.setHeader("Location", JortageProxy.publicHost+"/"+JortageProxy.hashToPath(hash)); + response.setStatus(301); + } catch (IllegalArgumentException e) { + response.sendError(404); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/jortage/proxy/RivetHandler.java b/src/main/java/com/jortage/proxy/RivetHandler.java new file mode 100644 index 0000000..66024ec --- /dev/null +++ b/src/main/java/com/jortage/proxy/RivetHandler.java @@ -0,0 +1,507 @@ +package com.jortage.proxy; + +import static com.google.common.base.Verify.verify; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.MessageDigest; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import kotlin.Pair; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobAccess; +import org.jclouds.blobstore.options.PutOptions; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; + +import com.google.common.base.CharMatcher; +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.hash.HashingOutputStream; +import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import okhttp3.HttpUrl; +import okhttp3.Interceptor; +import okhttp3.Interceptor.Chain; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.brotli.BrotliInterceptor; + +public final class RivetHandler extends AbstractHandler { + private static final Splitter RIVET_AUTH_SPLITTER = Splitter.on(':').limit(3); + private static final CharMatcher HEX_MATCHER = CharMatcher.anyOf("0123456789abcdef"); + + private static final String UA = "Jortage Rivet (+https://jortage.com/rivet.html)"; + + private enum Temperature { + FREEZING, COLD, WARM, HOT, SCALDING; + } + + private enum RetrieveResult { + /** + * The file was downloaded and added to the pool. Worst case. + */ + ADDED, + /** + * The file was downloaded, and after hashing was found to be present in the pool already; + * the data was thrown away. + */ + PRESENT, + /** + * The file was requested, and a blob redirect was found, so it short-circuited and avoided + * a download. + */ + FOUND, + /** + * Someone else requested the exact same url within the past 10 minutes, so no requests + * were made at all. Best case. + */ + CACHED, + } + + private final Gson gson; + // synchronize on a mutex when loading URLs to avoid download races that would waste bandwidth + private final Object retrieveMutex = new Object(); + private final Map> results = Maps.newHashMap(); + private final LoadingCache urlCache = CacheBuilder.newBuilder() + .concurrencyLevel(1) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener((n) -> { + synchronized (retrieveMutex) { + results.remove(n.getKey()); + } + }) + .build(new CacheLoader() { + @Override + public HashCode load(String url) throws Exception { + ByteSinkSource bss = null; + HttpUrl parsedUrl = HttpUrl.Companion.parse(url); + checkIllegalUrl(null, parsedUrl); + HashCode shortCircuit = checkShortCircuit(url, parsedUrl, Temperature.HOT); + if (shortCircuit != null) return shortCircuit; + try (Response headRes = client.newCall(new Request.Builder() + .addHeader("User-Agent", UA) + .url(parsedUrl) + .head() + .build()).execute()) { + if (headRes.isSuccessful()) { + shortCircuit = checkShortCircuit(url, headRes.request().url(), Temperature.WARM); + if (shortCircuit != null) return shortCircuit; + shortCircuit = checkShortCircuit(url, headRes.networkResponse().request().url(), Temperature.WARM); + if (shortCircuit != null) return shortCircuit; + try (Response getRes = client.newCall(new Request.Builder() + .addHeader("User-Agent", UA) + .url(headRes.request().url()) + .get() + .build()).execute()) { + if (getRes.isSuccessful()) { + long len = getRes.body().contentLength(); + if (len == -1 || len > 8192) { + bss = new FileByteSinkSource(File.createTempFile("jortage-proxy-", ".dat"), true); + } else { + bss = new MemoryByteSinkSource(); + } + OutputStream sinkOut = bss.getSink().openStream(); + HashingOutputStream hos = new HashingOutputStream(Hashing.sha512(), sinkOut); + ByteStreams.copy(getRes.body().byteStream(), hos); + hos.close(); + HashCode hash = hos.hash(); + String hashStr = hash.toString(); + String path = JortageProxy.hashToPath(hashStr); + if (JortageProxy.backingBlobStore.blobExists(JortageProxy.bucket, path)) { + results.put(url, new Pair<>(RetrieveResult.PRESENT, Temperature.COLD)); + } else { + Blob blob = JortageProxy.backingBlobStore.blobBuilder(path) + .payload(bss.getSource()) + .contentLength(bss.getSource().size()) + .contentType(getRes.body().contentType().toString()) + .build(); + long size = bss.getSource().size(); + JortageProxy.backingBlobStore.putBlob(JortageProxy.bucket, blob, + new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ).multipart(size > 8192)); + Queries.putPendingBackup(JortageProxy.dataSource, hash); + Queries.putFilesize(JortageProxy.dataSource, hash, size); + results.put(url, new Pair<>(RetrieveResult.ADDED, Temperature.FREEZING)); + } + return hash; + } else { + throw new IOException("Unsuccessful response code to GET: "+getRes.code()); + } + } + } else { + throw new IOException("Unsuccessful response code to HEAD: "+headRes.code()); + } + } finally { + if (bss != null) bss.close(); + } + } + + private HashCode checkShortCircuit(String originalUrl, HttpUrl url, Temperature temp) { + String publicHost = JortageProxy.config.getObject("backend").get(String.class, "publicHost").replaceFirst("^https?://", ""); + String fullHost = url.host(); + if (url.port() != (url.scheme().equals("https") ? 443 : 80)) { + fullHost = fullHost+":"+url.port(); + } + if (fullHost.equals(publicHost)) { + List segments = url.pathSegments(); + if (segments.size() == 4 && segments.get(0).equals("blobs")) { + String prelude = segments.get(1)+segments.get(2); + String hashStr = segments.get(3); + if (hashStr.startsWith(prelude) && HEX_MATCHER.matchesAllOf(hashStr)) { + if (JortageProxy.backingBlobStore.blobExists(JortageProxy.bucket, JortageProxy.hashToPath(hashStr))) { + HashCode hash = HashCode.fromString(hashStr); + results.put(originalUrl, new Pair<>(RetrieveResult.FOUND, temp)); + return hash; + } + } + } + } + return null; + } + }); + + private OkHttpClient client; + + public RivetHandler() { + this.gson = new Gson(); + Interceptor urlChecker = (chain) -> { + Request req = chain.request(); + checkIllegalUrl(chain, req.url()); + Response resp = chain.proceed(req); + if (resp.isRedirect() && resp.header("Location") != null) { + String location = resp.header("Location"); + HttpUrl url = HttpUrl.Companion.parse(location); + checkIllegalUrl(chain, url); + } + return resp; + }; + this.client = new OkHttpClient.Builder() + .addInterceptor(BrotliInterceptor.INSTANCE) + .addInterceptor(urlChecker) + .addNetworkInterceptor(urlChecker) + .connectTimeout(8, TimeUnit.SECONDS) + .build(); + } + + + private void checkIllegalUrl(Chain chain, HttpUrl url) throws UnknownHostException, IOException { + if (url.port() <= 0 || url.port() > 65535 || illegalPorts.contains(url.port())) { + if (chain != null) chain.call().cancel(); + throw new IOException("Illegal host: Illegal port "+url.port()); + } + String host = url.host(); + for (InetAddress inet : client.dns().lookup(host)) { + if (inet.isAnyLocalAddress() || inet.isLinkLocalAddress() || inet.isLoopbackAddress() + || inet.isMulticastAddress() || inet.isSiteLocalAddress()) { + if (chain != null) chain.call().cancel(); + throw new IOException("Illegal host: Illegal address "+inet.getHostAddress()+" ("+host+")"); + } + } + } + + + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException { + baseRequest.setHandled(true); + if ("/retrieve".equals(target)) { + try { + if ("OPTIONS".equals(req.getMethod())) { + res.setHeader("Allow", "POST"); + res.setHeader("Accept", "application/json;charset=utf-8"); + res.setStatus(204); + res.getOutputStream().close(); + return; + } + if (!"POST".equals(req.getMethod())) { + res.setHeader("Allow", "POST"); + jsonError(res, 405, "/retrieve only accepts POST"); + return; + } + String authHeader = req.getHeader("Rivet-Auth"); + if (authHeader == null) { + jsonError(res, 401, "Rivet-Auth header missing"); + return; + } + Iterator iter = RIVET_AUTH_SPLITTER.split(authHeader).iterator(); + if (!iter.hasNext()) { + jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)"); + return; + } + String identity = iter.next(); + if (!iter.hasNext()) { + jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)"); + return; + } + String macStr = iter.next(); + if (!iter.hasNext()) { + jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)"); + return; + } + String dateStr = iter.next(); + verify(!iter.hasNext()); + + Instant date; + try { + date = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(dateStr)); + } catch (DateTimeParseException e) { + jsonError(res, 401, "Rivet-Auth header invalid (Could not parse date)"); + return; + } + if (date.isBefore(Instant.now().minus(5, ChronoUnit.MINUTES))) { + jsonError(res, 401, "Rivet-Auth header invalid (Too old)"); + return; + } + + JortageProxy.reloadConfigIfChanged(); + if (!JortageProxy.config.containsKey("users") || !JortageProxy.config.getObject("users").containsKey(identity)) { + jsonError(res, 401, "Rivet-Auth header invalid (Bad access ID)"); + return; + } + if (req.getContentLength() == -1) { + jsonError(res, 411, "Length required"); + return; + } + if (req.getContentLength() > 8192) { + jsonError(res, 413, "Payload too large"); + return; + } + String contentType = req.getHeader("Content-Type"); + if (contentType == null || !"application/json;charset=utf-8".equals(contentType.replace(" ", "").toLowerCase(Locale.ROOT))) { + res.setHeader("Accept", "application/json;charset=utf-8"); + jsonError(res, 415, "Content-Type must be application/json; charset=utf-8"); + return; + } + byte[] theirMac = BaseEncoding.base64().decode(macStr); + Mac mac = assertSuccess(() -> Mac.getInstance("HmacSHA512")); + byte[] payload = ByteStreams.toByteArray(ByteStreams.limit(req.getInputStream(), req.getContentLength())); + req.getInputStream().close(); + String payloadStr = new String(payload, Charsets.UTF_8); + + String key = JortageProxy.config.getObject("users").get(String.class, identity); + assertSuccess(() -> mac.init(new SecretKeySpec(key.getBytes(Charsets.UTF_8), "RAW"))); + mac.update((identity+":"+dateStr+":"+payloadStr).getBytes(Charsets.UTF_8)); + byte[] ourMac = mac.doFinal(); + if (!MessageDigest.isEqual(theirMac, ourMac)) { + jsonError(res, 401, "Rivet-Auth header invalid (Bad MAC)"); + return; + } + + // phew. now that all of *that* is out of the way... what is it they want? + JsonObject json; + try { + json = gson.fromJson(payloadStr, JsonObject.class); + } catch (JsonSyntaxException e) { + jsonError(res, 400, "Syntax error in payload"); + return; + } + if (!json.has("sourceUrl")) { + jsonError(res, 400, "Must specify sourceUrl"); + return; + } + if (!json.has("destinationPath")) { + jsonError(res, 400, "Must specify destinationPath"); + return; + } + String sourceUrl = json.get("sourceUrl").getAsString(); + if (!sourceUrl.startsWith("https://") && !sourceUrl.startsWith("http://")) { + jsonError(res, 400, "sourceUrl must be http or https"); + return; + } + String destinationPath = json.get("destinationPath").getAsString(); + synchronized (retrieveMutex) { + RetrieveResult retRes = null; + Temperature temp = null; + HashCode hash; + try { + if (urlCache.getIfPresent(sourceUrl) != null) { + retRes = RetrieveResult.CACHED; + temp = Temperature.SCALDING; + } + hash = urlCache.get(sourceUrl); + if (retRes == null || temp == null) { + Pair pair = results.get(sourceUrl); + retRes = pair.getFirst(); + temp = pair.getSecond(); + } + } catch (ExecutionException | UncheckedExecutionException e) { + if (e.getMessage() != null) { + if (e.getMessage().contains("Illegal host")) { + jsonError(res, 400, "Illegal host"); + return; + } + if (e.getMessage().contains("Unsuccessful response")) { + jsonError(res, 502, "Upstream error "+(e.getMessage().substring(e.getMessage().lastIndexOf(':')+1).trim())); + return; + } + if (e.getMessage().contains("connect timed out")) { + jsonError(res, 504, "Upstream timeout"); + return; + } + } + jsonExceptionError(res, e, "sourceUrl: "+sourceUrl, "identity: "+identity); + return; + } + Queries.putMap(JortageProxy.dataSource, identity, destinationPath, hash); + res.setStatus(200); + res.setHeader("Content-Type", "application/json; charset=utf-8"); + JsonObject obj = new JsonObject(); + JsonObject result = new JsonObject(); + result.addProperty("name", retRes.name()); + result.addProperty("temperature", temp.name()); + obj.add("result", result); + obj.addProperty("hash", hash.toString()); + res.getOutputStream().write(obj.toString().getBytes(Charsets.UTF_8)); + res.getOutputStream().close(); + } + } catch (Throwable t) { + jsonExceptionError(res, t); + } + } else { + res.sendError(404); + } + } + + private void jsonExceptionError(HttpServletResponse res, Throwable t, String... extra) throws IOException { + byte[] tokenBys = new byte[8]; + ThreadLocalRandom.current().nextBytes(tokenBys); + String token = BaseEncoding.base16().lowerCase().encode(tokenBys); + System.err.println("== BEGIN "+token+" =="); + t.printStackTrace(); + if (extra.length > 0) { + System.err.println("Extra information:"); + for (String s : extra) { + System.err.println(s); + } + } + System.err.println("== END "+token+" =="); + jsonError(res, 500, "Internal error "+token); + } + + + private void jsonError(HttpServletResponse res, int code, String msg) throws IOException { + res.setStatus(code); + res.setHeader("Content-Type", "application/json; charset=utf-8"); + JsonObject obj = new JsonObject(); + obj.addProperty("error", msg); + res.getOutputStream().write(obj.toString().getBytes(Charsets.UTF_8)); + res.getOutputStream().close(); + } + + private interface ExceptableRunnable { void run() throws Exception; } + private interface ExceptableSupplier { T get() throws Exception; } + + private static void assertSuccess(ExceptableRunnable r) { + try { + r.run(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + private static T assertSuccess(ExceptableSupplier s) { + try { + return s.get(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + // https://chromium.googlesource.com/chromium/chromium/+/master/net/base/net_util.cc#92 + private static final ImmutableSet illegalPorts = ImmutableSet.of( + 1, // tcpmux + 7, // echo + 9, // discard + 11, // systat + 13, // daytime + 15, // netstat + 17, // qotd + 19, // chargen + 20, // ftp data + 21, // ftp access + 22, // ssh + 23, // telnet + 25, // smtp + 37, // time + 42, // name + 43, // nicname + 53, // domain + 77, // priv-rjs + 79, // finger + 87, // ttylink + 95, // supdup + 101, // hostriame + 102, // iso-tsap + 103, // gppitnp + 104, // acr-nema + 109, // pop2 + 110, // pop3 + 111, // sunrpc + 113, // auth + 115, // sftp + 117, // uucp-path + 119, // nntp + 123, // NTP + 135, // loc-srv /epmap + 139, // netbios + 143, // imap2 + 179, // BGP + 389, // ldap + 465, // smtp+ssl + 512, // print / exec + 513, // login + 514, // shell + 515, // printer + 526, // tempo + 530, // courier + 531, // chat + 532, // netnews + 540, // uucp + 556, // remotefs + 563, // nntp+ssl + 587, // stmp? + 601, // ?? + 636, // ldap+ssl + 993, // ldap+ssl + 995, // pop3+ssl + 2049, // nfs + 3659, // apple-sasl / PasswordServer + 4045, // lockd + 6000, // X11 + 6665, // Alternate IRC [Apple addition] + 6666, // Alternate IRC [Apple addition] + 6667, // Standard IRC [Apple addition] + 6668, // Alternate IRC [Apple addition] + 6669 // Alternate IRC [Apple addition] + ); + +} \ No newline at end of file diff --git a/src/main/java/com/jortage/proxy/RivetTest.java b/src/main/java/com/jortage/proxy/RivetTest.java new file mode 100644 index 0000000..91d58ec --- /dev/null +++ b/src/main/java/com/jortage/proxy/RivetTest.java @@ -0,0 +1,85 @@ +package com.jortage.proxy; + +import java.time.Instant; +import java.time.format.DateTimeFormatter; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import com.google.gson.JsonObject; + +import com.google.common.base.Charsets; +import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteStreams; + +import kotlin.Pair; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.brotli.BrotliInterceptor; + +public class RivetTest { + + public static void main(String[] args) throws Exception { + OkHttpClient client = new OkHttpClient.Builder() + .addInterceptor(BrotliInterceptor.INSTANCE) + .build(); + JsonObject obj = new JsonObject(); + obj.addProperty("sourceUrl", "http://example.com/nothing.png"); + obj.addProperty("destinationPath", "test.png"); + String payload = obj.toString(); + String accessKey = "test"; + String secretKey = "test"; + String date = DateTimeFormatter.ISO_INSTANT.format(Instant.now()); + Mac mac = assertSuccess(() -> Mac.getInstance("HmacSHA512")); + byte[] payloadBytes = payload.getBytes(Charsets.UTF_8); + + assertSuccess(() -> mac.init(new SecretKeySpec(secretKey.getBytes(Charsets.UTF_8), "RAW"))); + mac.update((accessKey+":"+date+":"+payload).getBytes(Charsets.UTF_8)); + byte[] macBys = mac.doFinal(); + String auth = accessKey+":"+BaseEncoding.base64().encode(macBys)+":"+date; + try (Response res = client.newCall(new Request.Builder() + .url("http://localhost:23280/retrieve") + .post(RequestBody.create(payloadBytes, MediaType.parse("application/json; charset=utf-8"))) + .header("Rivet-Auth", auth) + .header("User-Agent", "Jortage Rivet Test") + .build()).execute()) { + Request req = res.networkResponse().request(); + System.out.println(req.method()+" "+req.url()); + for (Pair pair : req.headers()) { + System.out.println(pair.getFirst()+": "+pair.getSecond()); + } + System.out.println(); + System.out.println(payload); + System.out.println(); + System.out.println(); + System.out.println(res.code()+" "+res.message()); + for (Pair pair : res.headers()) { + System.out.println(pair.getFirst()+": "+pair.getSecond()); + } + System.out.println(); + ByteStreams.copy(res.body().byteStream(), System.out); + } + } + + private interface ExceptableRunnable { void run() throws Exception; } + private interface ExceptableSupplier { T get() throws Exception; } + + private static void assertSuccess(ExceptableRunnable r) { + try { + r.run(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + private static T assertSuccess(ExceptableSupplier s) { + try { + return s.get(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + +} diff --git a/src/s3proxy/java b/src/s3proxy/java new file mode 120000 index 0000000..f8839d6 --- /dev/null +++ b/src/s3proxy/java @@ -0,0 +1 @@ +../../s3proxy/src/main/java/ \ No newline at end of file