feat: add packet transmit queue (#566)

master
Andre K 2023-01-17 18:46:04 -03:00 zatwierdzone przez GitHub
rodzic 24e5454fae
commit 620100b0d3
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
1 zmienionych plików z 78 dodań i 5 usunięć

Wyświetl plik

@ -42,6 +42,10 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.Json
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.inject.Inject
import kotlin.math.absoluteValue
@ -219,9 +223,8 @@ class MeshService : Service(), Logging {
* Send a mesh packet to the radio, if the radio is not currently connected this function will throw NotConnectedException
*/
private fun sendToRadio(packet: MeshPacket) {
sendToRadio(ToRadio.newBuilder().apply {
this.packet = packet
})
queuedPackets.add(packet)
startPacketQueue()
}
private fun updateMessageNotification(message: DataPacket) =
@ -657,6 +660,7 @@ class MeshService : Service(), Logging {
val u = MeshProtos.Routing.parseFrom(data.payload)
val isAck = u.errorReasonValue == MeshProtos.Routing.Error.NONE_VALUE
handleAckNak(isAck, fromId, data.requestId)
queueResponse.remove(data.requestId)?.complete(true)
}
Portnums.PortNum.ADMIN_APP_VALUE -> {
@ -776,6 +780,63 @@ class MeshService : Service(), Logging {
}
}
private val queuedPackets = ConcurrentLinkedDeque<MeshPacket>()
private val queueResponse = mutableMapOf<Int, CompletableFuture<Boolean>>()
private var queueJob: Job? = null
private fun sendPacket(packet: MeshPacket): CompletableFuture<Boolean> {
// send the packet to the radio and return a CompletableFuture that will be completed with the result
val future = CompletableFuture<Boolean>()
queueResponse[packet.id] = future
try {
sendToRadio(ToRadio.newBuilder().apply {
this.packet = packet
})
// FIXME remove when MeshPacketQueue is fixed
if (!packet.wantAck) future.complete(true)
} catch (ex: Exception) {
errormsg("sendToRadio error:", ex)
future.complete(false)
}
return future
}
private fun startPacketQueue() {
if (queueJob?.isActive == true) return
queueJob = serviceScope.handledLaunch {
debug("packet queueJob started")
while (connectionState == ConnectionState.CONNECTED) {
// take the first packet from the queue head
val packet = queuedPackets.poll() ?: break
// send packet to the radio and wait for response
val response = sendPacket(packet)
try {
debug("queueJob packet id=${packet.id.toUInt()} waiting")
@Suppress("BlockingMethodInNonBlockingContext")
val success = response.get(45, TimeUnit.SECONDS)
debug("queueJob packet id=${packet.id.toUInt()} success $success")
if (!success) {
// if send operation fails, add packet back to queue head and retry
queuedPackets.addFirst(packet)
}
} catch (e: TimeoutException) {
debug("queueJob timeout waiting packet id=${packet.id.toUInt()}")
queuedPackets.addFirst(packet)
}
}
}
}
private fun stopPacketQueue() {
if (queueJob?.isActive == true) {
debug("Stopping packet queueJob")
queueJob?.cancel()
queueJob = null
queuedPackets.clear()
queueResponse.clear()
}
}
private fun sendNow(p: DataPacket) {
val packet = toMeshPacket(p)
p.status = MessageStatus.ENROUTE
@ -950,7 +1011,7 @@ class MeshService : Service(), Logging {
// Just in case the user uncleanly reboots the phone, save now (we normally save in onDestroy)
saveSettings()
// lost radio connection, therefore no need to keep listening to GPS
stopPacketQueue()
stopLocationRequests()
if (connectTimeMsec != 0L) {
@ -986,7 +1047,7 @@ class MeshService : Service(), Logging {
// Just in case the user uncleanly reboots the phone, save now (we normally save in onDestroy)
saveSettings()
// lost radio connection, therefore no need to keep listening to GPS
stopPacketQueue()
stopLocationRequests()
GeeksvilleApplication.analytics.track(
@ -1084,6 +1145,7 @@ class MeshService : Service(), Logging {
MeshProtos.FromRadio.CHANNEL_FIELD_NUMBER -> handleChannel(proto.channel)
MeshProtos.FromRadio.CONFIG_FIELD_NUMBER -> handleDeviceConfig(proto.config)
MeshProtos.FromRadio.MODULECONFIG_FIELD_NUMBER -> handleModuleConfig(proto.moduleConfig)
MeshProtos.FromRadio.QUEUESTATUS_FIELD_NUMBER -> handleQueueStatus(proto.queueStatus)
else -> errormsg("Unexpected FromRadio variant")
}
} catch (ex: InvalidProtocolBufferException) {
@ -1124,6 +1186,17 @@ class MeshService : Service(), Logging {
setLocalModuleConfig(config)
}
private fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) {
debug("queueStatus ${queueStatus.toOneLineString()}")
val (success, isFull, requestId) = with(queueStatus) {
// FIXME use "free == 0" when MeshPacketQueue is fixed
Triple(res == 0, free <= 16, meshPacketId)
}
if (success && isFull) return // Queue is full, wait for free != 0
if (requestId != 0) queueResponse.remove(requestId)?.complete(success)
else queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success)
}
private fun handleChannel(ch: ChannelProtos.Channel) {
debug("Received channel ${ch.index}")
val packetToSave = MeshLog(