diff --git a/src/mesh/FloodingRouter.cpp b/src/mesh/FloodingRouter.cpp index e29c596df..f94540905 100644 --- a/src/mesh/FloodingRouter.cpp +++ b/src/mesh/FloodingRouter.cpp @@ -24,11 +24,15 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) printPacket("Ignore dupe incoming msg", p); rxDupe++; if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER && - config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { + config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER && + config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER_LATE) { // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! if (Router::cancelSending(p->from, p->id)) txRelayCanceled++; } + if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_LATE && iface) { + iface->clampToLateRebroadcastWindow(getFrom(p), p->id); + } /* If the original transmitter is doing retransmissions (hopStart equals hopLimit) for a reliable transmission, e.g., when the ACK got lost, we will handle the packet again to make sure it gets an ACK to its packet. */ diff --git a/src/mesh/MeshPacketQueue.cpp b/src/mesh/MeshPacketQueue.cpp index 99ef41c1e..d7ee65800 100644 --- a/src/mesh/MeshPacketQueue.cpp +++ b/src/mesh/MeshPacketQueue.cpp @@ -16,6 +16,12 @@ inline uint32_t getPriority(const meshtastic_MeshPacket *p) bool CompareMeshPacketFunc(const meshtastic_MeshPacket *p1, const meshtastic_MeshPacket *p2) { assert(p1 && p2); + + // If one packet is in the late transmit window, prefer the other one + if ((bool)p1->tx_after != (bool)p2->tx_after) { + return !p1->tx_after; + } + auto p1p = getPriority(p1), p2p = getPriority(p2); // If priorities differ, use that // for equal priorities, prefer packets already on mesh. @@ -94,11 +100,11 @@ meshtastic_MeshPacket *MeshPacketQueue::getFront() } /** Attempt to find and remove a packet from this queue. Returns a pointer to the removed packet, or NULL if not found */ -meshtastic_MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id) +meshtastic_MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id, bool tx_normal, bool tx_late) { for (auto it = queue.begin(); it != queue.end(); it++) { auto p = (*it); - if (getFrom(p) == from && p->id == id) { + if (getFrom(p) == from && p->id == id && ((tx_normal && !p->tx_after) || (tx_late && p->tx_after))) { queue.erase(it); return p; } @@ -114,9 +120,10 @@ bool MeshPacketQueue::replaceLowerPriorityPacket(meshtastic_MeshPacket *p) if (queue.empty()) { return false; // No packets to replace } + // Check if the packet at the back has a lower priority than the new packet auto &backPacket = queue.back(); - if (backPacket->priority < p->priority) { + if (!backPacket->tx_after && backPacket->priority < p->priority) { // Remove the back packet packetPool.release(backPacket); queue.pop_back(); @@ -125,6 +132,19 @@ bool MeshPacketQueue::replaceLowerPriorityPacket(meshtastic_MeshPacket *p) return true; } + if (backPacket->tx_after) { + // Check if there's a non-late packet with lower priority + auto it = queue.end(); + auto refPacket = *--it; + for (; refPacket->tx_after && it != queue.begin(); refPacket = *--it) + ; + if (!refPacket->tx_after && refPacket->priority < p->priority) { + packetPool.release(refPacket); + enqueue(refPacket); + return true; + } + } + // If the back packet's priority is not lower, no replacement occurs return false; } \ No newline at end of file diff --git a/src/mesh/MeshPacketQueue.h b/src/mesh/MeshPacketQueue.h index 3c28fc5ce..b41a214b9 100644 --- a/src/mesh/MeshPacketQueue.h +++ b/src/mesh/MeshPacketQueue.h @@ -36,5 +36,5 @@ class MeshPacketQueue meshtastic_MeshPacket *getFront(); /** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */ - meshtastic_MeshPacket *remove(NodeNum from, PacketId id); -}; + meshtastic_MeshPacket *remove(NodeNum from, PacketId id, bool tx_normal = true, bool tx_late = true); +}; \ No newline at end of file diff --git a/src/mesh/RadioInterface.cpp b/src/mesh/RadioInterface.cpp index 5a18ab0c0..b1403f3b6 100644 --- a/src/mesh/RadioInterface.cpp +++ b/src/mesh/RadioInterface.cpp @@ -254,8 +254,8 @@ uint32_t RadioInterface::getTxDelayMsec() return random(0, pow(2, CWsize)) * slotTimeMsec; } -/** The delay to use when we want to flood a message */ -uint32_t RadioInterface::getTxDelayMsecWeighted(float snr) +/** The CW size to use when calculating SNR_based delays */ +uint8_t RadioInterface::getCWsize(float snr) { // The minimum value for a LoRa SNR const uint32_t SNR_MIN = -20; @@ -263,10 +263,24 @@ uint32_t RadioInterface::getTxDelayMsecWeighted(float snr) // The maximum value for a LoRa SNR const uint32_t SNR_MAX = 15; + return map(snr, SNR_MIN, SNR_MAX, CWmin, CWmax); +} + +/** The worst-case SNR_based packet delay */ +uint32_t RadioInterface::getTxDelayMsecWeightedWorst(float snr) +{ + uint8_t CWsize = getCWsize(snr); + // offset the maximum delay for routers: (2 * CWmax * slotTimeMsec) + return (2 * CWmax * slotTimeMsec) + pow(2, CWsize) * slotTimeMsec; +} + +/** The delay to use when we want to flood a message */ +uint32_t RadioInterface::getTxDelayMsecWeighted(float snr) +{ // high SNR = large CW size (Long Delay) // low SNR = small CW size (Short Delay) uint32_t delay = 0; - uint8_t CWsize = map(snr, SNR_MIN, SNR_MAX, CWmin, CWmax); + uint8_t CWsize = getCWsize(snr); // LOG_DEBUG("rx_snr of %f so setting CWsize to:%d", snr, CWsize); if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER || config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER) { diff --git a/src/mesh/RadioInterface.h b/src/mesh/RadioInterface.h index 89a4c7087..652b2269c 100644 --- a/src/mesh/RadioInterface.h +++ b/src/mesh/RadioInterface.h @@ -173,9 +173,18 @@ class RadioInterface /** The delay to use when we want to send something */ uint32_t getTxDelayMsec(); + /** The CW to use when calculating SNR_based delays */ + uint8_t getCWsize(float snr); + + /** The worst-case SNR_based packet delay */ + uint32_t getTxDelayMsecWeightedWorst(float snr); + /** The delay to use when we want to flood a message. Use a weighted scale based on SNR */ uint32_t getTxDelayMsecWeighted(float snr); + /** If the packet is not already in the late rebroadcast window, move it there */ + virtual void clampToLateRebroadcastWindow(NodeNum from, PacketId id) { return; } + /** * Calculate airtime per * https://www.rs-online.com/designspark/rel-assets/ds-assets/uploads/knowledge-items/application-notes-for-the-internet-of-things/LoRa%20Design%20Guide.pdf diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index e416160eb..997b1d6fe 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -235,12 +235,12 @@ void RadioLibInterface::onNotify(uint32_t notification) case ISR_TX: handleTransmitInterrupt(); startReceive(); - startTransmitTimer(); + setTransmitDelay(); break; case ISR_RX: handleReceiveInterrupt(); startReceive(); - startTransmitTimer(); + setTransmitDelay(); break; case TRANSMIT_DELAY_COMPLETED: @@ -250,23 +250,32 @@ void RadioLibInterface::onNotify(uint32_t notification) if (!canSendImmediately()) { setTransmitDelay(); // currently Rx/Tx-ing: reset random delay } else { - if (isChannelActive()) { // check if there is currently a LoRa packet on the channel - startReceive(); // try receiving this packet, afterwards we'll be trying to transmit again - setTransmitDelay(); + meshtastic_MeshPacket *txp = txQueue.getFront(); + assert(txp); + long delay_remaining = txp->tx_after ? txp->tx_after - millis() : 0; + if (delay_remaining > 0) { + // There's still some delay pending on this packet, so resume waiting for it to elapse + notifyLater(delay_remaining, TRANSMIT_DELAY_COMPLETED, false); } else { - // Send any outgoing packets we have ready as fast as possible to keep the time between channel scan and - // actual transmission as short as possible - meshtastic_MeshPacket *txp = txQueue.dequeue(); - assert(txp); - bool sent = startSend(txp); - if (sent) { - // Packet has been sent, count it toward our TX airtime utilization. - uint32_t xmitMsec = getPacketTime(txp); - airTime->logAirtime(TX_LOG, xmitMsec); + if (isChannelActive()) { // check if there is currently a LoRa packet on the channel + startReceive(); // try receiving this packet, afterwards we'll be trying to transmit again + setTransmitDelay(); + } else { + // Send any outgoing packets we have ready as fast as possible to keep the time between channel scan and + // actual transmission as short as possible + txp = txQueue.dequeue(); + assert(txp); + bool sent = startSend(txp); + if (sent) { + // Packet has been sent, count it toward our TX airtime utilization. + uint32_t xmitMsec = getPacketTime(txp); + airTime->logAirtime(TX_LOG, xmitMsec); + } } } } } else { + // Do nothing, because the queue is empty } break; default: @@ -277,15 +286,24 @@ void RadioLibInterface::onNotify(uint32_t notification) void RadioLibInterface::setTransmitDelay() { meshtastic_MeshPacket *p = txQueue.getFront(); + if (!p) { + return; // noop if there's nothing in the queue + } + // We want all sending/receiving to be done by our daemon thread. // We use a delay here because this packet might have been sent in response to a packet we just received. // So we want to make sure the other side has had a chance to reconfigure its radio. - /* We assume if rx_snr = 0 and rx_rssi = 0, the packet was generated locally. - * This assumption is valid because of the offset generated by the radio to account for the noise - * floor. - */ - if (p->rx_snr == 0 && p->rx_rssi == 0) { + if (p->tx_after) { + unsigned long add_delay = p->rx_rssi ? getTxDelayMsecWeighted(p->rx_snr) : getTxDelayMsec(); + unsigned long now = millis(); + p->tx_after = max(p->tx_after + add_delay, now + add_delay); + notifyLater(now - p->tx_after, TRANSMIT_DELAY_COMPLETED, false); + } else if (p->rx_snr == 0 && p->rx_rssi == 0) { + /* We assume if rx_snr = 0 and rx_rssi = 0, the packet was generated locally. + * This assumption is valid because of the offset generated by the radio to account for the noise + * floor. + */ startTransmitTimer(true); } else { // If there is a SNR, start a timer scaled based on that SNR. @@ -312,6 +330,20 @@ void RadioLibInterface::startTransmitTimerSNR(float snr) } } +/** + * If the packet is not already in the late rebroadcast window, move it there + */ +void RadioLibInterface::clampToLateRebroadcastWindow(NodeNum from, PacketId id) +{ + // Look for non-late packets only, so we don't do this twice! + meshtastic_MeshPacket *p = txQueue.remove(from, id, true, false); + if (p) { + p->tx_after = millis() + getTxDelayMsecWeightedWorst(p->rx_snr); + txQueue.enqueue(p); + LOG_DEBUG("Move existing queued packet to the late rebroadcast window %dms from now", p->tx_after - millis()); + } +} + void RadioLibInterface::handleTransmitInterrupt() { // This can be null if we forced the device to enter standby mode. In that case diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index d6101ae37..dff58c9ad 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -140,10 +140,16 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified * doing the transmit */ void setTransmitDelay(); - /** random timer with certain min. and max. settings */ + /** + * random timer with certain min. and max. settings + * @return Timestamp after which the packet may be sent + */ void startTransmitTimer(bool withDelay = true); - /** timer scaled to SNR of to be flooded packet */ + /** + * timer scaled to SNR of to be flooded packet + * @return Timestamp after which the packet may be sent + */ void startTransmitTimerSNR(float snr); void handleTransmitInterrupt(); @@ -193,4 +199,9 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified virtual void setStandby(); const char *radioLibErr = "RadioLib err="; + + /** + * If the packet is not already in the late rebroadcast window, move it there + */ + void clampToLateRebroadcastWindow(NodeNum from, PacketId id); }; \ No newline at end of file