From 9f4d8ac12c29774075b78dd8a56adc612717ac92 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Wed, 27 Jul 2022 12:48:51 -0400 Subject: [PATCH] Fix contact discovery refresh crash. --- .../sync/ContactDiscoveryRefreshV1.java | 43 ++++++++++++------- core-util/build.gradle | 1 + .../core/util/concurrent/RxExtensions.kt | 26 +++++++++++ 3 files changed, 54 insertions(+), 16 deletions(-) create mode 100644 core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/ContactDiscoveryRefreshV1.java b/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/ContactDiscoveryRefreshV1.java index 99dcea461..867842f15 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/ContactDiscoveryRefreshV1.java +++ b/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/ContactDiscoveryRefreshV1.java @@ -9,6 +9,7 @@ import com.annimon.stream.Collectors; import com.annimon.stream.Stream; import org.signal.contacts.SystemContactsRepository; +import org.signal.core.util.concurrent.RxExtensions; import org.signal.core.util.logging.Log; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.util.Pair; @@ -190,23 +191,33 @@ class ContactDiscoveryRefreshV1 { .onErrorReturn(t -> new Pair<>(r, ServiceResponse.forUnknownError(t)))) .toList(); - return Observable.mergeDelayError(requests) - .observeOn(Schedulers.io(), true) - .scan(new UnlistedResult.Builder(), (builder, pair) -> { - Recipient recipient = pair.first(); - ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second()); - if (processor.hasResult()) { - builder.potentiallyActiveIds.add(recipient.getId()); - } else if (processor.genericIoError() || !processor.notFound()) { - builder.retries.add(recipient.getId()); - builder.potentiallyActiveIds.add(recipient.getId()); - } + try { + return RxExtensions.safeBlockingGet( + Observable.mergeDelayError(requests) + .observeOn(Schedulers.io(), true) + .scan(new UnlistedResult.Builder(), (builder, pair) -> { + Recipient recipient = pair.first(); + ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second()); + if (processor.hasResult()) { + builder.potentiallyActiveIds.add(recipient.getId()); + } else if (processor.genericIoError() || !processor.notFound()) { + builder.retries.add(recipient.getId()); + builder.potentiallyActiveIds.add(recipient.getId()); + } - return builder; - }) - .lastOrError() - .map(UnlistedResult.Builder::build) - .blockingGet(); + return builder; + }) + .lastOrError() + .map(UnlistedResult.Builder::build) + ); + } catch (InterruptedException e) { + Log.i(TAG, "Filter for unlisted profile fetches interrupted, fetch via job instead"); + UnlistedResult.Builder builder = new UnlistedResult.Builder(); + for (Recipient recipient : possiblyUnlisted) { + builder.retries.add(recipient.getId()); + } + return builder.build(); + } } private static boolean hasCommunicatedWith(@NonNull Recipient recipient) { diff --git a/core-util/build.gradle b/core-util/build.gradle index 4740d0383..a6232e51c 100644 --- a/core-util/build.gradle +++ b/core-util/build.gradle @@ -50,6 +50,7 @@ dependencies { implementation libs.androidx.core.ktx implementation libs.google.protobuf.javalite implementation libs.androidx.sqlite + implementation libs.rxjava3.rxjava testImplementation testLibs.junit.junit testImplementation testLibs.mockito.core diff --git a/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt b/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt new file mode 100644 index 000000000..c2536d293 --- /dev/null +++ b/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt @@ -0,0 +1,26 @@ +@file:JvmName("RxExtensions") + +package org.signal.core.util.concurrent + +import io.reactivex.rxjava3.core.Single +import java.lang.RuntimeException + +/** + * Throw an [InterruptedException] if a [Single.blockingGet] call is interrupted. This can + * happen when being called by code already within an Rx chain that is disposed. + * + * [Single.blockingGet] is considered harmful and should not be used. + */ +@Throws(InterruptedException::class) +fun Single.safeBlockingGet(): T { + try { + return blockingGet() + } catch (e: RuntimeException) { + val cause = e.cause + if (cause is InterruptedException) { + throw cause + } else { + throw e + } + } +}