Fix deadlock with web socket health monitor.

fork-5.53.8
Cody Henthorne 2021-12-01 13:14:05 -05:00 zatwierdzone przez Greyson Parrelli
rodzic 4ba4df706e
commit 59ad8bf76a
1 zmienionych plików z 64 dodań i 51 usunięć

Wyświetl plik

@ -5,6 +5,7 @@ import android.app.Application;
import androidx.annotation.NonNull; import androidx.annotation.NonNull;
import org.greenrobot.eventbus.EventBus; import org.greenrobot.eventbus.EventBus;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.logging.Log; import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.events.ReminderUpdateEvent; import org.thoughtcrime.securesms.events.ReminderUpdateEvent;
@ -17,6 +18,8 @@ import org.whispersystems.signalservice.api.websocket.HealthMonitor;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
@ -35,11 +38,13 @@ public final class SignalWebSocketHealthMonitor implements HealthMonitor {
private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS); private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS);
private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3; private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3;
private final Executor executor = ThreadUtil.trace(Executors.newSingleThreadExecutor());
private final Application context; private final Application context;
private SignalWebSocket signalWebSocket; private SignalWebSocket signalWebSocket;
private final SleepTimer sleepTimer; private final SleepTimer sleepTimer;
private volatile KeepAliveSender keepAliveSender; private KeepAliveSender keepAliveSender;
private final HealthState identified = new HealthState(); private final HealthState identified = new HealthState();
private final HealthState unidentified = new HealthState(); private final HealthState unidentified = new HealthState();
@ -50,72 +55,80 @@ public final class SignalWebSocketHealthMonitor implements HealthMonitor {
} }
public void monitor(@NonNull SignalWebSocket signalWebSocket) { public void monitor(@NonNull SignalWebSocket signalWebSocket) {
Preconditions.checkNotNull(signalWebSocket); executor.execute(() -> {
Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once"); Preconditions.checkNotNull(signalWebSocket);
Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once");
this.signalWebSocket = signalWebSocket; this.signalWebSocket = signalWebSocket;
//noinspection ResultOfMethodCallIgnored //noinspection ResultOfMethodCallIgnored
signalWebSocket.getWebSocketState() signalWebSocket.getWebSocketState()
.subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())
.distinctUntilChanged() .distinctUntilChanged()
.subscribe(s -> onStateChange(s, identified)); .subscribe(s -> onStateChange(s, identified));
//noinspection ResultOfMethodCallIgnored //noinspection ResultOfMethodCallIgnored
signalWebSocket.getUnidentifiedWebSocketState() signalWebSocket.getUnidentifiedWebSocketState()
.subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())
.distinctUntilChanged() .distinctUntilChanged()
.subscribe(s -> onStateChange(s, unidentified)); .subscribe(s -> onStateChange(s, unidentified));
});
} }
private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) { private void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) {
switch (connectionState) { executor.execute(() -> {
case CONNECTED: switch (connectionState) {
TextSecurePreferences.setUnauthorizedReceived(context, false); case CONNECTED:
break; TextSecurePreferences.setUnauthorizedReceived(context, false);
case AUTHENTICATION_FAILED: break;
TextSecurePreferences.setUnauthorizedReceived(context, true); case AUTHENTICATION_FAILED:
EventBus.getDefault().post(new ReminderUpdateEvent()); TextSecurePreferences.setUnauthorizedReceived(context, true);
break; EventBus.getDefault().post(new ReminderUpdateEvent());
case FAILED: break;
if (SignalStore.proxy().isProxyEnabled()) { case FAILED:
Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam."); if (SignalStore.proxy().isProxyEnabled()) {
ApplicationDependencies.closeConnections(); Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam.");
} ApplicationDependencies.closeConnections();
break; }
} break;
}
healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED; healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
if (keepAliveSender == null && isKeepAliveNecessary()) { if (keepAliveSender == null && isKeepAliveNecessary()) {
keepAliveSender = new KeepAliveSender(); keepAliveSender = new KeepAliveSender();
keepAliveSender.start(); keepAliveSender.start();
} else if (keepAliveSender != null && !isKeepAliveNecessary()) { } else if (keepAliveSender != null && !isKeepAliveNecessary()) {
keepAliveSender.shutdown(); keepAliveSender.shutdown();
keepAliveSender = null; keepAliveSender = null;
} }
});
} }
@Override @Override
public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) { public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
if (isIdentifiedWebSocket) { executor.execute(() -> {
identified.lastKeepAliveReceived = System.currentTimeMillis(); if (isIdentifiedWebSocket) {
} else { identified.lastKeepAliveReceived = System.currentTimeMillis();
unidentified.lastKeepAliveReceived = System.currentTimeMillis(); } else {
} unidentified.lastKeepAliveReceived = System.currentTimeMillis();
}
});
} }
@Override @Override
public void onMessageError(int status, boolean isIdentifiedWebSocket) { public void onMessageError(int status, boolean isIdentifiedWebSocket) {
if (status == 409) { executor.execute(() -> {
HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified); if (status == 409) {
if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) { HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified);
Log.w(TAG, "Received too many mismatch device errors, forcing new websockets."); if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) {
signalWebSocket.forceNewWebSockets(); Log.w(TAG, "Received too many mismatch device errors, forcing new websockets.");
signalWebSocket.forceNewWebSockets();
}
} }
} });
} }
private boolean isKeepAliveNecessary() { private boolean isKeepAliveNecessary() {