package org.thoughtcrime.securesms.conversation; import android.app.Application; import androidx.annotation.MainThread; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.lifecycle.LiveData; import androidx.lifecycle.LiveDataReactiveStreams; import androidx.lifecycle.MutableLiveData; import androidx.lifecycle.Observer; import androidx.lifecycle.Transformations; import androidx.lifecycle.ViewModel; import androidx.lifecycle.ViewModelProvider; import com.annimon.stream.Stream; import org.greenrobot.eventbus.EventBus; import org.greenrobot.eventbus.Subscribe; import org.greenrobot.eventbus.ThreadMode; import org.signal.core.util.MapUtil; import org.signal.core.util.logging.Log; import org.signal.paging.ObservablePagedData; import org.signal.paging.PagedData; import org.signal.paging.PagingConfig; import org.signal.paging.PagingController; import org.signal.paging.ProxyPagingController; import org.signal.libsignal.protocol.util.Pair; import org.thoughtcrime.securesms.components.settings.app.notifications.profiles.NotificationProfilesRepository; import org.thoughtcrime.securesms.conversation.colors.ChatColors; import org.thoughtcrime.securesms.conversation.colors.ChatColorsPalette; import org.thoughtcrime.securesms.conversation.colors.NameColor; import org.thoughtcrime.securesms.database.DatabaseObserver; import org.thoughtcrime.securesms.database.GroupDatabase; import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.database.model.MessageId; import org.thoughtcrime.securesms.database.model.StoryViewState; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.groups.GroupId; import org.thoughtcrime.securesms.mediasend.Media; import org.thoughtcrime.securesms.mediasend.MediaRepository; import org.thoughtcrime.securesms.notifications.profiles.NotificationProfile; import org.thoughtcrime.securesms.notifications.profiles.NotificationProfiles; import org.thoughtcrime.securesms.ratelimit.RecaptchaRequiredEvent; import org.thoughtcrime.securesms.recipients.Recipient; import org.thoughtcrime.securesms.recipients.RecipientId; import org.thoughtcrime.securesms.util.SignalLocalMetrics; import org.thoughtcrime.securesms.util.SingleLiveEvent; import org.thoughtcrime.securesms.util.Util; import org.thoughtcrime.securesms.util.ViewUtil; import org.thoughtcrime.securesms.util.livedata.LiveDataUtil; import org.thoughtcrime.securesms.util.livedata.Store; import org.thoughtcrime.securesms.wallpaper.ChatWallpaper; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers; import io.reactivex.rxjava3.core.BackpressureStrategy; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.CompositeDisposable; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.BehaviorSubject; public class ConversationViewModel extends ViewModel { private static final String TAG = Log.tag(ConversationViewModel.class); private final Application context; private final MediaRepository mediaRepository; private final ConversationRepository conversationRepository; private final MutableLiveData> recentMedia; private final BehaviorSubject threadId; private final Observable messageData; private final MutableLiveData showScrollButtons; private final MutableLiveData hasUnreadMentions; private final Observable canShowAsBubble; private final ProxyPagingController pagingController; private final DatabaseObserver.Observer conversationObserver; private final DatabaseObserver.MessageObserver messageUpdateObserver; private final DatabaseObserver.MessageObserver messageInsertObserver; private final BehaviorSubject recipientId; private final Observable> wallpaper; private final SingleLiveEvent events; private final Observable chatColors; private final MutableLiveData toolbarBottom; private final MutableLiveData inlinePlayerHeight; private final LiveData conversationTopMargin; private final Store threadAnimationStateStore; private final Observer threadAnimationStateStoreDriver; private final NotificationProfilesRepository notificationProfilesRepository; private final MutableLiveData searchQuery; private final Map> sessionMemberCache = new HashMap<>(); private ConversationIntents.Args args; private int jumpToPosition; private ConversationViewModel() { this.context = ApplicationDependencies.getApplication(); this.mediaRepository = new MediaRepository(); this.conversationRepository = new ConversationRepository(); this.recentMedia = new MutableLiveData<>(); this.showScrollButtons = new MutableLiveData<>(false); this.hasUnreadMentions = new MutableLiveData<>(false); this.events = new SingleLiveEvent<>(); this.pagingController = new ProxyPagingController<>(); this.conversationObserver = pagingController::onDataInvalidated; this.messageUpdateObserver = pagingController::onDataItemChanged; this.messageInsertObserver = messageId -> pagingController.onDataItemInserted(messageId, 0); this.toolbarBottom = new MutableLiveData<>(); this.inlinePlayerHeight = new MutableLiveData<>(); this.conversationTopMargin = Transformations.distinctUntilChanged(LiveDataUtil.combineLatest(toolbarBottom, inlinePlayerHeight, Integer::sum)); this.threadAnimationStateStore = new Store<>(new ThreadAnimationState(-1L, null, false)); this.notificationProfilesRepository = new NotificationProfilesRepository(); this.searchQuery = new MutableLiveData<>(); this.recipientId = BehaviorSubject.create(); this.threadId = BehaviorSubject.create(); BehaviorSubject recipientCache = BehaviorSubject.create(); recipientId .observeOn(Schedulers.io()) .distinctUntilChanged() .map(Recipient::resolved) .subscribe(recipientCache); BehaviorSubject conversationMetadata = BehaviorSubject.create(); Observable.combineLatest(threadId, recipientCache, Pair::new) .observeOn(Schedulers.io()) .distinctUntilChanged() .map(threadIdAndRecipient -> { SignalLocalMetrics.ConversationOpen.onMetadataLoadStarted(); ConversationData conversationData = conversationRepository.getConversationData(threadIdAndRecipient.first(), threadIdAndRecipient.second(), jumpToPosition); SignalLocalMetrics.ConversationOpen.onMetadataLoaded(); jumpToPosition = -1; return conversationData; }) .subscribe(conversationMetadata); ApplicationDependencies.getDatabaseObserver().registerMessageUpdateObserver(messageUpdateObserver); messageData = conversationMetadata .observeOn(Schedulers.io()) .switchMap(data -> { int startPosition; ConversationData.MessageRequestData messageRequestData = data.getMessageRequestData(); if (data.shouldJumpToMessage()) { startPosition = data.getJumpToPosition(); } else if (messageRequestData.isMessageRequestAccepted() && data.shouldScrollToLastSeen()) { startPosition = data.getLastSeenPosition(); } else if (messageRequestData.isMessageRequestAccepted()) { startPosition = data.getLastScrolledPosition(); } else { startPosition = data.getThreadSize(); } ApplicationDependencies.getDatabaseObserver().unregisterObserver(conversationObserver); ApplicationDependencies.getDatabaseObserver().unregisterObserver(messageInsertObserver); ApplicationDependencies.getDatabaseObserver().registerConversationObserver(data.getThreadId(), conversationObserver); ApplicationDependencies.getDatabaseObserver().registerMessageInsertObserver(data.getThreadId(), messageInsertObserver); ConversationDataSource dataSource = new ConversationDataSource(context, data.getThreadId(), messageRequestData, data.showUniversalExpireTimerMessage(), data.getThreadSize()); PagingConfig config = new PagingConfig.Builder().setPageSize(25) .setBufferPages(2) .setStartIndex(Math.max(startPosition, 0)) .build(); Log.d(TAG, "Starting at position: " + startPosition + " || jumpToPosition: " + data.getJumpToPosition() + ", lastSeenPosition: " + data.getLastSeenPosition() + ", lastScrolledPosition: " + data.getLastScrolledPosition()); ObservablePagedData pagedData = PagedData.createForObservable(dataSource, config); pagingController.set(pagedData.getController()); return pagedData.getData(); }) .observeOn(Schedulers.io()) .withLatestFrom(conversationMetadata, (messages, metadata) -> new MessageData(metadata, messages)) .doOnNext(a -> SignalLocalMetrics.ConversationOpen.onDataLoaded()); Observable liveRecipient = recipientId.distinctUntilChanged().switchMap(id -> Recipient.live(id).asObservable()); canShowAsBubble = threadId.observeOn(Schedulers.io()).map(conversationRepository::canShowAsBubble); wallpaper = liveRecipient.map(r -> Optional.ofNullable(r.getWallpaper())).distinctUntilChanged(); chatColors = liveRecipient.map(Recipient::getChatColors).distinctUntilChanged(); threadAnimationStateStore.update(threadId, (id, state) -> { if (state.getThreadId() == id) { return state; } else { return new ThreadAnimationState(id, null, false); } }); threadAnimationStateStore.update(conversationMetadata, (m, state) -> { if (state.getThreadId() == m.getThreadId()) { return state.copy(state.getThreadId(), m, state.getHasCommittedNonEmptyMessageList()); } else { return state.copy(m.getThreadId(), m, false); } }); this.threadAnimationStateStoreDriver = state -> {}; threadAnimationStateStore.getStateLiveData().observeForever(threadAnimationStateStoreDriver); EventBus.getDefault().register(this); } Observable getStoryViewState() { return recipientId .subscribeOn(Schedulers.io()) .switchMap(StoryViewState::getForRecipientId) .distinctUntilChanged() .observeOn(AndroidSchedulers.mainThread()); } void onMessagesCommitted(@NonNull List conversationMessages) { if (Util.hasItems(conversationMessages)) { threadAnimationStateStore.update(state -> { long threadId = conversationMessages.stream() .filter(Objects::nonNull) .findFirst() .map(c -> c.getMessageRecord().getThreadId()) .orElse(-2L); if (state.getThreadId() == threadId) { return state.copy(state.getThreadId(), state.getThreadMetadata(), true); } else { return state; } }); } } boolean shouldPlayMessageAnimations() { return threadAnimationStateStore.getState().shouldPlayMessageAnimations(); } void setToolbarBottom(int bottom) { toolbarBottom.setValue(bottom); } void setInlinePlayerVisible(boolean isVisible) { inlinePlayerHeight.setValue(isVisible ? ViewUtil.dpToPx(36) : 0); } void onAttachmentKeyboardOpen() { mediaRepository.getMediaInBucket(context, Media.ALL_MEDIA_BUCKET_ID, recentMedia::postValue); } @MainThread void onConversationDataAvailable(@NonNull RecipientId recipientId, long threadId, int startingPosition) { Log.d(TAG, "[onConversationDataAvailable] recipientId: " + recipientId + ", threadId: " + threadId + ", startingPosition: " + startingPosition); this.jumpToPosition = startingPosition; this.threadId.onNext(threadId); this.recipientId.onNext(recipientId); } void clearThreadId() { this.jumpToPosition = -1; this.threadId.onNext(-1L); } void setSearchQuery(@Nullable String query) { searchQuery.setValue(query); } @NonNull LiveData getSearchQuery() { return searchQuery; } @NonNull LiveData getConversationTopMargin() { return conversationTopMargin; } @NonNull Observable canShowAsBubble() { return canShowAsBubble .observeOn(AndroidSchedulers.mainThread()); } @NonNull LiveData getShowScrollToBottom() { return Transformations.distinctUntilChanged(showScrollButtons); } @NonNull LiveData getShowMentionsButton() { return Transformations.distinctUntilChanged(LiveDataUtil.combineLatest(showScrollButtons, hasUnreadMentions, (a, b) -> a && b)); } @NonNull Observable> getWallpaper() { return wallpaper .observeOn(AndroidSchedulers.mainThread()); } @NonNull LiveData getEvents() { return events; } @NonNull Observable getChatColors() { return chatColors .observeOn(AndroidSchedulers.mainThread()); } void setHasUnreadMentions(boolean hasUnreadMentions) { this.hasUnreadMentions.setValue(hasUnreadMentions); } boolean getShowScrollButtons() { return this.showScrollButtons.getValue(); } void setShowScrollButtons(boolean showScrollButtons) { this.showScrollButtons.setValue(showScrollButtons); } @NonNull LiveData> getRecentMedia() { return recentMedia; } @NonNull Observable getMessageData() { return messageData .observeOn(AndroidSchedulers.mainThread()); } @NonNull PagingController getPagingController() { return pagingController; } @NonNull Observable> getNameColorsMap() { return recipientId .observeOn(Schedulers.io()) .distinctUntilChanged() .map(Recipient::resolved) .map(Recipient::getGroupId) .map(groupId -> { if (groupId.isPresent()) { List fullMembers = SignalDatabase.groups().getGroupMembers(groupId.get(), GroupDatabase.MemberSet.FULL_MEMBERS_INCLUDING_SELF); Set cachedMembers = MapUtil.getOrDefault(sessionMemberCache, groupId.get(), new HashSet<>()); cachedMembers.addAll(fullMembers); sessionMemberCache.put(groupId.get(), cachedMembers); return cachedMembers; } else { return Collections.emptySet(); } }) .map(members -> { List sorted = Stream.of(members) .filter(member -> !Objects.equals(member, Recipient.self())) .sortBy(Recipient::requireStringId) .toList(); List names = ChatColorsPalette.Names.getAll(); Map colors = new HashMap<>(); for (int i = 0; i < sorted.size(); i++) { colors.put(sorted.get(i).getId(), names.get(i % names.size())); } return colors; }) .observeOn(AndroidSchedulers.mainThread()); } @NonNull LiveData> getActiveNotificationProfile() { final Observable> activeProfile = Observable.combineLatest(Observable.interval(0, 30, TimeUnit.SECONDS), notificationProfilesRepository.getProfiles(), (interval, profiles) -> profiles) .map(profiles -> Optional.ofNullable(NotificationProfiles.getActiveProfile(profiles))); return LiveDataReactiveStreams.fromPublisher(activeProfile.toFlowable(BackpressureStrategy.LATEST)); } void setArgs(@NonNull ConversationIntents.Args args) { this.args = args; } @NonNull ConversationIntents.Args getArgs() { return Objects.requireNonNull(args); } @Subscribe(threadMode = ThreadMode.POSTING) public void onRecaptchaRequiredEvent(@NonNull RecaptchaRequiredEvent event) { events.postValue(Event.SHOW_RECAPTCHA); } @Override protected void onCleared() { super.onCleared(); threadAnimationStateStore.getStateLiveData().removeObserver(threadAnimationStateStoreDriver); ApplicationDependencies.getDatabaseObserver().unregisterObserver(conversationObserver); ApplicationDependencies.getDatabaseObserver().unregisterObserver(messageUpdateObserver); ApplicationDependencies.getDatabaseObserver().unregisterObserver(messageInsertObserver); EventBus.getDefault().unregister(this); } enum Event { SHOW_RECAPTCHA } static class MessageData { private final List messages; private final ConversationData metadata; MessageData(@NonNull ConversationData metadata, @NonNull List messages) { this.metadata = metadata; this.messages = messages; } public @NonNull List getMessages() { return messages; } public @NonNull ConversationData getMetadata() { return metadata; } } static class Factory extends ViewModelProvider.NewInstanceFactory { @Override public @NonNull T create(@NonNull Class modelClass) { //noinspection ConstantConditions return modelClass.cast(new ConversationViewModel()); } } }