diff --git a/app/src/main/java/org/thoughtcrime/securesms/contacts/ContactChipViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/contacts/ContactChipViewModel.kt index 33e9b20b5..23c7ef21c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/contacts/ContactChipViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/contacts/ContactChipViewModel.kt @@ -42,7 +42,7 @@ class ContactChipViewModel : ViewModel() { disposables += getOrCreateRecipientId(selectedContact).map { Recipient.resolved(it) }.observeOn(Schedulers.io()).subscribe { recipient -> store.update { it + SelectedContacts.Model(selectedContact, recipient) } disposableMap[recipient.id]?.dispose() - disposableMap[recipient.id] = store.update(recipient.live().asObservable().toFlowable(BackpressureStrategy.LATEST)) { changedRecipient, state -> + disposableMap[recipient.id] = store.update(recipient.live().observable().toFlowable(BackpressureStrategy.LATEST)) { changedRecipient, state -> val index = state.indexOfFirst { it.selectedContact.matches(selectedContact) } when { index == 0 -> { diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationParentFragment.java b/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationParentFragment.java index c3589a02d..46fb4ae40 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationParentFragment.java +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationParentFragment.java @@ -285,7 +285,6 @@ import org.thoughtcrime.securesms.util.ContextUtil; import org.thoughtcrime.securesms.util.ConversationUtil; import org.thoughtcrime.securesms.util.Debouncer; import org.thoughtcrime.securesms.util.DrawableUtil; -import org.thoughtcrime.securesms.util.FeatureFlags; import org.thoughtcrime.securesms.util.FullscreenHelper; import org.thoughtcrime.securesms.util.IdentityUtil; import org.thoughtcrime.securesms.util.LifecycleDisposable; @@ -2286,7 +2285,12 @@ public class ConversationParentFragment extends Fragment Log.i(TAG, "[initializeResources] Recipient: " + recipient.getId() + ", Thread: " + threadId); - recipient.observe(getViewLifecycleOwner(), this::onRecipientChanged); + disposables.add( + recipient + .observable() + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(this::onRecipientChanged) + ); } private void initializeLinkPreviewObserver() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationViewModel.java b/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationViewModel.java index 6c1ce3558..0aafb0dd7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationViewModel.java +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/ConversationViewModel.java @@ -31,7 +31,6 @@ import org.thoughtcrime.securesms.database.DatabaseObserver; import org.thoughtcrime.securesms.database.model.MessageId; import org.thoughtcrime.securesms.database.model.StoryViewState; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.mediasend.Media; import org.thoughtcrime.securesms.mediasend.MediaRepository; import org.thoughtcrime.securesms.notifications.profiles.NotificationProfile; @@ -207,7 +206,7 @@ public class ConversationViewModel extends ViewModel { .observeOn(Schedulers.io()) .switchMap(scheduledMessagesRepository::getScheduledMessageCount); - Observable liveRecipient = recipientId.distinctUntilChanged().switchMap(id -> Recipient.live(id).asObservable()); + Observable liveRecipient = recipientId.distinctUntilChanged().switchMap(id -> Recipient.live(id).observable()); canShowAsBubble = threadId.observeOn(Schedulers.io()).map(conversationRepository::canShowAsBubble); wallpaper = liveRecipient.map(r -> Optional.ofNullable(r.getWallpaper())).distinctUntilChanged(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java b/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java index 35e40eb0e..6e37a56c0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java +++ b/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java @@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.subjects.BehaviorSubject; public final class LiveRecipient { @@ -41,15 +42,17 @@ public final class LiveRecipient { private final LiveData observableLiveDataResolved; private final Set observers; private final Observer foreverObserver; - private final AtomicReference recipient; - private final RecipientTable recipientTable; - private final GroupTable groupDatabase; - private final DistributionListTables distributionListTables; - private final MutableLiveData refreshForceNotify; + private final AtomicReference recipient; + private final RecipientTable recipientTable; + private final GroupTable groupDatabase; + private final DistributionListTables distributionListTables; + private final MutableLiveData refreshForceNotify; + private final BehaviorSubject subject; LiveRecipient(@NonNull Context context, @NonNull Recipient defaultRecipient) { this.context = context.getApplicationContext(); this.liveData = new MutableLiveData<>(defaultRecipient); + this.subject = BehaviorSubject.createDefault(defaultRecipient); this.recipient = new AtomicReference<>(defaultRecipient); this.recipientTable = SignalDatabase.recipients(); this.groupDatabase = SignalDatabase.groups(); @@ -80,6 +83,13 @@ public final class LiveRecipient { return recipient.get(); } + /** + * @return An rx-flavored {@link Observable}. + */ + public @NonNull Observable observable() { + return subject.distinctUntilChanged(Recipient::hasSameContent); + } + /** * Watch the recipient for changes. The callback will only be invoked if the provided lifecycle is * in a valid state. No need to remove the observer. If you do wish to remove the observer (if, @@ -97,19 +107,6 @@ public final class LiveRecipient { ThreadUtil.runOnMain(() -> observableLiveData.removeObservers(owner)); } - public Observable asObservable() { - return Observable.create(emitter -> { - Recipient current = recipient.get(); - if (current != null && current.getId() != RecipientId.UNKNOWN) { - emitter.onNext(current); - } - - RecipientForeverObserver foreverObserver = emitter::onNext; - observeForever(foreverObserver); - emitter.setCancellable(() -> removeForeverObserver(foreverObserver)); - }); - } - /** * Watch the recipient for changes. The callback could be invoked at any time. You MUST call * {@link #removeForeverObserver(RecipientForeverObserver)} when finished. You should use @@ -243,6 +240,7 @@ public final class LiveRecipient { synchronized void set(@NonNull Recipient recipient) { this.recipient.set(recipient); this.liveData.postValue(recipient); + this.subject.onNext(recipient); } @Override