From d38d702adfde7be7727079e837f7ae2d4324609c Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 2 Jun 2020 12:53:28 -0400 Subject: [PATCH] Parallelize group sends. --- .../ApplicationDependencyProvider.java | 4 +- .../api/SignalServiceMessagePipe.java | 84 +++++++++--------- .../api/SignalServiceMessageSender.java | 85 +++++++++++++------ .../util/concurrent/FutureMapTransformer.java | 73 ++++++++++++++++ .../util/concurrent/FutureTransformers.java | 12 +++ .../websocket/WebSocketConnection.java | 59 ++++++++----- .../internal/websocket/WebsocketResponse.java | 19 +++++ 7 files changed, 250 insertions(+), 86 deletions(-) create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureMapTransformer.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureTransformers.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebsocketResponse.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 87fae5ab8..5eb124540 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.util.EarlyMessageCache; import org.thoughtcrime.securesms.util.FeatureFlags; import org.thoughtcrime.securesms.util.FrameRateTracker; import org.thoughtcrime.securesms.util.TextSecurePreferences; +import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceAccountManager; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; @@ -89,7 +90,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr Optional.fromNullable(IncomingMessageObserver.getPipe()), Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()), Optional.of(new SecurityEventListener(context)), - provideClientZkOperations().getProfileOperations()); + provideClientZkOperations().getProfileOperations(), + SignalExecutors.UNBOUNDED); } @Override diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java index 09cdac198..59e7aca6e 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java @@ -17,7 +17,6 @@ import org.signal.zkgroup.profiles.ProfileKeyCredentialRequestContext; import org.signal.zkgroup.profiles.ProfileKeyVersion; import org.whispersystems.libsignal.InvalidVersionException; import org.whispersystems.libsignal.util.Hex; -import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; @@ -31,7 +30,10 @@ import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; import org.whispersystems.signalservice.internal.push.SendMessageResponse; import org.whispersystems.signalservice.internal.util.JsonUtil; import org.whispersystems.signalservice.internal.util.Util; +import org.whispersystems.signalservice.internal.util.concurrent.FutureTransformers; +import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; +import org.whispersystems.signalservice.internal.websocket.WebsocketResponse; import org.whispersystems.util.Base64; import java.io.IOException; @@ -40,6 +42,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -155,35 +158,36 @@ public class SignalServiceMessagePipe { } } - public SendMessageResponse send(OutgoingPushMessageList list, Optional unidentifiedAccess) throws IOException { - try { - List headers = new LinkedList() {{ - add("content-type:application/json"); - }}; + public Future send(OutgoingPushMessageList list, Optional unidentifiedAccess) throws IOException { + List headers = new LinkedList() {{ + add("content-type:application/json"); + }}; - if (unidentifiedAccess.isPresent()) { - headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())); - } - - WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() - .setId(new SecureRandom().nextLong()) - .setVerb("PUT") - .setPath(String.format("/v1/messages/%s", list.getDestination())) - .addAllHeaders(headers) - .setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes())) - .build(); - - Pair response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - - if (response.first() < 200 || response.first() >= 300) { - throw new IOException("Non-successful response: " + response.first()); - } - - if (Util.isEmpty(response.second())) return new SendMessageResponse(false); - else return JsonUtil.fromJson(response.second(), SendMessageResponse.class); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IOException(e); + if (unidentifiedAccess.isPresent()) { + headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())); } + + WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() + .setId(new SecureRandom().nextLong()) + .setVerb("PUT") + .setPath(String.format("/v1/messages/%s", list.getDestination())) + .addAllHeaders(headers) + .setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes())) + .build(); + + ListenableFuture response = websocket.sendRequest(requestMessage); + + return FutureTransformers.map(response, value -> { + if (value.getStatus() < 200 || value.getStatus() >= 300) { + throw new IOException("Non-successful response: " + value.getStatus()); + } + + if (Util.isEmpty(value.getBody())) { + return new SendMessageResponse(false); + } else { + return JsonUtil.fromJson(value.getBody(), SendMessageResponse.class); + } + }); } public ProfileAndCredential getProfile(SignalServiceAddress address, @@ -229,13 +233,13 @@ public class SignalServiceMessagePipe { WebSocketRequestMessage requestMessage = builder.build(); - Pair response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); + WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - if (response.first() < 200 || response.first() >= 300) { - throw new IOException("Non-successful response: " + response.first()); + if (response.getStatus() < 200 || response.getStatus() >= 300) { + throw new IOException("Non-successful response: " + response.getStatus()); } - SignalServiceProfile signalServiceProfile = JsonUtil.fromJson(response.second(), SignalServiceProfile.class); + SignalServiceProfile signalServiceProfile = JsonUtil.fromJson(response.getBody(), SignalServiceProfile.class); ProfileKeyCredential profileKeyCredential = requestContext != null && signalServiceProfile.getProfileKeyCredentialResponse() != null ? clientZkProfile.receiveProfileKeyCredential(requestContext, signalServiceProfile.getProfileKeyCredentialResponse()) : null; @@ -254,13 +258,13 @@ public class SignalServiceMessagePipe { .setPath("/v2/attachments/form/upload") .build(); - Pair response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); + WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - if (response.first() < 200 || response.first() >= 300) { - throw new IOException("Non-successful response: " + response.first()); + if (response.getStatus() < 200 || response.getStatus() >= 300) { + throw new IOException("Non-successful response: " + response.getStatus()); } - return JsonUtil.fromJson(response.second(), AttachmentV2UploadAttributes.class); + return JsonUtil.fromJson(response.getBody(), AttachmentV2UploadAttributes.class); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new IOException(e); } @@ -274,13 +278,13 @@ public class SignalServiceMessagePipe { .setPath("/v3/attachments/form/upload") .build(); - Pair response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); + WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - if (response.first() < 200 || response.first() >= 300) { - throw new IOException("Non-successful response: " + response.first()); + if (response.getStatus() < 200 || response.getStatus() >= 300) { + throw new IOException("Non-successful response: " + response.getStatus()); } - return JsonUtil.fromJson(response.second(), AttachmentV3UploadAttributes.class); + return JsonUtil.fromJson(response.getBody(), AttachmentV3UploadAttributes.class); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new IOException(e); } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index c805b25e8..e8f60dbd3 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -88,11 +88,19 @@ import org.whispersystems.util.Base64; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; +import java.sql.Time; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -105,6 +113,8 @@ public class SignalServiceMessageSender { private static final String TAG = SignalServiceMessageSender.class.getSimpleName(); + private static final int RETRY_COUNT = 4; + private final PushServiceSocket socket; private final SignalProtocolStore store; private final SignalServiceAddress localAddress; @@ -115,6 +125,8 @@ public class SignalServiceMessageSender { private final AtomicBoolean isMultiDevice; private final AtomicBoolean attachmentsV3; + private final ExecutorService executor; + /** * Construct a SignalServiceMessageSender. * @@ -135,9 +147,10 @@ public class SignalServiceMessageSender { Optional pipe, Optional unidentifiedPipe, Optional eventListener, - ClientZkProfileOperations clientZkProfileOperations) + ClientZkProfileOperations clientZkProfileOperations, + ExecutorService executor) { - this(urls, new StaticCredentialsProvider(uuid, e164, password, null), store, signalAgent, isMultiDevice, attachmentsV3, pipe, unidentifiedPipe, eventListener, clientZkProfileOperations); + this(urls, new StaticCredentialsProvider(uuid, e164, password, null), store, signalAgent, isMultiDevice, attachmentsV3, pipe, unidentifiedPipe, eventListener, clientZkProfileOperations, executor); } public SignalServiceMessageSender(SignalServiceConfiguration urls, @@ -149,7 +162,8 @@ public class SignalServiceMessageSender { Optional pipe, Optional unidentifiedPipe, Optional eventListener, - ClientZkProfileOperations clientZkProfileOperations) + ClientZkProfileOperations clientZkProfileOperations, + ExecutorService executor) { this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations); this.store = store; @@ -159,6 +173,7 @@ public class SignalServiceMessageSender { this.isMultiDevice = new AtomicBoolean(isMultiDevice); this.attachmentsV3 = new AtomicBoolean(attachmentsV3); this.eventListener = eventListener; + this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); } /** @@ -1188,28 +1203,43 @@ public class SignalServiceMessageSender { boolean online) throws IOException { - List results = new LinkedList<>(); + long startTime = System.currentTimeMillis(); + List> futureResults = new LinkedList<>(); Iterator recipientIterator = recipients.iterator(); Iterator> unidentifiedAccessIterator = unidentifiedAccess.iterator(); while (recipientIterator.hasNext()) { - SignalServiceAddress recipient = recipientIterator.next(); + SignalServiceAddress recipient = recipientIterator.next(); + Optional access = unidentifiedAccessIterator.next(); + futureResults.add(executor.submit(() -> sendMessage(recipient, access, timestamp, content, online))); + } + List results = new ArrayList<>(futureResults.size()); + recipientIterator = recipients.iterator(); + + for (Future futureResult : futureResults) { + SignalServiceAddress recipient = recipientIterator.next(); try { - SendMessageResult result = sendMessage(recipient, unidentifiedAccessIterator.next(), timestamp, content, online); - results.add(result); - } catch (UntrustedIdentityException e) { - Log.w(TAG, e); - results.add(SendMessageResult.identityFailure(recipient, e.getIdentityKey())); - } catch (UnregisteredUserException e) { - Log.w(TAG, e); - results.add(SendMessageResult.unregisteredFailure(recipient)); - } catch (PushNetworkException e) { - Log.w(TAG, e); - results.add(SendMessageResult.networkFailure(recipient)); + results.add(futureResult.get()); + } catch (ExecutionException e) { + if (e.getCause() instanceof UntrustedIdentityException) { + Log.w(TAG, e); + results.add(SendMessageResult.identityFailure(recipient, ((UntrustedIdentityException) e.getCause()).getIdentityKey())); + } else if (e.getCause() instanceof UnregisteredUserException) { + Log.w(TAG, e); + results.add(SendMessageResult.unregisteredFailure(recipient)); + } else if (e.getCause() instanceof PushNetworkException) { + Log.w(TAG, e); + results.add(SendMessageResult.networkFailure(recipient)); + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + throw new IOException(e); } } + Log.d(TAG, "Completed send to " + recipients.size() + " recipients in " + (System.currentTimeMillis() - startTime) + " ms"); return results; } @@ -1220,7 +1250,9 @@ public class SignalServiceMessageSender { boolean online) throws UntrustedIdentityException, IOException { - for (int i=0;i<4;i++) { + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < RETRY_COUNT; i++) { try { OutgoingPushMessageList messages = getEncryptedMessages(socket, recipient, unidentifiedAccess, timestamp, content, online); Optional pipe = this.pipe.get(); @@ -1228,26 +1260,27 @@ public class SignalServiceMessageSender { if (pipe.isPresent() && !unidentifiedAccess.isPresent()) { try { - Log.i(TAG, "[sendMessage] Transmitting over pipe..."); - SendMessageResponse response = pipe.get().send(messages, Optional.absent()); + SendMessageResponse response = pipe.get().send(messages, Optional.absent()).get(10, TimeUnit.SECONDS); + Log.d(TAG, "[sendMessage] Completed over pipe in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)"); return SendMessageResult.success(recipient, false, response.getNeedsSync() || isMultiDevice.get()); - } catch (IOException e) { + } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { Log.w(TAG, e); - Log.w(TAG, "[sendMessage] Falling back to new connection..."); + Log.w(TAG, "[sendMessage] Pipe failed, falling back..."); } } else if (unidentifiedPipe.isPresent() && unidentifiedAccess.isPresent()) { try { - Log.i(TAG, "[sendMessage] Transmitting over unidentified pipe..."); - SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess); + SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess).get(10, TimeUnit.SECONDS); + Log.d(TAG, "[sendMessage] Completed over unidentified pipe in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)"); return SendMessageResult.success(recipient, true, response.getNeedsSync() || isMultiDevice.get()); - } catch (IOException e) { + } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { Log.w(TAG, e); - Log.w(TAG, "[sendMessage] Falling back to new connection..."); + Log.w(TAG, "[sendMessage] Unidentified pipe failed, falling back..."); } } - Log.w(TAG, "[sendMessage] Not transmitting over pipe..."); SendMessageResponse response = socket.sendMessage(messages, unidentifiedAccess); + + Log.d(TAG, "[sendMessage] Completed over REST in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)"); return SendMessageResult.success(recipient, unidentifiedAccess.isPresent(), response.getNeedsSync() || isMultiDevice.get()); } catch (InvalidKeyException ike) { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureMapTransformer.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureMapTransformer.java new file mode 100644 index 000000000..cc2e926e3 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureMapTransformer.java @@ -0,0 +1,73 @@ +package org.whispersystems.signalservice.internal.util.concurrent; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Lets you perform a simple transform on the result of a future that maps it to a different value. + */ +class FutureMapTransformer implements ListenableFuture { + + private final ListenableFuture future; + private final FutureTransformers.Transformer transformer; + + FutureMapTransformer(ListenableFuture future, FutureTransformers.Transformer transformer) { + this.future = future; + this.transformer = transformer; + } + + @Override + public void addListener(Listener listener) { + future.addListener(new Listener() { + @Override + public void onSuccess(Input result) { + try { + listener.onSuccess(transformer.transform(result)); + } catch (Exception e) { + listener.onFailure(new ExecutionException(e)); + } + } + + @Override + public void onFailure(ExecutionException e) { + listener.onFailure(e); + } + }); + } + + @Override + public boolean cancel(boolean b) { + return future.cancel(b); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public Output get() throws InterruptedException, ExecutionException { + Input input = future.get(); + try { + return transformer.transform(input); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public Output get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + Input input = future.get(l, timeUnit); + try { + return transformer.transform(input); + } catch (Exception e) { + throw new ExecutionException(e); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureTransformers.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureTransformers.java new file mode 100644 index 000000000..56ea40e71 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/concurrent/FutureTransformers.java @@ -0,0 +1,12 @@ +package org.whispersystems.signalservice.internal.util.concurrent; + +public final class FutureTransformers { + + public static ListenableFuture map(ListenableFuture future, Transformer transformer) { + return new FutureMapTransformer<>(future, transformer); + } + + public interface Transformer { + Output transform(Input a) throws Exception; + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java index 20acec4ee..206760de8 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java @@ -12,6 +12,7 @@ import org.whispersystems.signalservice.api.util.Tls12SocketFactory; import org.whispersystems.signalservice.api.websocket.ConnectivityListener; import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager; import org.whispersystems.signalservice.internal.util.Util; +import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; import org.whispersystems.signalservice.internal.util.concurrent.SettableFuture; import java.io.IOException; @@ -52,7 +53,7 @@ public class WebSocketConnection extends WebSocketListener { private static final int KEEPALIVE_TIMEOUT_SECONDS = 55; private final LinkedList incomingRequests = new LinkedList<>(); - private final Map>> outgoingRequests = new HashMap<>(); + private final Map outgoingRequests = new HashMap<>(); private final String wsUri; private final TrustStore trustStore; @@ -94,7 +95,7 @@ public class WebSocketConnection extends WebSocketListener { } public synchronized void connect() { - Log.i(TAG, "WSC connect()..."); + Log.i(TAG, "connect()"); if (client == null) { String filledUri; @@ -137,7 +138,7 @@ public class WebSocketConnection extends WebSocketListener { } public synchronized void disconnect() { - Log.i(TAG, "WSC disconnect()..."); + Log.i(TAG, "disconnect()"); if (client != null) { client.close(1000, "OK"); @@ -169,7 +170,7 @@ public class WebSocketConnection extends WebSocketListener { else return incomingRequests.removeFirst(); } - public synchronized Future> sendRequest(WebSocketRequestMessage request) throws IOException { + public synchronized ListenableFuture sendRequest(WebSocketRequestMessage request) throws IOException { if (client == null || !connected) throw new IOException("No connection!"); WebSocketMessage message = WebSocketMessage.newBuilder() @@ -177,8 +178,8 @@ public class WebSocketConnection extends WebSocketListener { .setRequest(request) .build(); - SettableFuture> future = new SettableFuture<>(); - outgoingRequests.put(request.getId(), future); + SettableFuture future = new SettableFuture<>(); + outgoingRequests.put(request.getId(), new OutgoingRequest(future, System.currentTimeMillis())); if (!client.send(ByteString.of(message.toByteArray()))) { throw new IOException("Write failed!"); @@ -222,7 +223,7 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onOpen(WebSocket webSocket, Response response) { if (client != null && keepAliveSender == null) { - Log.i(TAG, "onConnected()"); + Log.i(TAG, "onOpen() connected"); attempts = 0; connected = true; keepAliveSender = new KeepAliveSender(); @@ -234,18 +235,21 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onMessage(WebSocket webSocket, ByteString payload) { - Log.d(TAG, "WSC onMessage()"); try { WebSocketMessage message = WebSocketMessage.parseFrom(payload.toByteArray()); - Log.d(TAG, "Message Type: " + message.getType().getNumber()); - if (message.getType().getNumber() == WebSocketMessage.Type.REQUEST_VALUE) { + Log.d(TAG, "onMessage() -- incoming request"); incomingRequests.add(message.getRequest()); } else if (message.getType().getNumber() == WebSocketMessage.Type.RESPONSE_VALUE) { - SettableFuture> listener = outgoingRequests.get(message.getResponse().getId()); - if (listener != null) listener.set(new Pair<>(message.getResponse().getStatus(), - new String(message.getResponse().getBody().toByteArray()))); + OutgoingRequest listener = outgoingRequests.get(message.getResponse().getId()); + if (listener != null) { + listener.getResponseFuture().set(new WebsocketResponse(message.getResponse().getStatus(), + new String(message.getResponse().getBody().toByteArray()))); + Log.d(TAG, "onMessage() -- response received in " + (System.currentTimeMillis() - listener.getStartTimestamp()) + " ms"); + } else { + Log.d(TAG, "onMessage() -- response received, but no listener"); + } } notifyAll(); @@ -256,14 +260,14 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onClosed(WebSocket webSocket, int code, String reason) { - Log.i(TAG, "onClose()..."); + Log.i(TAG, "onClose()"); this.connected = false; - Iterator>>> iterator = outgoingRequests.entrySet().iterator(); + Iterator> iterator = outgoingRequests.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry>> entry = iterator.next(); - entry.getValue().setException(new IOException("Closed: " + code + ", " + reason)); + Map.Entry entry = iterator.next(); + entry.getValue().getResponseFuture().setException(new IOException("Closed: " + code + ", " + reason)); iterator.remove(); } @@ -303,12 +307,12 @@ public class WebSocketConnection extends WebSocketListener { @Override public void onMessage(WebSocket webSocket, String text) { - Log.d(TAG, "onMessage(text)! " + text); + Log.d(TAG, "onMessage(text)"); } @Override public synchronized void onClosing(WebSocket webSocket, int code, String reason) { - Log.i(TAG, "onClosing()!..."); + Log.i(TAG, "onClosing()"); webSocket.close(1000, "OK"); } @@ -350,4 +354,21 @@ public class WebSocketConnection extends WebSocketListener { } } + private static class OutgoingRequest { + private final SettableFuture responseFuture; + private final long startTimestamp; + + private OutgoingRequest(SettableFuture future, long startTimestamp) { + this.responseFuture = future; + this.startTimestamp = startTimestamp; + } + + SettableFuture getResponseFuture() { + return responseFuture; + } + + long getStartTimestamp() { + return startTimestamp; + } + } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebsocketResponse.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebsocketResponse.java new file mode 100644 index 000000000..76b403c28 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebsocketResponse.java @@ -0,0 +1,19 @@ +package org.whispersystems.signalservice.internal.websocket; + +public class WebsocketResponse { + private final int status; + private final String body; + + WebsocketResponse(int status, String body) { + this.status = status; + this.body = body; + } + + public int getStatus() { + return status; + } + + public String getBody() { + return body; + } +}