combine acks works

1.2-legacy
Kevin Hester 2021-03-05 11:44:45 +08:00
rodzic 950b32232f
commit 0c0c0babba
11 zmienionych plików z 91 dodań i 63 usunięć

Wyświetl plik

@ -31,7 +31,7 @@ You probably don't care about this section - skip to the next one.
* DONE fix setch-fast in python tool
* age out pendingrequests in the python API
* DONE stress test channel download from python, sometimes it seems like we don't get all replies, bug was due to simultaneous android connection
* combine acks and responses in a single message if possible (do routing plugin LAST and drop ACK if someone else has already replied)
* DONE combine acks and responses in a single message if possible (do routing plugin LAST and drop ACK if someone else has already replied)
* DONE don't send packets we received from the phone BACK TOWARDS THE PHONE (possibly use fromnode 0 for packets the phone sends?)
* fix 1.1.50 android debug panel display
* DONE warn in android app about unset regions

Wyświetl plik

@ -31,7 +31,7 @@ void FloodingRouter::sniffReceived(const MeshPacket *p, const Routing *c)
{
// If a broadcast, possibly _also_ send copies out into the mesh.
// (FIXME, do something smarter than naive flooding here)
if (p->to == NODENUM_BROADCAST && p->hop_limit > 0 && p->from != getNodeNum()) {
if (p->to == NODENUM_BROADCAST && p->hop_limit > 0 && getFrom(p) != getNodeNum()) {
if (p->id != 0) {
MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it

Wyświetl plik

@ -1,23 +1,28 @@
#include "MeshPlugin.h"
#include "NodeDB.h"
#include "MeshService.h"
#include "NodeDB.h"
#include <assert.h>
std::vector<MeshPlugin *> *MeshPlugin::plugins;
const MeshPacket *MeshPlugin::currentRequest;
/**
* If any of the current chain of plugins has already sent a reply, it will be here. This is useful to allow
* the RoutingPlugin to avoid sending redundant acks
*/
MeshPacket *MeshPlugin::currentReply;
MeshPlugin::MeshPlugin(const char *_name) : name(_name)
{
// Can't trust static initalizer order, so we check each time
if(!plugins)
if (!plugins)
plugins = new std::vector<MeshPlugin *>();
plugins->push_back(this);
}
void MeshPlugin::setup() {
}
void MeshPlugin::setup() {}
MeshPlugin::~MeshPlugin()
{
@ -31,6 +36,8 @@ void MeshPlugin::callPlugins(const MeshPacket &mp)
assert(mp.which_payloadVariant == MeshPacket_decoded_tag); // I think we are guarnteed the packet is decoded by this point?
currentReply = NULL; // No reply yet
// Was this message directed to us specifically? Will be false if we are sniffing someone elses packets
auto ourNodeNum = nodeDB.getNodeNum();
bool toUs = mp.to == NODENUM_BROADCAST || mp.to == ourNodeNum;
@ -48,15 +55,16 @@ void MeshPlugin::callPlugins(const MeshPacket &mp)
bool handled = pi.handleReceived(mp);
// Possibly send replies (but only if the message was directed to us specifically, i.e. not for promiscious sniffing)
// also: we only let the one plugin send a reply, once that happens, remaining plugins are not considered
// NOTE: we send a reply *even if the (non broadcast) request was from us* which is unfortunate but necessary because currently when the phone
// sends things, it sends things using the local node ID as the from address. A better solution (FIXME) would be to let phones
// have their own distinct addresses and we 'route' to them like any other node.
if (mp.decoded.want_response && toUs && (getFrom(&mp) != ourNodeNum || mp.to == ourNodeNum)) {
// NOTE: we send a reply *even if the (non broadcast) request was from us* which is unfortunate but necessary because
// currently when the phone sends things, it sends things using the local node ID as the from address. A better
// solution (FIXME) would be to let phones have their own distinct addresses and we 'route' to them like any other
// node.
if (mp.decoded.want_response && toUs && (getFrom(&mp) != ourNodeNum || mp.to == ourNodeNum) && !currentReply) {
pi.sendResponse(mp);
DEBUG_MSG("Plugin %s sent a response\n", pi.name);
}
else {
} else {
DEBUG_MSG("Plugin %s considered\n", pi.name);
}
if (handled) {
@ -64,11 +72,17 @@ void MeshPlugin::callPlugins(const MeshPacket &mp)
break;
}
}
pi.currentRequest = NULL;
}
if(!pluginFound)
if(currentReply) {
DEBUG_MSG("Sending response\n");
service.sendToMesh(currentReply);
currentReply = NULL;
}
if (!pluginFound)
DEBUG_MSG("No plugins interested in portnum=%d\n", mp.decoded.portnum);
}
@ -76,39 +90,39 @@ void MeshPlugin::callPlugins(const MeshPacket &mp)
* so that subclasses can (optionally) send a response back to the original sender. Implementing this method
* is optional
*/
void MeshPlugin::sendResponse(const MeshPacket &req) {
void MeshPlugin::sendResponse(const MeshPacket &req)
{
auto r = allocReply();
if(r) {
DEBUG_MSG("Sending response\n");
if (r) {
setReplyTo(r, req);
service.sendToMesh(r);
}
else {
currentReply = r;
} else {
// Ignore - this is now expected behavior for routing plugin (because it ignores some replies)
// DEBUG_MSG("WARNING: Client requested response but this plugin did not provide\n");
}
}
/** set the destination and packet parameters of packet p intended as a reply to a particular "to" packet
/** set the destination and packet parameters of packet p intended as a reply to a particular "to" packet
* This ensures that if the request packet was sent reliably, the reply is sent that way as well.
*/
void setReplyTo(MeshPacket *p, const MeshPacket &to) {
*/
void setReplyTo(MeshPacket *p, const MeshPacket &to)
{
assert(p->which_payloadVariant == MeshPacket_decoded_tag); // Should already be set by now
p->to = getFrom(&to);
p->want_ack = to.want_ack;
p->decoded.request_id = to.id;
}
std::vector<MeshPlugin *> MeshPlugin::GetMeshPluginsWithUIFrames() {
std::vector<MeshPlugin *> MeshPlugin::GetMeshPluginsWithUIFrames()
{
std::vector<MeshPlugin *> pluginsWithUIFrames;
std::vector<MeshPlugin *> pluginsWithUIFrames;
for (auto i = plugins->begin(); i != plugins->end(); ++i) {
auto &pi = **i;
if ( pi.wantUIFrame()) {
if (pi.wantUIFrame()) {
DEBUG_MSG("Plugin wants a UI Frame\n");
pluginsWithUIFrames.push_back(&pi);
}
}
return pluginsWithUIFrames;
}

Wyświetl plik

@ -82,6 +82,13 @@ class MeshPlugin
private:
/**
* If any of the current chain of plugins has already sent a reply, it will be here. This is useful to allow
* the RoutingPlugin to avoid sending redundant acks
*/
static MeshPacket *currentReply;
friend class ReliableRouter;
/** Messages can be received that have the want_response bit set. If set, this callback will be invoked
* so that subclasses can (optionally) send a response back to the original sender. This method calls allocReply()
* to generate the reply message, and if !NULL that message will be delivered to whoever sent req

Wyświetl plik

@ -69,26 +69,21 @@ int MeshService::handleFromRadio(const MeshPacket *mp)
{
powerFSM.trigger(EVENT_RECEIVED_PACKET); // Possibly keep the node from sleeping
if (mp->from != 0) {
printPacket("Forwarding to phone", mp);
nodeDB.updateFrom(*mp); // update our DB state based off sniffing every RX packet from the radio
printPacket("Forwarding to phone", mp);
nodeDB.updateFrom(*mp); // update our DB state based off sniffing every RX packet from the radio
fromNum++;
fromNum++;
if (toPhoneQueue.numFree() == 0) {
DEBUG_MSG("NOTE: tophone queue is full, discarding oldest\n");
MeshPacket *d = toPhoneQueue.dequeuePtr(0);
if (d)
releaseToPool(d);
}
MeshPacket *copied = packetPool.allocCopy(*mp);
assert(toPhoneQueue.enqueue(copied, 0)); // FIXME, instead of failing for full queue, delete the oldest mssages
}
else {
DEBUG_MSG("Packet originally from phone, no need to send back that way...\n");
if (toPhoneQueue.numFree() == 0) {
DEBUG_MSG("NOTE: tophone queue is full, discarding oldest\n");
MeshPacket *d = toPhoneQueue.dequeuePtr(0);
if (d)
releaseToPool(d);
}
MeshPacket *copied = packetPool.allocCopy(*mp);
assert(toPhoneQueue.enqueue(copied, 0)); // FIXME, instead of failing for full queue, delete the oldest mssages
return 0;
}

Wyświetl plik

@ -59,6 +59,7 @@ void PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
}
// return (lastContactMsec != 0) &&
memset(&toRadioScratch, 0, sizeof(toRadioScratch));
if (pb_decode_from_bytes(buf, bufLength, ToRadio_fields, &toRadioScratch)) {
switch (toRadioScratch.which_payloadVariant) {
case ToRadio_packet_tag: {

Wyświetl plik

@ -58,11 +58,12 @@ template <class T> class ProtobufPlugin : protected SinglePortPlugin
// it would be better to update even if the message was destined to others.
auto &p = mp.decoded;
DEBUG_MSG("Received %s from=0x%0x, id=0x%x, payloadlen=%d\n", name, mp.from, mp.id, p.payload.size);
DEBUG_MSG("Received %s from=0x%0x, id=0x%x, portnum=%d, payloadlen=%d\n", name, mp.from, mp.id, p.portnum, p.payload.size);
T scratch;
T *decoded = NULL;
if(mp.decoded.portnum == ourPortNum) {
memset(&scratch, 0, sizeof(scratch));
if (pb_decode_from_bytes(p.payload.bytes, p.payload.size, fields, &scratch))
decoded = &scratch;
else

Wyświetl plik

@ -2,6 +2,7 @@
#include "MeshTypes.h"
#include "configuration.h"
#include "mesh-pb-constants.h"
#include "MeshPlugin.h"
// ReliableRouter::ReliableRouter() {}
@ -26,7 +27,8 @@ ErrorCode ReliableRouter::send(MeshPacket *p)
bool ReliableRouter::shouldFilterReceived(const MeshPacket *p)
{
if (p->to == NODENUM_BROADCAST && getFrom(p) == getNodeNum()) {
// Note: do not use getFrom() here, because we want to ignore messages sent from phone
if (p->to == NODENUM_BROADCAST && p->from == getNodeNum()) {
printPacket("Rx someone rebroadcasting for us", p);
// We are seeing someone rebroadcast one of our broadcast attempts.
@ -34,7 +36,8 @@ bool ReliableRouter::shouldFilterReceived(const MeshPacket *p)
// the original sending process.
if (stopRetransmission(getFrom(p), p->id)) {
DEBUG_MSG("Someone is retransmitting for us, generate implicit ack\n");
sendAckNak(Routing_Error_NONE, getFrom(p), p->id);
if(p->want_ack)
sendAckNak(Routing_Error_NONE, getFrom(p), p->id);
}
}
@ -60,23 +63,26 @@ void ReliableRouter::sniffReceived(const MeshPacket *p, const Routing *c)
if (p->to == ourNode) { // ignore ack/nak/want_ack packets that are not address to us (we only handle 0 hop reliability
// - not DSR routing)
if (p->want_ack) {
sendAckNak(Routing_Error_NONE, getFrom(p), p->id);
if(MeshPlugin::currentReply)
DEBUG_MSG("Someone else has replied to this message, no need for a 2nd ack");
else
sendAckNak(Routing_Error_NONE, getFrom(p), p->id);
}
// If the payload is valid, look for ack/nak
if (c) {
PacketId ackId = c->error_reason == Routing_Error_NONE ? p->decoded.request_id : 0;
PacketId nakId = c->error_reason != Routing_Error_NONE ? p->decoded.request_id : 0;
// We consider an ack to be either a !routing packet with a request ID or a routing packet with !error
PacketId ackId = ((c && c->error_reason == Routing_Error_NONE) || !c) ? p->decoded.request_id : 0;
// We intentionally don't check wasSeenRecently, because it is harmless to delete non existent retransmission records
if (ackId || nakId) {
if (ackId) {
DEBUG_MSG("Received a ack=%d, stopping retransmissions\n", ackId);
stopRetransmission(p->to, ackId);
} else {
DEBUG_MSG("Received a nak=%d, stopping retransmissions\n", nakId);
stopRetransmission(p->to, nakId);
}
// A nak is a routing packt that has an error code
PacketId nakId = (c && c->error_reason != Routing_Error_NONE) ? p->decoded.request_id : 0;
// We intentionally don't check wasSeenRecently, because it is harmless to delete non existent retransmission records
if (ackId || nakId) {
if (ackId) {
DEBUG_MSG("Received a ack for 0x%x, stopping retransmissions\n", ackId);
stopRetransmission(p->to, ackId);
} else {
DEBUG_MSG("Received a nak for 0x%x, stopping retransmissions\n", nakId);
stopRetransmission(p->to, nakId);
}
}
}

Wyświetl plik

@ -231,6 +231,7 @@ bool Router::perhapsDecode(MeshPacket *p)
crypto->decrypt(p->from, p->id, p->encrypted.size, bytes);
// Take those raw bytes and convert them back into a well structured protobuf we can understand
memset(p->decoded, 0, sizeof(p->decoded));
if (!pb_decode_from_bytes(bytes, p->encrypted.size, Data_fields, &p->decoded)) {
DEBUG_MSG("Invalid protobufs in received mesh packet (bad psk?!\n");
} else {

Wyświetl plik

@ -20,7 +20,6 @@
*/
void setupPlugins()
{
routingPlugin = new RoutingPlugin();
adminPlugin = new AdminPlugin();
nodeInfoPlugin = new NodeInfoPlugin();
positionPlugin = new PositionPlugin();
@ -48,4 +47,7 @@ void setupPlugins()
// new StoreForwardPlugin();
new EnvironmentalMeasurementPlugin();
#endif
// NOTE! This plugin must be added LAST because it likes to check for replies from other plugins and avoid sending extra acks
routingPlugin = new RoutingPlugin();
}

Wyświetl plik

@ -9,11 +9,12 @@ RoutingPlugin *routingPlugin;
bool RoutingPlugin::handleReceivedProtobuf(const MeshPacket &mp, const Routing *r)
{
DEBUG_MSG("Routing sniffing", &mp);
printPacket("Routing sniffing", &mp);
router->sniffReceived(&mp, r);
// FIXME - move this to a non promsicious PhoneAPI plugin?
if (mp.to == NODENUM_BROADCAST || mp.to == nodeDB.getNodeNum()) {
// Note: we are careful not to send back packets that started with the phone back to the phone
if ((mp.to == NODENUM_BROADCAST || mp.to == nodeDB.getNodeNum()) && (mp.from != 0)) {
printPacket("Delivering rx packet", &mp);
service.handleFromRadio(&mp);
}