feat: drop packet from queue after 3 failed attempts

master
andrekir 2023-02-13 18:38:22 -03:00
rodzic 8a0b59a0d1
commit 8a6361d72c
1 zmienionych plików z 14 dodań i 11 usunięć

Wyświetl plik

@ -43,7 +43,7 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import javax.inject.Inject import javax.inject.Inject
@ -781,7 +781,7 @@ class MeshService : Service(), Logging {
} }
} }
private val queuedPackets = ConcurrentLinkedDeque<MeshPacket>() private val queuedPackets = ConcurrentLinkedQueue<MeshPacket>()
private val queueResponse = mutableMapOf<Int, CompletableFuture<Boolean>>() private val queueResponse = mutableMapOf<Int, CompletableFuture<Boolean>>()
private var queueJob: Job? = null private var queueJob: Job? = null
@ -807,22 +807,25 @@ class MeshService : Service(), Logging {
queueJob = serviceScope.handledLaunch { queueJob = serviceScope.handledLaunch {
debug("packet queueJob started") debug("packet queueJob started")
while (connectionState == ConnectionState.CONNECTED) { while (connectionState == ConnectionState.CONNECTED) {
var retryCount = 0
// take the first packet from the queue head // take the first packet from the queue head
val packet = queuedPackets.poll() ?: break val packet = queuedPackets.poll() ?: break
// send packet to the radio and wait for response while (retryCount < 3) try {
val response = sendPacket(packet) // send packet to the radio and wait for response
try { val response = sendPacket(packet)
debug("queueJob packet id=${packet.id.toUInt()} waiting") debug("queueJob packet id=${packet.id.toUInt()} waiting (retry $retryCount)")
@Suppress("BlockingMethodInNonBlockingContext") @Suppress("BlockingMethodInNonBlockingContext")
val success = response.get(45, TimeUnit.SECONDS) val success = response.get(45, TimeUnit.SECONDS)
debug("queueJob packet id=${packet.id.toUInt()} success $success") debug("queueJob packet id=${packet.id.toUInt()} success $success")
if (!success) { if (success) break
// if send operation fails, add packet back to queue head and retry retryCount++ // if send operation fails, retry
queuedPackets.addFirst(packet)
}
} catch (e: TimeoutException) { } catch (e: TimeoutException) {
debug("queueJob timeout waiting packet id=${packet.id.toUInt()}") debug("queueJob timeout waiting packet id=${packet.id.toUInt()}")
queuedPackets.addFirst(packet) retryCount++ // if send operation fails, retry
}
if (retryCount >= 3) {
debug("queueJob packet id=${packet.id.toUInt()} failed")
handleAckNak(false, myNodeID, packet.id)
} }
} }
} }