Reduces the amount of download from relays, but at increased memory costs.

better_use_of_eose_at_memory_cost
Vitor Pamplona 2023-03-25 12:08:51 -04:00
rodzic 5cc0bf54be
commit 1f4f5e3094
10 zmienionych plików z 50 dodań i 127 usunięć

Wyświetl plik

@ -52,7 +52,7 @@ open class Note(val idHex: String) {
var relays = setOf<String>()
private set
var lastReactionsDownloadTime: Long? = null
var lastReactionsDownloadTime: Map<String, Long> = emptyMap()
fun id() = Hex.decode(idHex)
open fun idNote() = id().toNote()

Wyświetl plik

@ -37,7 +37,7 @@ class User(val pubkeyHex: String) {
var reports = mapOf<User, Set<Note>>()
private set
var latestReportTime: Long = 0
var latestEOSEs: Map<String, Long> = emptyMap()
var zaps = mapOf<Note, Note?>()
private set
@ -130,11 +130,6 @@ class User(val pubkeyHex: String) {
reports = reports + Pair(author, (reports[author] ?: emptySet()) + note)
liveSet?.reports?.invalidateData()
}
val reportTime = note.createdAt() ?: 0
if (reportTime > latestReportTime) {
latestReportTime = reportTime
}
}
fun removeReport(deleteNote: Note) {

Wyświetl plik

@ -117,7 +117,7 @@ abstract class NostrDataSource(val debugName: String) {
if (type == Relay.Type.EOSE && channel != null) {
// updates a per subscripton since date
subscriptions[channel]?.updateEOSE(Date().time / 1000)
subscriptions[channel]?.updateEOSE(Date().time / 1000, relay.url)
}
}
@ -142,7 +142,7 @@ abstract class NostrDataSource(val debugName: String) {
}
}
fun requestNewChannel(onEOSE: ((Long) -> Unit)? = null): Subscription {
fun requestNewChannel(onEOSE: ((Long, String) -> Unit)? = null): Subscription {
val newSubscription = Subscription(UUID.randomUUID().toString().substring(0, 4), onEOSE)
subscriptions = subscriptions + Pair(newSubscription.id, newSubscription)
return newSubscription

Wyświetl plik

@ -33,10 +33,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
val now = Date().time / 1000
return addressesToWatch.filter {
val lastTime = it.lastReactionsDownloadTime
lastTime == null || lastTime < (now - 10)
}.mapNotNull {
return addressesToWatch.mapNotNull {
it.address()?.let { aTag ->
TypedFilter(
types = FeedType.values().toSet(),
@ -64,10 +61,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
val now = Date().time / 1000
return addressesToWatch.filter {
val lastTime = it.lastReactionsDownloadTime
lastTime == null || lastTime < (now - 10)
}.mapNotNull {
return addressesToWatch.mapNotNull {
it.address()?.let { aTag ->
TypedFilter(
types = FeedType.values().toSet(),
@ -90,10 +84,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
val now = Date().time / 1000
return reactionsToWatch.filter {
val lastTime = it.lastReactionsDownloadTime
lastTime == null || lastTime < (now - 10)
}.map {
return reactionsToWatch.map {
TypedFilter(
types = FeedType.values().toSet(),
filter = JsonFilter(
@ -145,9 +136,9 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
)
}
val singleEventChannel = requestNewChannel { time ->
val singleEventChannel = requestNewChannel { time, relayUrl ->
eventsToWatch.forEach {
it.lastReactionsDownloadTime = time
it.lastReactionsDownloadTime = it.lastReactionsDownloadTime + Pair(relayUrl, time)
}
// Many relays operate with limits in the amount of filters.
// As information comes, the filters will be rotated to get more data.

Wyświetl plik

@ -34,13 +34,16 @@ object NostrSingleUserDataSource : NostrDataSource("SingleUserFeed") {
filter = JsonFilter(
kinds = listOf(ReportEvent.kind),
tags = mapOf("p" to listOf(it.pubkeyHex)),
since = it.latestReportTime
since = it.latestEOSEs
)
)
}
}
val userChannel = requestNewChannel() {
val userChannel = requestNewChannel() { time, relayUrl ->
usersToWatch.forEach {
it.latestEOSEs = it.latestEOSEs + Pair(relayUrl, time)
}
// Many relays operate with limits in the amount of filters.
// As information comes, the filters will be rotated to get more data.
invalidateFilters()

Wyświetl plik

@ -25,7 +25,7 @@ object NostrThreadDataSource : NostrDataSource("SingleThreadFeed") {
)
}
val loadEventsChannel = requestNewChannel() {
val loadEventsChannel = requestNewChannel() { eoseTime, relay ->
// Many relays operate with limits in the amount of filters.
// As information comes, the filters will be rotated to get more data.
invalidateFilters()

Wyświetl plik

@ -4,26 +4,19 @@ import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonArray
import com.google.gson.JsonObject
import com.vitorpamplona.amethyst.service.model.Event
import java.io.Serializable
import java.util.*
interface Filter {
fun match(event: Event): Boolean
fun toShortString(): String
}
class JsonFilter(
val ids: List<String>? = null,
val authors: List<String>? = null,
val kinds: List<Int>? = null,
val tags: Map<String, List<String>>? = null,
val since: Long? = null,
val since: Map<String, Long>? = null,
val until: Long? = null,
val limit: Int? = null,
val search: String? = null
) : Filter, Serializable {
fun toJson(): String {
) {
fun toJson(forRelay: String? = null): String {
val jsonObject = JsonObject()
ids?.run {
jsonObject.add("ids", JsonArray().apply { ids.forEach { add(it) } })
@ -40,7 +33,20 @@ class JsonFilter(
}
}
since?.run {
jsonObject.addProperty("since", since)
if (!isEmpty()) {
if (forRelay != null) {
val relaySince = get(forRelay)
if (relaySince != null) {
jsonObject.addProperty("since", relaySince)
}
} else {
val jsonObjectSince = JsonObject()
entries.forEach { sincePairs ->
jsonObjectSince.addProperty(sincePairs.key, "${sincePairs.value}")
}
jsonObject.add("since", jsonObjectSince)
}
}
}
until?.run {
jsonObject.addProperty("until", until)
@ -54,90 +60,7 @@ class JsonFilter(
return gson.toJson(jsonObject)
}
override fun match(event: Event): Boolean {
if (ids?.any { event.id == it } == false) return false
if (kinds?.any { event.kind == it } == false) return false
if (authors?.any { event.pubKey == it } == false) return false
tags?.forEach { tag ->
if (!event.tags.any { it.first() == tag.key && it[1] in tag.value }) return false
}
if (event.createdAt !in (since ?: Long.MIN_VALUE)..(until ?: Long.MAX_VALUE)) {
return false
}
return true
}
override fun toString(): String = "JsonFilter${toJson()}"
override fun toShortString(): String {
val list = ArrayList<String>()
ids?.run {
list.add("ids")
}
authors?.run {
list.add("authors")
}
kinds?.run {
list.add("kinds[${kinds.joinToString()}]")
}
tags?.run {
list.add("tags")
}
since?.run {
list.add("since")
}
until?.run {
list.add("until")
}
limit?.run {
list.add("limit")
}
search?.run {
list.add("search")
}
return list.joinToString()
}
companion object {
val gson: Gson = GsonBuilder().create()
fun fromJson(json: String): JsonFilter {
val jsonFilter = gson.fromJson(json, JsonObject::class.java)
return fromJson(jsonFilter)
}
val declaredFields = JsonFilter::class.java.declaredFields.map { it.name }
fun fromJson(json: JsonObject): JsonFilter {
// sanity check
if (json.keySet().any { !(it.startsWith("#") || it in declaredFields) }) {
println("Filter $json contains unknown parameters.")
}
return JsonFilter(
ids = if (json.has("ids")) json.getAsJsonArray("ids").map { it.asString } else null,
authors = if (json.has("authors")) {
json.getAsJsonArray("authors")
.map { it.asString }
} else {
null
},
kinds = if (json.has("kinds")) {
json.getAsJsonArray("kinds")
.map { it.asInt }
} else {
null
},
tags = json
.entrySet()
.filter { it.key.startsWith("#") }
.associate {
it.key.substring(1) to it.value.asJsonArray.map { it.asString }
}
.ifEmpty { null },
since = if (json.has("since")) json.get("since").asLong else null,
until = if (json.has("until")) json.get("until").asLong else null,
limit = if (json.has("limit")) json.get("limit").asInt else null,
search = if (json.has("search")) json.get("search").asString else null
)
}
}
}

Wyświetl plik

@ -92,6 +92,11 @@ class Relay(
// Log.w("Relay", "Relay onEVENT $url, $channel")
eventDownloadCounterInBytes += text.bytesUsedInMemory()
val event = Event.fromJson(msg[2], Client.lenient)
if (event.kind == 23195 || event.kind == 23196) {
println("AAAAA ${event.toJson()}")
}
listeners.forEach { it.onEvent(this@Relay, channel, event) }
}
"EOSE" -> listeners.forEach {
@ -183,8 +188,8 @@ class Relay(
val filters = Client.getSubscriptionFilters(requestId).filter { activeTypes.intersect(it.types).isNotEmpty() }
if (filters.isNotEmpty()) {
val request =
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.filter.toJson() }}]"""
// println("FILTERSSENT ${url} ${request}")
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.filter.toJson(url) }}]"""
// println("FILTERSSENT $url $request")
socket?.send(request)
eventUploadCounterInBytes += request.bytesUsedInMemory()
}

Wyświetl plik

@ -7,12 +7,12 @@ import java.util.UUID
data class Subscription(
val id: String = UUID.randomUUID().toString().substring(0, 4),
val onEOSE: ((Long) -> Unit)? = null
val onEOSE: ((Long, String) -> Unit)? = null
) {
var typedFilters: List<TypedFilter>? = null // Inactive when null
fun updateEOSE(l: Long) {
onEOSE?.let { it(l) }
fun updateEOSE(time: Long, relay: String) {
onEOSE?.let { it(time, relay) }
}
fun toJson(): String {

Wyświetl plik

@ -40,9 +40,15 @@ class TypedFilter(
jsonObject.add("#${kv.key}", JsonArray().apply { kv.value.forEach { add(it) } })
}
}
/*
Does not include since in the json comparison
filter.since?.run {
jsonObject.addProperty("since", filter.since)
}
val jsonObjectSince = JsonObject()
entries.forEach { sincePairs ->
jsonObjectSince.addProperty(sincePairs.key, "${sincePairs.value}")
}
jsonObject.add("since", jsonObjectSince)
}*/
filter.until?.run {
jsonObject.addProperty("until", filter.until)
}