Implement support for multipart uploads

trunk
Una Thompson 2019-09-14 19:45:37 -07:00
rodzic 089f21eccf
commit ff9100bdbc
2 zmienionych plików z 78 dodań i 17 usunięć

Wyświetl plik

@ -33,6 +33,8 @@ import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.hash.HashingOutputStream;
import com.google.common.io.ByteStreams;
@ -59,11 +61,15 @@ public class JortageBlobStore extends ForwardingBlobStore {
}
}
private String map(String container, String name) {
private String mapHash(String container, String name) {
checkContainer(container);
String hash = paths.get(buildKey(name));
if (hash == null) throw new IllegalArgumentException("Not found");
return JortageProxy.hashToPath(hash);
return hash;
}
private String map(String container, String name) {
return JortageProxy.hashToPath(mapHash(container, name));
}
@Override
@ -199,37 +205,99 @@ public class JortageBlobStore extends ForwardingBlobStore {
}
}
@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName, CopyOptions options) {
// javadoc says options are ignored, so we ignore them too
checkContainer(toContainer);
String hash = mapHash(fromContainer, fromName);
paths.put(buildKey(toName), hash);
return blobMetadata(bucket, JortageProxy.hashToPath(hash)).getETag();
}
@Override
public MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata, PutOptions options) {
checkContainer(container);
MutableBlobMetadata mbm = new MutableBlobMetadataImpl(blobMetadata);
mbm.setContainer(bucket);
mbm.setName(map(blobMetadata.getContainer(), blobMetadata.getName()));
return delegate().initiateMultipartUpload(bucket, mbm, options);
String name = "multitmp/"+identity+"-"+System.currentTimeMillis()+"-"+System.nanoTime();
mbm.setName(name);
mbm.getUserMetadata().put("jortage-creator", identity);
mbm.getUserMetadata().put("jortage-originalname", blobMetadata.getName());
paths.put("multipart:"+buildKey(blobMetadata.getName()), name);
paths.put("multipart-rev:"+name, blobMetadata.getName());
return delegate().initiateMultipartUpload(bucket, mbm, new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ));
}
private MultipartUpload mask(MultipartUpload mpu) {
checkContainer(mpu.containerName());
return MultipartUpload.create(bucket, Preconditions.checkNotNull(paths.get("multipart:"+buildKey(mpu.blobName()))), mpu.id(), mpu.blobMetadata(), new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ));
}
private MultipartUpload revmask(MultipartUpload mpu) {
checkContainer(mpu.containerName());
return MultipartUpload.create(bucket, Preconditions.checkNotNull(paths.get("multipart-rev:"+mpu.blobName())), mpu.id(), mpu.blobMetadata(), new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ));
}
@Override
public void abortMultipartUpload(MultipartUpload mpu) {
delegate().abortMultipartUpload(mpu);
delegate().abortMultipartUpload(mask(mpu));
}
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
return delegate().completeMultipartUpload(mpu, parts);
String origKey = buildKey(mpu.blobName());
mpu = mask(mpu);
// TODO this is a bit of a hack and isn't very efficient
String etag = delegate().completeMultipartUpload(mpu, parts);
try (InputStream stream = delegate().getBlob(mpu.containerName(), mpu.blobName()).getPayload().openStream()) {
HashingOutputStream hos = new HashingOutputStream(Hashing.sha512(), ByteStreams.nullOutputStream());
ByteStreams.copy(stream, hos);
String hash = hos.hash().toString();
String path = JortageProxy.hashToPath(hash);
// don't fall afoul of request rate limits
Thread.sleep(500);
BlobMetadata meta = delegate().blobMetadata(mpu.containerName(), mpu.blobName());
if (!delegate().blobExists(bucket, path)) {
Thread.sleep(500);
etag = delegate().copyBlob(mpu.containerName(), mpu.blobName(), bucket, path, CopyOptions.builder().contentMetadata(meta.getContentMetadata()).build());
Thread.sleep(500);
delegate().setBlobAccess(bucket, path, BlobAccess.PUBLIC_READ);
} else {
Thread.sleep(500);
etag = delegate().blobMetadata(bucket, path).getETag();
}
paths.put(buildKey(Preconditions.checkNotNull(meta.getUserMetadata().get("jortage-originalname"))), hash);
paths.remove("multipart:"+origKey);
paths.remove("multipart-rev:"+mpu.blobName());
Thread.sleep(500);
delegate().removeBlob(mpu.containerName(), mpu.blobName());
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return etag;
}
@Override
public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) {
return delegate().uploadMultipartPart(mpu, partNumber, payload);
return delegate().uploadMultipartPart(mask(mpu), partNumber, payload);
}
@Override
public List<MultipartPart> listMultipartUpload(MultipartUpload mpu) {
return delegate().listMultipartUpload(mpu);
return delegate().listMultipartUpload(mask(mpu));
}
@Override
public List<MultipartUpload> listMultipartUploads(String container) {
return delegate().listMultipartUploads(bucket);
checkContainer(container);
List<MultipartUpload> out = Lists.newArrayList();
for (MultipartUpload mpu : delegate().listMultipartUploads(bucket)) {
if (Objects.equal(mpu.blobMetadata().getUserMetadata().get("jortage-creator"), identity)) {
out.add(revmask(mpu));
}
}
return out;
}
@Override
@ -285,12 +353,6 @@ public class JortageBlobStore extends ForwardingBlobStore {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName,
CopyOptions options) {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void removeBlob(String container, String name) {
throw new UnsupportedOperationException("Read-only BlobStore");

Wyświetl plik

@ -109,7 +109,6 @@ public class JortageProxy {
S3Proxy s3Proxy = S3Proxy.builder()
.awsAuthentication(AuthenticationType.AWS_V2_OR_V4, "DUMMY", "DUMMY")
.endpoint(URI.create("http://localhost:23278"))
.v4MaxNonChunkedRequestSize(128*1024*1024)
.build();
s3Proxy.setBlobStoreLocator(new BlobStoreLocator() {