diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java index eef3d23..b8e42f6 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java @@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.PushbackInputStream; import java.io.UnsupportedEncodingException; import java.io.Writer; @@ -51,6 +50,9 @@ import java.util.concurrent.TimeUnit; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.xml.stream.XMLOutputFactory; @@ -1507,11 +1509,10 @@ public class S3ProxyHandler { "bytes"); } - try (InputStream is = blob.getPayload().openStream(); - OutputStream os = response.getOutputStream()) { - ByteStreams.copy(is, os); - os.flush(); - } + InputStream is = blob.getPayload().openStream(); + ServletOutputStream sos = response.getOutputStream(); + AsyncContext async = request.startAsync(request, response); + sos.setWriteListener(new StandardDataStream(async, is, sos)); } private void handleCopyBlob(HttpServletRequest request, @@ -1717,6 +1718,7 @@ public class S3ProxyHandler { builder = builder.contentMD5(contentMD5); } + // TODO: needs async like handleGetBlob eTag = blobStore.putBlob(containerName, builder.build(), options); } catch (HttpResponseException hre) { @@ -2871,4 +2873,38 @@ public class S3ProxyHandler { } return true; } + + private static final class StandardDataStream implements WriteListener { + private static final Logger logger = LoggerFactory.getLogger( + StandardDataStream.class); + + private final AsyncContext async; + private final InputStream is; + private final ServletOutputStream sos; + + private StandardDataStream(AsyncContext async, InputStream is, + ServletOutputStream sos) { + this.async = async; + this.is = is; + this.sos = sos; + } + + public void onWritePossible() throws IOException { + byte[] buffer = new byte[4096]; + while (sos.isReady()) { + int len = is.read(buffer); + if (len < 0) { + // TODO: does this close is and sos? + async.complete(); + return; + } + sos.write(buffer, 0, len); + } + } + + public void onError(Throwable t) { + logger.debug("Async Error", t); + async.complete(); + } + } }