Emulate multipart copy

Fixes #56.
pull/78/head
Andrew Gaul 2015-07-25 13:25:19 -07:00
rodzic 81c3fe9fc7
commit 8e5fecaa91
2 zmienionych plików z 133 dodań i 17 usunięć

Wyświetl plik

@ -462,8 +462,13 @@ final class S3ProxyHandler extends AbstractHandler {
handleContainerCreate(request, response, blobStore, path[1]);
return;
} else if (uploadId != null) {
handleUploadPart(request, response, blobStore, path[1],
path[2], uploadId);
if (request.getHeader("x-amz-copy-source") != null) {
handleCopyPart(request, response, blobStore, path[1],
path[2], uploadId);
} else {
handleUploadPart(request, response, blobStore, path[1],
path[2], uploadId);
}
return;
} else if (request.getHeader("x-amz-copy-source") != null) {
handleCopyBlob(request, response, blobStore, path[1], path[2]);
@ -1510,16 +1515,135 @@ final class S3ProxyHandler extends AbstractHandler {
}
}
private void handleCopyPart(HttpServletRequest request,
HttpServletResponse response, BlobStore blobStore,
String containerName, String blobName, String uploadId)
throws IOException, S3Exception {
// TODO: duplicated from handlePutBlob
String copySourceHeader = request.getHeader("x-amz-copy-source");
copySourceHeader = URLDecoder.decode(copySourceHeader, "UTF-8");
if (copySourceHeader.startsWith("/")) {
// Some clients like boto do not include the leading slash
copySourceHeader = copySourceHeader.substring(1);
}
String[] path = copySourceHeader.split("/", 2);
if (path.length != 2) {
throw new S3Exception(S3ErrorCode.INVALID_REQUEST);
}
String sourceContainerName = path[0];
String sourceBlobName = path[1];
GetOptions options = new GetOptions();
String range = request.getHeader("x-amz-copy-source-range");
if (range != null && range.startsWith("bytes=") &&
// ignore multiple ranges
range.indexOf(',') == -1) {
range = range.substring("bytes=".length());
String[] ranges = range.split("-", 2);
if (ranges[0].isEmpty()) {
options.tail(Long.parseLong(ranges[1]));
} else if (ranges[1].isEmpty()) {
options.startAt(Long.parseLong(ranges[0]));
} else {
options.range(Long.parseLong(ranges[0]),
Long.parseLong(ranges[1]));
}
}
String partNumberString = request.getParameter("partNumber");
if (partNumberString == null) {
throw new S3Exception(S3ErrorCode.INVALID_ARGUMENT);
}
int partNumber;
try {
partNumber = Integer.parseInt(partNumberString);
} catch (NumberFormatException nfe) {
throw new S3Exception(S3ErrorCode.INVALID_ARGUMENT,
"Part number must be an integer between 1 and 10000" +
", inclusive", nfe, ImmutableMap.of(
"ArgumentName", "partNumber",
"ArgumentValue", partNumberString));
}
if (partNumber < 1 || partNumber > 10_000) {
throw new S3Exception(S3ErrorCode.INVALID_ARGUMENT,
"Part number must be an integer between 1 and 10000" +
", inclusive", (Throwable) null, ImmutableMap.of(
"ArgumentName", "partNumber",
"ArgumentValue", partNumberString));
}
// TODO: how to reconstruct original mpu?
MultipartUpload mpu = MultipartUpload.create(containerName,
blobName, uploadId, createFakeBlobMetadata(blobStore));
Blob blob = blobStore.getBlob(sourceContainerName, sourceBlobName,
options);
if (blob == null) {
throw new S3Exception(S3ErrorCode.NO_SUCH_KEY);
}
BlobMetadata blobMetadata = blob.getMetadata();
long contentLength =
blobMetadata.getContentMetadata().getContentLength();
HashCode contentMD5 =
blobMetadata.getContentMetadata().getContentMD5AsHashCode();
String eTag;
try (InputStream is = blob.getPayload().openStream()) {
if (getBlobStoreType(blobStore).equals("azureblob")) {
// Azure has a maximum part size of 4 MB while S3 has a minimum
// part size of 5 MB and a maximum of 5 GB. Split a single S3
// part multiple Azure parts.
long azureMaximumMultipartPartSize = 4 * 1024 * 1024;
HashingInputStream his = new HashingInputStream(Hashing.md5(),
is);
for (int offset = 0, subPartNumber = 0; offset < contentLength;
offset += azureMaximumMultipartPartSize,
++subPartNumber) {
Payload payload = Payloads.newInputStreamPayload(
ByteStreams.limit(his,
azureMaximumMultipartPartSize));
payload.getContentMetadata().setContentLength(
Math.min(azureMaximumMultipartPartSize,
contentLength - offset));
blobStore.uploadMultipartPart(mpu,
10_000 * partNumber + subPartNumber, payload);
}
eTag = BaseEncoding.base16().lowerCase().encode(
his.hash().asBytes());
} else {
Payload payload = Payloads.newInputStreamPayload(is);
payload.getContentMetadata().setContentLength(contentLength);
MultipartPart part = blobStore.uploadMultipartPart(mpu,
partNumber, payload);
eTag = part.partETag();
}
}
try (Writer writer = response.getWriter()) {
XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(
writer);
xml.writeStartDocument();
xml.writeStartElement("CopyObjectResult");
xml.writeDefaultNamespace(AWS_XMLNS);
writeSimpleElement(xml, "LastModified",
blobStore.getContext().utils().date()
.iso8601DateFormat(blobMetadata.getLastModified()));
writeSimpleElement(xml, "ETag", maybeQuoteETag(eTag));
xml.writeEndElement();
xml.flush();
} catch (XMLStreamException xse) {
throw new IOException(xse);
}
}
private void handleUploadPart(HttpServletRequest request,
HttpServletResponse response, BlobStore blobStore,
String containerName, String blobName, String uploadId)
throws IOException, S3Exception {
// TODO: implement multipart copy
if (request.getHeader("x-amz-copy-source") != null ||
request.getHeader("x-amz-copy-source-range") != null) {
throw new S3Exception(S3ErrorCode.NOT_IMPLEMENTED);
}
// TODO: duplicated from handlePutBlob
String contentLengthString = null;
String contentMD5String = null;

Wyświetl plik

@ -189,15 +189,7 @@ public final class S3AwsSdkTest {
.withFirstByte(0L)
.withLastByte(BYTE_SOURCE.size() - 1)
.withPartNumber(1);
// TODO: S3Proxy lacks support for multipart copy
CopyPartResult copyPartResult = null;
try {
copyPartResult = client.copyPart(copyRequest);
Fail.failBecauseExceptionWasNotThrown(AmazonS3Exception.class);
} catch (AmazonS3Exception e) {
assertThat(e.getErrorCode()).isEqualTo("NotImplemented");
return;
}
CopyPartResult copyPartResult = client.copyPart(copyRequest);
List<PartETag> partETags = new ArrayList<>();
partETags.add(copyPartResult.getPartETag());