diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt index ce6ec355a..ce90d7b7a 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt @@ -709,6 +709,7 @@ class Account( fun sendZapPaymentRequestFor( bolt11: String, zappedNote: Note?, + onSent: () -> Unit, onResponse: (Response?) -> Unit, ) { if (!isWriteable()) return @@ -730,6 +731,8 @@ class Account( LocalCache.consume(event, zappedNote) { it.response(signer) { onResponse(it) } } Client.send(event, nip47.relayUri, wcListener.feedTypes) { wcListener.destroy() } + + onSent() } } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt index d5b4c939d..bb8d0d259 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt @@ -28,6 +28,7 @@ import com.vitorpamplona.amethyst.model.LocalCache import com.vitorpamplona.amethyst.model.Note import com.vitorpamplona.amethyst.model.User import com.vitorpamplona.amethyst.service.lnurl.LightningAddressResolver +import com.vitorpamplona.amethyst.ui.screen.loggedIn.collectSuccessfulSigningOperations import com.vitorpamplona.quartz.events.LiveActivitiesEvent import com.vitorpamplona.quartz.events.LnZapEvent import com.vitorpamplona.quartz.events.PayInvoiceErrorResponse @@ -35,7 +36,6 @@ import com.vitorpamplona.quartz.events.ZapSplitSetup import kotlinx.collections.immutable.ImmutableList import kotlinx.collections.immutable.toImmutableList import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlin.math.round @@ -59,9 +59,8 @@ class ZapPaymentHandler(val account: Account) { onPayViaIntent: (ImmutableList) -> Unit, zapType: LnZapEvent.ZapType, ) = withContext(Dispatchers.IO) { - val zapSplitSetup = note.event?.zapSplitSetup() - val noteEvent = note.event + val zapSplitSetup = noteEvent?.zapSplitSetup() val zapsToSend = if (!zapSplitSetup.isNullOrEmpty()) { @@ -84,101 +83,216 @@ class ZapPaymentHandler(val account: Account) { listOf(ZapSplitSetup(lud16, null, weight = 1.0, true)) } - val totalWeight = zapsToSend.sumOf { it.weight } - - val invoicesToPayOnIntent = mutableListOf() - - zapsToSend.forEachIndexed { index, value -> - val outerProgressMin = index / zapsToSend.size.toFloat() - val outerProgressMax = (index + 1) / zapsToSend.size.toFloat() - - val zapValue = round((amountMilliSats * value.weight / totalWeight) / 1000f).toLong() * 1000 - - if (value.isLnAddress) { - innerZap( - lud16 = value.lnAddressOrPubKeyHex, - note = note, - amount = zapValue, - pollOption = pollOption, - message = message, - context = context, - onError = onError, - onProgress = { - onProgress((it * (outerProgressMax - outerProgressMin)) + outerProgressMin) - }, - zapType = zapType, - onPayInvoiceThroughIntent = { - invoicesToPayOnIntent.add( - Payable( - info = value, - user = null, - amountMilliSats = zapValue, - invoice = it, - ), - ) - }, - ) + onProgress(0.02f) + signAllZapRequests(note, pollOption, message, zapType, zapsToSend) { splitZapRequestPairs -> + if (splitZapRequestPairs.isEmpty()) { + onProgress(0.00f) + return@signAllZapRequests } else { - val user = LocalCache.getUserIfExists(value.lnAddressOrPubKeyHex) - val lud16 = user?.info?.lnAddress() + onProgress(0.05f) + } - if (lud16 != null) { - innerZap( - lud16 = lud16, - note = note, - amount = zapValue, - pollOption = pollOption, - message = message, - context = context, - onError = onError, - onProgress = { - onProgress((it * (outerProgressMax - outerProgressMin)) + outerProgressMin) - }, - zapType = zapType, - overrideUser = user, - onPayInvoiceThroughIntent = { - invoicesToPayOnIntent.add( - Payable( - info = value, - user = user, - amountMilliSats = zapValue, - invoice = it, - ), - ) - }, - ) + assembleAllInvoices(splitZapRequestPairs.toList(), amountMilliSats, message, onError, onProgress = { + onProgress(it * 0.7f + 0.05f) // keeps within range. + }, context) { + if (it.isEmpty()) { + onProgress(0.00f) + return@assembleAllInvoices } else { - onError( - context.getString( - R.string.missing_lud16, - ), - context.getString( - R.string.user_x_does_not_have_a_lightning_address_setup_to_receive_sats, - user?.toBestDisplayName() ?: value.lnAddressOrPubKeyHex, - ), + onProgress(0.75f) + } + + if (account.hasWalletConnectSetup()) { + payViaNWC(it.values.map { it.second }, note, onError, onProgress = { + onProgress(it * 0.25f + 0.75f) // keeps within range. + }, context) { + // onProgress(1f) + } + } else { + onPayViaIntent( + it.map { + Payable( + info = it.key.first, + user = null, + amountMilliSats = it.value.first, + invoice = it.value.second, + ) + }.toImmutableList(), ) + + onProgress(0f) } } } + } - if (invoicesToPayOnIntent.isNotEmpty()) { - onPayViaIntent(invoicesToPayOnIntent.toImmutableList()) - onProgress(1f) - } else { - launch(Dispatchers.IO) { - // Awaits for the event to come back to LocalCache. - var count = 0 - while (invoicesToPayOnIntent.size < zapsToSend.size || count < 4) { - count++ - Thread.sleep(5000) - } - if (invoicesToPayOnIntent.isNotEmpty()) { - onPayViaIntent(invoicesToPayOnIntent.toImmutableList()) - onProgress(1f) + private fun calculateZapValue( + amountMilliSats: Long, + weight: Double, + totalWeight: Double, + ): Long { + val shareValue = amountMilliSats * (weight / totalWeight) + val roundedZapValue = round(shareValue / 1000f).toLong() * 1000 + return roundedZapValue + } + + suspend fun signAllZapRequests( + note: Note, + pollOption: Int?, + message: String, + zapType: LnZapEvent.ZapType, + zapsToSend: List, + onAllDone: suspend (MutableMap) -> Unit, + ) { + collectSuccessfulSigningOperations( + operationsInput = zapsToSend, + runRequestFor = { next: ZapSplitSetup, onReady -> + if (next.isLnAddress) { + prepareZapRequestIfNeeded(note, pollOption, message, zapType) { zapRequestJson -> + if (zapRequestJson != null) { + onReady(zapRequestJson) + } + } } else { - onProgress(1f) + val user = LocalCache.getUserIfExists(next.lnAddressOrPubKeyHex) + prepareZapRequestIfNeeded(note, pollOption, message, zapType, user) { zapRequestJson -> + if (zapRequestJson != null) { + onReady(zapRequestJson) + } + } } + }, + onReady = onAllDone, + ) + } + + suspend fun assembleAllInvoices( + invoices: List>, + totalAmountMilliSats: Long, + message: String, + onError: (String, String) -> Unit, + onProgress: (percent: Float) -> Unit, + context: Context, + onAllDone: suspend (MutableMap, Pair>) -> Unit, + ) { + var progressAllPayments = 0.00f + val totalWeight = invoices.sumOf { it.first.weight } + + collectSuccessfulSigningOperations, Pair>( + operationsInput = invoices, + runRequestFor = { splitZapRequestPair: Pair, onReady -> + assembleInvoice( + splitSetup = splitZapRequestPair.first, + nostrZapRequest = splitZapRequestPair.second, + zapValue = calculateZapValue(totalAmountMilliSats, splitZapRequestPair.first.weight, totalWeight), + message = message, + onError = onError, + onProgressStep = { percentStepForThisPayment -> + progressAllPayments += percentStepForThisPayment / invoices.size + onProgress(progressAllPayments) + }, + context = context, + onReady = onReady, + ) + }, + onReady = onAllDone, + ) + } + + suspend fun payViaNWC( + invoices: List, + note: Note, + onError: (String, String) -> Unit, + onProgress: (percent: Float) -> Unit, + context: Context, + onAllDone: suspend (MutableMap) -> Unit, + ) { + var progressAllPayments = 0.00f + + collectSuccessfulSigningOperations( + operationsInput = invoices, + runRequestFor = { invoice: String, onReady -> + account.sendZapPaymentRequestFor( + bolt11 = invoice, + zappedNote = note, + onSent = { + progressAllPayments += 0.5f / invoices.size + onProgress(progressAllPayments) + onReady(true) + }, + onResponse = { response -> + if (response is PayInvoiceErrorResponse) { + progressAllPayments += 0.5f / invoices.size + onProgress(progressAllPayments) + onError( + context.getString(R.string.error_dialog_pay_invoice_error), + context.getString( + R.string.wallet_connect_pay_invoice_error_error, + response.error?.message + ?: response.error?.code?.toString() ?: "Error parsing error message", + ), + ) + } else { + progressAllPayments += 0.5f / invoices.size + onProgress(progressAllPayments) + } + }, + ) + }, + onReady = onAllDone, + ) + } + + private fun assembleInvoice( + splitSetup: ZapSplitSetup, + nostrZapRequest: String, + zapValue: Long, + message: String, + onError: (String, String) -> Unit, + onProgressStep: (percent: Float) -> Unit, + context: Context, + onReady: (Pair) -> Unit, + ) { + var progressThisPayment = 0.00f + + var user: User? = null + val lud16 = + if (splitSetup.isLnAddress) { + splitSetup.lnAddressOrPubKeyHex + } else { + user = LocalCache.getUserIfExists(splitSetup.lnAddressOrPubKeyHex) + user?.info?.lnAddress() } + + if (lud16 != null) { + LightningAddressResolver() + .lnAddressInvoice( + lnaddress = lud16, + milliSats = zapValue, + message = message, + nostrRequest = nostrZapRequest, + onError = onError, + onProgress = { + val step = it - progressThisPayment + progressThisPayment = it + onProgressStep(step) + }, + context = context, + onSuccess = { + onProgressStep(1 - progressThisPayment) + onReady(Pair(zapValue, it)) + }, + ) + } else { + onError( + context.getString( + R.string.missing_lud16, + ), + context.getString( + R.string.user_x_does_not_have_a_lightning_address_setup_to_receive_sats, + user?.toBestDisplayName() ?: splitSetup.lnAddressOrPubKeyHex, + ), + ) } } @@ -198,63 +312,4 @@ class ZapPaymentHandler(val account: Account) { onReady(null) } } - - private suspend fun innerZap( - lud16: String, - note: Note, - amount: Long, - pollOption: Int?, - message: String, - context: Context, - onError: (String, String) -> Unit, - onProgress: (percent: Float) -> Unit, - onPayInvoiceThroughIntent: (String) -> Unit, - zapType: LnZapEvent.ZapType, - overrideUser: User? = null, - ) { - onProgress(0.05f) - - prepareZapRequestIfNeeded(note, pollOption, message, zapType, overrideUser) { zapRequestJson -> - onProgress(0.10f) - - LightningAddressResolver() - .lnAddressInvoice( - lud16, - amount, - message, - zapRequestJson, - onSuccess = { - onProgress(0.7f) - if (account.hasWalletConnectSetup()) { - account.sendZapPaymentRequestFor( - bolt11 = it, - note, - onResponse = { response -> - if (response is PayInvoiceErrorResponse) { - onProgress(0.0f) - onError( - context.getString(R.string.error_dialog_pay_invoice_error), - context.getString( - R.string.wallet_connect_pay_invoice_error_error, - response.error?.message - ?: response.error?.code?.toString() ?: "Error parsing error message", - ), - ) - } else { - onProgress(1f) - } - }, - ) - onProgress(0.8f) - } else { - onPayInvoiceThroughIntent(it) - onProgress(0f) - } - }, - onError = onError, - onProgress = onProgress, - context = context, - ) - } - } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt index d6cf49914..8153b49d6 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt @@ -27,7 +27,6 @@ import com.vitorpamplona.quartz.events.EventInterface import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.util.UUID @@ -125,47 +124,8 @@ object Client : RelayPool.Listener { } else if (relay == null) { RelayPool.send(signedEvent) } else { - val useConnectedRelayIfPresent = RelayPool.getRelays(relay) - - if (useConnectedRelayIfPresent.isNotEmpty()) { - useConnectedRelayIfPresent.forEach { it.send(signedEvent) } - } else { - /** temporary connection */ - newSporadicRelay( - relay, - feedTypes, - onConnected = { myRelay -> myRelay.send(signedEvent) }, - onDone = onDone, - ) - } - } - } - - @OptIn(DelicateCoroutinesApi::class) - private fun newSporadicRelay( - url: String, - feedTypes: Set?, - onConnected: (Relay) -> Unit, - onDone: (() -> Unit)?, - ) { - val relay = Relay(url, true, true, feedTypes ?: emptySet()) - RelayPool.addRelay(relay) - - relay.connectAndRun { - allSubscriptions().forEach { - relay.sendFilter(it.key, it.value) - } - - onConnected(relay) - - GlobalScope.launch(Dispatchers.IO) { - delay(60000) // waits for a reply - relay.disconnect() - RelayPool.removeRelay(relay) - - if (onDone != null) { - onDone() - } + RelayPool.getOrCreateRelay(relay, feedTypes, onDone) { + it.send(signedEvent) } } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt index 734486c58..21eaeb714 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt @@ -90,6 +90,7 @@ class Relay( var afterEOSEPerSubscription = mutableMapOf() val authResponse = mutableMapOf() + val sendWhenReady = mutableListOf() fun register(listener: Listener) { listeners = listeners.plus(listener) @@ -167,6 +168,13 @@ class Relay( // Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url") onConnected(this@Relay) + synchronized(sendWhenReady) { + sendWhenReady.forEach { + send(it) + } + sendWhenReady.clear() + } + listeners.forEach { it.onRelayStateChange(this@Relay, StateType.CONNECT, null) } } @@ -272,6 +280,7 @@ class Relay( val event = Event.fromJson(msgArray.get(2)) // Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}") + listeners.forEach { it.onEvent( this@Relay, @@ -456,6 +465,10 @@ class Relay( if (isReady) { socket?.send(event) eventUploadCounterInBytes += event.bytesUsedInMemory() + } else { + synchronized(sendWhenReady) { + sendWhenReady.add(signedEvent) + } } } else { // sends all filters after connection is successful. diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt index 2e717f6b5..faf184671 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt @@ -24,10 +24,15 @@ import androidx.compose.runtime.Immutable import com.vitorpamplona.amethyst.service.checkNotInMainThread import com.vitorpamplona.quartz.events.Event import com.vitorpamplona.quartz.events.EventInterface +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch /** * RelayPool manages the connection to multiple Relays and lets consumers deal with simple events. @@ -58,6 +63,57 @@ object RelayPool : Relay.Listener { return relays.filter { it.url == url } } + fun getOrCreateRelay( + url: String, + feedTypes: Set? = null, + onDone: (() -> Unit)? = null, + whenConnected: (Relay) -> Unit, + ) { + synchronized(this) { + val matching = getRelays(url) + if (matching.isNotEmpty()) { + matching.forEach { whenConnected(it) } + } else { + /** temporary connection */ + newSporadicRelay( + url, + feedTypes, + onConnected = whenConnected, + onDone = onDone, + ) + } + } + } + + @OptIn(DelicateCoroutinesApi::class) + fun newSporadicRelay( + url: String, + feedTypes: Set?, + onConnected: (Relay) -> Unit, + onDone: (() -> Unit)?, + ) { + val relay = Relay(url, true, true, feedTypes ?: emptySet()) + addRelay(relay) + + relay.connectAndRun { + Client.allSubscriptions().forEach { + relay.sendFilter(it.key, it.value) + } + + onConnected(relay) + + GlobalScope.launch(Dispatchers.IO) { + delay(60000) // waits for a reply + relay.disconnect() + removeRelay(relay) + + if (onDone != null) { + onDone() + } + } + } + } + fun loadRelays(relayList: List) { if (!relayList.isNullOrEmpty()) { relayList.forEach { addRelay(it) } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt index f77b86d0b..1c15b6063 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt @@ -93,9 +93,12 @@ import kotlinx.collections.immutable.toImmutableSet import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext @@ -291,26 +294,27 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View onNewState: (ImmutableList) -> Unit, ) { viewModelScope.launch(Dispatchers.IO) { - val myList = zaps.toList() - val initialResults = - myList - .associate { - it.request to - ZapAmountCommentNotification( - it.request.author, - it.request.event?.content()?.ifBlank { null }, - showAmountAxis((it.response?.event as? LnZapEvent)?.amount), - ) - } + zaps.associate { + it.request to + ZapAmountCommentNotification( + it.request.author, + it.request.event?.content()?.ifBlank { null }, + showAmountAxis((it.response.event as? LnZapEvent)?.amount), + ) + } .toMutableMap() collectSuccessfulSigningOperations( - operationsInput = myList, + operationsInput = zaps.filter { (it.request.event as? LnZapRequestEvent)?.isPrivateZap() == true }, runRequestFor = { next, onReady -> + checkNotInMainThread() + innerDecryptAmountMessage(next.request, next.response, onReady) }, ) { + checkNotInMainThread() + it.forEach { decrypted -> initialResults[decrypted.key.request] = decrypted.value } onNewState(initialResults.values.toImmutableList()) @@ -477,7 +481,9 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View message, context, onError, - onProgress, + onProgress = { + onProgress(it) + }, onPayViaIntent, zapType, ) @@ -1392,48 +1398,41 @@ class HasNotificationDot(bottomNavigationItems: ImmutableList) { @Immutable data class LoadedBechLink(val baseNote: Note?, val nip19: Nip19Bech32.ParseReturn) -public fun allOrNothingSigningOperations( - remainingTos: List, - runRequestFor: (T, (K) -> Unit) -> Unit, - output: MutableList = mutableListOf(), - onReady: (List) -> Unit, -) { - if (remainingTos.isEmpty()) { - onReady(output) - return - } - - val next = remainingTos.first() - - runRequestFor(next) { result: K -> - output.add(result) - allOrNothingSigningOperations(remainingTos.minus(next), runRequestFor, output, onReady) - } -} - public suspend fun collectSuccessfulSigningOperations( operationsInput: List, runRequestFor: (T, (K) -> Unit) -> Unit, output: MutableMap = mutableMapOf(), - onReady: (MutableMap) -> Unit, + onReady: suspend (MutableMap) -> Unit, ) { if (operationsInput.isEmpty()) { onReady(output) return } - for (input in operationsInput) { - // runs in sequence to avoid overcrowding Amber. - val result = - withTimeoutOrNull(100) { - suspendCancellableCoroutine { continuation -> - runRequestFor(input) { result: K -> continuation.resume(result) } + val (value, elapsed) = + measureTimedValue { + coroutineScope { + val jobs = + operationsInput.map { + async { + val result = + withTimeoutOrNull(10000) { + suspendCancellableCoroutine { continuation -> + runRequestFor(it) { result: K -> continuation.resume(result) } + } + } + if (result != null) { + output[it] = result + } + } + } + + // runs in parallel to avoid overcrowding Amber. + withTimeoutOrNull(15000) { + jobs.joinAll() } } - if (result != null) { - output[input] = result } - } onReady(output) } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/ProfileScreen.kt b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/ProfileScreen.kt index e8a8eab60..c19313acf 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/ProfileScreen.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/ProfileScreen.kt @@ -1182,7 +1182,7 @@ fun DisplayLNAddress( zapExpanded = false // pay directly if (accountViewModel.account.hasWalletConnectSetup()) { - accountViewModel.account.sendZapPaymentRequestFor(it, null) { response -> + accountViewModel.account.sendZapPaymentRequestFor(it, null, onSent = {}) { response -> if (response is PayInvoiceSuccessResponse) { showInfoMessageDialog = context.getString(R.string.payment_successful) } else if (response is PayInvoiceErrorResponse) {