refactor: improve message status handling in queueJob

pull/584/head
andrekir 2023-02-18 08:20:36 -03:00
rodzic 476ecefe94
commit 603cd85ca4
1 zmienionych plików z 31 dodań i 15 usunięć

Wyświetl plik

@ -41,6 +41,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
@ -207,6 +208,7 @@ class MeshService : Service(), Logging {
if (SoftwareUpdateService.isUpdating) throw IsUpdatingException() if (SoftwareUpdateService.isUpdating) throw IsUpdatingException()
radioInterfaceService.sendToRadio(b) radioInterfaceService.sendToRadio(b)
changeStatus(p.packet.id, MessageStatus.ENROUTE)
if (p.packet.hasDecoded()) { if (p.packet.hasDecoded()) {
val packetToSave = MeshLog( val packetToSave = MeshLog(
@ -790,6 +792,7 @@ class MeshService : Service(), Logging {
val future = CompletableFuture<Boolean>() val future = CompletableFuture<Boolean>()
queueResponse[packet.id] = future queueResponse[packet.id] = future
try { try {
if (connectionState != ConnectionState.CONNECTED) throw RadioNotConnectedException()
sendToRadio(ToRadio.newBuilder().apply { sendToRadio(ToRadio.newBuilder().apply {
this.packet = packet this.packet = packet
}) })
@ -825,7 +828,6 @@ class MeshService : Service(), Logging {
} }
if (retryCount >= 3) { if (retryCount >= 3) {
debug("queueJob packet id=${packet.id.toUInt()} failed") debug("queueJob packet id=${packet.id.toUInt()} failed")
handleAckNak(false, myNodeID, packet.id)
} }
} }
} }
@ -837,40 +839,54 @@ class MeshService : Service(), Logging {
queueJob?.cancel() queueJob?.cancel()
queueJob = null queueJob = null
queuedPackets.clear() queuedPackets.clear()
queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false)
queueResponse.clear() queueResponse.clear()
} }
} }
private fun sendNow(p: DataPacket) { private fun sendNow(p: DataPacket) {
val packet = toMeshPacket(p) val packet = toMeshPacket(p)
p.status = MessageStatus.ENROUTE
p.time = System.currentTimeMillis() // update time to the actual time we started sending p.time = System.currentTimeMillis() // update time to the actual time we started sending
// debug("Sending to radio: ${packet.toPIIString()}") // debug("Sending to radio: ${packet.toPIIString()}")
sendToRadio(packet) sendToRadio(packet)
} }
private fun processQueuedPackets() { private fun processQueuedPackets() = serviceScope.handledLaunch {
val m = MessageStatus.ENROUTE packetRepository.get().getQueuedPackets()?.forEach { p ->
serviceScope.handledLaunch { try {
packetRepository.get().getQueuedPackets()?.forEach { p -> sendNow(p)
try { } catch (ex: Exception) {
sendNow(p) errormsg("Error sending queued message:", ex)
if (p.status == m) return@forEach
packetRepository.get().updateMessageStatus(p, m)
serviceBroadcasts.broadcastMessageStatus(p.id, m)
} catch (ex: Exception) {
errormsg("Error sending queued message:", ex)
}
} }
} }
} }
private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) {
var dataPacket: DataPacket? = null
while (dataPacket == null) {
dataPacket = packetRepository.get().getDataPacketById(packetId)
if (dataPacket == null) delay(100)
}
dataPacket
}
/**
* Change the status on a DataPacket and update watchers
*/
private fun changeStatus(packetId: Int, m: MessageStatus) = serviceScope.handledLaunch {
if (packetId != 0) getDataPacketById(packetId)?.let { p ->
if (p.status == m) return@handledLaunch
packetRepository.get().updateMessageStatus(p, m)
serviceBroadcasts.broadcastMessageStatus(packetId, m)
}
}
/** /**
* Handle an ack/nak packet by updating sent message status * Handle an ack/nak packet by updating sent message status
*/ */
private fun handleAckNak(isAck: Boolean, fromId: String, requestId: Int) { private fun handleAckNak(isAck: Boolean, fromId: String, requestId: Int) {
serviceScope.handledLaunch { serviceScope.handledLaunch {
val p = packetRepository.get().getDataPacketById(requestId) val p = getDataPacketById(requestId)
// distinguish real ACKs coming from the intended receiver // distinguish real ACKs coming from the intended receiver
val m = if (isAck && fromId == p?.to) MessageStatus.RECEIVED val m = if (isAck && fromId == p?.to) MessageStatus.RECEIVED
else if (isAck) MessageStatus.DELIVERED else MessageStatus.ERROR else if (isAck) MessageStatus.DELIVERED else MessageStatus.ERROR