diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java similarity index 61% rename from app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java rename to app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java index 968d37312..c6a95e551 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraint.java @@ -9,19 +9,19 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.Constraint; /** - * A constraint that is met once we have pulled down all messages from the websocket during initial - * load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}. + * A constraint that is met once we have pulled down and decrypted all messages from the websocket + * during initial load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}. */ -public final class WebsocketDrainedConstraint implements Constraint { +public final class DecryptionsDrainedConstraint implements Constraint { public static final String KEY = "WebsocketDrainedConstraint"; - private WebsocketDrainedConstraint() { + private DecryptionsDrainedConstraint() { } @Override public boolean isMet() { - return ApplicationDependencies.getIncomingMessageObserver().isWebsocketDrained(); + return ApplicationDependencies.getIncomingMessageObserver().isDecryptionDrained(); } @Override @@ -34,11 +34,11 @@ public final class WebsocketDrainedConstraint implements Constraint { public void applyToJobInfo(@NonNull JobInfo.Builder jobInfoBuilder) { } - public static final class Factory implements Constraint.Factory { + public static final class Factory implements Constraint.Factory { @Override - public WebsocketDrainedConstraint create() { - return new WebsocketDrainedConstraint(); + public DecryptionsDrainedConstraint create() { + return new DecryptionsDrainedConstraint(); } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraintObserver.java similarity index 50% rename from app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java rename to app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraintObserver.java index 2ca38ce4c..ebb5ef46a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/DecryptionsDrainedConstraintObserver.java @@ -6,17 +6,17 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.ConstraintObserver; /** - * An observer for {@link WebsocketDrainedConstraint}. Will fire when the websocket is drained - * (i.e. it has received an empty response). + * An observer for {@link DecryptionsDrainedConstraint}. Will fire when the websocket is drained and + * the relevant decryptions have finished. */ -public class WebsocketDrainedConstraintObserver implements ConstraintObserver { +public class DecryptionsDrainedConstraintObserver implements ConstraintObserver { - private static final String REASON = WebsocketDrainedConstraintObserver.class.getSimpleName(); + private static final String REASON = DecryptionsDrainedConstraintObserver.class.getSimpleName(); private volatile Notifier notifier; - public WebsocketDrainedConstraintObserver() { - ApplicationDependencies.getIncomingMessageObserver().addWebsocketDrainedListener(() -> { + public DecryptionsDrainedConstraintObserver() { + ApplicationDependencies.getIncomingMessageObserver().addDecryptionDrainedListener(() -> { if (notifier != null) { notifier.onConstraintMet(REASON); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/GroupCallPeekJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/GroupCallPeekJob.java index 506ac411a..7c436962f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/GroupCallPeekJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/GroupCallPeekJob.java @@ -6,7 +6,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.JobManager; -import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint; +import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint; import org.thoughtcrime.securesms.recipients.RecipientId; /** @@ -27,7 +27,7 @@ public final class GroupCallPeekJob extends BaseJob { String queue = QUEUE + groupRecipientId.serialize(); Parameters.Builder parameters = new Parameters.Builder() .setQueue(queue) - .addConstraint(WebsocketDrainedConstraint.KEY); + .addConstraint(DecryptionsDrainedConstraint.KEY); jobManager.cancelAllInQueue(queue); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index d8d2c1b30..cae2b3a41 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -17,8 +17,8 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraintObserver; import org.thoughtcrime.securesms.jobmanager.impl.NetworkOrCellServiceConstraint; import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint; import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver; -import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint; -import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraintObserver; +import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint; +import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraintObserver; import org.thoughtcrime.securesms.jobmanager.migrations.PushProcessMessageQueueJobMigration; import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration; import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration2; @@ -97,6 +97,7 @@ public final class JobManagerFactories { put(MultiDeviceViewOnceOpenJob.KEY, new MultiDeviceViewOnceOpenJob.Factory()); put(ProfileKeySendJob.KEY, new ProfileKeySendJob.Factory()); put(PushDecryptMessageJob.KEY, new PushDecryptMessageJob.Factory()); + put(PushDecryptDrainedJob.KEY, new PushDecryptDrainedJob.Factory()); put(PushProcessMessageJob.KEY, new PushProcessMessageJob.Factory()); put(PushGroupSendJob.KEY, new PushGroupSendJob.Factory()); put(PushGroupSilentUpdateSendJob.KEY, new PushGroupSilentUpdateSendJob.Factory()); @@ -182,7 +183,7 @@ public final class JobManagerFactories { put(NetworkOrCellServiceConstraint.KEY, new NetworkOrCellServiceConstraint.Factory(application)); put(NetworkOrCellServiceConstraint.LEGACY_KEY, new NetworkOrCellServiceConstraint.Factory(application)); put(SqlCipherMigrationConstraint.KEY, new SqlCipherMigrationConstraint.Factory(application)); - put(WebsocketDrainedConstraint.KEY, new WebsocketDrainedConstraint.Factory()); + put(DecryptionsDrainedConstraint.KEY, new DecryptionsDrainedConstraint.Factory()); }}; } @@ -191,7 +192,7 @@ public final class JobManagerFactories { new ChargingConstraintObserver(application), new NetworkConstraintObserver(application), new SqlCipherMigrationConstraintObserver(), - new WebsocketDrainedConstraintObserver()); + new DecryptionsDrainedConstraintObserver()); } public static List getJobMigrations(@NonNull Application application) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java new file mode 100644 index 000000000..7aad9e638 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushDecryptDrainedJob.java @@ -0,0 +1,63 @@ +package org.thoughtcrime.securesms.jobs; + +import androidx.annotation.NonNull; + +import org.signal.core.util.logging.Log; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.Data; +import org.thoughtcrime.securesms.jobmanager.Job; + +/** + * A job that has the same queue as {@link PushDecryptMessageJob} that we enqueue so we can notify + * the {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver} when decryptions have + * finished. This lets us know not just when the websocket is drained, but when all the decryptions + * for the messages we pulled down from the websocket have been finished. + */ +public class PushDecryptDrainedJob extends BaseJob { + + public static final String KEY = "PushDecryptDrainedJob"; + + private static final String TAG = Log.tag(PushDecryptDrainedJob.class); + + public PushDecryptDrainedJob() { + this(new Parameters.Builder() + .setQueue(PushDecryptMessageJob.QUEUE) + .build()); + } + + private PushDecryptDrainedJob(@NonNull Parameters parameters) { + super(parameters); + } + + @Override + public @NonNull Data serialize() { + return Data.EMPTY; + } + + @Override + protected void onRun() throws Exception { + Log.i(TAG, "Decryptions are caught-up."); + ApplicationDependencies.getIncomingMessageObserver().notifyDecryptionsDrained(); + } + + @Override + protected boolean onShouldRetry(@NonNull Exception e) { + return false; + } + + @Override + public @NonNull String getFactoryKey() { + return KEY; + } + + @Override + public void onFailure() { + } + + public static final class Factory implements Job.Factory { + @Override + public @NonNull PushDecryptDrainedJob create(@NonNull Parameters parameters, @NonNull Data data) { + return new PushDecryptDrainedJob(parameters); + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java index e434c1437..00ee2a691 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java @@ -8,7 +8,7 @@ import org.thoughtcrime.securesms.groups.GroupId; import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor; import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; -import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint; +import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint; /** * Schedules a {@link RequestGroupV2InfoWorkerJob} to happen after message queues are drained. @@ -32,7 +32,7 @@ public final class RequestGroupV2InfoJob extends BaseJob { public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId, int toRevision) { this(new Parameters.Builder() .setQueue("RequestGroupV2InfoSyncJob") - .addConstraint(WebsocketDrainedConstraint.KEY) + .addConstraint(DecryptionsDrainedConstraint.KEY) .setMaxAttempts(Parameters.UNLIMITED) .build(), groupId, diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index b156c6e3b..ca06602c7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -20,6 +20,7 @@ import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.R; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; +import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor; import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; @@ -30,6 +31,7 @@ import org.whispersystems.signalservice.api.SignalServiceMessagePipe; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -47,16 +49,17 @@ public class IncomingMessageObserver { private final Context context; private final SignalServiceNetworkAccess networkAccess; - private final List websocketDrainedListeners; + private final List decryptionDrainedListeners; private boolean appVisible; - private volatile boolean websocketDrained; + private volatile boolean networkDrained; + private volatile boolean decryptionDrained; public IncomingMessageObserver(@NonNull Context context) { - this.context = context; - this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess(); - this.websocketDrainedListeners = new CopyOnWriteArrayList<>(); + this.context = context; + this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess(); + this.decryptionDrainedListeners = new CopyOnWriteArrayList<>(); new MessageRetrievalThread().start(); @@ -82,7 +85,8 @@ public class IncomingMessageObserver { synchronized (IncomingMessageObserver.this) { if (!NetworkConstraint.isMet(context)) { Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state."); - websocketDrained = false; + networkDrained = false; + decryptionDrained = false; shutdown(pipe, unidentifiedPipe); } IncomingMessageObserver.this.notifyAll(); @@ -91,15 +95,31 @@ public class IncomingMessageObserver { }, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } - public synchronized void addWebsocketDrainedListener(@NonNull Runnable listener) { - websocketDrainedListeners.add(listener); - if (websocketDrained) { + public synchronized void addDecryptionDrainedListener(@NonNull Runnable listener) { + decryptionDrainedListeners.add(listener); + if (decryptionDrained) { listener.run(); } } - public boolean isWebsocketDrained() { - return websocketDrained; + public boolean isDecryptionDrained() { + return decryptionDrained; + } + + public void notifyDecryptionsDrained() { + List listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size()); + + synchronized (this) { + if (networkDrained && !decryptionDrained) { + Log.i(TAG, "Decryptions newly drained."); + decryptionDrained = true; + listenersToTrigger.addAll(decryptionDrainedListeners); + } + } + + for (Runnable listener : listenersToTrigger) { + listener.run(); + } } private synchronized void onAppForegrounded() { @@ -195,13 +215,10 @@ public class IncomingMessageObserver { } }); - if (!result.isPresent() && !websocketDrained) { - Log.i(TAG, "Websocket was newly-drained. Triggering listeners."); - websocketDrained = true; - - for (Runnable listener : websocketDrainedListeners) { - listener.run(); - } + if (!result.isPresent() && !networkDrained) { + Log.i(TAG, "Network was newly-drained. Enqueuing a job to listen for decryption draining."); + networkDrained = true; + ApplicationDependencies.getJobManager().add(new PushDecryptDrainedJob()); } } catch (TimeoutException e) { Log.w(TAG, "Application level read timeout...");