Refactoring Channel -> Subscription

pull/90/head
Vitor Pamplona 2023-02-06 10:56:10 -05:00
rodzic a5f156ec49
commit a6dcf64f04
17 zmienionych plików z 119 dodań i 74 usunięć

Wyświetl plik

@ -1,16 +0,0 @@
package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.UUID
import nostr.postr.JsonFilter
data class Channel (
val id: String = UUID.randomUUID().toString().substring(0,4),
val onEOSE: ((Long) -> Unit)? = null
) {
var filter: List<TypedFilter>? = null // Inactive when null
fun updateEOSE(l: Long) {
onEOSE?.let { it(l) }
}
}

Wyświetl plik

@ -3,7 +3,6 @@ package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.UserState
import com.vitorpamplona.amethyst.service.model.ReportEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
@ -69,7 +68,7 @@ object NostrAccountDataSource: NostrDataSource<Note>("AccountData") {
override fun updateChannelFilters() {
// gets everthing about the user logged in
accountChannel.filter = listOf(
accountChannel.typedFilters = listOf(
createAccountMetadataFilter(),
createAccountContactListFilter(),
createNotificationFilter(),

Wyświetl plik

@ -39,6 +39,6 @@ object NostrChannelDataSource: NostrDataSource<Note>("ChatroomFeed") {
}
override fun updateChannelFilters() {
messagesChannel.filter = listOfNotNull(createMessagesToChannelFilter()).ifEmpty { null }
messagesChannel.typedFilters = listOfNotNull(createMessagesToChannelFilter()).ifEmpty { null }
}
}

Wyświetl plik

@ -62,6 +62,6 @@ object NostrChatRoomDataSource: NostrDataSource<Note>("ChatroomFeed") {
}
override fun updateChannelFilters() {
inandoutChannel.filter = listOfNotNull(createMessagesToMeFilter(), createMessagesFromMeFilter()).ifEmpty { null }
inandoutChannel.typedFilters = listOfNotNull(createMessagesToMeFilter(), createMessagesFromMeFilter()).ifEmpty { null }
}
}

Wyświetl plik

@ -1,7 +1,6 @@
package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.service.model.ChannelCreateEvent
import com.vitorpamplona.amethyst.service.model.ChannelMessageEvent
@ -97,7 +96,7 @@ object NostrChatroomListDataSource: NostrDataSource<Note>("MailBoxFeed") {
createMyChannelsFilter()
)
chatroomListChannel.filter = listOfNotNull(
chatroomListChannel.typedFilters = listOfNotNull(
list,
createLastChannelInfoFilter(),
createLastMessageOfEachChannelFilter()

Wyświetl plik

@ -13,6 +13,7 @@ import com.vitorpamplona.amethyst.service.model.ReportEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.Client
import com.vitorpamplona.amethyst.service.relays.Relay
import com.vitorpamplona.amethyst.service.relays.Subscription
import java.util.Date
import java.util.UUID
import kotlinx.coroutines.CoroutineScope
@ -29,7 +30,7 @@ import nostr.postr.events.RecommendRelayEvent
import nostr.postr.events.TextNoteEvent
abstract class NostrDataSource<T>(val debugName: String) {
private var channels = mapOf<String, Channel>()
private var subscriptions = mapOf<String, Subscription>()
private var eventCounter = mapOf<String, Int>()
fun printCounter() {
@ -40,7 +41,7 @@ abstract class NostrDataSource<T>(val debugName: String) {
private val clientListener = object : Client.Listener() {
override fun onEvent(event: Event, subscriptionId: String, relay: Relay) {
if (subscriptionId in channels.keys) {
if (subscriptionId in subscriptions.keys) {
val key = "${debugName} ${subscriptionId} ${event.kind}"
val keyValue = eventCounter.get(key)
if (keyValue != null) {
@ -95,7 +96,7 @@ abstract class NostrDataSource<T>(val debugName: String) {
if (type == Relay.Type.EOSE && channel != null) {
// updates a per subscripton since date
channels[channel]?.updateEOSE(Date().time / 1000)
subscriptions[channel]?.updateEOSE(Date().time / 1000)
}
}
@ -113,8 +114,8 @@ abstract class NostrDataSource<T>(val debugName: String) {
}
open fun stop() {
channels.values.forEach { channel ->
if (channel.filter != null) // if it is active, close
subscriptions.values.forEach { channel ->
if (channel.typedFilters != null) // if it is active, close
Client.close(channel.id)
}
}
@ -139,15 +140,15 @@ abstract class NostrDataSource<T>(val debugName: String) {
}
}
fun requestNewChannel(onEOSE: ((Long) -> Unit)? = null): Channel {
val newChannel = Channel(UUID.randomUUID().toString().substring(0,4), onEOSE)
channels = channels + Pair(newChannel.id, newChannel)
return newChannel
fun requestNewChannel(onEOSE: ((Long) -> Unit)? = null): Subscription {
val newSubscription = Subscription(UUID.randomUUID().toString().substring(0,4), onEOSE)
subscriptions = subscriptions + Pair(newSubscription.id, newSubscription)
return newSubscription
}
fun dismissChannel(channel: Channel) {
Client.close(channel.id)
channels = channels.minus(channel.id)
fun dismissChannel(subscription: Subscription) {
Client.close(subscription.id)
subscriptions = subscriptions.minus(subscription.id)
}
var handlerWaiting = false
@ -173,37 +174,37 @@ abstract class NostrDataSource<T>(val debugName: String) {
fun resetFiltersSuspend() {
// saves the channels that are currently active
val activeChannels = channels.values.filter { it.filter != null }
val activeSubscriptions = subscriptions.values.filter { it.typedFilters != null }
// saves the current content to only update if it changes
val currentFilter = activeChannels.associate { it.id to it.filter!!.joinToString("|") { it.filter.toJson() } }
val currentFilters = activeSubscriptions.associate { it.id to it.toJson() }
updateChannelFilters()
// Makes sure to only send an updated filter when it actually changes.
channels.values.forEach { channel ->
val channelsNewFilter = channel.filter
subscriptions.values.forEach { updatedSubscription ->
val updatedSubscriotionNewFilters = updatedSubscription.typedFilters
if (channel in activeChannels) {
if (channelsNewFilter == null) {
if (updatedSubscription.id in currentFilters.keys) {
if (updatedSubscriotionNewFilters == null) {
// was active and is not active anymore, just close.
Client.close(channel.id)
Client.close(updatedSubscription.id)
} else {
// was active and is still active, check if it has changed.
if (channelsNewFilter.joinToString("|") { it.filter.toJson() } != currentFilter[channel.id]) {
Client.close(channel.id)
Client.sendFilter(channel.id, channelsNewFilter)
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
Client.close(updatedSubscription.id)
Client.sendFilter(updatedSubscription.id, updatedSubscriotionNewFilters)
} else {
// hasn't changed, does nothing.
Client.sendFilterOnlyIfDisconnected(channel.id, channelsNewFilter)
Client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriotionNewFilters)
}
}
} else {
if (channelsNewFilter == null) {
if (updatedSubscriotionNewFilters == null) {
// was not active and is still not active, does nothing
} else {
// was not active and becomes active, sends the filter.
if (channelsNewFilter.joinToString("|") { it.filter.toJson() } != currentFilter[channel.id]) {
Client.sendFilter(channel.id, channelsNewFilter)
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
Client.sendFilter(updatedSubscription.id, updatedSubscriotionNewFilters)
}
}
}

Wyświetl plik

@ -31,6 +31,6 @@ object NostrGlobalDataSource: NostrDataSource<Note>("GlobalFeed") {
.reversed()
override fun updateChannelFilters() {
globalFeedChannel.filter = listOf(createGlobalFilter()).ifEmpty { null }
globalFeedChannel.typedFilters = listOf(createGlobalFilter()).ifEmpty { null }
}
}

Wyświetl plik

@ -64,6 +64,6 @@ object NostrHomeDataSource: NostrDataSource<Note>("HomeFeed") {
}
override fun updateChannelFilters() {
followAccountChannel.filter = listOf(createFollowAccountsFilter()).ifEmpty { null }
followAccountChannel.typedFilters = listOf(createFollowAccountsFilter()).ifEmpty { null }
}
}

Wyświetl plik

@ -1,16 +1,11 @@
package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.decodePublicKey
import com.vitorpamplona.amethyst.service.model.ReactionEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.Collections
import nostr.postr.JsonFilter
import nostr.postr.bechToBytes
import nostr.postr.events.TextNoteEvent
import nostr.postr.toHex
object NostrSearchEventOrUserDataSource: NostrDataSource<Note>("SingleEventFeed") {
@ -43,7 +38,7 @@ object NostrSearchEventOrUserDataSource: NostrDataSource<Note>("SingleEventFeed"
}
override fun updateChannelFilters() {
searchChannel.filter = createAnythingWithIDFilter()
searchChannel.typedFilters = createAnythingWithIDFilter()
}
fun search(eventId: String) {

Wyświetl plik

@ -3,15 +3,10 @@ package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.service.model.ChannelCreateEvent
import com.vitorpamplona.amethyst.service.model.ChannelMessageEvent
import com.vitorpamplona.amethyst.service.model.ChannelMetadataEvent
import com.vitorpamplona.amethyst.service.model.ReactionEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.Collections
import nostr.postr.JsonFilter
import nostr.postr.events.TextNoteEvent
object NostrSingleChannelDataSource: NostrDataSource<Note>("SingleChannelFeed") {
private var channelsToWatch = setOf<String>()
@ -64,7 +59,7 @@ object NostrSingleChannelDataSource: NostrDataSource<Note>("SingleChannelFeed")
val reactions = createRepliesAndReactionsFilter()
val missing = createLoadEventsIfNotLoadedFilter()
singleChannelChannel.filter = listOfNotNull(reactions, missing).ifEmpty { null }
singleChannelChannel.typedFilters = listOfNotNull(reactions, missing).ifEmpty { null }
}
fun add(eventId: String) {

Wyświetl plik

@ -10,10 +10,8 @@ import com.vitorpamplona.amethyst.service.model.ReportEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.Collections
import java.util.Date
import nostr.postr.JsonFilter
import nostr.postr.events.MetadataEvent
import nostr.postr.events.TextNoteEvent
object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
@ -100,7 +98,7 @@ object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
val reactions = createRepliesAndReactionsFilter()
val missing = createLoadEventsIfNotLoadedFilter()
singleEventChannel.filter = listOfNotNull(reactions, missing).flatten().ifEmpty { null }
singleEventChannel.typedFilters = listOfNotNull(reactions, missing).flatten().ifEmpty { null }
}
fun add(eventId: String) {

Wyświetl plik

@ -1,12 +1,10 @@
package com.vitorpamplona.amethyst.service
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.service.model.ReportEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.Collections
import nostr.postr.JsonFilter
import nostr.postr.events.MetadataEvent
@ -57,7 +55,7 @@ object NostrSingleUserDataSource: NostrDataSource<User>("SingleUserFeed") {
}
override fun updateChannelFilters() {
userChannel.filter = listOfNotNull(createUserFilter(), createUserReportFilter()).flatten()
userChannel.typedFilters = listOfNotNull(createUserFilter(), createUserReportFilter()).flatten()
}
fun add(userId: String) {

Wyświetl plik

@ -6,7 +6,6 @@ import com.vitorpamplona.amethyst.service.model.ReactionEvent
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import java.util.Collections
import nostr.postr.JsonFilter
import nostr.postr.events.TextNoteEvent
@ -58,7 +57,7 @@ object NostrThreadDataSource: NostrDataSource<Note>("SingleThreadFeed") {
}
override fun updateChannelFilters() {
loadEventsChannel.filter = listOfNotNull(createLoadEventsIfNotLoadedFilter(), createRepliesAndReactionsFilter()).ifEmpty { null }
loadEventsChannel.typedFilters = listOfNotNull(createLoadEventsIfNotLoadedFilter(), createRepliesAndReactionsFilter()).ifEmpty { null }
}
fun searchRoot(note: Note, testedNotes: MutableSet<Note> = mutableSetOf()): Note? {

Wyświetl plik

@ -4,7 +4,6 @@ import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.model.toByteArray
import com.vitorpamplona.amethyst.service.relays.FeedType
import com.vitorpamplona.amethyst.service.relays.TypedFilter
import nostr.postr.JsonFilter
@ -73,7 +72,7 @@ object NostrUserProfileDataSource: NostrDataSource<Note>("UserProfileFeed") {
}
override fun updateChannelFilters() {
userInfoChannel.filter = listOf(
userInfoChannel.typedFilters = listOf(
createUserInfoFilter(),
createUserPostsFilter(),
createFollowFilter(),

Wyświetl plik

@ -170,7 +170,7 @@ class Relay(
if (filters.isNotEmpty()) {
val request =
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.filter.toJson() }}]"""
//println("FILTERSSENT ${url} " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""")
//println("FILTERSSENT ${url} ${request}")
socket?.send(request)
}
}

Wyświetl plik

@ -0,0 +1,30 @@
package com.vitorpamplona.amethyst.service.relays
import com.google.gson.GsonBuilder
import com.google.gson.JsonArray
import com.google.gson.JsonObject
import java.util.UUID
data class Subscription (
val id: String = UUID.randomUUID().toString().substring(0,4),
val onEOSE: ((Long) -> Unit)? = null
) {
var typedFilters: List<TypedFilter>? = null // Inactive when null
fun updateEOSE(l: Long) {
onEOSE?.let { it(l) }
}
fun toJson(): String {
return GsonBuilder().create().toJson(toJsonObject())
}
fun toJsonObject(): JsonObject {
val jsonObject = JsonObject()
jsonObject.addProperty("id", id)
typedFilters?.run {
jsonObject.add("typedFilters", JsonArray().apply { typedFilters?.forEach { add(it.toJsonObject()) } })
}
return jsonObject
}
}

Wyświetl plik

@ -1,8 +1,56 @@
package com.vitorpamplona.amethyst.service.relays
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonArray
import com.google.gson.JsonObject
import nostr.postr.JsonFilter
class TypedFilter(
val types: Set<FeedType>,
val filter: JsonFilter
)
) {
fun toJson(): String {
return GsonBuilder().create().toJson(toJsonObject())
}
fun toJsonObject(): JsonObject {
val jsonObject = JsonObject()
jsonObject.add("types", typesToJson(types))
jsonObject.add("filter", filterToJson(filter))
return jsonObject
}
fun typesToJson(types: Set<FeedType>): JsonArray {
return JsonArray().apply { types.forEach { add(it.name.toLowerCase()) } }
}
fun filterToJson(filter: JsonFilter): JsonObject {
val jsonObject = JsonObject()
filter.ids?.run {
jsonObject.add("ids", JsonArray().apply { filter.ids?.forEach { add(it) } })
}
filter.authors?.run {
jsonObject.add("authors", JsonArray().apply { filter.authors?.forEach { add(it) } })
}
filter.kinds?.run {
jsonObject.add("kinds", JsonArray().apply { filter.kinds?.forEach { add(it) } })
}
filter.tags?.run {
entries.forEach { kv ->
jsonObject.add("#${kv.key}", JsonArray().apply { kv.value.forEach { add(it) } })
}
}
filter.since?.run {
jsonObject.addProperty("since", filter.since)
}
filter.until?.run {
jsonObject.addProperty("until", filter.until)
}
filter.limit?.run {
jsonObject.addProperty("limit", filter.limit)
}
return jsonObject
}
}