Change WebsocketDrainedConstraint to DecryptionsDrainedConstraint.

fork-5.53.8
Greyson Parrelli 2020-12-08 16:51:03 -05:00
rodzic 84e9282f87
commit 9f8e31db78
7 zmienionych plików z 121 dodań i 40 usunięć

Wyświetl plik

@ -9,19 +9,19 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Constraint;
/**
* A constraint that is met once we have pulled down all messages from the websocket during initial
* load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}.
* A constraint that is met once we have pulled down and decrypted all messages from the websocket
* during initial load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}.
*/
public final class WebsocketDrainedConstraint implements Constraint {
public final class DecryptionsDrainedConstraint implements Constraint {
public static final String KEY = "WebsocketDrainedConstraint";
private WebsocketDrainedConstraint() {
private DecryptionsDrainedConstraint() {
}
@Override
public boolean isMet() {
return ApplicationDependencies.getIncomingMessageObserver().isWebsocketDrained();
return ApplicationDependencies.getIncomingMessageObserver().isDecryptionDrained();
}
@Override
@ -34,11 +34,11 @@ public final class WebsocketDrainedConstraint implements Constraint {
public void applyToJobInfo(@NonNull JobInfo.Builder jobInfoBuilder) {
}
public static final class Factory implements Constraint.Factory<WebsocketDrainedConstraint> {
public static final class Factory implements Constraint.Factory<DecryptionsDrainedConstraint> {
@Override
public WebsocketDrainedConstraint create() {
return new WebsocketDrainedConstraint();
public DecryptionsDrainedConstraint create() {
return new DecryptionsDrainedConstraint();
}
}
}

Wyświetl plik

@ -6,17 +6,17 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver;
/**
* An observer for {@link WebsocketDrainedConstraint}. Will fire when the websocket is drained
* (i.e. it has received an empty response).
* An observer for {@link DecryptionsDrainedConstraint}. Will fire when the websocket is drained and
* the relevant decryptions have finished.
*/
public class WebsocketDrainedConstraintObserver implements ConstraintObserver {
public class DecryptionsDrainedConstraintObserver implements ConstraintObserver {
private static final String REASON = WebsocketDrainedConstraintObserver.class.getSimpleName();
private static final String REASON = DecryptionsDrainedConstraintObserver.class.getSimpleName();
private volatile Notifier notifier;
public WebsocketDrainedConstraintObserver() {
ApplicationDependencies.getIncomingMessageObserver().addWebsocketDrainedListener(() -> {
public DecryptionsDrainedConstraintObserver() {
ApplicationDependencies.getIncomingMessageObserver().addDecryptionDrainedListener(() -> {
if (notifier != null) {
notifier.onConstraintMet(REASON);
}

Wyświetl plik

@ -6,7 +6,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint;
import org.thoughtcrime.securesms.recipients.RecipientId;
/**
@ -27,7 +27,7 @@ public final class GroupCallPeekJob extends BaseJob {
String queue = QUEUE + groupRecipientId.serialize();
Parameters.Builder parameters = new Parameters.Builder()
.setQueue(queue)
.addConstraint(WebsocketDrainedConstraint.KEY);
.addConstraint(DecryptionsDrainedConstraint.KEY);
jobManager.cancelAllInQueue(queue);

Wyświetl plik

@ -17,8 +17,8 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkOrCellServiceConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.migrations.PushProcessMessageQueueJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration2;
@ -97,6 +97,7 @@ public final class JobManagerFactories {
put(MultiDeviceViewOnceOpenJob.KEY, new MultiDeviceViewOnceOpenJob.Factory());
put(ProfileKeySendJob.KEY, new ProfileKeySendJob.Factory());
put(PushDecryptMessageJob.KEY, new PushDecryptMessageJob.Factory());
put(PushDecryptDrainedJob.KEY, new PushDecryptDrainedJob.Factory());
put(PushProcessMessageJob.KEY, new PushProcessMessageJob.Factory());
put(PushGroupSendJob.KEY, new PushGroupSendJob.Factory());
put(PushGroupSilentUpdateSendJob.KEY, new PushGroupSilentUpdateSendJob.Factory());
@ -182,7 +183,7 @@ public final class JobManagerFactories {
put(NetworkOrCellServiceConstraint.KEY, new NetworkOrCellServiceConstraint.Factory(application));
put(NetworkOrCellServiceConstraint.LEGACY_KEY, new NetworkOrCellServiceConstraint.Factory(application));
put(SqlCipherMigrationConstraint.KEY, new SqlCipherMigrationConstraint.Factory(application));
put(WebsocketDrainedConstraint.KEY, new WebsocketDrainedConstraint.Factory());
put(DecryptionsDrainedConstraint.KEY, new DecryptionsDrainedConstraint.Factory());
}};
}
@ -191,7 +192,7 @@ public final class JobManagerFactories {
new ChargingConstraintObserver(application),
new NetworkConstraintObserver(application),
new SqlCipherMigrationConstraintObserver(),
new WebsocketDrainedConstraintObserver());
new DecryptionsDrainedConstraintObserver());
}
public static List<JobMigration> getJobMigrations(@NonNull Application application) {

Wyświetl plik

@ -0,0 +1,63 @@
package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
/**
* A job that has the same queue as {@link PushDecryptMessageJob} that we enqueue so we can notify
* the {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver} when decryptions have
* finished. This lets us know not just when the websocket is drained, but when all the decryptions
* for the messages we pulled down from the websocket have been finished.
*/
public class PushDecryptDrainedJob extends BaseJob {
public static final String KEY = "PushDecryptDrainedJob";
private static final String TAG = Log.tag(PushDecryptDrainedJob.class);
public PushDecryptDrainedJob() {
this(new Parameters.Builder()
.setQueue(PushDecryptMessageJob.QUEUE)
.build());
}
private PushDecryptDrainedJob(@NonNull Parameters parameters) {
super(parameters);
}
@Override
public @NonNull Data serialize() {
return Data.EMPTY;
}
@Override
protected void onRun() throws Exception {
Log.i(TAG, "Decryptions are caught-up.");
ApplicationDependencies.getIncomingMessageObserver().notifyDecryptionsDrained();
}
@Override
protected boolean onShouldRetry(@NonNull Exception e) {
return false;
}
@Override
public @NonNull String getFactoryKey() {
return KEY;
}
@Override
public void onFailure() {
}
public static final class Factory implements Job.Factory<PushDecryptDrainedJob> {
@Override
public @NonNull PushDecryptDrainedJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new PushDecryptDrainedJob(parameters);
}
}
}

Wyświetl plik

@ -8,7 +8,7 @@ import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint;
/**
* Schedules a {@link RequestGroupV2InfoWorkerJob} to happen after message queues are drained.
@ -32,7 +32,7 @@ public final class RequestGroupV2InfoJob extends BaseJob {
public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId, int toRevision) {
this(new Parameters.Builder()
.setQueue("RequestGroupV2InfoSyncJob")
.addConstraint(WebsocketDrainedConstraint.KEY)
.addConstraint(DecryptionsDrainedConstraint.KEY)
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
groupId,

Wyświetl plik

@ -20,6 +20,7 @@ import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
@ -30,6 +31,7 @@ import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@ -47,16 +49,17 @@ public class IncomingMessageObserver {
private final Context context;
private final SignalServiceNetworkAccess networkAccess;
private final List<Runnable> websocketDrainedListeners;
private final List<Runnable> decryptionDrainedListeners;
private boolean appVisible;
private volatile boolean websocketDrained;
private volatile boolean networkDrained;
private volatile boolean decryptionDrained;
public IncomingMessageObserver(@NonNull Context context) {
this.context = context;
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
this.websocketDrainedListeners = new CopyOnWriteArrayList<>();
this.context = context;
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
this.decryptionDrainedListeners = new CopyOnWriteArrayList<>();
new MessageRetrievalThread().start();
@ -82,7 +85,8 @@ public class IncomingMessageObserver {
synchronized (IncomingMessageObserver.this) {
if (!NetworkConstraint.isMet(context)) {
Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state.");
websocketDrained = false;
networkDrained = false;
decryptionDrained = false;
shutdown(pipe, unidentifiedPipe);
}
IncomingMessageObserver.this.notifyAll();
@ -91,15 +95,31 @@ public class IncomingMessageObserver {
}, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
public synchronized void addWebsocketDrainedListener(@NonNull Runnable listener) {
websocketDrainedListeners.add(listener);
if (websocketDrained) {
public synchronized void addDecryptionDrainedListener(@NonNull Runnable listener) {
decryptionDrainedListeners.add(listener);
if (decryptionDrained) {
listener.run();
}
}
public boolean isWebsocketDrained() {
return websocketDrained;
public boolean isDecryptionDrained() {
return decryptionDrained;
}
public void notifyDecryptionsDrained() {
List<Runnable> listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size());
synchronized (this) {
if (networkDrained && !decryptionDrained) {
Log.i(TAG, "Decryptions newly drained.");
decryptionDrained = true;
listenersToTrigger.addAll(decryptionDrainedListeners);
}
}
for (Runnable listener : listenersToTrigger) {
listener.run();
}
}
private synchronized void onAppForegrounded() {
@ -195,13 +215,10 @@ public class IncomingMessageObserver {
}
});
if (!result.isPresent() && !websocketDrained) {
Log.i(TAG, "Websocket was newly-drained. Triggering listeners.");
websocketDrained = true;
for (Runnable listener : websocketDrainedListeners) {
listener.run();
}
if (!result.isPresent() && !networkDrained) {
Log.i(TAG, "Network was newly-drained. Enqueuing a job to listen for decryption draining.");
networkDrained = true;
ApplicationDependencies.getJobManager().add(new PushDecryptDrainedJob());
}
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");