kopia lustrzana https://github.com/ryukoposting/Signal-Android
Use the websocket for FCM fetches.
rodzic
4f31dc36ba
commit
33828439fb
|
@ -699,6 +699,7 @@
|
|||
<service android:enabled="true" android:name=".service.webrtc.WebRtcCallService" android:foregroundServiceType="camera|microphone"/>
|
||||
<service android:enabled="true" android:exported="false" android:name=".service.KeyCachingService"/>
|
||||
<service android:enabled="true" android:name=".messages.IncomingMessageObserver$ForegroundService"/>
|
||||
<service android:enabled="true" android:name=".messages.IncomingMessageObserver$BackgroundService"/>
|
||||
<service android:name=".service.webrtc.AndroidCallConnectionService"
|
||||
android:permission="android.permission.BIND_TELECOM_CONNECTION_SERVICE"
|
||||
android:exported="true">
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.signal.core.util.logging.Log
|
|||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil
|
||||
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob
|
||||
import org.thoughtcrime.securesms.messages.RestStrategy
|
||||
import org.thoughtcrime.securesms.messages.WebSocketStrategy
|
||||
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor
|
||||
|
||||
/**
|
||||
|
@ -97,7 +97,7 @@ object FcmFetchManager {
|
|||
|
||||
@JvmStatic
|
||||
fun retrieveMessages(context: Context) {
|
||||
val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, RestStrategy(), RestStrategy())
|
||||
val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy())
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.")
|
||||
|
|
|
@ -13,7 +13,7 @@ import org.signal.core.util.concurrent.SignalExecutors;
|
|||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.RestStrategy;
|
||||
import org.thoughtcrime.securesms.messages.WebSocketStrategy;
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
|
||||
|
@ -49,7 +49,7 @@ public class FcmJobService extends JobService {
|
|||
SignalExecutors.UNBOUNDED.execute(() -> {
|
||||
Context context = getApplicationContext();
|
||||
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
|
||||
boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy());
|
||||
boolean success = retriever.retrieveMessages(context, new WebSocketStrategy());
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.");
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.thoughtcrime.securesms.jobmanager.Data;
|
|||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.RestStrategy;
|
||||
import org.thoughtcrime.securesms.messages.WebSocketStrategy;
|
||||
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -60,7 +60,7 @@ public final class PushNotificationReceiveJob extends BaseJob {
|
|||
@Override
|
||||
public void onRun() throws IOException {
|
||||
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
|
||||
boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new RestStrategy());
|
||||
boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new WebSocketStrategy());
|
||||
|
||||
if (result) {
|
||||
Log.i(TAG, "Successfully pulled messages.");
|
||||
|
|
|
@ -34,8 +34,6 @@ public class BackgroundMessageRetriever {
|
|||
|
||||
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
|
||||
|
||||
private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
public static final long DO_NOT_SHOW_IN_FOREGROUND = DelayedNotificationController.DO_NOT_SHOW;
|
||||
|
||||
/**
|
||||
|
@ -109,7 +107,7 @@ public class BackgroundMessageRetriever {
|
|||
|
||||
Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime));
|
||||
|
||||
if (strategy.execute(NORMAL_TIMEOUT)) {
|
||||
if (strategy.execute()) {
|
||||
Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime));
|
||||
success = true;
|
||||
break;
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.thoughtcrime.securesms.jobs.UnableToStartException;
|
|||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor;
|
||||
import org.thoughtcrime.securesms.notifications.NotificationChannels;
|
||||
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
|
||||
import org.thoughtcrime.securesms.util.AppForegroundObserver;
|
||||
import org.thoughtcrime.securesms.util.Util;
|
||||
import org.whispersystems.signalservice.api.SignalWebSocket;
|
||||
|
@ -36,6 +35,7 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -52,33 +52,35 @@ public class IncomingMessageObserver {
|
|||
|
||||
private static final String TAG = Log.tag(IncomingMessageObserver.class);
|
||||
|
||||
public static final int FOREGROUND_ID = 313399;
|
||||
public static final int FOREGROUND_ID = 313399;
|
||||
private static final long REQUEST_TIMEOUT_MINUTES = 1;
|
||||
private static final long OLD_REQUEST_WINDOW_MS = TimeUnit.MINUTES.toMillis(5);
|
||||
private static final long MAX_BACKGROUND_TIME = TimeUnit.MINUTES.toMillis(5);
|
||||
|
||||
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
|
||||
|
||||
private final Application context;
|
||||
private final SignalServiceNetworkAccess networkAccess;
|
||||
private final List<Runnable> decryptionDrainedListeners;
|
||||
private final BroadcastReceiver connectionReceiver;
|
||||
private final Map<String, Long> keepAliveTokens;
|
||||
|
||||
private boolean appVisible;
|
||||
private long lastInteractionTime;
|
||||
|
||||
private volatile boolean networkDrained;
|
||||
private volatile boolean decryptionDrained;
|
||||
private volatile boolean terminated;
|
||||
|
||||
|
||||
public IncomingMessageObserver(@NonNull Application context) {
|
||||
if (INSTANCE_COUNT.incrementAndGet() != 1) {
|
||||
throw new AssertionError("Multiple observers!");
|
||||
}
|
||||
|
||||
this.context = context;
|
||||
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
|
||||
this.decryptionDrainedListeners = new CopyOnWriteArrayList<>();
|
||||
this.keepAliveTokens = new HashMap<>();
|
||||
this.lastInteractionTime = System.currentTimeMillis();
|
||||
|
||||
new MessageRetrievalThread().start();
|
||||
|
||||
|
@ -139,6 +141,13 @@ public class IncomingMessageObserver {
|
|||
return decryptionDrained;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if the websocket is active, otherwise false.
|
||||
*/
|
||||
public boolean isActive() {
|
||||
return isConnectionNecessary();
|
||||
}
|
||||
|
||||
public void notifyDecryptionsDrained() {
|
||||
List<Runnable> listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size());
|
||||
|
||||
|
@ -157,33 +166,42 @@ public class IncomingMessageObserver {
|
|||
|
||||
private synchronized void onAppForegrounded() {
|
||||
appVisible = true;
|
||||
context.startService(new Intent(context, BackgroundService.class));
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private synchronized void onAppBackgrounded() {
|
||||
appVisible = false;
|
||||
appVisible = false;
|
||||
lastInteractionTime = System.currentTimeMillis();
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private synchronized boolean isConnectionNecessary() {
|
||||
boolean registered = SignalStore.account().isRegistered();
|
||||
boolean fcmEnabled = SignalStore.account().isFcmEnabled();
|
||||
boolean hasNetwork = NetworkConstraint.isMet(context);
|
||||
boolean hasProxy = SignalStore.proxy().isProxyEnabled();
|
||||
boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced();
|
||||
long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS;
|
||||
boolean registered = SignalStore.account().isRegistered();
|
||||
boolean fcmEnabled = SignalStore.account().isFcmEnabled();
|
||||
boolean hasNetwork = NetworkConstraint.isMet(context);
|
||||
boolean hasProxy = SignalStore.proxy().isProxyEnabled();
|
||||
boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced();
|
||||
long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS;
|
||||
long timeIdle = appVisible ? 0 : System.currentTimeMillis() - lastInteractionTime;
|
||||
|
||||
boolean removedRequests = keepAliveTokens.entrySet().removeIf(e -> e.getValue() < oldRequest);
|
||||
if (removedRequests) {
|
||||
Log.d(TAG, "Removed old keep web socket open requests.");
|
||||
}
|
||||
|
||||
Log.d(TAG, String.format("Network: %s, Foreground: %s, FCM: %s, Stay open requests: [%s], Censored: %s, Registered: %s, Proxy: %s, Force websocket: %s",
|
||||
hasNetwork, appVisible, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), networkAccess.isCensored(), registered, hasProxy, forceWebsocket));
|
||||
String lastInteractionString = appVisible ? "N/A" : timeIdle + " ms (" + (timeIdle < MAX_BACKGROUND_TIME ? "within limit" : "over limit") + ")";
|
||||
|
||||
return registered &&
|
||||
(appVisible || !fcmEnabled || forceWebsocket || Util.hasItems(keepAliveTokens)) &&
|
||||
hasNetwork;
|
||||
boolean conclusion = registered &&
|
||||
(appVisible || timeIdle < MAX_BACKGROUND_TIME || !fcmEnabled || Util.hasItems(keepAliveTokens)) &&
|
||||
hasNetwork;
|
||||
|
||||
String needsConnectionString = conclusion ? "Needs Connection" : "Does Not Need Connection";
|
||||
|
||||
Log.d(TAG, String.format(Locale.US, "[" + needsConnectionString + "] Network: %s, Foreground: %s, Time Since Last Interaction: %s, FCM: %s, Stay open requests: [%s], Registered: %s, Proxy: %s, Force websocket: %s",
|
||||
hasNetwork, appVisible, lastInteractionString, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), registered, hasProxy, forceWebsocket));
|
||||
|
||||
return conclusion;
|
||||
}
|
||||
|
||||
private synchronized void waitForConnectionNecessary() {
|
||||
|
@ -212,11 +230,13 @@ public class IncomingMessageObserver {
|
|||
|
||||
public synchronized void registerKeepAliveToken(String key) {
|
||||
keepAliveTokens.put(key, System.currentTimeMillis());
|
||||
lastInteractionTime = System.currentTimeMillis();
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized void removeKeepAliveToken(String key) {
|
||||
keepAliveTokens.remove(key);
|
||||
lastInteractionTime = System.currentTimeMillis();
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
|
@ -270,6 +290,10 @@ public class IncomingMessageObserver {
|
|||
attempts = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (!appVisible) {
|
||||
BackgroundService.stop(context);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
attempts++;
|
||||
Log.w(TAG, e);
|
||||
|
@ -313,4 +337,38 @@ public class IncomingMessageObserver {
|
|||
return Service.START_STICKY;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A service that exists just to encourage the system to keep our process alive a little longer.
|
||||
*/
|
||||
public static class BackgroundService extends Service {
|
||||
|
||||
public static void start(Context context) {
|
||||
try {
|
||||
context.startService(new Intent(context, BackgroundService.class));
|
||||
} catch (Exception e) {
|
||||
Log.w(TAG, "Failed to start background service.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void stop(Context context) {
|
||||
context.stopService(new Intent(context, BackgroundService.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable IBinder onBind(Intent intent) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int onStartCommand(Intent intent, int flags, int startId) {
|
||||
Log.d(TAG, "Background service started.");
|
||||
return START_STICKY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDestroy() {
|
||||
Log.d(TAG, "Background service destroyed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,53 +20,14 @@ import java.util.Set;
|
|||
*/
|
||||
public abstract class MessageRetrievalStrategy {
|
||||
|
||||
private volatile boolean canceled;
|
||||
|
||||
/**
|
||||
* Fetches and processes any pending messages. This method should block until the messages are
|
||||
* actually stored and processed -- not just retrieved.
|
||||
*
|
||||
* @param timeout Hint for how long this will run. The strategy will also be canceled after the
|
||||
* timeout ends, but having the timeout available may be useful for setting things
|
||||
* like socket timeouts.
|
||||
*
|
||||
* @return True if everything was successful up until cancelation, false otherwise.
|
||||
*/
|
||||
@WorkerThread
|
||||
abstract boolean execute(long timeout);
|
||||
|
||||
/**
|
||||
* Marks the strategy as canceled. It is the responsibility of the implementation of
|
||||
* {@link #execute(long)} to check {@link #isCanceled()} to know if execution should stop.
|
||||
*/
|
||||
void cancel() {
|
||||
this.canceled = true;
|
||||
}
|
||||
|
||||
protected boolean isCanceled() {
|
||||
return canceled;
|
||||
}
|
||||
|
||||
protected static void blockUntilQueueDrained(@NonNull String tag, @NonNull String queue, long timeoutMs) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
final JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
final MarkerJob markerJob = new MarkerJob(queue);
|
||||
|
||||
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
|
||||
|
||||
if (!jobState.isPresent()) {
|
||||
Log.w(tag, "Timed out waiting for " + queue + " job(s) to finish!");
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
|
||||
Log.d(tag, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
|
||||
}
|
||||
|
||||
protected static String timeSuffix(long startTime) {
|
||||
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
|
||||
}
|
||||
abstract boolean execute();
|
||||
|
||||
protected static class QueueFindingJobListener implements JobTracker.JobListener {
|
||||
private final Set<String> queues = new HashSet<>();
|
||||
|
|
|
@ -1,127 +0,0 @@
|
|||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker;
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.stories.Stories;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
|
||||
import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Retrieves messages over the REST endpoint.
|
||||
*/
|
||||
public class RestStrategy extends MessageRetrievalStrategy {
|
||||
|
||||
private static final String TAG = Log.tag(RestStrategy.class);
|
||||
|
||||
@WorkerThread
|
||||
@Override
|
||||
public boolean execute(long timeout) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
QueueFindingJobListener queueListener = new QueueFindingJobListener();
|
||||
|
||||
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
|
||||
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
|
||||
|
||||
int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout);
|
||||
|
||||
if (jobCount == 0) {
|
||||
Log.d(TAG, "No PushDecryptMessageJobs were enqueued.");
|
||||
return true;
|
||||
} else {
|
||||
Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued.");
|
||||
}
|
||||
|
||||
long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
|
||||
Set<String> processQueues = queueListener.getQueues();
|
||||
|
||||
Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
|
||||
|
||||
if (timeRemainingMs > 0) {
|
||||
Iterator<String> iter = processQueues.iterator();
|
||||
|
||||
while (iter.hasNext() && timeRemainingMs > 0) {
|
||||
timeRemainingMs = blockUntilQueueDrained(iter.next(), timeRemainingMs);
|
||||
}
|
||||
|
||||
if (timeRemainingMs <= 0) {
|
||||
Log.w(TAG, "Ran out of time while waiting for queues to drain.");
|
||||
}
|
||||
} else {
|
||||
Log.w(TAG, "Ran out of time before we could even wait on individual queues!");
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e);
|
||||
ApplicationDependencies.resetSignalServiceMessageReceiver();
|
||||
if (e instanceof AuthorizationFailedException && SignalStore.account().isRegistered() && SignalStore.account().getAci() != null) {
|
||||
TextSecurePreferences.setUnauthorizedReceived(ApplicationDependencies.getApplication(), true);
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
jobManager.removeListener(queueListener);
|
||||
}
|
||||
}
|
||||
|
||||
private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
|
||||
AtomicInteger jobCount = new AtomicInteger(0);
|
||||
|
||||
receiver.setSoTimeoutMillis(timeout);
|
||||
|
||||
receiver.retrieveMessages(Stories.isFeatureEnabled(), envelope -> {
|
||||
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
|
||||
String jobId = processor.processEnvelope(envelope);
|
||||
|
||||
if (jobId != null) {
|
||||
jobCount.incrementAndGet();
|
||||
}
|
||||
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
|
||||
});
|
||||
|
||||
return jobCount.get();
|
||||
}
|
||||
|
||||
private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
final JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
final MarkerJob markerJob = new MarkerJob(queue);
|
||||
|
||||
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
|
||||
|
||||
if (!jobState.isPresent()) {
|
||||
Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
|
||||
Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
|
||||
return timeoutMs - duration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull String toString() {
|
||||
return Log.tag(RestStrategy.class);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import org.signal.core.util.Stopwatch;
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker;
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Retrieves messages over the websocket.
|
||||
*/
|
||||
public class WebSocketStrategy extends MessageRetrievalStrategy {
|
||||
|
||||
private static final String TAG = Log.tag(WebSocketStrategy.class);
|
||||
|
||||
private static final String KEEP_ALIVE_TOKEN = "WebsocketStrategy";
|
||||
private static final long QUEUE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
|
||||
|
||||
@WorkerThread
|
||||
@Override
|
||||
public boolean execute() {
|
||||
Stopwatch stopwatch = new Stopwatch("websocket-strategy");
|
||||
IncomingMessageObserver observer = ApplicationDependencies.getIncomingMessageObserver();
|
||||
|
||||
observer.registerKeepAliveToken(KEEP_ALIVE_TOKEN);
|
||||
try {
|
||||
JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
QueueFindingJobListener queueListener = new QueueFindingJobListener();
|
||||
|
||||
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
|
||||
|
||||
blockUntilWebsocketDrained(observer);
|
||||
stopwatch.split("decryptions-drained");
|
||||
|
||||
Set<String> processQueues = queueListener.getQueues();
|
||||
Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
|
||||
|
||||
for (String queue : processQueues) {
|
||||
blockUntilJobQueueDrained(queue, QUEUE_TIMEOUT);
|
||||
}
|
||||
|
||||
stopwatch.split("process-drained");
|
||||
stopwatch.stop(TAG);
|
||||
|
||||
return true;
|
||||
} finally {
|
||||
ApplicationDependencies.getIncomingMessageObserver().removeKeepAliveToken(KEEP_ALIVE_TOKEN);
|
||||
}
|
||||
}
|
||||
|
||||
private static void blockUntilWebsocketDrained(IncomingMessageObserver observer) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
observer.addDecryptionDrainedListener(new Runnable() {
|
||||
@Override public void run() {
|
||||
latch.countDown();
|
||||
observer.removeDecryptionDrainedListener(this);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
if (!latch.await(1, TimeUnit.MINUTES)) {
|
||||
Log.w(TAG, "Hit timeout while waiting for decryptions to drain!");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Log.w(TAG, "Interrupted!", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void blockUntilJobQueueDrained(@NonNull String queue, long timeoutMs) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
final JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
final MarkerJob markerJob = new MarkerJob(queue);
|
||||
|
||||
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
|
||||
|
||||
if (!jobState.isPresent()) {
|
||||
Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
|
||||
Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull String toString() {
|
||||
return Log.tag(WebSocketStrategy.class);
|
||||
}
|
||||
}
|
|
@ -1,90 +0,0 @@
|
|||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
|
||||
import org.whispersystems.signalservice.api.SignalWebSocket;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
class WebsocketStrategy extends MessageRetrievalStrategy {
|
||||
|
||||
private static final String TAG = Log.tag(WebsocketStrategy.class);
|
||||
|
||||
private final SignalWebSocket signalWebSocket;
|
||||
private final JobManager jobManager;
|
||||
|
||||
public WebsocketStrategy() {
|
||||
this.signalWebSocket = ApplicationDependencies.getSignalWebSocket();
|
||||
this.jobManager = ApplicationDependencies.getJobManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(long timeout) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
Set<String> processJobQueues = drainWebsocket(timeout, startTime);
|
||||
Iterator<String> queueIterator = processJobQueues.iterator();
|
||||
long timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
|
||||
|
||||
while (!isCanceled() && queueIterator.hasNext() && timeRemaining > 0) {
|
||||
String queue = queueIterator.next();
|
||||
|
||||
blockUntilQueueDrained(TAG, queue, timeRemaining);
|
||||
|
||||
timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
Log.w(TAG, "Encountered an exception while draining the websocket.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private @NonNull Set<String> drainWebsocket(long timeout, long startTime) throws IOException {
|
||||
QueueFindingJobListener queueListener = new QueueFindingJobListener();
|
||||
|
||||
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
|
||||
|
||||
try {
|
||||
signalWebSocket.connect();
|
||||
while (shouldContinue()) {
|
||||
try {
|
||||
Optional<SignalServiceEnvelope> result = signalWebSocket.readOrEmpty(timeout, envelope -> {
|
||||
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime));
|
||||
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
|
||||
processor.processEnvelope(envelope);
|
||||
}
|
||||
});
|
||||
|
||||
if (!result.isPresent()) {
|
||||
Log.i(TAG, "Hit an empty response. Finished." + timeSuffix(startTime));
|
||||
break;
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
Log.w(TAG, "Websocket timeout." + timeSuffix(startTime));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
signalWebSocket.disconnect();
|
||||
jobManager.removeListener(queueListener);
|
||||
}
|
||||
|
||||
return queueListener.getQueues();
|
||||
}
|
||||
|
||||
|
||||
private boolean shouldContinue() {
|
||||
return !isCanceled();
|
||||
}
|
||||
}
|
Ładowanie…
Reference in New Issue