From 1b2cb2637f19a76ddd9e15e0954c1a5e8dc116a7 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Mon, 6 Mar 2023 13:40:44 -0500 Subject: [PATCH] Perform decryptions inline. --- ...SignalInstrumentationApplicationContext.kt | 1 + .../MessageProcessingPerformanceTest.kt | 46 +- .../securesms/testing/AliceClient.kt | 12 +- .../securesms/testing/BobClient.kt | 12 +- .../securesms/testing/FakeClientHelpers.kt | 30 +- .../dependencies/ApplicationDependencies.java | 15 - .../ApplicationDependencyProvider.java | 6 - .../impl/DecryptionsDrainedConstraint.java | 2 +- .../securesms/jobs/JobManagerFactories.java | 2 + .../securesms/jobs/PushDecryptDrainedJob.java | 5 +- .../securesms/jobs/PushDecryptMessageJob.kt | 1 + .../messages/IncomingMessageObserver.java | 388 ------------- .../messages/IncomingMessageObserver.kt | 509 ++++++++++++++++++ .../messages/IncomingMessageProcessor.java | 112 ---- .../migrations/ApplicationMigrations.java | 7 +- .../DecryptionsDrainedMigrationJob.kt | 36 ++ .../MockApplicationDependencyProvider.java | 6 - .../signalservice/api/SignalWebSocket.java | 39 +- 18 files changed, 614 insertions(+), 615 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/migrations/DecryptionsDrainedMigrationJob.kt diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/SignalInstrumentationApplicationContext.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/SignalInstrumentationApplicationContext.kt index c979bd073..77dad2703 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/SignalInstrumentationApplicationContext.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/SignalInstrumentationApplicationContext.kt @@ -22,6 +22,7 @@ class SignalInstrumentationApplicationContext : ApplicationContext() { override fun initializeAppDependencies() { val default = ApplicationDependencyProvider(this) ApplicationDependencies.init(this, InstrumentationApplicationDependencyProvider(this, default)) + ApplicationDependencies.getDeadlockDetector().start() } override fun initializeLogging() { diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/messages/MessageProcessingPerformanceTest.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/MessageProcessingPerformanceTest.kt index 3e133eb0c..bbc885c74 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/messages/MessageProcessingPerformanceTest.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/MessageProcessingPerformanceTest.kt @@ -22,7 +22,7 @@ import org.thoughtcrime.securesms.testing.Entry import org.thoughtcrime.securesms.testing.FakeClientHelpers import org.thoughtcrime.securesms.testing.SignalActivityRule import org.thoughtcrime.securesms.testing.awaitFor -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope +import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -86,16 +86,17 @@ class MessageProcessingPerformanceTest { .inMemoryLogger .getLockForUntil(TimingMessageContentProcessor.endTagPredicate(firstPreKeyMessageTimestamp)) - Thread { aliceClient.process(encryptedEnvelope) }.start() + Thread { aliceClient.process(encryptedEnvelope, System.currentTimeMillis()) }.start() aliceProcessFirstMessageLatch.awaitFor(15.seconds) // Send message from Alice to Bob - bobClient.decrypt(aliceClient.encrypt(System.currentTimeMillis(), bob)) + val aliceNow = System.currentTimeMillis() + bobClient.decrypt(aliceClient.encrypt(aliceNow, bob), aliceNow) // Build N messages from Bob to Alice val messageCount = 100 - val envelopes = ArrayList(messageCount) + val envelopes = ArrayList(messageCount) var now = System.currentTimeMillis() for (i in 0..messageCount) { envelopes += bobClient.encrypt(now) @@ -114,7 +115,7 @@ class MessageProcessingPerformanceTest { Thread { for (envelope in envelopes) { Log.i(TIMING_TAG, "Retrieved envelope! ${envelope.timestamp}") - aliceClient.process(envelope) + aliceClient.process(envelope, envelope.timestamp) } }.start() @@ -125,41 +126,6 @@ class MessageProcessingPerformanceTest { // Process logs for timing data val entries = harness.inMemoryLogger.entries() - // Calculate decrypt jobs - var skipFirst = true - var decryptJobCount = 0L - var decryptJobDuration = 0L - var decryptJobSinceSubmission = 0L - var firstDuration = 0L - var firstSinceSubmission = 0L - entries.filter { it.tag == "JobRunner" } - .forEach { - val match = jobFinishRegex.matchEntire(it.message!!) - - if (match != null) { - val job = match.groupValues[1] - if (job == "PushDecryptMessageJob") { - if (skipFirst) { - skipFirst = false - } else { - val duration = match.groupValues[2].toLong() - val sinceSubmission = match.groupValues[3].toLong() - decryptJobCount++ - decryptJobDuration += duration - decryptJobSinceSubmission += sinceSubmission - - if (decryptJobCount == 1L) { - firstDuration = duration - firstSinceSubmission = sinceSubmission - } - } - } - } - } - - android.util.Log.w(TAG, "Push Decrypt Job: First runtime ${firstDuration}ms First since submission: ${firstSinceSubmission}ms") - android.util.Log.w(TAG, "Push Decrypt Job: Average runtime: ${decryptJobDuration.toFloat() / decryptJobCount.toFloat()}ms Average since submission: ${decryptJobSinceSubmission.toFloat() / decryptJobCount.toFloat()}ms") - // Calculate MessageContentProcessor val takeLast: List = entries.filter { it.tag == TimingMessageContentProcessor.TAG }.drop(2) diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt index c296eb1c1..dfe4bb6df 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt @@ -6,10 +6,10 @@ import org.thoughtcrime.securesms.crypto.ProfileKeyUtil import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.recipients.Recipient -import org.thoughtcrime.securesms.testing.FakeClientHelpers.toSignalServiceEnvelope -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope +import org.thoughtcrime.securesms.testing.FakeClientHelpers.toEnvelope import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.push.SignalServiceAddress +import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope /** * Welcome to Alice's Client. @@ -28,17 +28,17 @@ class AliceClient(val serviceId: ServiceId, val e164: String, val trustRoot: ECK expires = 31337 ) - fun process(envelope: SignalServiceEnvelope) { - ApplicationDependencies.getIncomingMessageProcessor().acquire().use { processor -> processor.processEnvelope(envelope) } + fun process(envelope: Envelope, serverDeliveredTimestamp: Long) { + ApplicationDependencies.getIncomingMessageObserver().processEnvelope(envelope, serverDeliveredTimestamp) } - fun encrypt(now: Long, destination: Recipient): SignalServiceEnvelope { + fun encrypt(now: Long, destination: Recipient): Envelope { return ApplicationDependencies.getSignalServiceMessageSender().getEncryptedMessage( SignalServiceAddress(destination.requireServiceId(), destination.requireE164()), FakeClientHelpers.getTargetUnidentifiedAccess(ProfileKeyUtil.getSelfProfileKey(), ProfileKey(destination.profileKey), aliceSenderCertificate), 1, FakeClientHelpers.encryptedTextMessage(now), false - ).toSignalServiceEnvelope(now, destination.requireServiceId()) + ).toEnvelope(now, destination.requireServiceId()) } } diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/BobClient.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/BobClient.kt index 9a98cc467..fe6935474 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/BobClient.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/BobClient.kt @@ -21,16 +21,16 @@ import org.thoughtcrime.securesms.database.OneTimePreKeyTable import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.SignedPreKeyTable import org.thoughtcrime.securesms.keyvalue.SignalStore -import org.thoughtcrime.securesms.testing.FakeClientHelpers.toSignalServiceEnvelope +import org.thoughtcrime.securesms.testing.FakeClientHelpers.toEnvelope import org.whispersystems.signalservice.api.SignalServiceAccountDataStore import org.whispersystems.signalservice.api.SignalSessionLock import org.whispersystems.signalservice.api.crypto.SignalServiceCipher import org.whispersystems.signalservice.api.crypto.SignalSessionBuilder import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope import org.whispersystems.signalservice.api.push.DistributionId import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.push.SignalServiceAddress +import org.whispersystems.signalservice.internal.push.SignalServiceProtos import java.util.Optional import java.util.UUID import java.util.concurrent.locks.ReentrantLock @@ -59,7 +59,7 @@ class BobClient(val serviceId: ServiceId, val e164: String, val identityKeyPair: } /** Inspired by SignalServiceMessageSender#getEncryptedMessage */ - fun encrypt(now: Long): SignalServiceEnvelope { + fun encrypt(now: Long): SignalServiceProtos.Envelope { val envelopeContent = FakeClientHelpers.encryptedTextMessage(now) val cipher = SignalServiceCipher(serviceAddress, 1, aciStore, sessionLock, null) @@ -70,12 +70,12 @@ class BobClient(val serviceId: ServiceId, val e164: String, val identityKeyPair: } return cipher.encrypt(getAliceProtocolAddress(), getAliceUnidentifiedAccess(), envelopeContent) - .toSignalServiceEnvelope(envelopeContent.content.get().dataMessage.timestamp, getAliceServiceId()) + .toEnvelope(envelopeContent.content.get().dataMessage.timestamp, getAliceServiceId()) } - fun decrypt(envelope: SignalServiceEnvelope) { + fun decrypt(envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long) { val cipher = SignalServiceCipher(serviceAddress, 1, aciStore, sessionLock, UnidentifiedAccessUtil.getCertificateValidator()) - cipher.decrypt(envelope) + cipher.decrypt(envelope, serverDeliveredTimestamp) } private fun getAliceServiceId(): ServiceId { diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/FakeClientHelpers.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/FakeClientHelpers.kt index 4405f3e4b..6a04e1f50 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/FakeClientHelpers.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/FakeClientHelpers.kt @@ -9,14 +9,15 @@ import org.signal.libsignal.protocol.ecc.Curve import org.signal.libsignal.protocol.ecc.ECKeyPair import org.signal.libsignal.protocol.ecc.ECPublicKey import org.signal.libsignal.zkgroup.profiles.ProfileKey +import org.thoughtcrime.securesms.database.model.toProtoByteString import org.whispersystems.signalservice.api.crypto.ContentHint import org.whispersystems.signalservice.api.crypto.EnvelopeContent import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.internal.push.OutgoingPushMessage import org.whispersystems.signalservice.internal.push.SignalServiceProtos +import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope import org.whispersystems.util.Base64 import java.util.Optional import java.util.UUID @@ -62,20 +63,17 @@ object FakeClientHelpers { return EnvelopeContent.encrypted(content.build(), ContentHint.RESENDABLE, Optional.empty()) } - fun OutgoingPushMessage.toSignalServiceEnvelope(timestamp: Long, destination: ServiceId): SignalServiceEnvelope { - return SignalServiceEnvelope( - this.type, - Optional.empty(), - 1, - timestamp, - Base64.decode(this.content), - timestamp + 1, - timestamp + 2, - UUID.randomUUID().toString(), - destination.toString(), - true, - false, - null - ) + fun OutgoingPushMessage.toEnvelope(timestamp: Long, destination: ServiceId): Envelope { + return Envelope.newBuilder() + .setType(Envelope.Type.valueOf(this.type)) + .setSourceDevice(1) + .setTimestamp(timestamp) + .setServerTimestamp(timestamp + 1) + .setDestinationUuid(destination.toString()) + .setServerGuid(UUID.randomUUID().toString()) + .setContent(Base64.decode(this.content).toProtoByteString()) + .setUrgent(true) + .setStory(false) + .build() } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 2cf2f1a89..48b29efd4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -24,7 +24,6 @@ import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; -import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.payments.Payments; @@ -97,7 +96,6 @@ public class ApplicationDependencies { private static volatile SignalServiceMessageSender messageSender; private static volatile SignalServiceMessageReceiver messageReceiver; private static volatile IncomingMessageObserver incomingMessageObserver; - private static volatile IncomingMessageProcessor incomingMessageProcessor; private static volatile BackgroundMessageRetriever backgroundMessageRetriever; private static volatile LiveRecipientCache recipientCache; private static volatile JobManager jobManager; @@ -274,18 +272,6 @@ public class ApplicationDependencies { return provider.provideSignalServiceNetworkAccess(); } - public static @NonNull IncomingMessageProcessor getIncomingMessageProcessor() { - if (incomingMessageProcessor == null) { - synchronized (LOCK) { - if (incomingMessageProcessor == null) { - incomingMessageProcessor = provider.provideIncomingMessageProcessor(); - } - } - } - - return incomingMessageProcessor; - } - public static @NonNull BackgroundMessageRetriever getBackgroundMessageRetriever() { if (backgroundMessageRetriever == null) { synchronized (LOCK) { @@ -693,7 +679,6 @@ public class ApplicationDependencies { @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull SignalServiceConfiguration signalServiceConfiguration); @NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver(@NonNull SignalServiceConfiguration signalServiceConfiguration); @NonNull SignalServiceNetworkAccess provideSignalServiceNetworkAccess(); - @NonNull IncomingMessageProcessor provideIncomingMessageProcessor(); @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever(); @NonNull LiveRecipientCache provideRecipientCache(); @NonNull JobManager provideJobManager(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 1d151fa86..9df96ec8a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -47,7 +47,6 @@ import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; -import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier; @@ -157,11 +156,6 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr return new SignalServiceNetworkAccess(context); } - @Override - public @NonNull IncomingMessageProcessor provideIncomingMessageProcessor() { - return new IncomingMessageProcessor(context); - } - @Override public @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever() { return new BackgroundMessageRetriever(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java index c6a95e551..54231054e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java @@ -21,7 +21,7 @@ public final class DecryptionsDrainedConstraint implements Constraint { @Override public boolean isMet() { - return ApplicationDependencies.getIncomingMessageObserver().isDecryptionDrained(); + return ApplicationDependencies.getIncomingMessageObserver().getDecryptionDrained(); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 1e7724e13..50ec8f8d5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -41,6 +41,7 @@ import org.thoughtcrime.securesms.migrations.BlobStorageLocationMigrationJob; import org.thoughtcrime.securesms.migrations.CachedAttachmentsMigrationJob; import org.thoughtcrime.securesms.migrations.ClearGlideCacheMigrationJob; import org.thoughtcrime.securesms.migrations.DatabaseMigrationJob; +import org.thoughtcrime.securesms.migrations.DecryptionsDrainedMigrationJob; import org.thoughtcrime.securesms.migrations.DeleteDeprecatedLogsMigrationJob; import org.thoughtcrime.securesms.migrations.DirectoryRefreshMigrationJob; import org.thoughtcrime.securesms.migrations.EmojiDownloadMigrationJob; @@ -215,6 +216,7 @@ public final class JobManagerFactories { put(CachedAttachmentsMigrationJob.KEY, new CachedAttachmentsMigrationJob.Factory()); put(ClearGlideCacheMigrationJob.KEY, new ClearGlideCacheMigrationJob.Factory()); put(DatabaseMigrationJob.KEY, new DatabaseMigrationJob.Factory()); + put(DecryptionsDrainedMigrationJob.KEY, new DecryptionsDrainedMigrationJob.Factory()); put(DeleteDeprecatedLogsMigrationJob.KEY, new DeleteDeprecatedLogsMigrationJob.Factory()); put(DirectoryRefreshMigrationJob.KEY, new DirectoryRefreshMigrationJob.Factory()); put(EmojiDownloadMigrationJob.KEY, new EmojiDownloadMigrationJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java index 7aad9e638..61d69b410 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java @@ -9,9 +9,8 @@ import org.thoughtcrime.securesms.jobmanager.Job; /** * A job that has the same queue as {@link PushDecryptMessageJob} that we enqueue so we can notify - * the {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver} when decryptions have - * finished. This lets us know not just when the websocket is drained, but when all the decryptions - * for the messages we pulled down from the websocket have been finished. + * the {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver} when the decryption job + * queue is empty. */ public class PushDecryptDrainedJob extends BaseJob { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptMessageJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptMessageJob.kt index b732d5d59..b9d2826bc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptMessageJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptMessageJob.kt @@ -47,6 +47,7 @@ class PushDecryptMessageJob private constructor( private const val KEY_ENVELOPE = "envelope" } + @Deprecated("No more jobs of this type should be enqueued. Decryptions now happen as things come off of the websocket.") @JvmOverloads constructor(envelope: SignalServiceEnvelope, smsMessageId: Long = -1) : this( Parameters.Builder() diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java deleted file mode 100644 index 218801006..000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ /dev/null @@ -1,388 +0,0 @@ -package org.thoughtcrime.securesms.messages; - -import android.app.Application; -import android.app.Service; -import android.content.BroadcastReceiver; -import android.content.Context; -import android.content.Intent; -import android.content.IntentFilter; -import android.net.ConnectivityManager; -import android.os.IBinder; - -import androidx.annotation.NonNull; -import androidx.annotation.Nullable; -import androidx.core.app.NotificationCompat; - -import org.signal.core.util.ThreadUtil; -import org.signal.core.util.concurrent.SignalExecutors; -import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.R; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil; -import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; -import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil; -import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob; -import org.thoughtcrime.securesms.jobs.UnableToStartException; -import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor; -import org.thoughtcrime.securesms.notifications.NotificationChannels; -import org.thoughtcrime.securesms.util.AppForegroundObserver; -import org.thoughtcrime.securesms.util.Util; -import org.whispersystems.signalservice.api.SignalWebSocket; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; -import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import io.reactivex.rxjava3.disposables.Disposable; - -/** - * The application-level manager of our websocket connection. - *

- * This class is responsible for opening/closing the websocket based on the app's state and observing new inbound messages received on the websocket. - */ -public class IncomingMessageObserver { - - private static final String TAG = Log.tag(IncomingMessageObserver.class); - - public static final int FOREGROUND_ID = 313399; - private static final long REQUEST_TIMEOUT_MINUTES = 1; - private static final long OLD_REQUEST_WINDOW_MS = TimeUnit.MINUTES.toMillis(5); - private static final long MAX_BACKGROUND_TIME = TimeUnit.MINUTES.toMillis(5); - - private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); - - private final Application context; - private final List decryptionDrainedListeners; - private final BroadcastReceiver connectionReceiver; - private final Map keepAliveTokens; - - private boolean appVisible; - private long lastInteractionTime; - - private volatile boolean networkDrained; - private volatile boolean decryptionDrained; - private volatile boolean terminated; - - - public IncomingMessageObserver(@NonNull Application context) { - if (INSTANCE_COUNT.incrementAndGet() != 1) { - throw new AssertionError("Multiple observers!"); - } - - this.context = context; - this.decryptionDrainedListeners = new CopyOnWriteArrayList<>(); - this.keepAliveTokens = new HashMap<>(); - this.lastInteractionTime = System.currentTimeMillis(); - - new MessageRetrievalThread().start(); - - if (!SignalStore.account().isFcmEnabled() || SignalStore.internalValues().isWebsocketModeForced()) { - try { - ForegroundServiceUtil.startWhenCapable(context, new Intent(context, ForegroundService.class)); - } catch (UnableToStartException e) { - Log.w(TAG, "Unable to start foreground service for websocket!", e); - } - } - - ApplicationDependencies.getAppForegroundObserver().addListener(new AppForegroundObserver.Listener() { - @Override - public void onForeground() { - onAppForegrounded(); - } - - @Override - public void onBackground() { - onAppBackgrounded(); - } - }); - - connectionReceiver = new BroadcastReceiver() { - @Override - public void onReceive(Context context, Intent intent) { - synchronized (IncomingMessageObserver.this) { - if (!NetworkConstraint.isMet(context)) { - Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state."); - networkDrained = false; - decryptionDrained = false; - disconnect(); - } - IncomingMessageObserver.this.notifyAll(); - } - } - }; - - context.registerReceiver(connectionReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); - } - - public synchronized void notifyRegistrationChanged() { - notifyAll(); - } - - public synchronized void addDecryptionDrainedListener(@NonNull Runnable listener) { - decryptionDrainedListeners.add(listener); - if (decryptionDrained) { - listener.run(); - } - } - - public synchronized void removeDecryptionDrainedListener(@NonNull Runnable listener) { - decryptionDrainedListeners.remove(listener); - } - - public boolean isDecryptionDrained() { - return decryptionDrained; - } - - /** - * @return True if the websocket is active, otherwise false. - */ - public boolean isActive() { - return isConnectionNecessary(); - } - - public void notifyDecryptionsDrained() { - List listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size()); - - synchronized (this) { - if (networkDrained && !decryptionDrained) { - Log.i(TAG, "Decryptions newly drained."); - decryptionDrained = true; - listenersToTrigger.addAll(decryptionDrainedListeners); - } - } - - for (Runnable listener : listenersToTrigger) { - listener.run(); - } - } - - private synchronized void onAppForegrounded() { - appVisible = true; - context.startService(new Intent(context, BackgroundService.class)); - notifyAll(); - } - - private synchronized void onAppBackgrounded() { - appVisible = false; - lastInteractionTime = System.currentTimeMillis(); - notifyAll(); - } - - private synchronized boolean isConnectionNecessary() { - boolean registered = SignalStore.account().isRegistered(); - boolean fcmEnabled = SignalStore.account().isFcmEnabled(); - boolean hasNetwork = NetworkConstraint.isMet(context); - boolean hasProxy = SignalStore.proxy().isProxyEnabled(); - boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced(); - long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS; - long timeIdle = appVisible ? 0 : System.currentTimeMillis() - lastInteractionTime; - - boolean removedRequests = keepAliveTokens.entrySet().removeIf(e -> e.getValue() < oldRequest); - if (removedRequests) { - Log.d(TAG, "Removed old keep web socket open requests."); - } - - String lastInteractionString = appVisible ? "N/A" : timeIdle + " ms (" + (timeIdle < MAX_BACKGROUND_TIME ? "within limit" : "over limit") + ")"; - - boolean conclusion = registered && - (appVisible || timeIdle < MAX_BACKGROUND_TIME || !fcmEnabled || Util.hasItems(keepAliveTokens)) && - hasNetwork; - - String needsConnectionString = conclusion ? "Needs Connection" : "Does Not Need Connection"; - - Log.d(TAG, String.format(Locale.US, "[" + needsConnectionString + "] Network: %s, Foreground: %s, Time Since Last Interaction: %s, FCM: %s, Stay open requests: [%s], Registered: %s, Proxy: %s, Force websocket: %s", - hasNetwork, appVisible, lastInteractionString, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), registered, hasProxy, forceWebsocket)); - - return conclusion; - } - - private synchronized void waitForConnectionNecessary() { - try { - while (!isConnectionNecessary()) wait(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } - - public void terminateAsync() { - INSTANCE_COUNT.decrementAndGet(); - - context.unregisterReceiver(connectionReceiver); - - SignalExecutors.BOUNDED.execute(() -> { - Log.w(TAG, "Beginning termination."); - terminated = true; - disconnect(); - }); - } - - private void disconnect() { - ApplicationDependencies.getSignalWebSocket().disconnect(); - } - - public synchronized void registerKeepAliveToken(String key) { - keepAliveTokens.put(key, System.currentTimeMillis()); - lastInteractionTime = System.currentTimeMillis(); - notifyAll(); - } - - public synchronized void removeKeepAliveToken(String key) { - keepAliveTokens.remove(key); - lastInteractionTime = System.currentTimeMillis(); - notifyAll(); - } - - private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler { - - MessageRetrievalThread() { - super("MessageRetrievalService"); - Log.i(TAG, "Initializing! (" + this.hashCode() + ")"); - setUncaughtExceptionHandler(this); - } - - @Override - public void run() { - int attempts = 0; - - while (!terminated) { - Log.i(TAG, "Waiting for websocket state change...."); - if (attempts > 1) { - long backoff = BackoffUtil.exponentialBackoff(attempts, TimeUnit.SECONDS.toMillis(30)); - Log.w(TAG, "Too many failed connection attempts, attempts: " + attempts + " backing off: " + backoff); - ThreadUtil.sleep(backoff); - } - waitForConnectionNecessary(); - - Log.i(TAG, "Making websocket connection...."); - SignalWebSocket signalWebSocket = ApplicationDependencies.getSignalWebSocket(); - - Disposable webSocketDisposable = signalWebSocket.getWebSocketState().subscribe(state -> { - Log.d(TAG, "WebSocket State: " + state); - - // Any state change at all means that we are not drained - networkDrained = false; - decryptionDrained = false; - }); - - signalWebSocket.connect(); - - try { - while (isConnectionNecessary()) { - try { - Log.d(TAG, "Reading message..."); - Optional result = signalWebSocket.readOrEmpty(TimeUnit.MINUTES.toMillis(REQUEST_TIMEOUT_MINUTES), envelope -> { - Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp()); - try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - processor.processEnvelope(envelope); - } - }); - attempts = 0; - - if (!result.isPresent() && !networkDrained) { - Log.i(TAG, "Network was newly-drained. Enqueuing a job to listen for decryption draining."); - networkDrained = true; - ApplicationDependencies.getJobManager().add(new PushDecryptDrainedJob()); - } else if (!result.isPresent()) { - Log.w(TAG, "Got tombstone, but we thought the network was already drained!"); - } - } catch (WebSocketUnavailableException e) { - Log.i(TAG, "Pipe unexpectedly unavailable, connecting"); - signalWebSocket.connect(); - } catch (TimeoutException e) { - Log.w(TAG, "Application level read timeout..."); - attempts = 0; - } - } - - if (!appVisible) { - BackgroundService.stop(context); - } - } catch (Throwable e) { - attempts++; - Log.w(TAG, e); - } finally { - Log.w(TAG, "Shutting down pipe..."); - disconnect(); - webSocketDisposable.dispose(); - } - - Log.i(TAG, "Looping..."); - } - - Log.w(TAG, "Terminated! (" + this.hashCode() + ")"); - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - Log.w(TAG, "*** Uncaught exception!"); - Log.w(TAG, e); - } - } - - public static class ForegroundService extends Service { - - @Override - public @Nullable IBinder onBind(Intent intent) { - return null; - } - - @Override - public int onStartCommand(Intent intent, int flags, int startId) { - super.onStartCommand(intent, flags, startId); - - NotificationCompat.Builder builder = new NotificationCompat.Builder(getApplicationContext(), NotificationChannels.getInstance().BACKGROUND); - builder.setContentTitle(getApplicationContext().getString(R.string.MessageRetrievalService_signal)); - builder.setContentText(getApplicationContext().getString(R.string.MessageRetrievalService_background_connection_enabled)); - builder.setPriority(NotificationCompat.PRIORITY_MIN); - builder.setWhen(0); - builder.setSmallIcon(R.drawable.ic_signal_background_connection); - startForeground(FOREGROUND_ID, builder.build()); - - return Service.START_STICKY; - } - } - - /** - * A service that exists just to encourage the system to keep our process alive a little longer. - */ - public static class BackgroundService extends Service { - - public static void start(Context context) { - try { - context.startService(new Intent(context, BackgroundService.class)); - } catch (Exception e) { - Log.w(TAG, "Failed to start background service.", e); - } - } - - public static void stop(Context context) { - context.stopService(new Intent(context, BackgroundService.class)); - } - - @Override - public @Nullable IBinder onBind(Intent intent) { - return null; - } - - @Override - public int onStartCommand(Intent intent, int flags, int startId) { - Log.d(TAG, "Background service started."); - return START_STICKY; - } - - @Override - public void onDestroy() { - Log.d(TAG, "Background service destroyed."); - } - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt new file mode 100644 index 000000000..b1357deca --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -0,0 +1,509 @@ +package org.thoughtcrime.securesms.messages + +import android.annotation.SuppressLint +import android.app.Application +import android.app.Service +import android.content.BroadcastReceiver +import android.content.Context +import android.content.Intent +import android.content.IntentFilter +import android.net.ConnectivityManager +import android.os.IBinder +import androidx.annotation.VisibleForTesting +import androidx.core.app.NotificationCompat +import org.signal.core.util.ThreadUtil +import org.signal.core.util.concurrent.SignalExecutors +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.R +import org.thoughtcrime.securesms.database.MessageTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JobTracker +import org.thoughtcrime.securesms.jobmanager.JobTracker.JobListener +import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil.startWhenCapable +import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob +import org.thoughtcrime.securesms.jobs.PushProcessMessageJob +import org.thoughtcrime.securesms.jobs.UnableToStartException +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.notifications.NotificationChannels +import org.thoughtcrime.securesms.recipients.RecipientId +import org.thoughtcrime.securesms.util.AppForegroundObserver +import org.thoughtcrime.securesms.util.Util +import org.whispersystems.signalservice.api.messages.SignalServiceContent +import org.whispersystems.signalservice.api.messages.SignalServiceMetadata +import org.whispersystems.signalservice.api.push.ServiceId +import org.whispersystems.signalservice.api.push.SignalServiceAddress +import org.whispersystems.signalservice.api.util.UuidUtil +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException +import org.whispersystems.signalservice.internal.push.SignalServiceProtos +import org.whispersystems.signalservice.internal.serialize.SignalServiceAddressProtobufSerializer +import org.whispersystems.signalservice.internal.serialize.SignalServiceMetadataProtobufSerializer +import org.whispersystems.signalservice.internal.serialize.protos.SignalServiceContentProto +import java.util.* +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * The application-level manager of our websocket connection. + * + * + * This class is responsible for opening/closing the websocket based on the app's state and observing new inbound messages received on the websocket. + */ +class IncomingMessageObserver(private val context: Application) { + + companion object { + private val TAG = Log.tag(IncomingMessageObserver::class.java) + private val WEBSOCKET_READ_TIMEOUT = TimeUnit.MINUTES.toMillis(1) + private val KEEP_ALIVE_TOKEN_MAX_AGE = TimeUnit.MINUTES.toMillis(5) + private val MAX_BACKGROUND_TIME = TimeUnit.MINUTES.toMillis(5) + private val INSTANCE_COUNT = AtomicInteger(0) + + const val FOREGROUND_ID = 313399 + } + + private val decryptionDrainedListeners: MutableList = CopyOnWriteArrayList() + private val keepAliveTokens: MutableMap = mutableMapOf() + private val connectionReceiver: BroadcastReceiver + + private val lock: ReentrantLock = ReentrantLock() + private val condition: Condition = lock.newCondition() + + private var appVisible = false + private var lastInteractionTime: Long = System.currentTimeMillis() + + @Volatile + private var terminated = false + + @Volatile + var decryptionDrained = false + private set + + init { + if (INSTANCE_COUNT.incrementAndGet() != 1) { + throw AssertionError("Multiple observers!") + } + + MessageRetrievalThread().start() + + if (!SignalStore.account().fcmEnabled || SignalStore.internalValues().isWebsocketModeForced) { + try { + startWhenCapable(context, Intent(context, ForegroundService::class.java)) + } catch (e: UnableToStartException) { + Log.w(TAG, "Unable to start foreground service for websocket!", e) + } + } + + ApplicationDependencies.getAppForegroundObserver().addListener(object : AppForegroundObserver.Listener { + override fun onForeground() { + onAppForegrounded() + } + + override fun onBackground() { + onAppBackgrounded() + } + }) + + connectionReceiver = object : BroadcastReceiver() { + override fun onReceive(context: Context, intent: Intent) { + lock.withLock { + if (!NetworkConstraint.isMet(context)) { + Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state.") + decryptionDrained = false + disconnect() + } + condition.signalAll() + } + } + } + + context.registerReceiver(connectionReceiver, IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)) + } + + fun notifyRegistrationChanged() { + lock.withLock { + condition.signalAll() + } + } + + fun addDecryptionDrainedListener(listener: Runnable) { + decryptionDrainedListeners.add(listener) + if (decryptionDrained) { + listener.run() + } + } + + fun removeDecryptionDrainedListener(listener: Runnable) { + decryptionDrainedListeners.remove(listener) + } + + fun notifyDecryptionsDrained() { + if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) { + Log.i(TAG, "Queue was empty when notified. Signaling change.") + lock.withLock { + condition.signalAll() + } + } else { + Log.i(TAG, "Queue still had items when notified. Registering listener to signal change.") + ApplicationDependencies.getJobManager().addListener( + { it.parameters.queue == PushDecryptMessageJob.QUEUE }, + DecryptionDrainedQueueListener() + ) + } + } + + private fun onAppForegrounded() { + lock.withLock { + appVisible = true + context.startService(Intent(context, BackgroundService::class.java)) + condition.signalAll() + } + } + + private fun onAppBackgrounded() { + lock.withLock { + appVisible = false + lastInteractionTime = System.currentTimeMillis() + condition.signalAll() + } + } + + private fun isConnectionNecessary(): Boolean { + lock.withLock { + val registered = SignalStore.account().isRegistered + val fcmEnabled = SignalStore.account().fcmEnabled + val hasNetwork = NetworkConstraint.isMet(context) + val hasProxy = SignalStore.proxy().isProxyEnabled + val forceWebsocket = SignalStore.internalValues().isWebsocketModeForced + val keepAliveCutoffTime = System.currentTimeMillis() - KEEP_ALIVE_TOKEN_MAX_AGE + val timeIdle = if (appVisible) 0 else System.currentTimeMillis() - lastInteractionTime + val removedRequests = keepAliveTokens.entries.removeIf { (_, createTime) -> createTime < keepAliveCutoffTime } + val decryptQueueEmpty = ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE) + + if (removedRequests) { + Log.d(TAG, "Removed old keep web socket open requests.") + } + + val lastInteractionString = if (appVisible) "N/A" else timeIdle.toString() + " ms (" + (if (timeIdle < MAX_BACKGROUND_TIME) "within limit" else "over limit") + ")" + val conclusion = registered && + (appVisible || timeIdle < MAX_BACKGROUND_TIME || !fcmEnabled || Util.hasItems(keepAliveTokens)) && + hasNetwork && + decryptQueueEmpty + + val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection" + + Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisible, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, Stay open requests: [${keepAliveTokens.entries}], Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket, Decrypt Queue Empty: $decryptQueueEmpty") + return conclusion + } + } + + private fun waitForConnectionNecessary() { + lock.withLock { + try { + while (!isConnectionNecessary()) { + condition.await() + } + } catch (e: InterruptedException) { + throw AssertionError(e) + } + } + } + + fun terminateAsync() { + INSTANCE_COUNT.decrementAndGet() + context.unregisterReceiver(connectionReceiver) + + SignalExecutors.BOUNDED.execute { + Log.w(TAG, "Beginning termination.") + terminated = true + disconnect() + } + } + + private fun disconnect() { + ApplicationDependencies.getSignalWebSocket().disconnect() + } + + fun registerKeepAliveToken(key: String) { + lock.withLock { + keepAliveTokens[key] = System.currentTimeMillis() + lastInteractionTime = System.currentTimeMillis() + condition.signalAll() + } + } + + fun removeKeepAliveToken(key: String) { + lock.withLock { + keepAliveTokens.remove(key) + lastInteractionTime = System.currentTimeMillis() + condition.signalAll() + } + } + + @VisibleForTesting + fun processEnvelope(envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long) { + when (envelope.type.number) { + SignalServiceProtos.Envelope.Type.RECEIPT_VALUE -> processReceipt(envelope) + SignalServiceProtos.Envelope.Type.PREKEY_BUNDLE_VALUE, + SignalServiceProtos.Envelope.Type.CIPHERTEXT_VALUE, + SignalServiceProtos.Envelope.Type.UNIDENTIFIED_SENDER_VALUE, + SignalServiceProtos.Envelope.Type.PLAINTEXT_CONTENT_VALUE -> processMessage(envelope, serverDeliveredTimestamp) + else -> Log.w(TAG, "Received envelope of unknown type: " + envelope.type) + } + } + + private fun processMessage(envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long) { + val result = MessageDecryptor.decrypt(context, envelope, serverDeliveredTimestamp) + + when (result) { + is MessageDecryptor.Result.Success -> { + ApplicationDependencies.getJobManager().add( + PushProcessMessageJob( + result.toMessageState(), + result.toSignalServiceContent(), + null, + -1, + result.envelope.timestamp + ) + ) + } + + is MessageDecryptor.Result.Error -> { + ApplicationDependencies.getJobManager().add( + PushProcessMessageJob( + result.toMessageState(), + null, + result.errorMetadata.toExceptionMetadata(), + -1, + result.envelope.timestamp + ) + ) + } + + is MessageDecryptor.Result.Ignore -> { + // No action needed + } + + else -> { + throw AssertionError("Unexpected result! ${result.javaClass.simpleName}") + } + } + + result.followUpOperations.forEach { it.run() } + } + + private fun processReceipt(envelope: SignalServiceProtos.Envelope) { + if (!UuidUtil.isUuid(envelope.sourceUuid)) { + Log.w(TAG, "Invalid envelope source UUID!") + return + } + + val senderId = RecipientId.from(ServiceId.parseOrThrow(envelope.sourceUuid)) + + Log.i(TAG, "Received server receipt. Sender: $senderId, Device: ${envelope.sourceDevice}, Timestamp: ${envelope.timestamp}") + SignalDatabase.messages.incrementDeliveryReceiptCount(MessageTable.SyncMessageId(senderId, envelope.timestamp), System.currentTimeMillis()) + SignalDatabase.messageLog.deleteEntryForRecipient(envelope.timestamp, senderId, envelope.sourceDevice) + } + + private fun MessageDecryptor.Result.toMessageState(): MessageContentProcessor.MessageState { + return when (this) { + is MessageDecryptor.Result.DecryptionError -> MessageContentProcessor.MessageState.DECRYPTION_ERROR + is MessageDecryptor.Result.Ignore -> MessageContentProcessor.MessageState.NOOP + is MessageDecryptor.Result.InvalidVersion -> MessageContentProcessor.MessageState.INVALID_VERSION + is MessageDecryptor.Result.LegacyMessage -> MessageContentProcessor.MessageState.LEGACY_MESSAGE + is MessageDecryptor.Result.Success -> MessageContentProcessor.MessageState.DECRYPTED_OK + is MessageDecryptor.Result.UnsupportedDataMessage -> MessageContentProcessor.MessageState.UNSUPPORTED_DATA_MESSAGE + } + } + + private fun MessageDecryptor.Result.Success.toSignalServiceContent(): SignalServiceContent { + val localAddress = SignalServiceAddress(this.metadata.destinationServiceId, Optional.ofNullable(SignalStore.account().e164)) + val metadata = SignalServiceMetadata( + SignalServiceAddress(this.metadata.sourceServiceId, Optional.ofNullable(this.metadata.sourceE164)), + this.metadata.sourceDeviceId, + this.envelope.timestamp, + this.envelope.serverTimestamp, + this.serverDeliveredTimestamp, + this.metadata.sealedSender, + this.envelope.serverGuid, + Optional.ofNullable(this.metadata.groupId), + this.metadata.destinationServiceId.toString() + ) + + val contentProto = SignalServiceContentProto.newBuilder() + .setLocalAddress(SignalServiceAddressProtobufSerializer.toProtobuf(localAddress)) + .setMetadata(SignalServiceMetadataProtobufSerializer.toProtobuf(metadata)) + .setContent(content) + .build() + + return SignalServiceContent.createFromProto(contentProto)!! + } + + private fun MessageDecryptor.ErrorMetadata.toExceptionMetadata(): MessageContentProcessor.ExceptionMetadata { + return MessageContentProcessor.ExceptionMetadata( + this.sender, + this.senderDevice, + this.groupId + ) + } + + private inner class MessageRetrievalThread : Thread("MessageRetrievalService"), Thread.UncaughtExceptionHandler { + + init { + Log.i(TAG, "Initializing! (" + this.hashCode() + ")") + uncaughtExceptionHandler = this + } + + override fun run() { + var attempts = 0 + + while (!terminated) { + Log.i(TAG, "Waiting for websocket state change....") + if (attempts > 1) { + val backoff = BackoffUtil.exponentialBackoff(attempts, TimeUnit.SECONDS.toMillis(30)) + Log.w(TAG, "Too many failed connection attempts, attempts: $attempts backing off: $backoff") + ThreadUtil.sleep(backoff) + } + + waitForConnectionNecessary() + Log.i(TAG, "Making websocket connection....") + + val signalWebSocket = ApplicationDependencies.getSignalWebSocket() + val webSocketDisposable = signalWebSocket.webSocketState.subscribe { state: WebSocketConnectionState -> + Log.d(TAG, "WebSocket State: $state") + + // Any state change at all means that we are not drained + decryptionDrained = false + } + + signalWebSocket.connect() + try { + while (isConnectionNecessary()) { + try { + Log.d(TAG, "Reading message...") + + val hasMore = signalWebSocket.readMessage(WEBSOCKET_READ_TIMEOUT) { envelope, serverDeliveredTimestamp -> + Log.i(TAG, "Retrieved envelope! " + envelope.timestamp) + processEnvelope(envelope, serverDeliveredTimestamp) + true + } + + attempts = 0 + + if (!hasMore && !decryptionDrained) { + Log.i(TAG, "Decryptions newly-drained.") + decryptionDrained = true + + for (listener in decryptionDrainedListeners.toList()) { + listener.run() + } + } else if (!hasMore) { + Log.w(TAG, "Got tombstone, but we thought the network was already drained!") + } + } catch (e: WebSocketUnavailableException) { + Log.i(TAG, "Pipe unexpectedly unavailable, connecting") + signalWebSocket.connect() + } catch (e: TimeoutException) { + Log.w(TAG, "Application level read timeout...") + attempts = 0 + } + } + + if (!appVisible) { + BackgroundService.stop(context) + } + } catch (e: Throwable) { + attempts++ + Log.w(TAG, e) + } finally { + Log.w(TAG, "Shutting down pipe...") + disconnect() + webSocketDisposable.dispose() + } + Log.i(TAG, "Looping...") + } + Log.w(TAG, "Terminated! (" + this.hashCode() + ")") + } + + override fun uncaughtException(t: Thread, e: Throwable) { + Log.w(TAG, "Uncaught exception in message thread!", e) + } + } + + private inner class DecryptionDrainedQueueListener : JobListener { + @SuppressLint("WrongThread") + override fun onStateChanged(job: Job, jobState: JobTracker.JobState) { + if (jobState.isComplete) { + if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) { + Log.i(TAG, "Queue is now empty. Signaling change.") + lock.withLock { + condition.signalAll() + } + ApplicationDependencies.getJobManager().removeListener(this) + } else { + Log.i(TAG, "Item finished in queue, but it's still not empty. Waiting to signal change.") + } + } + } + } + + class ForegroundService : Service() { + override fun onBind(intent: Intent): IBinder? { + return null + } + + override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int { + super.onStartCommand(intent, flags, startId) + + val notification = NotificationCompat.Builder(applicationContext, NotificationChannels.getInstance().BACKGROUND) + .setContentTitle(applicationContext.getString(R.string.MessageRetrievalService_signal)) + .setContentText(applicationContext.getString(R.string.MessageRetrievalService_background_connection_enabled)) + .setPriority(NotificationCompat.PRIORITY_MIN) + .setWhen(0) + .setSmallIcon(R.drawable.ic_signal_background_connection) + .build() + + startForeground(FOREGROUND_ID, notification) + + return START_STICKY + } + } + + /** + * A service that exists just to encourage the system to keep our process alive a little longer. + */ + class BackgroundService : Service() { + override fun onBind(intent: Intent): IBinder? = null + + override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int { + Log.d(TAG, "Background service started.") + return START_STICKY + } + + override fun onDestroy() { + Log.d(TAG, "Background service destroyed.") + } + + companion object { + fun start(context: Context) { + try { + context.startService(Intent(context, BackgroundService::class.java)) + } catch (e: Exception) { + Log.w(TAG, "Failed to start background service.", e) + } + } + + fun stop(context: Context) { + context.stopService(Intent(context, BackgroundService::class.java)) + } + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java deleted file mode 100644 index 07e447c49..000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.thoughtcrime.securesms.messages; - -import android.app.Application; -import android.content.Context; - -import androidx.annotation.NonNull; -import androidx.annotation.Nullable; - -import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.database.GroupTable; -import org.thoughtcrime.securesms.database.MessageTable.SyncMessageId; -import org.thoughtcrime.securesms.database.SignalDatabase; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.groups.GroupId; -import org.thoughtcrime.securesms.jobmanager.Job; -import org.thoughtcrime.securesms.jobmanager.JobManager; -import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob; -import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; -import org.thoughtcrime.securesms.recipients.Recipient; -import org.thoughtcrime.securesms.recipients.RecipientId; -import org.thoughtcrime.securesms.util.GroupUtil; -import org.signal.core.util.SetUtil; -import org.thoughtcrime.securesms.util.TextSecurePreferences; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; -import org.whispersystems.signalservice.api.messages.SignalServiceGroupV2; - -import java.io.Closeable; -import java.util.concurrent.locks.ReentrantLock; - -/** - * The central entry point for all envelopes that have been retrieved. Envelopes must be processed - * here to guarantee proper ordering. - */ -public class IncomingMessageProcessor { - - private static final String TAG = Log.tag(IncomingMessageProcessor.class); - - private final Application context; - private final ReentrantLock lock; - - public IncomingMessageProcessor(@NonNull Application context) { - this.context = context; - this.lock = new ReentrantLock(); - } - - /** - * @return An instance of a Processor that will allow you to process messages in a thread safe - * way. Must be closed. - */ - public Processor acquire() { - lock.lock(); - return new Processor(context); - } - - private void release() { - lock.unlock(); - } - - public class Processor implements Closeable { - - private final Context context; - private final JobManager jobManager; - - private Processor(@NonNull Context context) { - this.context = context; - this.jobManager = ApplicationDependencies.getJobManager(); - } - - /** - * @return The id of the {@link PushDecryptMessageJob} that was scheduled to process the message, if - * one was created. Otherwise null. - */ - public @Nullable String processEnvelope(@NonNull SignalServiceEnvelope envelope) { - if (envelope.hasSourceUuid()) { - Recipient.externalPush(envelope.getSourceAddress()); - } - - if (envelope.isReceipt()) { - processReceipt(envelope); - return null; - } else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender() || envelope.isPlaintextContent()) { - return processMessage(envelope); - } else { - Log.w(TAG, "Received envelope of unknown type: " + envelope.getType()); - return null; - } - } - - private @Nullable String processMessage(@NonNull SignalServiceEnvelope envelope) { - return processMessageDeferred(envelope); - } - - private @Nullable String processMessageDeferred(@NonNull SignalServiceEnvelope envelope) { - Job job = new PushDecryptMessageJob(envelope); - jobManager.add(job); - return job.getId(); - } - - private void processReceipt(@NonNull SignalServiceEnvelope envelope) { - Recipient sender = Recipient.externalPush(envelope.getSourceAddress()); - Log.i(TAG, "Received server receipt. Sender: " + sender.getId() + ", Device: " + envelope.getSourceDevice() + ", Timestamp: " + envelope.getTimestamp()); - - SignalDatabase.messages().incrementDeliveryReceiptCount(new SyncMessageId(sender.getId(), envelope.getTimestamp()), System.currentTimeMillis()); - SignalDatabase.messageLog().deleteEntryForRecipient(envelope.getTimestamp(), sender.getId(), envelope.getSourceDevice()); - } - - @Override - public void close() { - release(); - } - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java b/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java index 5d66ab17a..8475d57f6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java +++ b/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java @@ -122,9 +122,10 @@ public class ApplicationMigrations { static final int GLIDE_CACHE_CLEAR = 77; static final int SYSTEM_NAME_RESYNC = 78; static final int RECOVERY_PASSWORD_SYNC = 79; + static final int DECRYPTIONS_DRAINED = 80; } - public static final int CURRENT_VERSION = 79; + public static final int CURRENT_VERSION = 80; /** * This *must* be called after the {@link JobManager} has been instantiated, but *before* the call @@ -542,6 +543,10 @@ public class ApplicationMigrations { jobs.put(Version.RECOVERY_PASSWORD_SYNC, new AttributesMigrationJob()); } + if (lastSeenVersion < Version.DECRYPTIONS_DRAINED) { + jobs.put(Version.DECRYPTIONS_DRAINED, new DecryptionsDrainedMigrationJob()); + } + return jobs; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/migrations/DecryptionsDrainedMigrationJob.kt b/app/src/main/java/org/thoughtcrime/securesms/migrations/DecryptionsDrainedMigrationJob.kt new file mode 100644 index 000000000..e06b5b208 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/migrations/DecryptionsDrainedMigrationJob.kt @@ -0,0 +1,36 @@ +package org.thoughtcrime.securesms.migrations + +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies +import org.thoughtcrime.securesms.jobmanager.Data +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob + +/** + * Kicks off a job to notify the [org.thoughtcrime.securesms.messages.IncomingMessageObserver] when the decryption queue is empty. + */ +internal class DecryptionsDrainedMigrationJob( + parameters: Parameters = Parameters.Builder().build() +) : MigrationJob(parameters) { + + companion object { + val TAG = Log.tag(DecryptionsDrainedMigrationJob::class.java) + const val KEY = "DecryptionsDrainedMigrationJob" + } + + override fun getFactoryKey(): String = KEY + + override fun isUiBlocking(): Boolean = false + + override fun performMigration() { + ApplicationDependencies.getJobManager().add(PushDecryptDrainedJob()) + } + + override fun shouldRetry(e: Exception): Boolean = false + + class Factory : Job.Factory { + override fun create(parameters: Parameters, data: Data): DecryptionsDrainedMigrationJob { + return DecryptionsDrainedMigrationJob(parameters) + } + } +} diff --git a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.java b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.java index 2bbe7dd43..3464dee8e 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.java +++ b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.java @@ -15,7 +15,6 @@ import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; -import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.payments.Payments; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; @@ -77,11 +76,6 @@ public class MockApplicationDependencyProvider implements ApplicationDependencie return null; } - @Override - public @NonNull IncomingMessageProcessor provideIncomingMessageProcessor() { - return null; - } - @Override public @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever() { return null; diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java index a113ac36b..1e778c66c 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java @@ -2,10 +2,10 @@ package org.whispersystems.signalservice.api; import org.signal.libsignal.protocol.logging.Log; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.api.websocket.WebSocketFactory; import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; +import org.whispersystems.signalservice.internal.push.SignalServiceProtos; import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketRequestMessage; import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketResponseMessage; @@ -15,6 +15,7 @@ import org.whispersystems.util.Base64; import java.io.IOException; import java.util.Optional; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; @@ -218,30 +219,33 @@ public final class SignalWebSocket { /** *

- * A blocking call that reads a message off the pipe. When this call returns, the message has been - * acknowledged and will not be retransmitted. This will return {@link Optional#empty()} when an - * empty response is hit, which indicates the WebSocket is empty. + * A blocking call that reads a message off the pipe. When this call returns, if the callback indicates the + * message was successfully processed, then the message will be ack'ed on the serve and will not be retransmitted. + *

+ * This will return true if there are more messages to be read from the websocket, or false if the websocket is empty. *

* You can specify a {@link MessageReceivedCallback} that will be called before the received message is acknowledged. * This allows you to write the received message to durable storage before acknowledging receipt of it to the * server. *

- * Important: The empty response will only be hit once for each connection. That means if you get - * an empty response and call readOrEmpty() again on the same instance, you will not get an empty - * response, and instead will block until you get an actual message. This will, however, reset if - * connection breaks (if, for instance, you lose and regain network). + * Important: This will only return `false` once for each connection. That means if you get false call readMessage() + * again on the same instance, you will not get an immediate `false` return value, and instead will block until + * you get an actual message. This will, however, reset if connection breaks (if, for instance, you lose and regain network). * * @param timeout The timeout to wait for. * @param callback A callback that will be called before the message receipt is acknowledged to the server. * @return The message read (same as the message sent through the callback). */ @SuppressWarnings("DuplicateThrows") - public Optional readOrEmpty(long timeout, MessageReceivedCallback callback) + public boolean readMessage(long timeout, MessageReceivedCallback callback) throws TimeoutException, WebSocketUnavailableException, IOException { while (true) { WebSocketRequestMessage request = getWebSocket().readRequest(timeout); WebSocketResponseMessage response = createWebSocketResponse(request); + + AtomicBoolean successfullyProcessed = new AtomicBoolean(false); + try { if (isSignalServiceEnvelope(request)) { Optional timestampHeader = findHeader(request); @@ -255,15 +259,18 @@ public final class SignalWebSocket { } } - SignalServiceEnvelope envelope = new SignalServiceEnvelope(request.getBody().toByteArray(), timestamp); + SignalServiceProtos.Envelope envelope = SignalServiceProtos.Envelope.parseFrom(request.getBody().toByteArray()); - callback.onMessage(envelope); - return Optional.of(envelope); + successfullyProcessed.set(callback.onMessage(envelope, timestamp)); + + return true; } else if (isSocketEmptyRequest(request)) { - return Optional.empty(); + return false; } } finally { - getWebSocket().sendResponse(response); + if (successfullyProcessed.get()) { + getWebSocket().sendResponse(response); + } } } } @@ -314,6 +321,8 @@ public final class SignalWebSocket { * received. */ public interface MessageReceivedCallback { - void onMessage(SignalServiceEnvelope envelope); + + /** True if you successfully processed the message, otherwise false. **/ + boolean onMessage(SignalServiceProtos.Envelope envelope, long serverDeliveredTimestamp); } }