diff --git a/proto b/proto index a76ceb150..0221e83d6 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit a76ceb1509b2ec3d844af0378e998e7d4737492c +Subproject commit 0221e83d689f7930ed3e5c474eff4fbb8697efbb diff --git a/src/mesh/MeshPacketQueue.cpp b/src/mesh/MeshPacketQueue.cpp new file mode 100644 index 000000000..7d3d2aa2b --- /dev/null +++ b/src/mesh/MeshPacketQueue.cpp @@ -0,0 +1,79 @@ +#include "MeshPacketQueue.h" + +/// @return the priority of the specified packet +inline uint32_t getPriority(MeshPacket *p) +{ + auto pri = p->priority; + return pri; +} + +/// @return "true" if "p1" is ordered before "p2" +bool CompareMeshPacket::operator()(MeshPacket *p1, MeshPacket *p2) +{ + assert(p1 && p2); + auto p1p = getPriority(p1), p2p = getPriority(p2); + + // If priorities differ, use that + // for equal priorities, order by id (older packets have higher priority - this will briefly be wrong when IDs roll over but + // no big deal) + return (p1p != p2p) ? (p1p < p2p) // prefer bigger priorities + : (p1->id >= p2->id); // prefer smaller packet ids +} + +MeshPacketQueue::MeshPacketQueue(size_t _maxLen) : maxLen(_maxLen) +{ +} + +/** enqueue a packet, return false if full */ +bool MeshPacketQueue::enqueue(MeshPacket *p) +{ + + // We might receive acks from other nodes (and since generated remotely, they won't have priority assigned. Check for that + // and fix it + if (p->priority == MeshPacket_Priority_UNSET) + p->priority = p->decoded.which_ackVariant ? MeshPacket_Priority_ACK : MeshPacket_Priority_DEFAULT; + + if (size() >= maxLen) + return false; + else { + push(p); + return true; + } +} + +MeshPacket *MeshPacketQueue::dequeue() +{ + if (empty()) + return NULL; + else { + auto p = top(); + pop(); // remove the first item + return p; + } +} + +// this is kinda yucky, but I'm not sure if all arduino c++ compilers support closuers. And we only have one +// thread that can run at a time - so safe +static NodeNum findFrom; +static PacketId findId; + +static bool isMyPacket(MeshPacket *p) +{ + return p->id == findId && p->from == findFrom; +} + +/** Attempt to find and remove a packet from this queue. Returns true the packet which was removed from the queue */ +MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id) +{ + findFrom = from; + findId = id; + auto it = std::find_if(this->c.begin(), this->c.end(), isMyPacket); + if (it != this->c.end()) { + auto p = *it; + this->c.erase(it); + std::make_heap(this->c.begin(), this->c.end(), this->comp); + return p; + } else { + return NULL; + } +} diff --git a/src/mesh/MeshPacketQueue.h b/src/mesh/MeshPacketQueue.h new file mode 100644 index 000000000..f04649cb5 --- /dev/null +++ b/src/mesh/MeshPacketQueue.h @@ -0,0 +1,33 @@ +#pragma once + +#include "MeshTypes.h" + +#include +#include + +// this is an strucure which implements the +// operator overloading +struct CompareMeshPacket { + bool operator()(MeshPacket *p1, MeshPacket *p2); +}; + +/** + * A priority queue of packets. + * + */ +class MeshPacketQueue : public std::priority_queue, CompareMeshPacket> +{ + size_t maxLen; + public: + MeshPacketQueue(size_t _maxLen); + + /** enqueue a packet, return false if full */ + bool enqueue(MeshPacket *p); + + // bool isEmpty(); + + MeshPacket *dequeue(); + + /** Attempt to find and remove a packet from this queue. Returns true the packet which was removed from the queue */ + MeshPacket *remove(NodeNum from, PacketId id); +}; \ No newline at end of file diff --git a/src/mesh/RadioInterface.cpp b/src/mesh/RadioInterface.cpp index 34e04298f..972e67678 100644 --- a/src/mesh/RadioInterface.cpp +++ b/src/mesh/RadioInterface.cpp @@ -161,6 +161,9 @@ void printPacket(const char *prefix, const MeshPacket *p) if (p->rx_snr != 0.0) { DEBUG_MSG(" rxSNR=%g", p->rx_snr); } + if(p->priority != 0) + DEBUG_MSG(" priority=%d", p->priority); + DEBUG_MSG(")\n"); } diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 744762c08..84fd37ee6 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -100,7 +100,7 @@ ErrorCode RadioLibInterface::send(MeshPacket *p) uint32_t xmitMsec = getPacketTime(p); DEBUG_MSG("txGood=%d,rxGood=%d,rxBad=%d\n", txGood, rxGood, rxBad); - ErrorCode res = txQueue.enqueue(p, 0) ? ERRNO_OK : ERRNO_UNKNOWN; + ErrorCode res = txQueue.enqueue(p) ? ERRNO_OK : ERRNO_UNKNOWN; if (res != ERRNO_OK) { // we weren't able to queue it, so we must drop it to prevent leaks packetPool.release(p); @@ -125,7 +125,7 @@ ErrorCode RadioLibInterface::send(MeshPacket *p) bool RadioLibInterface::canSleep() { - bool res = txQueue.isEmpty(); + bool res = txQueue.empty(); if (!res) // only print debug messages if we are vetoing sleep DEBUG_MSG("radio wait to sleep, txEmpty=%d\n", res); @@ -134,8 +134,13 @@ bool RadioLibInterface::canSleep() /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ bool RadioLibInterface::cancelSending(NodeNum from, PacketId id) { - assert(0); - return false; + auto p = txQueue.remove(from, id); + if(p) + packetPool.release(p); // free the packet we just removed + + bool result = (p != NULL); + DEBUG_MSG("cancelSending id=0x%x, removed=%d", id, result); + return result; } @@ -172,12 +177,12 @@ void RadioLibInterface::onNotify(uint32_t notification) // If we are not currently in receive mode, then restart the timer and try again later (this can happen if the main thread // has placed the unit into standby) FIXME, how will this work if the chipset is in sleep mode? - if (!txQueue.isEmpty()) { + if (!txQueue.empty()) { if (!canSendImmediately()) { startTransmitTimer(); // try again in a little while } else { // Send any outgoing packets we have ready - MeshPacket *txp = txQueue.dequeuePtr(0); + MeshPacket *txp = txQueue.dequeue(); assert(txp); startSend(txp); } @@ -193,7 +198,7 @@ void RadioLibInterface::onNotify(uint32_t notification) void RadioLibInterface::startTransmitTimer(bool withDelay) { // If we have work to do and the timer wasn't already scheduled, schedule it now - if (!txQueue.isEmpty()) { + if (!txQueue.empty()) { uint32_t delay = !withDelay ? 1 : getTxDelayMsec(); // DEBUG_MSG("xmit timer %d\n", delay); notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index b002f49bc..726e81b37 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -2,6 +2,7 @@ #include "../concurrency/OSThread.h" #include "RadioInterface.h" +#include "MeshPacketQueue.h" #ifdef CubeCell_BoardPlus #define RADIOLIB_SOFTWARE_SERIAL_UNSUPPORTED @@ -74,7 +75,7 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified */ uint32_t rxBad = 0, rxGood = 0, txGood = 0; - PointerQueue txQueue = PointerQueue(MAX_TX_QUEUE); + MeshPacketQueue txQueue = MeshPacketQueue(MAX_TX_QUEUE); protected: @@ -138,7 +139,7 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id); - + private: /** if we have something waiting to send, start a short random timer so we can come check for collision before actually doing * the transmit diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 121d42a21..43d939c31 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -120,6 +120,7 @@ void Router::sendAckNak(ErrorReason err, NodeNum to, PacketId idFrom) p->decoded.which_payloadVariant = SubPacket_error_reason_tag; p->decoded.error_reason = err; } + p->priority = MeshPacket_Priority_ACK; sendLocal(p); // we sometimes send directly to the local node } diff --git a/src/mesh/generated/deviceonly.pb.h b/src/mesh/generated/deviceonly.pb.h index fb503be49..ad6b31738 100644 --- a/src/mesh/generated/deviceonly.pb.h +++ b/src/mesh/generated/deviceonly.pb.h @@ -80,7 +80,7 @@ extern const pb_msgdesc_t DeviceState_msg; #define DeviceState_fields &DeviceState_msg /* Maximum encoded size of messages (where known) */ -#define DeviceState_size 6262 +#define DeviceState_size 6266 #ifdef __cplusplus } /* extern "C" */ diff --git a/src/mesh/generated/mesh.pb.c b/src/mesh/generated/mesh.pb.c index 3bc2b0800..0bf5fbe38 100644 --- a/src/mesh/generated/mesh.pb.c +++ b/src/mesh/generated/mesh.pb.c @@ -58,3 +58,4 @@ PB_BIND(ToRadio, ToRadio, 2) + diff --git a/src/mesh/generated/mesh.pb.h b/src/mesh/generated/mesh.pb.h index b9de66a98..7dd097cef 100644 --- a/src/mesh/generated/mesh.pb.h +++ b/src/mesh/generated/mesh.pb.h @@ -83,6 +83,15 @@ typedef enum _CriticalErrorCode { CriticalErrorCode_TransmitFailed = 8 } CriticalErrorCode; +typedef enum _MeshPacket_Priority { + MeshPacket_Priority_UNSET = 0, + MeshPacket_Priority_MIN = 1, + MeshPacket_Priority_BACKGROUND = 10, + MeshPacket_Priority_DEFAULT = 64, + MeshPacket_Priority_ACK = 120, + MeshPacket_Priority_MAX = 127 +} MeshPacket_Priority; + typedef enum _ChannelSettings_ModemConfig { ChannelSettings_ModemConfig_Bw125Cr45Sf128 = 0, ChannelSettings_ModemConfig_Bw500Cr45Sf128 = 1, @@ -265,6 +274,7 @@ typedef struct _MeshPacket { uint32_t rx_time; uint32_t hop_limit; bool want_ack; + MeshPacket_Priority priority; } MeshPacket; typedef struct _FromRadio { @@ -323,6 +333,10 @@ typedef struct _ToRadio { #define _CriticalErrorCode_MAX CriticalErrorCode_TransmitFailed #define _CriticalErrorCode_ARRAYSIZE ((CriticalErrorCode)(CriticalErrorCode_TransmitFailed+1)) +#define _MeshPacket_Priority_MIN MeshPacket_Priority_UNSET +#define _MeshPacket_Priority_MAX MeshPacket_Priority_MAX +#define _MeshPacket_Priority_ARRAYSIZE ((MeshPacket_Priority)(MeshPacket_Priority_MAX+1)) + #define _ChannelSettings_ModemConfig_MIN ChannelSettings_ModemConfig_Bw125Cr45Sf128 #define _ChannelSettings_ModemConfig_MAX ChannelSettings_ModemConfig_Bw125Cr48Sf4096 #define _ChannelSettings_ModemConfig_ARRAYSIZE ((ChannelSettings_ModemConfig)(ChannelSettings_ModemConfig_Bw125Cr48Sf4096+1)) @@ -342,7 +356,7 @@ extern "C" { #define User_init_default {"", "", "", {0}} #define RouteDiscovery_init_default {0, {0, 0, 0, 0, 0, 0, 0, 0}} #define SubPacket_init_default {0, {Position_init_default}, 0, 0, 0, 0, {0}, 0} -#define MeshPacket_init_default {0, 0, 0, {SubPacket_init_default}, 0, 0, 0, 0, 0, 0} +#define MeshPacket_init_default {0, 0, 0, {SubPacket_init_default}, 0, 0, 0, 0, 0, 0, _MeshPacket_Priority_MIN} #define ChannelSettings_init_default {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0} #define RadioConfig_init_default {false, RadioConfig_UserPreferences_init_default, false, ChannelSettings_init_default} #define RadioConfig_UserPreferences_init_default {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} @@ -356,7 +370,7 @@ extern "C" { #define User_init_zero {"", "", "", {0}} #define RouteDiscovery_init_zero {0, {0, 0, 0, 0, 0, 0, 0, 0}} #define SubPacket_init_zero {0, {Position_init_zero}, 0, 0, 0, 0, {0}, 0} -#define MeshPacket_init_zero {0, 0, 0, {SubPacket_init_zero}, 0, 0, 0, 0, 0, 0} +#define MeshPacket_init_zero {0, 0, 0, {SubPacket_init_zero}, 0, 0, 0, 0, 0, 0, _MeshPacket_Priority_MIN} #define ChannelSettings_init_zero {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0} #define RadioConfig_init_zero {false, RadioConfig_UserPreferences_init_zero, false, ChannelSettings_init_zero} #define RadioConfig_UserPreferences_init_zero {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} @@ -479,6 +493,7 @@ extern "C" { #define MeshPacket_rx_time_tag 9 #define MeshPacket_hop_limit_tag 10 #define MeshPacket_want_ack_tag 11 +#define MeshPacket_priority_tag 12 #define FromRadio_num_tag 1 #define FromRadio_packet_tag 2 #define FromRadio_my_info_tag 3 @@ -554,7 +569,8 @@ X(a, STATIC, SINGULAR, UINT32, id, 6) \ X(a, STATIC, SINGULAR, FLOAT, rx_snr, 7) \ X(a, STATIC, SINGULAR, FIXED32, rx_time, 9) \ X(a, STATIC, SINGULAR, UINT32, hop_limit, 10) \ -X(a, STATIC, SINGULAR, BOOL, want_ack, 11) +X(a, STATIC, SINGULAR, BOOL, want_ack, 11) \ +X(a, STATIC, SINGULAR, UENUM, priority, 12) #define MeshPacket_CALLBACK NULL #define MeshPacket_DEFAULT NULL #define MeshPacket_payloadVariant_decoded_MSGTYPE SubPacket @@ -734,7 +750,7 @@ extern const pb_msgdesc_t ToRadio_msg; #define User_size 72 #define RouteDiscovery_size 88 #define SubPacket_size 275 -#define MeshPacket_size 320 +#define MeshPacket_size 322 #define ChannelSettings_size 95 #define RadioConfig_size 405 #define RadioConfig_UserPreferences_size 305 diff --git a/src/plugins/NodeInfoPlugin.cpp b/src/plugins/NodeInfoPlugin.cpp index 494387dd5..6c236ed34 100644 --- a/src/plugins/NodeInfoPlugin.cpp +++ b/src/plugins/NodeInfoPlugin.cpp @@ -35,8 +35,9 @@ void NodeInfoPlugin::sendOurNodeInfo(NodeNum dest, bool wantReplies) MeshPacket *p = allocReply(); p->to = dest; p->decoded.want_response = wantReplies; + p->priority = MeshPacket_Priority_BACKGROUND; prevPacketId = p->id; - + service.sendToMesh(p); } diff --git a/src/plugins/PositionPlugin.cpp b/src/plugins/PositionPlugin.cpp index ab89eddc1..4ea7298e6 100644 --- a/src/plugins/PositionPlugin.cpp +++ b/src/plugins/PositionPlugin.cpp @@ -40,10 +40,11 @@ void PositionPlugin::sendOurPosition(NodeNum dest, bool wantReplies) // cancel any not yet sent (now stale) position packets if(prevPacketId) // if we wrap around to zero, we'll simply fail to cancel in that rare case (no big deal) service.cancelSending(prevPacketId); - + MeshPacket *p = allocReply(); p->to = dest; p->decoded.want_response = wantReplies; + p->priority = MeshPacket_Priority_BACKGROUND; prevPacketId = p->id; service.sendToMesh(p);