From ff9100bdbc160245063dc6972bddd569be5daf7d Mon Sep 17 00:00:00 2001 From: Una Thompson Date: Sat, 14 Sep 2019 19:45:37 -0700 Subject: [PATCH] Implement support for multipart uploads --- .../com/jortage/proxy/JortageBlobStore.java | 94 +++++++++++++++---- .../java/com/jortage/proxy/JortageProxy.java | 1 - 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/jortage/proxy/JortageBlobStore.java b/src/main/java/com/jortage/proxy/JortageBlobStore.java index 539db4e..19ec202 100644 --- a/src/main/java/com/jortage/proxy/JortageBlobStore.java +++ b/src/main/java/com/jortage/proxy/JortageBlobStore.java @@ -33,6 +33,8 @@ import org.jclouds.io.Payload; import org.jclouds.io.payloads.FilePayload; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import com.google.common.hash.HashingOutputStream; import com.google.common.io.ByteStreams; @@ -59,11 +61,15 @@ public class JortageBlobStore extends ForwardingBlobStore { } } - private String map(String container, String name) { + private String mapHash(String container, String name) { checkContainer(container); String hash = paths.get(buildKey(name)); if (hash == null) throw new IllegalArgumentException("Not found"); - return JortageProxy.hashToPath(hash); + return hash; + } + + private String map(String container, String name) { + return JortageProxy.hashToPath(mapHash(container, name)); } @Override @@ -199,37 +205,99 @@ public class JortageBlobStore extends ForwardingBlobStore { } } + @Override + public String copyBlob(String fromContainer, String fromName, String toContainer, String toName, CopyOptions options) { + // javadoc says options are ignored, so we ignore them too + checkContainer(toContainer); + String hash = mapHash(fromContainer, fromName); + paths.put(buildKey(toName), hash); + return blobMetadata(bucket, JortageProxy.hashToPath(hash)).getETag(); + } + @Override public MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata, PutOptions options) { + checkContainer(container); MutableBlobMetadata mbm = new MutableBlobMetadataImpl(blobMetadata); - mbm.setContainer(bucket); - mbm.setName(map(blobMetadata.getContainer(), blobMetadata.getName())); - return delegate().initiateMultipartUpload(bucket, mbm, options); + String name = "multitmp/"+identity+"-"+System.currentTimeMillis()+"-"+System.nanoTime(); + mbm.setName(name); + mbm.getUserMetadata().put("jortage-creator", identity); + mbm.getUserMetadata().put("jortage-originalname", blobMetadata.getName()); + paths.put("multipart:"+buildKey(blobMetadata.getName()), name); + paths.put("multipart-rev:"+name, blobMetadata.getName()); + return delegate().initiateMultipartUpload(bucket, mbm, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + } + + private MultipartUpload mask(MultipartUpload mpu) { + checkContainer(mpu.containerName()); + return MultipartUpload.create(bucket, Preconditions.checkNotNull(paths.get("multipart:"+buildKey(mpu.blobName()))), mpu.id(), mpu.blobMetadata(), new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); + } + + private MultipartUpload revmask(MultipartUpload mpu) { + checkContainer(mpu.containerName()); + return MultipartUpload.create(bucket, Preconditions.checkNotNull(paths.get("multipart-rev:"+mpu.blobName())), mpu.id(), mpu.blobMetadata(), new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ)); } @Override public void abortMultipartUpload(MultipartUpload mpu) { - delegate().abortMultipartUpload(mpu); + delegate().abortMultipartUpload(mask(mpu)); } @Override public String completeMultipartUpload(MultipartUpload mpu, List parts) { - return delegate().completeMultipartUpload(mpu, parts); + String origKey = buildKey(mpu.blobName()); + mpu = mask(mpu); + // TODO this is a bit of a hack and isn't very efficient + String etag = delegate().completeMultipartUpload(mpu, parts); + try (InputStream stream = delegate().getBlob(mpu.containerName(), mpu.blobName()).getPayload().openStream()) { + HashingOutputStream hos = new HashingOutputStream(Hashing.sha512(), ByteStreams.nullOutputStream()); + ByteStreams.copy(stream, hos); + String hash = hos.hash().toString(); + String path = JortageProxy.hashToPath(hash); + // don't fall afoul of request rate limits + Thread.sleep(500); + BlobMetadata meta = delegate().blobMetadata(mpu.containerName(), mpu.blobName()); + if (!delegate().blobExists(bucket, path)) { + Thread.sleep(500); + etag = delegate().copyBlob(mpu.containerName(), mpu.blobName(), bucket, path, CopyOptions.builder().contentMetadata(meta.getContentMetadata()).build()); + Thread.sleep(500); + delegate().setBlobAccess(bucket, path, BlobAccess.PUBLIC_READ); + } else { + Thread.sleep(500); + etag = delegate().blobMetadata(bucket, path).getETag(); + } + paths.put(buildKey(Preconditions.checkNotNull(meta.getUserMetadata().get("jortage-originalname"))), hash); + paths.remove("multipart:"+origKey); + paths.remove("multipart-rev:"+mpu.blobName()); + Thread.sleep(500); + delegate().removeBlob(mpu.containerName(), mpu.blobName()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return etag; } @Override public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) { - return delegate().uploadMultipartPart(mpu, partNumber, payload); + return delegate().uploadMultipartPart(mask(mpu), partNumber, payload); } @Override public List listMultipartUpload(MultipartUpload mpu) { - return delegate().listMultipartUpload(mpu); + return delegate().listMultipartUpload(mask(mpu)); } @Override public List listMultipartUploads(String container) { - return delegate().listMultipartUploads(bucket); + checkContainer(container); + List out = Lists.newArrayList(); + for (MultipartUpload mpu : delegate().listMultipartUploads(bucket)) { + if (Objects.equal(mpu.blobMetadata().getUserMetadata().get("jortage-creator"), identity)) { + out.add(revmask(mpu)); + } + } + return out; } @Override @@ -285,12 +353,6 @@ public class JortageBlobStore extends ForwardingBlobStore { throw new UnsupportedOperationException("Read-only BlobStore"); } - @Override - public String copyBlob(String fromContainer, String fromName, String toContainer, String toName, - CopyOptions options) { - throw new UnsupportedOperationException("Read-only BlobStore"); - } - @Override public void removeBlob(String container, String name) { throw new UnsupportedOperationException("Read-only BlobStore"); diff --git a/src/main/java/com/jortage/proxy/JortageProxy.java b/src/main/java/com/jortage/proxy/JortageProxy.java index c81b2c2..17e1a25 100644 --- a/src/main/java/com/jortage/proxy/JortageProxy.java +++ b/src/main/java/com/jortage/proxy/JortageProxy.java @@ -109,7 +109,6 @@ public class JortageProxy { S3Proxy s3Proxy = S3Proxy.builder() .awsAuthentication(AuthenticationType.AWS_V2_OR_V4, "DUMMY", "DUMMY") .endpoint(URI.create("http://localhost:23278")) - .v4MaxNonChunkedRequestSize(128*1024*1024) .build(); s3Proxy.setBlobStoreLocator(new BlobStoreLocator() {