Improve handling of group send errors over websocket.

- Correctly parse error responses from send group message via websocket.
- Reduce logging output for mismatched/stale devices exceptions.
- Only fallback from websocket to socket if there were technical errors.

Closes #11918
fork-5.53.8
AsamK 2022-01-22 16:59:06 +01:00 zatwierdzone przez Alex Hart
rodzic 15254ee720
commit 523e21f3be
5 zmienionych plików z 53 dodań i 17 usunięć

Wyświetl plik

@ -68,6 +68,7 @@ import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException; import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException;
@ -107,6 +108,7 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Verifi
import org.whispersystems.signalservice.internal.push.StaleDevices; import org.whispersystems.signalservice.internal.push.StaleDevices;
import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException;
import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException;
import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory; import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory;
@ -1640,6 +1642,9 @@ public class SignalServiceMessageSender {
try { try {
SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, Optional.absent()).blockingGet()).getResultOrThrow(); SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, Optional.absent()).blockingGet()).getResultOrThrow();
return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
} catch (InvalidUnidentifiedAccessHeaderException | UnregisteredUserException | MismatchedDevicesException | StaleDevicesException e) {
// Non-technical failures shouldn't be retried with socket
throw e;
} catch (WebSocketUnavailableException e) { } catch (WebSocketUnavailableException e) {
Log.i(TAG, "[sendMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); Log.i(TAG, "[sendMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) { } catch (IOException e) {
@ -1650,6 +1655,9 @@ public class SignalServiceMessageSender {
try { try {
SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, unidentifiedAccess).blockingGet()).getResultOrThrow(); SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, unidentifiedAccess).blockingGet()).getResultOrThrow();
return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
} catch (InvalidUnidentifiedAccessHeaderException | UnregisteredUserException | MismatchedDevicesException | StaleDevicesException e) {
// Non-technical failures shouldn't be retried with socket
throw e;
} catch (WebSocketUnavailableException e) { } catch (WebSocketUnavailableException e) {
Log.i(TAG, "[sendMessage][" + timestamp + "] Unidentified pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); Log.i(TAG, "[sendMessage][" + timestamp + "] Unidentified pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) { } catch (IOException e) {
@ -1677,10 +1685,10 @@ public class SignalServiceMessageSender {
throw afe; throw afe;
} }
} catch (MismatchedDevicesException mde) { } catch (MismatchedDevicesException mde) {
Log.w(TAG, mde); Log.w(TAG, "[sendMessage][" + timestamp + "] Handling mismatched devices. (" + mde.getMessage() + ")");
handleMismatchedDevices(socket, recipient, mde.getMismatchedDevices()); handleMismatchedDevices(socket, recipient, mde.getMismatchedDevices());
} catch (StaleDevicesException ste) { } catch (StaleDevicesException ste) {
Log.w(TAG, ste); Log.w(TAG, "[sendMessage][" + timestamp + "] Handling stale devices. (" + ste.getMessage() + ")");
handleStaleDevices(recipient, ste.getStaleDevices()); handleStaleDevices(recipient, ste.getStaleDevices());
} }
} }
@ -1800,25 +1808,28 @@ public class SignalServiceMessageSender {
} }
try { try {
SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).blockingGet()).getResultOrThrow(); try {
return transformGroupResponseToMessageResults(targetInfo.devices, response, content); SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).blockingGet()).getResultOrThrow();
} catch (WebSocketUnavailableException e) { return transformGroupResponseToMessageResults(targetInfo.devices, response, content);
Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } catch (InvalidUnidentifiedAccessHeaderException | NotFoundException | GroupMismatchedDevicesException | GroupStaleDevicesException e) {
} catch (IOException e) { // Non-technical failures shouldn't be retried with socket
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); throw e;
} } catch (WebSocketUnavailableException e) {
Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
}
try {
SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, joinedUnidentifiedAccess, timestamp, online); SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, joinedUnidentifiedAccess, timestamp, online);
return transformGroupResponseToMessageResults(targetInfo.devices, response, content); return transformGroupResponseToMessageResults(targetInfo.devices, response, content);
} catch (GroupMismatchedDevicesException e) { } catch (GroupMismatchedDevicesException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices.", e); Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices. (" + e.getMessage() + ")");
for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) { for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) {
SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(mismatched.getUuid()), Optional.absent()); SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(mismatched.getUuid()), Optional.absent());
handleMismatchedDevices(socket, address, mismatched.getDevices()); handleMismatchedDevices(socket, address, mismatched.getDevices());
} }
} catch (GroupStaleDevicesException e) { } catch (GroupStaleDevicesException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling stale devices.", e); Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling stale devices. (" + e.getMessage() + ")");
for (GroupStaleDevices stale : e.getStaleDevices()) { for (GroupStaleDevices stale : e.getStaleDevices()) {
SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(stale.getUuid()), Optional.absent()); SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(stale.getUuid()), Optional.absent());
handleStaleDevices(address, stale.getDevices()); handleStaleDevices(address, stale.getDevices());

Wyświetl plik

@ -2,7 +2,6 @@ package org.whispersystems.signalservice.api.services;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.whispersystems.libsignal.logging.Log;
import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
@ -10,9 +9,14 @@ import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException;
import org.whispersystems.signalservice.internal.ServiceResponse; import org.whispersystems.signalservice.internal.ServiceResponse;
import org.whispersystems.signalservice.internal.ServiceResponseProcessor; import org.whispersystems.signalservice.internal.ServiceResponseProcessor;
import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices;
import org.whispersystems.signalservice.internal.push.GroupStaleDevices;
import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList;
import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse;
import org.whispersystems.signalservice.internal.push.SendMessageResponse; import org.whispersystems.signalservice.internal.push.SendMessageResponse;
import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException;
import org.whispersystems.signalservice.internal.util.JsonUtil; import org.whispersystems.signalservice.internal.util.JsonUtil;
import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.signalservice.internal.util.Util;
import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper;
@ -85,7 +89,18 @@ public class MessagingService {
.build(); .build();
return signalWebSocket.request(requestMessage) return signalWebSocket.request(requestMessage)
.map(DefaultResponseMapper.getDefault(SendGroupMessageResponse.class)::map) .map(DefaultResponseMapper.extend(SendGroupMessageResponse.class)
.withCustomError(401, (status, errorBody, getHeader) -> new InvalidUnidentifiedAccessHeaderException())
.withCustomError(404, (status, errorBody, getHeader) -> new NotFoundException("At least one unregistered user in message send."))
.withCustomError(409, (status, errorBody, getHeader) -> {
GroupMismatchedDevices[] mismatchedDevices = JsonUtil.fromJsonResponse(errorBody, GroupMismatchedDevices[].class);
return new GroupMismatchedDevicesException(mismatchedDevices);
})
.withCustomError(410, (status, errorBody, getHeader) -> {
GroupStaleDevices[] staleDevices = JsonUtil.fromJsonResponse(errorBody, GroupStaleDevices[].class);
return new GroupStaleDevicesException(staleDevices);
})
.build()::map)
.onErrorReturn(ServiceResponse::forUnknownError); .onErrorReturn(ServiceResponse::forUnknownError);
} }

Wyświetl plik

@ -66,7 +66,11 @@ public final class DefaultErrorMapper implements ErrorMapper {
@Override @Override
public Throwable parseError(int status, String body, Function<String, String> getHeader) { public Throwable parseError(int status, String body, Function<String, String> getHeader) {
if (customErrorMappers.containsKey(status)) { if (customErrorMappers.containsKey(status)) {
return customErrorMappers.get(status).parseError(status, body, getHeader); try {
return customErrorMappers.get(status).parseError(status, body, getHeader);
} catch (MalformedResponseException e) {
return e;
}
} }
switch (status) { switch (status) {

Wyświetl plik

@ -42,7 +42,12 @@ public class DefaultResponseMapper<Response> implements ResponseMapper<Response>
@Override @Override
public ServiceResponse<Response> map(int status, String body, Function<String, String> getHeader, boolean unidentified) { public ServiceResponse<Response> map(int status, String body, Function<String, String> getHeader, boolean unidentified) {
Throwable applicationError = errorMapper.parseError(status, body, getHeader); Throwable applicationError;
try {
applicationError = errorMapper.parseError(status, body, getHeader);
} catch (MalformedResponseException e) {
applicationError = e;
}
if (applicationError == null) { if (applicationError == null) {
try { try {
if (customResponseMapper != null) { if (customResponseMapper != null) {

Wyświetl plik

@ -1,6 +1,7 @@
package org.whispersystems.signalservice.internal.websocket; package org.whispersystems.signalservice.internal.websocket;
import org.whispersystems.libsignal.util.guava.Function; import org.whispersystems.libsignal.util.guava.Function;
import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException;
/** /**
* Can map an API response to an appropriate {@link Throwable}. * Can map an API response to an appropriate {@link Throwable}.
@ -9,5 +10,5 @@ import org.whispersystems.libsignal.util.guava.Function;
* {@link DefaultErrorMapper}. * {@link DefaultErrorMapper}.
*/ */
public interface ErrorMapper { public interface ErrorMapper {
Throwable parseError(int status, String body, Function<String, String> getHeader); Throwable parseError(int status, String body, Function<String, String> getHeader) throws MalformedResponseException;
} }