kopia lustrzana https://github.com/jortage/poolmgr
Mini Rivet refactor, implement fast upload endpoint
rodzic
4c8c3e0946
commit
f5ccfae4c0
|
@ -100,6 +100,19 @@ public class Queries {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isMapped(DataSource dataSource, HashCode hash) {
|
||||
try (Connection c = dataSource.getConnection()) {
|
||||
try (PreparedStatement ps = c.prepareStatement("SELECT 1 FROM `name_map` WHERE `hash` = ? LIMIT 1;")) {
|
||||
ps.setBytes(1, hash.asBytes());
|
||||
try (ResultSet rs = ps.executeQuery()) {
|
||||
return rs.first();
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void putFilesize(DataSource dataSource, HashCode hash, long size) {
|
||||
try (Connection c = dataSource.getConnection()) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import static com.google.common.base.Verify.verify;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -68,7 +69,7 @@ public final class RivetHandler extends AbstractHandler {
|
|||
FREEZING, COLD, WARM, HOT, SCALDING;
|
||||
}
|
||||
|
||||
private enum RetrieveResult {
|
||||
private enum RivetResult {
|
||||
/**
|
||||
* The file was downloaded and added to the pool. Worst case.
|
||||
*/
|
||||
|
@ -93,7 +94,7 @@ public final class RivetHandler extends AbstractHandler {
|
|||
private final Gson gson;
|
||||
// synchronize on a mutex when loading URLs to avoid download races that would waste bandwidth
|
||||
private final Object retrieveMutex = new Object();
|
||||
private final Map<String, Pair<RetrieveResult, Temperature>> results = Maps.newHashMap();
|
||||
private final Map<String, Pair<RivetResult, Temperature>> results = Maps.newHashMap();
|
||||
private final LoadingCache<String, HashCode> urlCache = CacheBuilder.newBuilder()
|
||||
.concurrencyLevel(1)
|
||||
.expireAfterWrite(10, TimeUnit.MINUTES)
|
||||
|
@ -134,13 +135,15 @@ public final class RivetHandler extends AbstractHandler {
|
|||
}
|
||||
OutputStream sinkOut = bss.getSink().openStream();
|
||||
HashingOutputStream hos = new HashingOutputStream(Hashing.sha512(), sinkOut);
|
||||
ByteStreams.copy(getRes.body().byteStream(), hos);
|
||||
try (InputStream in = getRes.body().byteStream()) {
|
||||
ByteStreams.copy(in, hos);
|
||||
}
|
||||
hos.close();
|
||||
HashCode hash = hos.hash();
|
||||
String hashStr = hash.toString();
|
||||
String path = JortageProxy.hashToPath(hashStr);
|
||||
if (JortageProxy.backingBlobStore.blobExists(JortageProxy.bucket, path)) {
|
||||
results.put(url, new Pair<>(RetrieveResult.PRESENT, Temperature.COLD));
|
||||
if (Queries.isMapped(JortageProxy.dataSource, hash)) {
|
||||
results.put(url, new Pair<>(RivetResult.PRESENT, Temperature.COLD));
|
||||
} else {
|
||||
Blob blob = JortageProxy.backingBlobStore.blobBuilder(path)
|
||||
.payload(bss.getSource())
|
||||
|
@ -152,7 +155,7 @@ public final class RivetHandler extends AbstractHandler {
|
|||
new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ).multipart(size > 8192));
|
||||
Queries.putPendingBackup(JortageProxy.dataSource, hash);
|
||||
Queries.putFilesize(JortageProxy.dataSource, hash, size);
|
||||
results.put(url, new Pair<>(RetrieveResult.ADDED, Temperature.FREEZING));
|
||||
results.put(url, new Pair<>(RivetResult.ADDED, Temperature.FREEZING));
|
||||
}
|
||||
return hash;
|
||||
} else {
|
||||
|
@ -179,9 +182,9 @@ public final class RivetHandler extends AbstractHandler {
|
|||
String prelude = segments.get(1)+segments.get(2);
|
||||
String hashStr = segments.get(3);
|
||||
if (hashStr.startsWith(prelude) && HEX_MATCHER.matchesAllOf(hashStr)) {
|
||||
if (JortageProxy.backingBlobStore.blobExists(JortageProxy.bucket, JortageProxy.hashToPath(hashStr))) {
|
||||
HashCode hash = HashCode.fromString(hashStr);
|
||||
results.put(originalUrl, new Pair<>(RetrieveResult.FOUND, temp));
|
||||
HashCode hash = HashCode.fromString(hashStr);
|
||||
if (Queries.isMapped(JortageProxy.dataSource, hash)) {
|
||||
results.put(originalUrl, new Pair<>(RivetResult.FOUND, temp));
|
||||
return hash;
|
||||
}
|
||||
}
|
||||
|
@ -231,167 +234,287 @@ public final class RivetHandler extends AbstractHandler {
|
|||
}
|
||||
|
||||
|
||||
private class RivetRequest {
|
||||
public final String identity;
|
||||
public final JsonObject json;
|
||||
private RivetRequest(String identity, JsonObject json) {
|
||||
this.identity = identity;
|
||||
this.json = json;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException {
|
||||
baseRequest.setHandled(true);
|
||||
if ("/retrieve".equals(target)) {
|
||||
try {
|
||||
if ("OPTIONS".equals(req.getMethod())) {
|
||||
res.setHeader("Allow", "POST");
|
||||
res.setHeader("Accept", "application/json;charset=utf-8");
|
||||
res.setStatus(204);
|
||||
res.getOutputStream().close();
|
||||
return;
|
||||
}
|
||||
if (!"POST".equals(req.getMethod())) {
|
||||
res.setHeader("Allow", "POST");
|
||||
jsonError(res, 405, "/retrieve only accepts POST");
|
||||
return;
|
||||
}
|
||||
String authHeader = req.getHeader("Rivet-Auth");
|
||||
if (authHeader == null) {
|
||||
jsonError(res, 401, "Rivet-Auth header missing");
|
||||
return;
|
||||
}
|
||||
Iterator<String> iter = RIVET_AUTH_SPLITTER.split(authHeader).iterator();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return;
|
||||
}
|
||||
String identity = iter.next();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return;
|
||||
}
|
||||
String macStr = iter.next();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return;
|
||||
}
|
||||
String dateStr = iter.next();
|
||||
verify(!iter.hasNext());
|
||||
|
||||
Instant date;
|
||||
RivetRequest rreq = authenticateAndParse(target, "POST", "application/json; charset=utf-8", true, req, res);
|
||||
if (rreq == null) return;
|
||||
if (!rreq.json.has("sourceUrl")) {
|
||||
jsonError(res, 400, "Must specify sourceUrl");
|
||||
return;
|
||||
}
|
||||
if (!rreq.json.has("destinationPath")) {
|
||||
jsonError(res, 400, "Must specify destinationPath");
|
||||
return;
|
||||
}
|
||||
String sourceUrl = rreq.json.get("sourceUrl").getAsString();
|
||||
if (!sourceUrl.startsWith("https://") && !sourceUrl.startsWith("http://")) {
|
||||
jsonError(res, 400, "sourceUrl must be http or https");
|
||||
return;
|
||||
}
|
||||
String destinationPath = rreq.json.get("destinationPath").getAsString();
|
||||
RivetResult retRes = null;
|
||||
Temperature temp = null;
|
||||
HashCode hash;
|
||||
res.sendError(102);
|
||||
synchronized (retrieveMutex) {
|
||||
try {
|
||||
date = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(dateStr));
|
||||
} catch (DateTimeParseException e) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Could not parse date)");
|
||||
return;
|
||||
}
|
||||
if (date.isBefore(Instant.now().minus(5, ChronoUnit.MINUTES))) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Too old)");
|
||||
return;
|
||||
}
|
||||
|
||||
JortageProxy.reloadConfigIfChanged();
|
||||
if (!JortageProxy.config.containsKey("users") || !JortageProxy.config.getObject("users").containsKey(identity)) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Bad access ID)");
|
||||
return;
|
||||
}
|
||||
if (req.getContentLength() == -1) {
|
||||
jsonError(res, 411, "Length required");
|
||||
return;
|
||||
}
|
||||
if (req.getContentLength() > 8192) {
|
||||
jsonError(res, 413, "Payload too large");
|
||||
return;
|
||||
}
|
||||
String contentType = req.getHeader("Content-Type");
|
||||
if (contentType == null || !"application/json;charset=utf-8".equals(contentType.replace(" ", "").toLowerCase(Locale.ROOT))) {
|
||||
res.setHeader("Accept", "application/json;charset=utf-8");
|
||||
jsonError(res, 415, "Content-Type must be application/json; charset=utf-8");
|
||||
return;
|
||||
}
|
||||
byte[] theirMac = BaseEncoding.base64().decode(macStr);
|
||||
Mac mac = assertSuccess(() -> Mac.getInstance("HmacSHA512"));
|
||||
byte[] payload = ByteStreams.toByteArray(ByteStreams.limit(req.getInputStream(), req.getContentLength()));
|
||||
req.getInputStream().close();
|
||||
String payloadStr = new String(payload, Charsets.UTF_8);
|
||||
|
||||
String key = JortageProxy.config.getObject("users").get(String.class, identity);
|
||||
assertSuccess(() -> mac.init(new SecretKeySpec(key.getBytes(Charsets.UTF_8), "RAW")));
|
||||
mac.update((identity+":"+dateStr+":"+payloadStr).getBytes(Charsets.UTF_8));
|
||||
byte[] ourMac = mac.doFinal();
|
||||
if (!MessageDigest.isEqual(theirMac, ourMac)) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Bad MAC)");
|
||||
return;
|
||||
}
|
||||
|
||||
// phew. now that all of *that* is out of the way... what is it they want?
|
||||
JsonObject json;
|
||||
try {
|
||||
json = gson.fromJson(payloadStr, JsonObject.class);
|
||||
} catch (JsonSyntaxException e) {
|
||||
jsonError(res, 400, "Syntax error in payload");
|
||||
return;
|
||||
}
|
||||
if (!json.has("sourceUrl")) {
|
||||
jsonError(res, 400, "Must specify sourceUrl");
|
||||
return;
|
||||
}
|
||||
if (!json.has("destinationPath")) {
|
||||
jsonError(res, 400, "Must specify destinationPath");
|
||||
return;
|
||||
}
|
||||
String sourceUrl = json.get("sourceUrl").getAsString();
|
||||
if (!sourceUrl.startsWith("https://") && !sourceUrl.startsWith("http://")) {
|
||||
jsonError(res, 400, "sourceUrl must be http or https");
|
||||
return;
|
||||
}
|
||||
String destinationPath = json.get("destinationPath").getAsString();
|
||||
synchronized (retrieveMutex) {
|
||||
RetrieveResult retRes = null;
|
||||
Temperature temp = null;
|
||||
HashCode hash;
|
||||
try {
|
||||
if (urlCache.getIfPresent(sourceUrl) != null) {
|
||||
retRes = RetrieveResult.CACHED;
|
||||
temp = Temperature.SCALDING;
|
||||
}
|
||||
hash = urlCache.get(sourceUrl);
|
||||
if (retRes == null || temp == null) {
|
||||
Pair<RetrieveResult, Temperature> pair = results.get(sourceUrl);
|
||||
retRes = pair.getFirst();
|
||||
temp = pair.getSecond();
|
||||
}
|
||||
} catch (ExecutionException | UncheckedExecutionException e) {
|
||||
if (e.getMessage() != null) {
|
||||
if (e.getMessage().contains("Illegal host")) {
|
||||
jsonError(res, 400, "Illegal host");
|
||||
return;
|
||||
}
|
||||
if (e.getMessage().contains("Unsuccessful response")) {
|
||||
jsonError(res, 502, "Upstream error "+(e.getMessage().substring(e.getMessage().lastIndexOf(':')+1).trim()));
|
||||
return;
|
||||
}
|
||||
if (e.getMessage().contains("connect timed out")) {
|
||||
jsonError(res, 504, "Upstream timeout");
|
||||
return;
|
||||
}
|
||||
}
|
||||
jsonExceptionError(res, e, "sourceUrl: "+sourceUrl, "identity: "+identity);
|
||||
return;
|
||||
if (urlCache.getIfPresent(sourceUrl) != null) {
|
||||
retRes = RivetResult.CACHED;
|
||||
temp = Temperature.SCALDING;
|
||||
}
|
||||
Queries.putMap(JortageProxy.dataSource, identity, destinationPath, hash);
|
||||
res.setStatus(200);
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
JsonObject obj = new JsonObject();
|
||||
JsonObject result = new JsonObject();
|
||||
result.addProperty("name", retRes.name());
|
||||
result.addProperty("temperature", temp.name());
|
||||
obj.add("result", result);
|
||||
obj.addProperty("hash", hash.toString());
|
||||
res.getOutputStream().write(obj.toString().getBytes(Charsets.UTF_8));
|
||||
res.getOutputStream().close();
|
||||
hash = urlCache.get(sourceUrl);
|
||||
if (retRes == null || temp == null) {
|
||||
Pair<RivetResult, Temperature> pair = results.get(sourceUrl);
|
||||
retRes = pair.getFirst();
|
||||
temp = pair.getSecond();
|
||||
}
|
||||
} catch (ExecutionException | UncheckedExecutionException e) {
|
||||
if (e.getMessage() != null) {
|
||||
if (e.getMessage().contains("Illegal host")) {
|
||||
jsonError(res, 400, "Illegal host");
|
||||
return;
|
||||
}
|
||||
if (e.getMessage().contains("Unsuccessful response")) {
|
||||
jsonError(res, 502, "Upstream error "+(e.getMessage().substring(e.getMessage().lastIndexOf(':')+1).trim()));
|
||||
return;
|
||||
}
|
||||
if (e.getMessage().contains("Failed to connect")) {
|
||||
jsonError(res, 502, "Upstream refused connection");
|
||||
return;
|
||||
}
|
||||
if (e.getMessage().contains("connect timed out")) {
|
||||
jsonError(res, 504, "Upstream timeout");
|
||||
return;
|
||||
}
|
||||
}
|
||||
jsonExceptionError(res, e, "sourceUrl: "+sourceUrl, "identity: "+rreq.identity);
|
||||
return;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
jsonExceptionError(res, t);
|
||||
}
|
||||
try {
|
||||
Queries.putMap(JortageProxy.dataSource, rreq.identity, destinationPath, hash);
|
||||
res.setStatus(200);
|
||||
JsonObject obj = new JsonObject();
|
||||
JsonObject result = new JsonObject();
|
||||
result.addProperty("name", retRes.name());
|
||||
result.addProperty("temperature", temp.name());
|
||||
obj.add("result", result);
|
||||
obj.addProperty("hash", hash.toString());
|
||||
sendJson(res, obj);
|
||||
} catch (Exception e) {
|
||||
jsonExceptionError(res, e, "sourceUrl: "+sourceUrl, "identity: "+rreq.identity, "hash: "+hash);
|
||||
return;
|
||||
}
|
||||
} else if (target.startsWith("/upload/")) {
|
||||
String expect = req.getHeader("Expect");
|
||||
if (expect == null || !expect.equals("100-continue")) {
|
||||
jsonError(res, 400, "Must expect continue");
|
||||
return;
|
||||
}
|
||||
String hashStr = req.getQueryString();
|
||||
if (hashStr == null || hashStr.length() != 128 || !HEX_MATCHER.matchesAllOf(hashStr)) {
|
||||
jsonError(res, 400, "Bad hash");
|
||||
return;
|
||||
}
|
||||
String path = target.substring(8);
|
||||
RivetRequest rreq = authenticateAndParse(target, "PUT", null, false, req, res);
|
||||
if (rreq == null) return;
|
||||
try {
|
||||
HashCode hash = HashCode.fromString(hashStr);
|
||||
RivetResult rres;
|
||||
Temperature temp;
|
||||
if (Queries.isMapped(JortageProxy.dataSource, hash)) {
|
||||
rres = RivetResult.FOUND;
|
||||
temp = Temperature.HOT;
|
||||
} else {
|
||||
ByteSinkSource bss = null;
|
||||
try {
|
||||
long len = req.getContentLengthLong();
|
||||
if (len == -1 || len > 8192) {
|
||||
bss = new FileByteSinkSource(File.createTempFile("jortage-proxy-", ".dat"), true);
|
||||
} else {
|
||||
bss = new MemoryByteSinkSource();
|
||||
}
|
||||
OutputStream sinkOut = bss.getSink().openStream();
|
||||
HashingOutputStream hos = new HashingOutputStream(Hashing.sha512(), sinkOut);
|
||||
// accessing the input stream sends a 100 Continue
|
||||
try (InputStream in = req.getInputStream()) {
|
||||
ByteStreams.copy(in, hos);
|
||||
}
|
||||
hos.close();
|
||||
HashCode realHash = hos.hash();
|
||||
if (!hash.equals(realHash)) {
|
||||
jsonError(res, 400, "Hash of body ("+realHash+") did not match hash in query ("+hash+")");
|
||||
return;
|
||||
}
|
||||
Blob blob = JortageProxy.backingBlobStore.blobBuilder(JortageProxy.hashToPath(hash.toString()))
|
||||
.payload(bss.getSource())
|
||||
.contentLength(bss.getSource().size())
|
||||
.contentType(req.getContentType())
|
||||
.build();
|
||||
long size = bss.getSource().size();
|
||||
JortageProxy.backingBlobStore.putBlob(JortageProxy.bucket, blob,
|
||||
new PutOptions().setBlobAccess(BlobAccess.PUBLIC_READ).multipart(size > 8192));
|
||||
Queries.putPendingBackup(JortageProxy.dataSource, hash);
|
||||
Queries.putFilesize(JortageProxy.dataSource, hash, size);
|
||||
rres = RivetResult.ADDED;
|
||||
temp = Temperature.FREEZING;
|
||||
} finally {
|
||||
if (bss != null) bss.close();
|
||||
}
|
||||
}
|
||||
Queries.putMap(JortageProxy.dataSource, rreq.identity, path, hash);
|
||||
res.setStatus(200);
|
||||
JsonObject obj = new JsonObject();
|
||||
JsonObject result = new JsonObject();
|
||||
result.addProperty("name", rres.name());
|
||||
result.addProperty("temperature", temp.name());
|
||||
obj.add("result", result);
|
||||
sendJson(res, obj);
|
||||
} catch (Exception e) {
|
||||
jsonExceptionError(res, e, "identity: "+rreq.identity, "target: "+target+(req.getQueryString() == null ? "" : "?"+req.getQueryString()));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
res.sendError(404);
|
||||
}
|
||||
}
|
||||
|
||||
private RivetRequest authenticateAndParse(String target, String method, String expectedContentType,
|
||||
boolean validateAndParseBody, HttpServletRequest req, HttpServletResponse res) throws IOException {
|
||||
if (expectedContentType != null)
|
||||
expectedContentType = expectedContentType.replace(" ", "").toLowerCase(Locale.ROOT);
|
||||
try {
|
||||
if ("OPTIONS".equals(req.getMethod())) {
|
||||
res.setHeader("Allow", method);
|
||||
res.setHeader("Accept", "application/json;charset=utf-8");
|
||||
res.setStatus(204);
|
||||
res.getOutputStream().close();
|
||||
return null;
|
||||
}
|
||||
if (!"POST".equals(req.getMethod())) {
|
||||
res.setHeader("Allow", method);
|
||||
jsonError(res, 405, "Only "+method+" is accepted");
|
||||
return null;
|
||||
}
|
||||
String authHeader = req.getHeader("Rivet-Auth");
|
||||
if (authHeader == null) {
|
||||
jsonError(res, 401, "Rivet-Auth header missing");
|
||||
return null;
|
||||
}
|
||||
Iterator<String> iter = RIVET_AUTH_SPLITTER.split(authHeader).iterator();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return null;
|
||||
}
|
||||
String identity = iter.next();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return null;
|
||||
}
|
||||
String macStr = iter.next();
|
||||
if (!iter.hasNext()) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Not enough fields)");
|
||||
return null;
|
||||
}
|
||||
String dateStr = iter.next();
|
||||
verify(!iter.hasNext());
|
||||
|
||||
Instant date;
|
||||
try {
|
||||
date = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(dateStr));
|
||||
} catch (DateTimeParseException e) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Could not parse date)");
|
||||
return null;
|
||||
}
|
||||
if (date.isBefore(Instant.now().minus(5, ChronoUnit.MINUTES))) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Too old)");
|
||||
return null;
|
||||
}
|
||||
if (date.isAfter(Instant.now().plus(2, ChronoUnit.MINUTES))) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (From future)");
|
||||
return null;
|
||||
}
|
||||
|
||||
JortageProxy.reloadConfigIfChanged();
|
||||
if (!JortageProxy.config.containsKey("users") || !JortageProxy.config.getObject("users").containsKey(identity)) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Bad access ID)");
|
||||
return null;
|
||||
}
|
||||
if (validateAndParseBody) {
|
||||
if (req.getContentLength() == -1) {
|
||||
jsonError(res, 411, "Length required");
|
||||
return null;
|
||||
}
|
||||
if (req.getContentLength() > 8192) {
|
||||
jsonError(res, 413, "Payload too large");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
if (expectedContentType != null) {
|
||||
String contentType = req.getHeader("Content-Type");
|
||||
if (contentType == null || !expectedContentType.equals(contentType.replace(" ", "").toLowerCase(Locale.ROOT))) {
|
||||
res.setHeader("Accept", expectedContentType);
|
||||
jsonError(res, 415, "Content-Type must be "+expectedContentType);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
byte[] theirMac = BaseEncoding.base64().decode(macStr);
|
||||
Mac mac = assertSuccess(() -> Mac.getInstance("HmacSHA512"));
|
||||
byte[] payload;
|
||||
if (validateAndParseBody) {
|
||||
payload = ByteStreams.toByteArray(ByteStreams.limit(req.getInputStream(), req.getContentLength()));
|
||||
req.getInputStream().close();
|
||||
} else {
|
||||
payload = new byte[0];
|
||||
}
|
||||
String payloadStr = new String(payload, Charsets.UTF_8);
|
||||
|
||||
String key = JortageProxy.config.getObject("users").get(String.class, identity);
|
||||
assertSuccess(() -> mac.init(new SecretKeySpec(key.getBytes(Charsets.UTF_8), "RAW")));
|
||||
String query;
|
||||
if (req.getQueryString() == null) {
|
||||
query = "";
|
||||
} else {
|
||||
query = "?"+req.getQueryString();
|
||||
}
|
||||
mac.update((target+query+":"+identity+":"+dateStr+":"+payloadStr).getBytes(Charsets.UTF_8));
|
||||
byte[] ourMac = mac.doFinal();
|
||||
if (!MessageDigest.isEqual(theirMac, ourMac)) {
|
||||
jsonError(res, 401, "Rivet-Auth header invalid (Bad MAC)");
|
||||
return null;
|
||||
}
|
||||
|
||||
JsonObject json;
|
||||
if (validateAndParseBody) {
|
||||
try {
|
||||
json = gson.fromJson(payloadStr, JsonObject.class);
|
||||
} catch (JsonSyntaxException e) {
|
||||
jsonError(res, 400, "Syntax error in payload");
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
json = null;
|
||||
}
|
||||
return new RivetRequest(identity, json);
|
||||
} catch (Throwable t) {
|
||||
jsonExceptionError(res, t);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void jsonExceptionError(HttpServletResponse res, Throwable t, String... extra) throws IOException {
|
||||
byte[] tokenBys = new byte[8];
|
||||
ThreadLocalRandom.current().nextBytes(tokenBys);
|
||||
|
@ -414,6 +537,11 @@ public final class RivetHandler extends AbstractHandler {
|
|||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
JsonObject obj = new JsonObject();
|
||||
obj.addProperty("error", msg);
|
||||
sendJson(res, obj);
|
||||
}
|
||||
|
||||
private void sendJson(HttpServletResponse res, JsonObject obj) throws IOException {
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.getOutputStream().write(obj.toString().getBytes(Charsets.UTF_8));
|
||||
res.getOutputStream().close();
|
||||
}
|
||||
|
|
|
@ -1,16 +1,26 @@
|
|||
package com.jortage.proxy;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.crypto.Mac;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonNull;
|
||||
import com.google.gson.JsonObject;
|
||||
|
||||
import com.google.gson.JsonPrimitive;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
|
||||
import kotlin.Pair;
|
||||
import okhttp3.MediaType;
|
||||
|
@ -19,51 +29,155 @@ import okhttp3.Request;
|
|||
import okhttp3.RequestBody;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.brotli.BrotliInterceptor;
|
||||
import okio.BufferedSink;
|
||||
import okio.Okio;
|
||||
|
||||
public class RivetTest {
|
||||
|
||||
private static final String HOST = "http://localhost:23280";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
OkHttpClient client = new OkHttpClient.Builder()
|
||||
.addInterceptor(BrotliInterceptor.INSTANCE)
|
||||
.addNetworkInterceptor((chain) -> {
|
||||
Request req = chain.request();
|
||||
System.out.println("\u001B[0m\u001B[7m "+req.method()+" \u001B[0m "+req.url());
|
||||
for (Pair<? extends String, ? extends String> pair : req.headers()) {
|
||||
System.out.println("\u001B[38;5;117m"+pair.getFirst()+": \u001B[38;5;213m"+pair.getSecond());
|
||||
}
|
||||
System.out.println("\u001B[0m");
|
||||
if (!req.body().isOneShot() && req.body().contentType().toString().startsWith("application/json")) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
BufferedSink bs = Okio.buffer(Okio.sink(baos));
|
||||
req.body().writeTo(bs);
|
||||
bs.emit();
|
||||
JsonObject obj = new Gson().fromJson(new String(baos.toByteArray(), Charsets.UTF_8), JsonObject.class);
|
||||
prettyPrint(obj, "");
|
||||
} else {
|
||||
long len = req.body().contentLength();
|
||||
System.out.println("<"+(len == -1 ? "?" : len)+" bytes>");
|
||||
}
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
Response res = chain.proceed(req);
|
||||
if (res.isSuccessful()) {
|
||||
System.out.print("\u001B[102m\u001B[30m");
|
||||
} else if (res.isRedirect()) {
|
||||
System.out.print("\u001B[37m");
|
||||
} else if (res.code() >= 400 && res.code() <= 499) {
|
||||
System.out.print("\u001B[105m\u001B[30m");
|
||||
} else if (res.code() >= 500 && res.code() <= 599) {
|
||||
System.out.print("\u001B[101m\u001B[37m");
|
||||
} else {
|
||||
System.out.print("\u001B[46m\u001B[30m");
|
||||
}
|
||||
System.out.println(" "+res.code()+" \u001B[0m "+res.message());
|
||||
for (Pair<? extends String, ? extends String> pair : res.headers()) {
|
||||
System.out.println("\u001B[38;5;117m"+pair.getFirst()+": \u001B[38;5;213m"+pair.getSecond());
|
||||
}
|
||||
System.out.println("\u001B[0m");
|
||||
if (res.body().contentLength() != 0) {
|
||||
if (res.body().contentType() != null && res.body().contentType().toString().startsWith("application/json")) {
|
||||
JsonObject obj = new Gson().fromJson(res.body().byteString().utf8(), JsonObject.class);
|
||||
prettyPrint(obj, "");
|
||||
} else {
|
||||
ByteStreams.copy(res.body().byteStream(), System.out);
|
||||
}
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
}
|
||||
return res;
|
||||
})
|
||||
.build();
|
||||
JsonObject obj = new JsonObject();
|
||||
obj.addProperty("sourceUrl", "http://example.com/nothing.png");
|
||||
obj.addProperty("destinationPath", "test.png");
|
||||
String payload = obj.toString();
|
||||
String accessKey = "test";
|
||||
String secretKey = "test";
|
||||
obj.addProperty("sourceUrl", "https://blob.jortage.com/site/jortage_header_logo_dark.png");
|
||||
obj.addProperty("destinationPath", "jortage_logo.png");
|
||||
// doRivetRequest(client, "/retrieve", "test", "test", "application/json; charset=utf-8", ByteSource.wrap(obj.toString().getBytes(Charsets.UTF_8)), true);
|
||||
doRivetRequest(client, "/upload/fastorange.png?b28e0f25d21559880fdd027f35b2359f810bc88ae01ed1220ce85a2038ab584332402270840cb7bff3bd6e53dd7e8d2edf9078d05baf503e1f646dc74b39118a",
|
||||
"test", "test", "image/png", Files.asByteSource(new File("fastorange.png")), false);
|
||||
}
|
||||
|
||||
private static void prettyPrint(JsonElement ele, String indent) {
|
||||
if (ele instanceof JsonObject) {
|
||||
System.out.println("{");
|
||||
String origIndent = indent;
|
||||
indent = indent+" ";
|
||||
for (Map.Entry<String, JsonElement> en : ele.getAsJsonObject().entrySet()) {
|
||||
System.out.print(indent+"\u001B[38;5;117m"+en.getKey()+": \u001B[0m");
|
||||
prettyPrint(en.getValue(), indent);
|
||||
}
|
||||
System.out.println(origIndent+"}");
|
||||
} else if (ele instanceof JsonArray) {
|
||||
System.out.println("[");
|
||||
String origIndent = indent;
|
||||
indent = indent+" ";
|
||||
for (JsonElement e : ele.getAsJsonArray()) {
|
||||
System.out.print(indent);
|
||||
prettyPrint(e, indent);
|
||||
}
|
||||
System.out.println(origIndent+"]");
|
||||
} else if (ele instanceof JsonNull) {
|
||||
System.out.println("\u001B[90mnull\u001B[0m");
|
||||
} else if (ele instanceof JsonPrimitive) {
|
||||
if (((JsonPrimitive) ele).isString()) {
|
||||
System.out.println("\u001B[38;5;213m"+ele+"\u001B[0m");
|
||||
} else {
|
||||
System.out.println("\u001B[38;5;48m"+ele+"\u001B[0m");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void doRivetRequest(OkHttpClient client, String target, String accessKey, String secretKey,
|
||||
String contentType, ByteSource payload, boolean signPayload) throws IOException {
|
||||
String date = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
|
||||
Mac mac = assertSuccess(() -> Mac.getInstance("HmacSHA512"));
|
||||
byte[] payloadBytes = payload.getBytes(Charsets.UTF_8);
|
||||
byte[] payloadBytes = signPayload ? payload.read() : new byte[0];
|
||||
String payloadStr = new String(payloadBytes, Charsets.UTF_8);
|
||||
|
||||
assertSuccess(() -> mac.init(new SecretKeySpec(secretKey.getBytes(Charsets.UTF_8), "RAW")));
|
||||
mac.update((accessKey+":"+date+":"+payload).getBytes(Charsets.UTF_8));
|
||||
mac.update((target+":"+accessKey+":"+date+":"+payloadStr).getBytes(Charsets.UTF_8));
|
||||
byte[] macBys = mac.doFinal();
|
||||
String auth = accessKey+":"+BaseEncoding.base64().encode(macBys)+":"+date;
|
||||
System.out.println(auth);
|
||||
if (Integer.valueOf(4) == 4) return;
|
||||
try (Response res = client.newCall(new Request.Builder()
|
||||
.url("http://localhost:23280/retrieve")
|
||||
.post(RequestBody.create(payloadBytes, MediaType.parse("application/json; charset=utf-8")))
|
||||
.url(HOST+target)
|
||||
.post(signPayload ? RequestBody.create(payloadBytes, MediaType.parse(contentType))
|
||||
: new ByteSourceRequestBody(payload, MediaType.parse(contentType)))
|
||||
.header("Rivet-Auth", auth)
|
||||
.header("User-Agent", "Jortage Rivet Test")
|
||||
.header("Expect", signPayload ? "102-processing" : "100-continue")
|
||||
.build()).execute()) {
|
||||
Request req = res.networkResponse().request();
|
||||
System.out.println(req.method()+" "+req.url());
|
||||
for (Pair<? extends String, ? extends String> pair : req.headers()) {
|
||||
System.out.println(pair.getFirst()+": "+pair.getSecond());
|
||||
}
|
||||
System.out.println();
|
||||
System.out.println(payload);
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
System.out.println(res.code()+" "+res.message());
|
||||
for (Pair<? extends String, ? extends String> pair : res.headers()) {
|
||||
System.out.println(pair.getFirst()+": "+pair.getSecond());
|
||||
}
|
||||
System.out.println();
|
||||
ByteStreams.copy(res.body().byteStream(), System.out);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ByteSourceRequestBody extends RequestBody {
|
||||
|
||||
private final ByteSource source;
|
||||
private final MediaType type;
|
||||
|
||||
public ByteSourceRequestBody(ByteSource source, MediaType type) {
|
||||
this.source = source;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MediaType contentType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long contentLength() throws IOException {
|
||||
return source.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(BufferedSink sink) throws IOException {
|
||||
source.copyTo(sink.outputStream());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private interface ExceptableRunnable { void run() throws Exception; }
|
||||
private interface ExceptableSupplier<T> { T get() throws Exception; }
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue