From 603cd85ca4a0734c47c84f3aec07a85bd598c4de Mon Sep 17 00:00:00 2001 From: andrekir Date: Sat, 18 Feb 2023 08:20:36 -0300 Subject: [PATCH] refactor: improve message status handling in queueJob --- .../geeksville/mesh/service/MeshService.kt | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index 2cacf9227..ef4593db3 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -41,6 +41,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.withTimeoutOrNull import kotlinx.serialization.json.Json import java.util.* import java.util.concurrent.ConcurrentLinkedQueue @@ -207,6 +208,7 @@ class MeshService : Service(), Logging { if (SoftwareUpdateService.isUpdating) throw IsUpdatingException() radioInterfaceService.sendToRadio(b) + changeStatus(p.packet.id, MessageStatus.ENROUTE) if (p.packet.hasDecoded()) { val packetToSave = MeshLog( @@ -790,6 +792,7 @@ class MeshService : Service(), Logging { val future = CompletableFuture() queueResponse[packet.id] = future try { + if (connectionState != ConnectionState.CONNECTED) throw RadioNotConnectedException() sendToRadio(ToRadio.newBuilder().apply { this.packet = packet }) @@ -825,7 +828,6 @@ class MeshService : Service(), Logging { } if (retryCount >= 3) { debug("queueJob packet id=${packet.id.toUInt()} failed") - handleAckNak(false, myNodeID, packet.id) } } } @@ -837,40 +839,54 @@ class MeshService : Service(), Logging { queueJob?.cancel() queueJob = null queuedPackets.clear() + queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false) queueResponse.clear() } } private fun sendNow(p: DataPacket) { val packet = toMeshPacket(p) - p.status = MessageStatus.ENROUTE p.time = System.currentTimeMillis() // update time to the actual time we started sending // debug("Sending to radio: ${packet.toPIIString()}") sendToRadio(packet) } - private fun processQueuedPackets() { - val m = MessageStatus.ENROUTE - serviceScope.handledLaunch { - packetRepository.get().getQueuedPackets()?.forEach { p -> - try { - sendNow(p) - if (p.status == m) return@forEach - packetRepository.get().updateMessageStatus(p, m) - serviceBroadcasts.broadcastMessageStatus(p.id, m) - } catch (ex: Exception) { - errormsg("Error sending queued message:", ex) - } + private fun processQueuedPackets() = serviceScope.handledLaunch { + packetRepository.get().getQueuedPackets()?.forEach { p -> + try { + sendNow(p) + } catch (ex: Exception) { + errormsg("Error sending queued message:", ex) } } } + private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) { + var dataPacket: DataPacket? = null + while (dataPacket == null) { + dataPacket = packetRepository.get().getDataPacketById(packetId) + if (dataPacket == null) delay(100) + } + dataPacket + } + + /** + * Change the status on a DataPacket and update watchers + */ + private fun changeStatus(packetId: Int, m: MessageStatus) = serviceScope.handledLaunch { + if (packetId != 0) getDataPacketById(packetId)?.let { p -> + if (p.status == m) return@handledLaunch + packetRepository.get().updateMessageStatus(p, m) + serviceBroadcasts.broadcastMessageStatus(packetId, m) + } + } + /** * Handle an ack/nak packet by updating sent message status */ private fun handleAckNak(isAck: Boolean, fromId: String, requestId: Int) { serviceScope.handledLaunch { - val p = packetRepository.get().getDataPacketById(requestId) + val p = getDataPacketById(requestId) // distinguish real ACKs coming from the intended receiver val m = if (isAck && fromId == p?.to) MessageStatus.RECEIVED else if (isAck) MessageStatus.DELIVERED else MessageStatus.ERROR