kopia lustrzana https://github.com/ryukoposting/Signal-Android
Parallelize group sends.
rodzic
04a000a8a8
commit
d38d702adf
|
@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.util.EarlyMessageCache;
|
|||
import org.thoughtcrime.securesms.util.FeatureFlags;
|
||||
import org.thoughtcrime.securesms.util.FrameRateTracker;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
|
||||
import org.whispersystems.libsignal.util.guava.Optional;
|
||||
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
|
||||
|
@ -89,7 +90,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
|
|||
Optional.fromNullable(IncomingMessageObserver.getPipe()),
|
||||
Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()),
|
||||
Optional.of(new SecurityEventListener(context)),
|
||||
provideClientZkOperations().getProfileOperations());
|
||||
provideClientZkOperations().getProfileOperations(),
|
||||
SignalExecutors.UNBOUNDED);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,7 +17,6 @@ import org.signal.zkgroup.profiles.ProfileKeyCredentialRequestContext;
|
|||
import org.signal.zkgroup.profiles.ProfileKeyVersion;
|
||||
import org.whispersystems.libsignal.InvalidVersionException;
|
||||
import org.whispersystems.libsignal.util.Hex;
|
||||
import org.whispersystems.libsignal.util.Pair;
|
||||
import org.whispersystems.libsignal.util.guava.Optional;
|
||||
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
|
||||
|
@ -31,7 +30,10 @@ import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList;
|
|||
import org.whispersystems.signalservice.internal.push.SendMessageResponse;
|
||||
import org.whispersystems.signalservice.internal.util.JsonUtil;
|
||||
import org.whispersystems.signalservice.internal.util.Util;
|
||||
import org.whispersystems.signalservice.internal.util.concurrent.FutureTransformers;
|
||||
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
|
||||
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
|
||||
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse;
|
||||
import org.whispersystems.util.Base64;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -40,6 +42,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -155,35 +158,36 @@ public class SignalServiceMessagePipe {
|
|||
}
|
||||
}
|
||||
|
||||
public SendMessageResponse send(OutgoingPushMessageList list, Optional<UnidentifiedAccess> unidentifiedAccess) throws IOException {
|
||||
try {
|
||||
List<String> headers = new LinkedList<String>() {{
|
||||
add("content-type:application/json");
|
||||
}};
|
||||
public Future<SendMessageResponse> send(OutgoingPushMessageList list, Optional<UnidentifiedAccess> unidentifiedAccess) throws IOException {
|
||||
List<String> headers = new LinkedList<String>() {{
|
||||
add("content-type:application/json");
|
||||
}};
|
||||
|
||||
if (unidentifiedAccess.isPresent()) {
|
||||
headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey()));
|
||||
}
|
||||
|
||||
WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder()
|
||||
.setId(new SecureRandom().nextLong())
|
||||
.setVerb("PUT")
|
||||
.setPath(String.format("/v1/messages/%s", list.getDestination()))
|
||||
.addAllHeaders(headers)
|
||||
.setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes()))
|
||||
.build();
|
||||
|
||||
Pair<Integer, String> response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
|
||||
if (response.first() < 200 || response.first() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.first());
|
||||
}
|
||||
|
||||
if (Util.isEmpty(response.second())) return new SendMessageResponse(false);
|
||||
else return JsonUtil.fromJson(response.second(), SendMessageResponse.class);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
throw new IOException(e);
|
||||
if (unidentifiedAccess.isPresent()) {
|
||||
headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey()));
|
||||
}
|
||||
|
||||
WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder()
|
||||
.setId(new SecureRandom().nextLong())
|
||||
.setVerb("PUT")
|
||||
.setPath(String.format("/v1/messages/%s", list.getDestination()))
|
||||
.addAllHeaders(headers)
|
||||
.setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes()))
|
||||
.build();
|
||||
|
||||
ListenableFuture<WebsocketResponse> response = websocket.sendRequest(requestMessage);
|
||||
|
||||
return FutureTransformers.map(response, value -> {
|
||||
if (value.getStatus() < 200 || value.getStatus() >= 300) {
|
||||
throw new IOException("Non-successful response: " + value.getStatus());
|
||||
}
|
||||
|
||||
if (Util.isEmpty(value.getBody())) {
|
||||
return new SendMessageResponse(false);
|
||||
} else {
|
||||
return JsonUtil.fromJson(value.getBody(), SendMessageResponse.class);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ProfileAndCredential getProfile(SignalServiceAddress address,
|
||||
|
@ -229,13 +233,13 @@ public class SignalServiceMessagePipe {
|
|||
|
||||
WebSocketRequestMessage requestMessage = builder.build();
|
||||
|
||||
Pair<Integer, String> response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
|
||||
if (response.first() < 200 || response.first() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.first());
|
||||
if (response.getStatus() < 200 || response.getStatus() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.getStatus());
|
||||
}
|
||||
|
||||
SignalServiceProfile signalServiceProfile = JsonUtil.fromJson(response.second(), SignalServiceProfile.class);
|
||||
SignalServiceProfile signalServiceProfile = JsonUtil.fromJson(response.getBody(), SignalServiceProfile.class);
|
||||
ProfileKeyCredential profileKeyCredential = requestContext != null && signalServiceProfile.getProfileKeyCredentialResponse() != null
|
||||
? clientZkProfile.receiveProfileKeyCredential(requestContext, signalServiceProfile.getProfileKeyCredentialResponse())
|
||||
: null;
|
||||
|
@ -254,13 +258,13 @@ public class SignalServiceMessagePipe {
|
|||
.setPath("/v2/attachments/form/upload")
|
||||
.build();
|
||||
|
||||
Pair<Integer, String> response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
|
||||
if (response.first() < 200 || response.first() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.first());
|
||||
if (response.getStatus() < 200 || response.getStatus() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.getStatus());
|
||||
}
|
||||
|
||||
return JsonUtil.fromJson(response.second(), AttachmentV2UploadAttributes.class);
|
||||
return JsonUtil.fromJson(response.getBody(), AttachmentV2UploadAttributes.class);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
@ -274,13 +278,13 @@ public class SignalServiceMessagePipe {
|
|||
.setPath("/v3/attachments/form/upload")
|
||||
.build();
|
||||
|
||||
Pair<Integer, String> response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS);
|
||||
|
||||
if (response.first() < 200 || response.first() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.first());
|
||||
if (response.getStatus() < 200 || response.getStatus() >= 300) {
|
||||
throw new IOException("Non-successful response: " + response.getStatus());
|
||||
}
|
||||
|
||||
return JsonUtil.fromJson(response.second(), AttachmentV3UploadAttributes.class);
|
||||
return JsonUtil.fromJson(response.getBody(), AttachmentV3UploadAttributes.class);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
|
|
@ -88,11 +88,19 @@ import org.whispersystems.util.Base64;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.SecureRandom;
|
||||
import java.sql.Time;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -105,6 +113,8 @@ public class SignalServiceMessageSender {
|
|||
|
||||
private static final String TAG = SignalServiceMessageSender.class.getSimpleName();
|
||||
|
||||
private static final int RETRY_COUNT = 4;
|
||||
|
||||
private final PushServiceSocket socket;
|
||||
private final SignalProtocolStore store;
|
||||
private final SignalServiceAddress localAddress;
|
||||
|
@ -115,6 +125,8 @@ public class SignalServiceMessageSender {
|
|||
private final AtomicBoolean isMultiDevice;
|
||||
private final AtomicBoolean attachmentsV3;
|
||||
|
||||
private final ExecutorService executor;
|
||||
|
||||
/**
|
||||
* Construct a SignalServiceMessageSender.
|
||||
*
|
||||
|
@ -135,9 +147,10 @@ public class SignalServiceMessageSender {
|
|||
Optional<SignalServiceMessagePipe> pipe,
|
||||
Optional<SignalServiceMessagePipe> unidentifiedPipe,
|
||||
Optional<EventListener> eventListener,
|
||||
ClientZkProfileOperations clientZkProfileOperations)
|
||||
ClientZkProfileOperations clientZkProfileOperations,
|
||||
ExecutorService executor)
|
||||
{
|
||||
this(urls, new StaticCredentialsProvider(uuid, e164, password, null), store, signalAgent, isMultiDevice, attachmentsV3, pipe, unidentifiedPipe, eventListener, clientZkProfileOperations);
|
||||
this(urls, new StaticCredentialsProvider(uuid, e164, password, null), store, signalAgent, isMultiDevice, attachmentsV3, pipe, unidentifiedPipe, eventListener, clientZkProfileOperations, executor);
|
||||
}
|
||||
|
||||
public SignalServiceMessageSender(SignalServiceConfiguration urls,
|
||||
|
@ -149,7 +162,8 @@ public class SignalServiceMessageSender {
|
|||
Optional<SignalServiceMessagePipe> pipe,
|
||||
Optional<SignalServiceMessagePipe> unidentifiedPipe,
|
||||
Optional<EventListener> eventListener,
|
||||
ClientZkProfileOperations clientZkProfileOperations)
|
||||
ClientZkProfileOperations clientZkProfileOperations,
|
||||
ExecutorService executor)
|
||||
{
|
||||
this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations);
|
||||
this.store = store;
|
||||
|
@ -159,6 +173,7 @@ public class SignalServiceMessageSender {
|
|||
this.isMultiDevice = new AtomicBoolean(isMultiDevice);
|
||||
this.attachmentsV3 = new AtomicBoolean(attachmentsV3);
|
||||
this.eventListener = eventListener;
|
||||
this.executor = executor != null ? executor : Executors.newSingleThreadExecutor();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1188,28 +1203,43 @@ public class SignalServiceMessageSender {
|
|||
boolean online)
|
||||
throws IOException
|
||||
{
|
||||
List<SendMessageResult> results = new LinkedList<>();
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<Future<SendMessageResult>> futureResults = new LinkedList<>();
|
||||
Iterator<SignalServiceAddress> recipientIterator = recipients.iterator();
|
||||
Iterator<Optional<UnidentifiedAccess>> unidentifiedAccessIterator = unidentifiedAccess.iterator();
|
||||
|
||||
while (recipientIterator.hasNext()) {
|
||||
SignalServiceAddress recipient = recipientIterator.next();
|
||||
SignalServiceAddress recipient = recipientIterator.next();
|
||||
Optional<UnidentifiedAccess> access = unidentifiedAccessIterator.next();
|
||||
futureResults.add(executor.submit(() -> sendMessage(recipient, access, timestamp, content, online)));
|
||||
}
|
||||
|
||||
List<SendMessageResult> results = new ArrayList<>(futureResults.size());
|
||||
recipientIterator = recipients.iterator();
|
||||
|
||||
for (Future<SendMessageResult> futureResult : futureResults) {
|
||||
SignalServiceAddress recipient = recipientIterator.next();
|
||||
try {
|
||||
SendMessageResult result = sendMessage(recipient, unidentifiedAccessIterator.next(), timestamp, content, online);
|
||||
results.add(result);
|
||||
} catch (UntrustedIdentityException e) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.identityFailure(recipient, e.getIdentityKey()));
|
||||
} catch (UnregisteredUserException e) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.unregisteredFailure(recipient));
|
||||
} catch (PushNetworkException e) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.networkFailure(recipient));
|
||||
results.add(futureResult.get());
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof UntrustedIdentityException) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.identityFailure(recipient, ((UntrustedIdentityException) e.getCause()).getIdentityKey()));
|
||||
} else if (e.getCause() instanceof UnregisteredUserException) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.unregisteredFailure(recipient));
|
||||
} else if (e.getCause() instanceof PushNetworkException) {
|
||||
Log.w(TAG, e);
|
||||
results.add(SendMessageResult.networkFailure(recipient));
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
Log.d(TAG, "Completed send to " + recipients.size() + " recipients in " + (System.currentTimeMillis() - startTime) + " ms");
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -1220,7 +1250,9 @@ public class SignalServiceMessageSender {
|
|||
boolean online)
|
||||
throws UntrustedIdentityException, IOException
|
||||
{
|
||||
for (int i=0;i<4;i++) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
for (int i = 0; i < RETRY_COUNT; i++) {
|
||||
try {
|
||||
OutgoingPushMessageList messages = getEncryptedMessages(socket, recipient, unidentifiedAccess, timestamp, content, online);
|
||||
Optional<SignalServiceMessagePipe> pipe = this.pipe.get();
|
||||
|
@ -1228,26 +1260,27 @@ public class SignalServiceMessageSender {
|
|||
|
||||
if (pipe.isPresent() && !unidentifiedAccess.isPresent()) {
|
||||
try {
|
||||
Log.i(TAG, "[sendMessage] Transmitting over pipe...");
|
||||
SendMessageResponse response = pipe.get().send(messages, Optional.<UnidentifiedAccess>absent());
|
||||
SendMessageResponse response = pipe.get().send(messages, Optional.absent()).get(10, TimeUnit.SECONDS);
|
||||
Log.d(TAG, "[sendMessage] Completed over pipe in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)");
|
||||
return SendMessageResult.success(recipient, false, response.getNeedsSync() || isMultiDevice.get());
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
|
||||
Log.w(TAG, e);
|
||||
Log.w(TAG, "[sendMessage] Falling back to new connection...");
|
||||
Log.w(TAG, "[sendMessage] Pipe failed, falling back...");
|
||||
}
|
||||
} else if (unidentifiedPipe.isPresent() && unidentifiedAccess.isPresent()) {
|
||||
try {
|
||||
Log.i(TAG, "[sendMessage] Transmitting over unidentified pipe...");
|
||||
SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess);
|
||||
SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess).get(10, TimeUnit.SECONDS);
|
||||
Log.d(TAG, "[sendMessage] Completed over unidentified pipe in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)");
|
||||
return SendMessageResult.success(recipient, true, response.getNeedsSync() || isMultiDevice.get());
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
|
||||
Log.w(TAG, e);
|
||||
Log.w(TAG, "[sendMessage] Falling back to new connection...");
|
||||
Log.w(TAG, "[sendMessage] Unidentified pipe failed, falling back...");
|
||||
}
|
||||
}
|
||||
|
||||
Log.w(TAG, "[sendMessage] Not transmitting over pipe...");
|
||||
SendMessageResponse response = socket.sendMessage(messages, unidentifiedAccess);
|
||||
|
||||
Log.d(TAG, "[sendMessage] Completed over REST in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)");
|
||||
return SendMessageResult.success(recipient, unidentifiedAccess.isPresent(), response.getNeedsSync() || isMultiDevice.get());
|
||||
|
||||
} catch (InvalidKeyException ike) {
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package org.whispersystems.signalservice.internal.util.concurrent;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Lets you perform a simple transform on the result of a future that maps it to a different value.
|
||||
*/
|
||||
class FutureMapTransformer<Input, Output> implements ListenableFuture<Output> {
|
||||
|
||||
private final ListenableFuture<Input> future;
|
||||
private final FutureTransformers.Transformer<Input, Output> transformer;
|
||||
|
||||
FutureMapTransformer(ListenableFuture<Input> future, FutureTransformers.Transformer<Input, Output> transformer) {
|
||||
this.future = future;
|
||||
this.transformer = transformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener<Output> listener) {
|
||||
future.addListener(new Listener<Input>() {
|
||||
@Override
|
||||
public void onSuccess(Input result) {
|
||||
try {
|
||||
listener.onSuccess(transformer.transform(result));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(new ExecutionException(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(ExecutionException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean b) {
|
||||
return future.cancel(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return future.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return future.isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Output get() throws InterruptedException, ExecutionException {
|
||||
Input input = future.get();
|
||||
try {
|
||||
return transformer.transform(input);
|
||||
} catch (Exception e) {
|
||||
throw new ExecutionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Output get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
Input input = future.get(l, timeUnit);
|
||||
try {
|
||||
return transformer.transform(input);
|
||||
} catch (Exception e) {
|
||||
throw new ExecutionException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package org.whispersystems.signalservice.internal.util.concurrent;
|
||||
|
||||
public final class FutureTransformers {
|
||||
|
||||
public static <Input, Output> ListenableFuture<Output> map(ListenableFuture<Input> future, Transformer<Input, Output> transformer) {
|
||||
return new FutureMapTransformer<>(future, transformer);
|
||||
}
|
||||
|
||||
public interface Transformer<Input, Output> {
|
||||
Output transform(Input a) throws Exception;
|
||||
}
|
||||
}
|
|
@ -12,6 +12,7 @@ import org.whispersystems.signalservice.api.util.Tls12SocketFactory;
|
|||
import org.whispersystems.signalservice.api.websocket.ConnectivityListener;
|
||||
import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager;
|
||||
import org.whispersystems.signalservice.internal.util.Util;
|
||||
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
|
||||
import org.whispersystems.signalservice.internal.util.concurrent.SettableFuture;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -52,7 +53,7 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
private static final int KEEPALIVE_TIMEOUT_SECONDS = 55;
|
||||
|
||||
private final LinkedList<WebSocketRequestMessage> incomingRequests = new LinkedList<>();
|
||||
private final Map<Long, SettableFuture<Pair<Integer, String>>> outgoingRequests = new HashMap<>();
|
||||
private final Map<Long, OutgoingRequest> outgoingRequests = new HashMap<>();
|
||||
|
||||
private final String wsUri;
|
||||
private final TrustStore trustStore;
|
||||
|
@ -94,7 +95,7 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
}
|
||||
|
||||
public synchronized void connect() {
|
||||
Log.i(TAG, "WSC connect()...");
|
||||
Log.i(TAG, "connect()");
|
||||
|
||||
if (client == null) {
|
||||
String filledUri;
|
||||
|
@ -137,7 +138,7 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
}
|
||||
|
||||
public synchronized void disconnect() {
|
||||
Log.i(TAG, "WSC disconnect()...");
|
||||
Log.i(TAG, "disconnect()");
|
||||
|
||||
if (client != null) {
|
||||
client.close(1000, "OK");
|
||||
|
@ -169,7 +170,7 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
else return incomingRequests.removeFirst();
|
||||
}
|
||||
|
||||
public synchronized Future<Pair<Integer, String>> sendRequest(WebSocketRequestMessage request) throws IOException {
|
||||
public synchronized ListenableFuture<WebsocketResponse> sendRequest(WebSocketRequestMessage request) throws IOException {
|
||||
if (client == null || !connected) throw new IOException("No connection!");
|
||||
|
||||
WebSocketMessage message = WebSocketMessage.newBuilder()
|
||||
|
@ -177,8 +178,8 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
.setRequest(request)
|
||||
.build();
|
||||
|
||||
SettableFuture<Pair<Integer, String>> future = new SettableFuture<>();
|
||||
outgoingRequests.put(request.getId(), future);
|
||||
SettableFuture<WebsocketResponse> future = new SettableFuture<>();
|
||||
outgoingRequests.put(request.getId(), new OutgoingRequest(future, System.currentTimeMillis()));
|
||||
|
||||
if (!client.send(ByteString.of(message.toByteArray()))) {
|
||||
throw new IOException("Write failed!");
|
||||
|
@ -222,7 +223,7 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
@Override
|
||||
public synchronized void onOpen(WebSocket webSocket, Response response) {
|
||||
if (client != null && keepAliveSender == null) {
|
||||
Log.i(TAG, "onConnected()");
|
||||
Log.i(TAG, "onOpen() connected");
|
||||
attempts = 0;
|
||||
connected = true;
|
||||
keepAliveSender = new KeepAliveSender();
|
||||
|
@ -234,18 +235,21 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
|
||||
@Override
|
||||
public synchronized void onMessage(WebSocket webSocket, ByteString payload) {
|
||||
Log.d(TAG, "WSC onMessage()");
|
||||
try {
|
||||
WebSocketMessage message = WebSocketMessage.parseFrom(payload.toByteArray());
|
||||
|
||||
Log.d(TAG, "Message Type: " + message.getType().getNumber());
|
||||
|
||||
if (message.getType().getNumber() == WebSocketMessage.Type.REQUEST_VALUE) {
|
||||
Log.d(TAG, "onMessage() -- incoming request");
|
||||
incomingRequests.add(message.getRequest());
|
||||
} else if (message.getType().getNumber() == WebSocketMessage.Type.RESPONSE_VALUE) {
|
||||
SettableFuture<Pair<Integer, String>> listener = outgoingRequests.get(message.getResponse().getId());
|
||||
if (listener != null) listener.set(new Pair<>(message.getResponse().getStatus(),
|
||||
new String(message.getResponse().getBody().toByteArray())));
|
||||
OutgoingRequest listener = outgoingRequests.get(message.getResponse().getId());
|
||||
if (listener != null) {
|
||||
listener.getResponseFuture().set(new WebsocketResponse(message.getResponse().getStatus(),
|
||||
new String(message.getResponse().getBody().toByteArray())));
|
||||
Log.d(TAG, "onMessage() -- response received in " + (System.currentTimeMillis() - listener.getStartTimestamp()) + " ms");
|
||||
} else {
|
||||
Log.d(TAG, "onMessage() -- response received, but no listener");
|
||||
}
|
||||
}
|
||||
|
||||
notifyAll();
|
||||
|
@ -256,14 +260,14 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
|
||||
@Override
|
||||
public synchronized void onClosed(WebSocket webSocket, int code, String reason) {
|
||||
Log.i(TAG, "onClose()...");
|
||||
Log.i(TAG, "onClose()");
|
||||
this.connected = false;
|
||||
|
||||
Iterator<Map.Entry<Long, SettableFuture<Pair<Integer, String>>>> iterator = outgoingRequests.entrySet().iterator();
|
||||
Iterator<Map.Entry<Long, OutgoingRequest>> iterator = outgoingRequests.entrySet().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Long, SettableFuture<Pair<Integer, String>>> entry = iterator.next();
|
||||
entry.getValue().setException(new IOException("Closed: " + code + ", " + reason));
|
||||
Map.Entry<Long, OutgoingRequest> entry = iterator.next();
|
||||
entry.getValue().getResponseFuture().setException(new IOException("Closed: " + code + ", " + reason));
|
||||
iterator.remove();
|
||||
}
|
||||
|
||||
|
@ -303,12 +307,12 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
|
||||
@Override
|
||||
public void onMessage(WebSocket webSocket, String text) {
|
||||
Log.d(TAG, "onMessage(text)! " + text);
|
||||
Log.d(TAG, "onMessage(text)");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onClosing(WebSocket webSocket, int code, String reason) {
|
||||
Log.i(TAG, "onClosing()!...");
|
||||
Log.i(TAG, "onClosing()");
|
||||
webSocket.close(1000, "OK");
|
||||
}
|
||||
|
||||
|
@ -350,4 +354,21 @@ public class WebSocketConnection extends WebSocketListener {
|
|||
}
|
||||
}
|
||||
|
||||
private static class OutgoingRequest {
|
||||
private final SettableFuture<WebsocketResponse> responseFuture;
|
||||
private final long startTimestamp;
|
||||
|
||||
private OutgoingRequest(SettableFuture<WebsocketResponse> future, long startTimestamp) {
|
||||
this.responseFuture = future;
|
||||
this.startTimestamp = startTimestamp;
|
||||
}
|
||||
|
||||
SettableFuture<WebsocketResponse> getResponseFuture() {
|
||||
return responseFuture;
|
||||
}
|
||||
|
||||
long getStartTimestamp() {
|
||||
return startTimestamp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package org.whispersystems.signalservice.internal.websocket;
|
||||
|
||||
public class WebsocketResponse {
|
||||
private final int status;
|
||||
private final String body;
|
||||
|
||||
WebsocketResponse(int status, String body) {
|
||||
this.status = status;
|
||||
this.body = body;
|
||||
}
|
||||
|
||||
public int getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
}
|
Ładowanie…
Reference in New Issue