Emulate multipart upload with single-part uploads

This approach requires three times as many operations as the optimal
approach.  Implementing this correctly requires exposing the
underlying multipart operations in jclouds.  Most s3-tests pass but
test_list_multipart_upload still fails:

References #2.
pull/26/head
Andrew Gaul 2014-12-21 16:59:02 -08:00
rodzic 11c93a43a8
commit b0b1f4e9fa
3 zmienionych plików z 473 dodań i 4 usunięć

Wyświetl plik

@ -37,6 +37,10 @@ enum S3ErrorCode {
"Your previous request to create the named bucket" +
" succeeded and you already own it."),
BUCKET_NOT_EMPTY(HttpServletResponse.SC_CONFLICT, "Conflict"),
ENTITY_TOO_SMALL(HttpServletResponse.SC_BAD_REQUEST,
"Your proposed upload is smaller than the minimum allowed object" +
" size. Each part must be at least 5 MB in size, except the last" +
" part."),
INVALID_ACCESS_KEY_ID(HttpServletResponse.SC_FORBIDDEN, "Forbidden"),
INVALID_ARGUMENT(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"),
INVALID_BUCKET_NAME(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"),
@ -52,6 +56,7 @@ enum S3ErrorCode {
"Length Required"),
NO_SUCH_BUCKET(HttpServletResponse.SC_NOT_FOUND, "Not Found"),
NO_SUCH_KEY(HttpServletResponse.SC_NOT_FOUND, "Not Found"),
NO_SUCH_UPLOAD(HttpServletResponse.SC_NOT_FOUND, "Not Found"),
REQUEST_TIME_TOO_SKEWED(HttpServletResponse.SC_FORBIDDEN, "Forbidden"),
REQUEST_TIMEOUT(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"),
SIGNATURE_DOES_NOT_MATCH(HttpServletResponse.SC_FORBIDDEN, "Forbidden");

Wyświetl plik

@ -33,10 +33,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
@ -61,7 +63,9 @@ import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.hash.HashingInputStream;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.net.HostAndPort;
import com.google.common.net.HttpHeaders;
@ -102,7 +106,16 @@ final class S3ProxyHandler extends AbstractHandler {
"75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a";
private static final String FAKE_OWNER_DISPLAY_NAME =
"CustomersName@amazon.com";
private static final String FAKE_INITIATOR_ID =
"arn:aws:iam::111122223333:" +
"user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx";
private static final String FAKE_INITIATOR_DISPLAY_NAME =
"umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx";
private static final String FAKE_REQUEST_ID = "4442587FB7D0A2F9";
private static final String FAKE_UPLOAD_ID =
"EXAMPLEJZ6e0YupT2h66iePQCc9IEbYbDUy4RTpMeoSMLPRp8Z5o1u8feSRo" +
"npvnWsKKG35tI2LB9VDPiCgTy.Gq2VxQLYjrue4Nq.NBdqI-";
private static final long MINIMUM_MULTIPART_PART_SIZE = 5 * 1024 * 1024;
private static final Pattern VALID_BUCKET_PATTERN =
Pattern.compile("[a-zA-Z0-9._-]+");
private static final Set<String> SIGNED_SUBRESOURCES = ImmutableSet.of(
@ -119,8 +132,11 @@ final class S3ProxyHandler extends AbstractHandler {
"location",
"marker",
"max-keys",
"partNumber",
"prefix",
"Signature"
"Signature",
"uploadId",
"uploads"
);
private static final Set<String> CANNED_ACLS = ImmutableSet.of(
"private",
@ -312,12 +328,18 @@ final class S3ProxyHandler extends AbstractHandler {
for (int i = 0; i < path.length; i++) {
path[i] = URLDecoder.decode(path[i], "UTF-8");
}
String uploadId = request.getParameter("uploadId");
switch (method) {
case "DELETE":
if (path.length <= 2 || path[2].isEmpty()) {
handleContainerDelete(response, path[1]);
baseRequest.setHandled(true);
return;
} else if (uploadId != null) {
handleAbortMultipartUpload(request, response, path[1], path[2],
uploadId);
baseRequest.setHandled(true);
return;
} else {
handleBlobRemove(response, path[1], path[2]);
baseRequest.setHandled(true);
@ -337,6 +359,10 @@ final class S3ProxyHandler extends AbstractHandler {
handleContainerLocation(response, path[1]);
baseRequest.setHandled(true);
return;
} else if ("".equals(request.getParameter("uploads"))) {
handleListMultipartUploads(response, uploadId);
baseRequest.setHandled(true);
return;
}
handleBlobList(request, response, path[1]);
baseRequest.setHandled(true);
@ -346,6 +372,11 @@ final class S3ProxyHandler extends AbstractHandler {
handleGetBlobAcl(response, path[1], path[2]);
baseRequest.setHandled(true);
return;
} else if (uploadId != null) {
handleListParts(request, response, path[1], path[2],
uploadId);
baseRequest.setHandled(true);
return;
}
handleGetBlob(request, response, path[1], path[2]);
baseRequest.setHandled(true);
@ -366,6 +397,16 @@ final class S3ProxyHandler extends AbstractHandler {
handleMultiBlobRemove(request, response, path[1]);
baseRequest.setHandled(true);
return;
} else if ("".equals(request.getParameter("uploads"))) {
handleInitiateMultipartUpload(request, response, path[1],
path[2]);
baseRequest.setHandled(true);
return;
} else if (uploadId != null) {
handleCompleteMultipartUpload(request, response, path[1],
path[2], uploadId);
baseRequest.setHandled(true);
return;
}
break;
case "PUT":
@ -378,6 +419,11 @@ final class S3ProxyHandler extends AbstractHandler {
handleContainerCreate(request, response, path[1]);
baseRequest.setHandled(true);
return;
} else if (uploadId != null) {
handleUploadPart(request, response, path[1], path[2],
uploadId);
baseRequest.setHandled(true);
return;
} else if (request.getHeader("x-amz-copy-source") != null) {
handleCopyBlob(request, response, path[1], path[2]);
baseRequest.setHandled(true);
@ -685,6 +731,12 @@ final class S3ProxyHandler extends AbstractHandler {
}
}
private void handleListMultipartUploads(HttpServletResponse response,
String uploadId) throws IOException {
// TODO: list all blobs starting with uploadId
response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
}
private void handleContainerExists(HttpServletResponse response,
String containerName) throws IOException {
if (!blobStore.containerExists(containerName)) {
@ -1249,6 +1301,345 @@ final class S3ProxyHandler extends AbstractHandler {
}
}
private void handleInitiateMultipartUpload(HttpServletRequest request,
HttpServletResponse response, String containerName,
String blobName) throws IOException {
String uploadId = FAKE_UPLOAD_ID + UUID.randomUUID().toString();
ByteSource payload = ByteSource.empty();
BlobBuilder.PayloadBlobBuilder builder = blobStore
.blobBuilder(uploadId)
.payload(payload);
addContentMetdataFromHttpRequest(builder, request);
builder.contentLength(payload.size());
blobStore.putBlob(containerName, builder.build());
try (Writer writer = response.getWriter()) {
XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(
writer);
xml.writeStartDocument();
xml.writeStartElement("InitiateMultipartUploadResult");
xml.writeDefaultNamespace(AWS_XMLNS);
xml.writeStartElement("Bucket");
xml.writeCharacters(containerName);
xml.writeEndElement();
xml.writeStartElement("Key");
xml.writeCharacters(blobName);
xml.writeEndElement();
xml.writeStartElement("UploadId");
xml.writeCharacters(uploadId);
xml.writeEndElement();
xml.writeEndElement();
xml.flush();
} catch (XMLStreamException xse) {
throw new IOException(xse);
}
}
private void handleCompleteMultipartUpload(HttpServletRequest request,
HttpServletResponse response, String containerName,
String blobName, String uploadId) throws IOException {
try (InputStream is = request.getInputStream();
Writer writer = response.getWriter()) {
Collection<String> partNames = new ArrayList<>();
long totalContentLength = 0;
for (Iterator<String> it = parseSimpleXmlElements(is,
"PartNumber").iterator(); it.hasNext();) {
String partName = uploadId + "." + it.next();
partNames.add(partName);
BlobMetadata metadata = blobStore.blobMetadata(containerName,
partName);
long contentLength =
metadata.getContentMetadata().getContentLength();
if (contentLength < MINIMUM_MULTIPART_PART_SIZE &&
it.hasNext()) {
sendSimpleErrorResponse(response,
S3ErrorCode.ENTITY_TOO_SMALL);
return;
}
totalContentLength += contentLength;
}
BlobMetadata blobMetadata = blobStore.blobMetadata(
containerName, uploadId);
ContentMetadata contentMetadata =
blobMetadata.getContentMetadata();
BlobBuilder.PayloadBlobBuilder builder = blobStore
.blobBuilder(blobName)
.userMetadata(blobMetadata.getUserMetadata())
.payload(new MultiBlobByteSource(blobStore, containerName,
partNames))
.contentDisposition(
contentMetadata.getContentDisposition())
.contentEncoding(contentMetadata.getContentEncoding())
.contentLanguage(contentMetadata.getContentLanguage())
.contentLength(totalContentLength)
.expires(contentMetadata.getExpires());
String contentType = contentMetadata.getContentType();
if (contentType != null) {
builder.contentType(contentType);
}
// TODO: will the client time out here?
String eTag = blobStore.putBlob(containerName, builder.build(),
new PutOptions().multipart(true));
blobStore.removeBlobs(containerName, partNames);
XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(
writer);
xml.writeStartDocument();
xml.writeStartElement("CompleteMultipartUploadResult");
xml.writeDefaultNamespace(AWS_XMLNS);
xml.writeStartElement("Location");
// TODO: bogus value
xml.writeCharacters("http://Example-Bucket.s3.amazonaws.com/" +
blobName);
xml.writeEndElement();
xml.writeStartElement("Bucket");
xml.writeCharacters(containerName);
xml.writeEndElement();
xml.writeStartElement("Key");
xml.writeCharacters(blobName);
xml.writeEndElement();
if (eTag != null) {
xml.writeStartElement("ETag");
if (blobStoreType.equals("google-cloud-storage")) {
eTag = BaseEncoding.base16().lowerCase().encode(
BaseEncoding.base64().decode(eTag));
}
xml.writeCharacters("\"" + eTag + "\"");
xml.writeEndElement();
}
xml.writeEndElement();
xml.flush();
} catch (XMLStreamException xse) {
throw new IOException(xse);
}
}
private void handleAbortMultipartUpload(HttpServletRequest request,
HttpServletResponse response, String containerName,
String blobName, String uploadId) throws IOException {
if (!blobStore.blobExists(containerName, uploadId)) {
sendSimpleErrorResponse(response, S3ErrorCode.NO_SUCH_UPLOAD);
return;
}
PageSet<? extends StorageMetadata> pageSet = blobStore.list(
containerName,
new ListContainerOptions().afterMarker(uploadId));
for (StorageMetadata sm : pageSet) {
String partName = sm.getName();
if (!partName.startsWith(uploadId + ".")) {
break;
}
blobStore.removeBlob(containerName, partName);
}
blobStore.removeBlob(containerName, uploadId);
response.sendError(HttpServletResponse.SC_NO_CONTENT);
}
private void handleListParts(HttpServletRequest request,
HttpServletResponse response, String containerName,
String blobName, String uploadId) throws IOException {
try (Writer writer = response.getWriter()) {
XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(
writer);
xml.writeStartDocument();
xml.writeStartElement("ListPartsResult");
xml.writeDefaultNamespace(AWS_XMLNS);
xml.writeStartElement("Bucket");
xml.writeCharacters(containerName);
xml.writeEndElement();
xml.writeStartElement("Key");
xml.writeCharacters(blobName);
xml.writeEndElement();
xml.writeStartElement("UploadId");
xml.writeCharacters(uploadId);
xml.writeEndElement();
// TODO: bogus values
xml.writeStartElement("Initiator");
xml.writeStartElement("ID");
xml.writeCharacters(FAKE_INITIATOR_ID);
xml.writeEndElement();
xml.writeStartElement("DisplayName");
xml.writeCharacters(FAKE_INITIATOR_DISPLAY_NAME);
xml.writeEndElement();
xml.writeEndElement();
xml.writeStartElement("Owner");
xml.writeStartElement("ID");
xml.writeCharacters(FAKE_OWNER_ID);
xml.writeEndElement();
xml.writeStartElement("DisplayName");
xml.writeCharacters(FAKE_OWNER_DISPLAY_NAME);
xml.writeEndElement();
xml.writeEndElement();
xml.writeStartElement("StorageClass");
xml.writeCharacters("STANDARD");
xml.writeEndElement();
// TODO: pagination
/*
xml.writeStartElement("PartNumberMarker");
xml.writeCharacters("1");
xml.writeEndElement();
xml.writeStartElement("NextPartNumberMarker");
xml.writeCharacters("3");
xml.writeEndElement();
xml.writeStartElement("MaxParts");
xml.writeCharacters("2");
xml.writeEndElement();
xml.writeStartElement("IsTruncated");
xml.writeCharacters("true");
xml.writeEndElement();
*/
PageSet<? extends StorageMetadata> pageSet = blobStore.list(
containerName,
new ListContainerOptions().afterMarker(uploadId));
for (StorageMetadata sm : pageSet) {
String partName = sm.getName();
if (!partName.startsWith(uploadId + ".")) {
break;
}
BlobMetadata metadata = blobStore.blobMetadata(containerName,
partName);
xml.writeStartElement("Part");
xml.writeStartElement("PartNumber");
xml.writeCharacters(partName.substring(
(uploadId + ".").length()));
xml.writeEndElement();
Date lastModified = sm.getLastModified();
if (lastModified != null) {
xml.writeStartElement("LastModified");
xml.writeCharacters(blobStore.getContext().utils().date()
.iso8601DateFormat(lastModified));
xml.writeEndElement();
}
String eTag = sm.getETag();
if (eTag != null) {
xml.writeStartElement("ETag");
if (blobStoreType.equals("google-cloud-storage")) {
eTag = BaseEncoding.base16().lowerCase().encode(
BaseEncoding.base64().decode(eTag));
}
xml.writeCharacters("\"" + eTag + "\"");
xml.writeEndElement();
}
xml.writeStartElement("Size");
xml.writeCharacters(String.valueOf(
metadata.getContentMetadata().getContentLength()));
xml.writeEndElement();
xml.writeEndElement();
}
xml.writeEndElement();
xml.flush();
} catch (XMLStreamException xse) {
throw new IOException(xse);
}
}
private void handleUploadPart(HttpServletRequest request,
HttpServletResponse response, String containerName,
String blobName, String uploadId) throws IOException {
// TODO: duplicated from handlePutBlob
String contentLengthString = null;
String contentMD5String = null;
for (String headerName : Collections.list(request.getHeaderNames())) {
String headerValue = Strings.nullToEmpty(request.getHeader(
headerName));
if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) {
contentLengthString = headerValue;
} else if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_MD5)) {
contentMD5String = headerValue;
}
}
HashCode contentMD5 = null;
if (contentMD5String != null) {
try {
contentMD5 = HashCode.fromBytes(
BaseEncoding.base64().decode(contentMD5String));
} catch (IllegalArgumentException iae) {
sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST);
return;
}
if (contentMD5.bits() != Hashing.md5().bits()) {
sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST);
return;
}
}
if (contentLengthString == null) {
sendSimpleErrorResponse(response,
S3ErrorCode.MISSING_CONTENT_LENGTH);
return;
}
long contentLength;
try {
contentLength = Long.parseLong(contentLengthString);
} catch (NumberFormatException nfe) {
sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT);
return;
}
if (contentLength < 0) {
sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT);
return;
}
String partNumber = request.getParameter("partNumber");
// TODO: sanity checking
try (HashingInputStream his = new HashingInputStream(Hashing.md5(),
request.getInputStream())) {
BlobBuilder.PayloadBlobBuilder builder = blobStore
.blobBuilder(uploadId + "." + partNumber)
.payload(his);
addContentMetdataFromHttpRequest(builder, request);
if (contentMD5 != null) {
builder = builder.contentMD5(contentMD5);
}
blobStore.putBlob(containerName, builder.build());
// recalculate ETag since some object stores like Azure return
// non-hash
byte[] hashCode = his.hash().asBytes();
response.addHeader(HttpHeaders.ETAG, "\"" +
BaseEncoding.base16().lowerCase().encode(hashCode) + "\"");
}
}
private static void addMetadataToResponse(HttpServletResponse response,
BlobMetadata metadata) {
ContentMetadata contentMetadata =
@ -1474,4 +1865,70 @@ final class S3ProxyHandler extends AbstractHandler {
builder.expires(new Date(expires));
}
}
static final class MultiBlobByteSource extends ByteSource {
private final BlobStore blobStore;
private final String containerName;
private final Collection<String> blobNames;
MultiBlobByteSource(BlobStore blobStore, String containerName,
Collection<String> blobNames) {
this.blobStore = checkNotNull(blobStore);
this.containerName = checkNotNull(containerName);
this.blobNames = checkNotNull(blobNames);
}
@Override
public InputStream openStream() throws IOException {
return new MultiBlobInputStream(blobStore, containerName,
blobNames);
}
}
static final class MultiBlobInputStream extends InputStream {
private final BlobStore blobStore;
private final String containerName;
private final Iterator<String> blobNames;
private InputStream is;
MultiBlobInputStream(BlobStore blobStore, String containerName,
Collection<String> blobNames) throws IOException {
this.blobStore = checkNotNull(blobStore);
this.containerName = checkNotNull(containerName);
this.blobNames = blobNames.iterator();
resetInputStream();
}
@Override
public int read() throws IOException {
int ch = is.read();
if (ch != -1) {
return ch;
} else if (blobNames.hasNext()) {
resetInputStream();
return is.read();
} else {
return -1;
}
}
@Override
public int read(byte[] array, int offset, int length)
throws IOException {
int ch = is.read(array, offset, length);
if (ch != -1) {
return ch;
} else if (blobNames.hasNext()) {
resetInputStream();
return is.read(array, offset, length);
} else {
return -1;
}
}
private void resetInputStream() throws IOException {
Blob blob = blobStore.getBlob(containerName, blobNames.next());
is = blob.getPayload().openStream();
}
}
}

Wyświetl plik

@ -52,6 +52,7 @@ import org.jclouds.io.Payload;
import org.jclouds.io.payloads.ByteSourcePayload;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.rest.HttpClient;
import org.jclouds.s3.S3Client;
import org.jclouds.util.Throwables2;
import org.junit.After;
import org.junit.Before;
@ -376,7 +377,7 @@ public final class S3ProxyTest {
}
@Test
public void testUnknownParameter() throws Exception {
public void testMultipartUpload() throws Exception {
String blobName = "blob";
int minMultipartSize = 32 * 1024 * 1024 + 1;
ByteSource byteSource = ByteSource.wrap(new byte[minMultipartSize]);
@ -384,9 +385,15 @@ public final class S3ProxyTest {
.payload(byteSource)
.contentLength(byteSource.size())
.build();
PutOptions options = new PutOptions().multipart(true);
s3BlobStore.putBlob(containerName, blob,
new PutOptions().multipart(true));
}
@Test
public void testUnknownParameter() throws Exception {
S3Client s3Client = s3Context.unwrapApi(S3Client.class);
try {
s3BlobStore.putBlob(containerName, blob, options);
s3Client.disableBucketLogging(containerName);
fail("Expected HttpResponseException");
} catch (RuntimeException re) {
// TODO: why does jclouds wrap this in a