Async response proof of concept

This streams data during Get Object requests.  References #219.
async
Andrew Gaul 2017-05-05 13:49:17 -07:00
rodzic d3c77384f1
commit 0b76a5c76d
1 zmienionych plików z 42 dodań i 6 usunięć

Wyświetl plik

@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
@ -51,6 +50,9 @@ import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.xml.stream.XMLOutputFactory;
@ -1507,11 +1509,10 @@ public class S3ProxyHandler {
"bytes");
}
try (InputStream is = blob.getPayload().openStream();
OutputStream os = response.getOutputStream()) {
ByteStreams.copy(is, os);
os.flush();
}
InputStream is = blob.getPayload().openStream();
ServletOutputStream sos = response.getOutputStream();
AsyncContext async = request.startAsync(request, response);
sos.setWriteListener(new StandardDataStream(async, is, sos));
}
private void handleCopyBlob(HttpServletRequest request,
@ -1717,6 +1718,7 @@ public class S3ProxyHandler {
builder = builder.contentMD5(contentMD5);
}
// TODO: needs async like handleGetBlob
eTag = blobStore.putBlob(containerName, builder.build(),
options);
} catch (HttpResponseException hre) {
@ -2871,4 +2873,38 @@ public class S3ProxyHandler {
}
return true;
}
private static final class StandardDataStream implements WriteListener {
private static final Logger logger = LoggerFactory.getLogger(
StandardDataStream.class);
private final AsyncContext async;
private final InputStream is;
private final ServletOutputStream sos;
private StandardDataStream(AsyncContext async, InputStream is,
ServletOutputStream sos) {
this.async = async;
this.is = is;
this.sos = sos;
}
public void onWritePossible() throws IOException {
byte[] buffer = new byte[4096];
while (sos.isReady()) {
int len = is.read(buffer);
if (len < 0) {
// TODO: does this close is and sos?
async.complete();
return;
}
sos.write(buffer, 0, len);
}
}
public void onError(Throwable t) {
logger.debug("Async Error", t);
async.complete();
}
}
}