Continues the migration from LiveData to Flow

pull/823/head
Vitor Pamplona 2024-04-04 08:56:15 -04:00
rodzic 2509d639bd
commit 7475143506
2 zmienionych plików z 100 dodań i 135 usunięć

Wyświetl plik

@ -108,11 +108,15 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combineTransform
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.transformLatest
import kotlinx.coroutines.launch
@ -216,179 +220,127 @@ class Account(
val communities: ImmutableSet<String> = persistentSetOf(),
)
class ListNameNotePair(val listName: String, val event: GeneralListEvent?)
@OptIn(ExperimentalCoroutinesApi::class)
val liveKind3Follows: StateFlow<LiveFollowLists> by lazy {
userProfile()
.live()
.follows
.asFlow()
.transformLatest {
emit(
LiveFollowLists(
userProfile().cachedFollowingKeySet().toImmutableSet(),
userProfile().cachedFollowingTagSet().toImmutableSet(),
userProfile().cachedFollowingGeohashSet().toImmutableSet(),
userProfile().cachedFollowingCommunitiesSet().toImmutableSet(),
),
)
}
.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
val liveKind3FollowsFlow: Flow<LiveFollowLists> =
userProfile().flow().follows.stateFlow.transformLatest {
emit(
LiveFollowLists(
it.user.cachedFollowingKeySet().toImmutableSet(),
it.user.cachedFollowingTagSet().toImmutableSet(),
it.user.cachedFollowingGeohashSet().toImmutableSet(),
it.user.cachedFollowingCommunitiesSet().toImmutableSet(),
),
)
}
val liveKind3Follows = liveKind3FollowsFlow.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
@OptIn(ExperimentalCoroutinesApi::class)
private val liveHomeList: Flow<ListNameNotePair> by lazy {
defaultHomeFollowList.flatMapLatest { listName ->
println("AABBCC liveHomeList $listName changed")
loadPeopleListFlowFromListName(listName)
}.transformLatest {
println("AABBCC liveHomeList after FlattenMerge")
emit(it)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
private val liveHomeList: StateFlow<NoteState?> by lazy {
defaultHomeFollowList
.transformLatest {
if (it != GLOBAL_FOLLOWS && it != KIND3_FOLLOWS) {
LocalCache.checkGetOrCreateAddressableNote(it)?.flow()?.metadata?.stateFlow?.let {
emit(it)
}
fun loadPeopleListFlowFromListName(listName: String): Flow<ListNameNotePair> {
return if (listName != GLOBAL_FOLLOWS && listName != KIND3_FOLLOWS) {
val note = LocalCache.checkGetOrCreateAddressableNote(listName)
println("AABBCC loadPeopleListFlowFromListName $listName ${note?.idHex} ${note?.flow()?.metadata?.stateFlow?.subscriptionCount?.value}")
note?.flow()?.metadata?.stateFlow?.mapLatest {
println("AABBCC loadPeopleListFlowFromListName running")
val noteEvent = it.note.event as? GeneralListEvent
println("AABBCC loadPeopleListFlowFromListName emitting ${noteEvent?.id}")
ListNameNotePair(listName, noteEvent)
} ?: MutableStateFlow(ListNameNotePair(listName, null))
} else {
MutableStateFlow(ListNameNotePair(listName, null))
}
}
fun combinePeopleListFlows(
kind3FollowsSource: Flow<LiveFollowLists>,
peopleListFollowsSource: Flow<ListNameNotePair>,
): Flow<LiveFollowLists?> {
return combineTransform(kind3FollowsSource, peopleListFollowsSource) { kind3Follows, peopleListFollows ->
if (peopleListFollows.listName == GLOBAL_FOLLOWS) {
println("AABBCC combinePeopleListFlows ${peopleListFollows.listName} Global")
emit(null)
} else if (peopleListFollows.listName == KIND3_FOLLOWS) {
println("AABBCC combinePeopleListFlows ${peopleListFollows.listName} Kind 3")
emit(kind3Follows)
} else if (peopleListFollows.event == null) {
println("AABBCC combinePeopleListFlows ${peopleListFollows.listName} Note is null")
emit(LiveFollowLists())
} else {
println("AABBCC combinePeopleListFlows ${peopleListFollows.listName} ${peopleListFollows.event.id}")
val result = waitToDecrypt(peopleListFollows.event)
if (result == null) {
println("AABBCC combinePeopleListFlows ${peopleListFollows.listName} returning null")
emit(LiveFollowLists())
} else {
emit(result)
}
}
.flattenMerge()
.stateIn(scope, SharingStarted.Eagerly, null)
}
}
val liveHomeFollowLists: StateFlow<LiveFollowLists?> by lazy {
combineTransform(defaultHomeFollowList, liveKind3Follows, liveHomeList) {
listName,
kind3Follows,
peopleListFollows,
->
if (listName == GLOBAL_FOLLOWS) {
emit(null)
} else if (listName == KIND3_FOLLOWS) {
emit(kind3Follows)
} else {
val result =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) { continuation.resume(it) }
}
}
result?.let { emit(it) } ?: run { emit(LiveFollowLists()) }
}
}
combinePeopleListFlows(liveKind3FollowsFlow, liveHomeList)
.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
}
@OptIn(ExperimentalCoroutinesApi::class)
private val liveNotificationList: StateFlow<NoteState?> by lazy {
private val liveNotificationList: Flow<ListNameNotePair> by lazy {
defaultNotificationFollowList
.transformLatest {
if (it != GLOBAL_FOLLOWS && it != KIND3_FOLLOWS) {
LocalCache.checkGetOrCreateAddressableNote(it)?.flow()?.metadata?.stateFlow?.let {
emit(it)
}
}
}
.flattenMerge()
.stateIn(scope, SharingStarted.Eagerly, null)
.transformLatest { listName ->
emit(loadPeopleListFlowFromListName(listName))
}.flattenMerge()
}
val liveNotificationFollowLists: StateFlow<LiveFollowLists?> by lazy {
combineTransform(defaultNotificationFollowList, liveKind3Follows, liveNotificationList) {
listName,
kind3Follows,
peopleListFollows,
->
if (listName == GLOBAL_FOLLOWS) {
emit(null)
} else if (listName == KIND3_FOLLOWS) {
emit(kind3Follows)
} else {
val result =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) { continuation.resume(it) }
}
}
result?.let { emit(it) } ?: run { emit(LiveFollowLists()) }
}
}
combinePeopleListFlows(liveKind3FollowsFlow, liveNotificationList)
.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
}
@OptIn(ExperimentalCoroutinesApi::class)
private val liveStoriesList: StateFlow<NoteState?> by lazy {
private val liveStoriesList: Flow<ListNameNotePair> by lazy {
defaultStoriesFollowList
.transformLatest {
if (it != GLOBAL_FOLLOWS && it != KIND3_FOLLOWS) {
LocalCache.checkGetOrCreateAddressableNote(it)?.flow()?.metadata?.stateFlow?.let {
emit(it)
}
}
}
.flattenMerge()
.stateIn(scope, SharingStarted.Eagerly, null)
.transformLatest { listName ->
emit(loadPeopleListFlowFromListName(listName))
}.flattenMerge()
}
val liveStoriesFollowLists: StateFlow<LiveFollowLists?> by lazy {
combineTransform(defaultStoriesFollowList, liveKind3Follows, liveStoriesList) {
listName,
kind3Follows,
peopleListFollows,
->
if (listName == GLOBAL_FOLLOWS) {
emit(null)
} else if (listName == KIND3_FOLLOWS) {
emit(kind3Follows)
} else {
val result =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) { continuation.resume(it) }
}
}
result?.let { emit(it) } ?: run { emit(LiveFollowLists()) }
}
}
combinePeopleListFlows(liveKind3FollowsFlow, liveStoriesList)
.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
}
@OptIn(ExperimentalCoroutinesApi::class)
private val liveDiscoveryList: StateFlow<NoteState?> by lazy {
private val liveDiscoveryList: Flow<ListNameNotePair> by lazy {
defaultDiscoveryFollowList
.transformLatest {
if (it != GLOBAL_FOLLOWS && it != KIND3_FOLLOWS) {
LocalCache.checkGetOrCreateAddressableNote(it)?.flow()?.metadata?.stateFlow?.let {
emit(it)
}
}
}
.flattenMerge()
.stateIn(scope, SharingStarted.Eagerly, null)
.transformLatest { listName ->
emit(loadPeopleListFlowFromListName(listName))
}.flattenMerge()
}
val liveDiscoveryFollowLists: StateFlow<LiveFollowLists?> by lazy {
combineTransform(defaultDiscoveryFollowList, liveKind3Follows, liveDiscoveryList) {
listName,
kind3Follows,
peopleListFollows,
->
if (listName == GLOBAL_FOLLOWS) {
emit(null)
} else if (listName == KIND3_FOLLOWS) {
emit(kind3Follows)
} else {
val result =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) { continuation.resume(it) }
}
}
result?.let { emit(it) } ?: run { emit(LiveFollowLists()) }
}
}
combinePeopleListFlows(liveKind3FollowsFlow, liveDiscoveryList)
.stateIn(scope, SharingStarted.Eagerly, LiveFollowLists())
}
private fun decryptLiveFollows(
peopleListFollows: NoteState?,
listEvent: GeneralListEvent,
onReady: (LiveFollowLists) -> Unit,
) {
val listEvent = (peopleListFollows?.note?.event as? GeneralListEvent)
listEvent?.privateTags(signer) { privateTagList ->
listEvent.privateTags(signer) { privateTagList ->
onReady(
LiveFollowLists(
users =
@ -406,6 +358,16 @@ class Account(
}
}
suspend fun waitToDecrypt(peopleListFollows: GeneralListEvent): LiveFollowLists? {
return withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) {
continuation.resume(it)
}
}
}
}
@Immutable
data class LiveHiddenUsers(
val hiddenUsers: ImmutableSet<String>,

Wyświetl plik

@ -127,6 +127,7 @@ class User(val pubkeyHex: String) {
// Update following of the current user
liveSet?.innerFollows?.invalidateData()
flowSet?.follows?.invalidateData()
// Update Followers of the past user list
// Update Followers of the new contact list
@ -474,14 +475,16 @@ class User(val pubkeyHex: String) {
@Stable
class UserFlowSet(u: User) {
// Observers line up here.
val follows = UserBundledRefresherFlow(u)
val relays = UserBundledRefresherFlow(u)
fun isInUse(): Boolean {
return relays.stateFlow.subscriptionCount.value > 0
return relays.stateFlow.subscriptionCount.value > 0 || follows.stateFlow.subscriptionCount.value > 0
}
fun destroy() {
relays.destroy()
follows.destroy()
}
}