Stream B2 writes

Previously S3Proxy buffered B2 writes on disk to create a repeatable
payload for jclouds which calculated the SHA-1 checksum.  The B2
service no longer requires a checksum so we can remove the buffering.
pull/210/head
Andrew Gaul 2017-04-07 16:46:03 -07:00
rodzic 65e65bc214
commit 437ae2068d
1 zmienionych plików z 11 dodań i 65 usunięć

Wyświetl plik

@ -75,7 +75,6 @@ import com.google.common.hash.HashingInputStream;
import com.google.common.io.BaseEncoding; import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.FileBackedOutputStream;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import com.google.common.net.HttpHeaders; import com.google.common.net.HttpHeaders;
import com.google.common.net.PercentEscaper; import com.google.common.net.PercentEscaper;
@ -215,8 +214,6 @@ public class S3ProxyHandler {
); );
private static final PercentEscaper AWS_URL_PARAMETER_ESCAPER = private static final PercentEscaper AWS_URL_PARAMETER_ESCAPER =
new PercentEscaper("-_.~", false); new PercentEscaper("-_.~", false);
// TODO: configurable fileThreshold
private static final int B2_PUT_BLOB_BUFFER_SIZE = 1024 * 1024;
private final boolean anonymousIdentity; private final boolean anonymousIdentity;
private final AuthenticationType authenticationType; private final AuthenticationType authenticationType;
@ -1698,23 +1695,12 @@ public class S3ProxyHandler {
options.multipart(true); options.multipart(true);
} }
FileBackedOutputStream fbos = null;
String eTag; String eTag;
try { try {
BlobBuilder.PayloadBlobBuilder builder; BlobBuilder.PayloadBlobBuilder builder = blobStore
if (blobStoreType.equals("b2")) { .blobBuilder(blobName)
// B2 requires a repeatable payload to calculate the SHA1 hash .payload(is)
fbos = new FileBackedOutputStream(B2_PUT_BLOB_BUFFER_SIZE); .contentLength(contentLength);
ByteStreams.copy(is, fbos);
fbos.close();
builder = blobStore.blobBuilder(blobName)
.payload(fbos.asByteSource());
} else {
builder = blobStore.blobBuilder(blobName)
.payload(is);
}
builder.contentLength(contentLength);
addContentMetdataFromHttpRequest(builder, request); addContentMetdataFromHttpRequest(builder, request);
if (contentMD5 != null) { if (contentMD5 != null) {
@ -1739,10 +1725,6 @@ public class S3ProxyHandler {
break; break;
} }
return; return;
} finally {
if (fbos != null) {
fbos.reset();
}
} }
response.addHeader(HttpHeaders.ETAG, maybeQuoteETag(eTag)); response.addHeader(HttpHeaders.ETAG, maybeQuoteETag(eTag));
@ -2210,7 +2192,6 @@ public class S3ProxyHandler {
blobMetadata.getContentMetadata().getContentLength(); blobMetadata.getContentMetadata().getContentLength();
String blobStoreType = getBlobStoreType(blobStore); String blobStoreType = getBlobStoreType(blobStore);
FileBackedOutputStream fbos = null;
try (InputStream is = blob.getPayload().openStream()) { try (InputStream is = blob.getPayload().openStream()) {
if (blobStoreType.equals("azureblob")) { if (blobStoreType.equals("azureblob")) {
// Azure has a maximum part size of 4 MB while S3 has a minimum // Azure has a maximum part size of 4 MB while S3 has a minimum
@ -2235,29 +2216,13 @@ public class S3ProxyHandler {
eTag = BaseEncoding.base16().lowerCase().encode( eTag = BaseEncoding.base16().lowerCase().encode(
his.hash().asBytes()); his.hash().asBytes());
} else { } else {
Payload payload; Payload payload = Payloads.newInputStreamPayload(is);
if (blobStoreType.equals("b2")) {
// B2 requires a repeatable payload to calculate the SHA1
// hash
fbos = new FileBackedOutputStream(B2_PUT_BLOB_BUFFER_SIZE);
ByteStreams.copy(is, fbos);
fbos.close();
payload = Payloads.newByteSourcePayload(
fbos.asByteSource());
} else {
payload = Payloads.newInputStreamPayload(is);
}
payload.getContentMetadata().setContentLength(contentLength); payload.getContentMetadata().setContentLength(contentLength);
MultipartPart part = blobStore.uploadMultipartPart(mpu, MultipartPart part = blobStore.uploadMultipartPart(mpu,
partNumber, payload); partNumber, payload);
eTag = part.partETag(); eTag = part.partETag();
} }
} finally {
if (fbos != null) {
fbos.reset();
}
} }
try (Writer writer = response.getWriter()) { try (Writer writer = response.getWriter()) {
@ -2380,32 +2345,13 @@ public class S3ProxyHandler {
his.hash().asBytes()))); his.hash().asBytes())));
} else { } else {
MultipartPart part; MultipartPart part;
Payload payload; Payload payload = Payloads.newInputStreamPayload(is);
FileBackedOutputStream fbos = null;
try {
String blobStoreType = getBlobStoreType(blobStore);
if (blobStoreType.equals("b2")) {
// B2 requires a repeatable payload to calculate the SHA1
// hash
fbos = new FileBackedOutputStream(B2_PUT_BLOB_BUFFER_SIZE);
ByteStreams.copy(is, fbos);
fbos.close();
payload = Payloads.newByteSourcePayload(
fbos.asByteSource());
} else {
payload = Payloads.newInputStreamPayload(is);
}
payload.getContentMetadata().setContentLength(contentLength); payload.getContentMetadata().setContentLength(contentLength);
if (contentMD5 != null) { if (contentMD5 != null) {
payload.getContentMetadata().setContentMD5(contentMD5); payload.getContentMetadata().setContentMD5(contentMD5);
} }
part = blobStore.uploadMultipartPart(mpu, partNumber, payload); part = blobStore.uploadMultipartPart(mpu, partNumber, payload);
} finally {
if (fbos != null) {
fbos.reset();
}
}
if (part.partETag() != null) { if (part.partETag() != null) {
response.addHeader(HttpHeaders.ETAG, response.addHeader(HttpHeaders.ETAG,