kopia lustrzana https://github.com/vitorpamplona/amethyst
Avoids race conditions when the data source is supposed to have stopped
rodzic
7f25e1c434
commit
ab93a39ad6
|
@ -10,6 +10,7 @@ import com.vitorpamplona.quartz.events.Event
|
||||||
import com.vitorpamplona.quartz.utils.TimeUtils
|
import com.vitorpamplona.quartz.utils.TimeUtils
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.GlobalScope
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.cancel
|
import kotlinx.coroutines.cancel
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
|
@ -25,6 +26,8 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
private var eventCounter = mapOf<String, Counter>()
|
private var eventCounter = mapOf<String, Counter>()
|
||||||
var changingFilters = AtomicBoolean()
|
var changingFilters = AtomicBoolean()
|
||||||
|
|
||||||
|
private var active: Boolean = false
|
||||||
|
|
||||||
fun printCounter() {
|
fun printCounter() {
|
||||||
eventCounter.forEach {
|
eventCounter.forEach {
|
||||||
Log.d("STATE DUMP", "Received Events ${it.key}: ${it.value.counter}")
|
Log.d("STATE DUMP", "Received Events ${it.key}: ${it.value.counter}")
|
||||||
|
@ -91,6 +94,7 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fun destroy() {
|
fun destroy() {
|
||||||
|
// makes sure to run
|
||||||
stop()
|
stop()
|
||||||
Client.unsubscribe(clientListener)
|
Client.unsubscribe(clientListener)
|
||||||
scope.cancel()
|
scope.cancel()
|
||||||
|
@ -99,14 +103,19 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
|
|
||||||
open fun start() {
|
open fun start() {
|
||||||
println("DataSource: ${this.javaClass.simpleName} Start")
|
println("DataSource: ${this.javaClass.simpleName} Start")
|
||||||
|
active = true
|
||||||
resetFilters()
|
resetFilters()
|
||||||
}
|
}
|
||||||
|
|
||||||
open fun stop() {
|
open fun stop() {
|
||||||
|
active = false
|
||||||
println("DataSource: ${this.javaClass.simpleName} Stop")
|
println("DataSource: ${this.javaClass.simpleName} Stop")
|
||||||
subscriptions.values.forEach { channel ->
|
|
||||||
Client.close(channel.id)
|
GlobalScope.launch(Dispatchers.IO) {
|
||||||
channel.typedFilters = null
|
subscriptions.values.forEach { subscription ->
|
||||||
|
Client.close(subscription.id)
|
||||||
|
subscription.typedFilters = null
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,6 +152,7 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fun resetFiltersSuspend() {
|
fun resetFiltersSuspend() {
|
||||||
|
println("DataSource: ${this.javaClass.simpleName} resetFiltersSuspend $active")
|
||||||
checkNotInMainThread()
|
checkNotInMainThread()
|
||||||
|
|
||||||
// saves the channels that are currently active
|
// saves the channels that are currently active
|
||||||
|
@ -166,10 +176,14 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
// was active and is still active, check if it has changed.
|
// was active and is still active, check if it has changed.
|
||||||
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
|
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
|
||||||
Client.close(updatedSubscription.id)
|
Client.close(updatedSubscription.id)
|
||||||
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
|
if (active) {
|
||||||
|
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// hasn't changed, does nothing.
|
// hasn't changed, does nothing.
|
||||||
Client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters)
|
if (active) {
|
||||||
|
Client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -178,7 +192,9 @@ abstract class NostrDataSource(val debugName: String) {
|
||||||
} else {
|
} else {
|
||||||
// was not active and becomes active, sends the filter.
|
// was not active and becomes active, sends the filter.
|
||||||
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
|
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
|
||||||
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
|
if (active) {
|
||||||
|
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Ładowanie…
Reference in New Issue