Merge pull request #141 from geeksville/reliable

Reliable
1.2-legacy
Kevin Hester 2020-05-21 16:46:01 -07:00 zatwierdzone przez GitHub
commit a753c942b9
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
11 zmienionych plików z 236 dodań i 64 usunięć

Wyświetl plik

@ -2,6 +2,10 @@
great source of papers and class notes: http://www.cs.jhu.edu/~cs647/
flood routing improvements
- DONE if we don't see anyone rebroadcast our want_ack=true broadcasts, retry as needed.
reliable messaging tasks (stage one for DSR):
- DONE generalize naive flooding
@ -19,9 +23,6 @@ reliable messaging tasks (stage one for DSR):
dsr tasks
- do "hop by hop" routing
- when sending, if destnodeinfo.next_hop is zero (and no message is already waiting for an arp for that node), startRouteDiscovery() for that node. Queue the message in the 'waiting for arp queue' so we can send it later when then the arp completes.
- otherwise, use next_hop and start sending a message (with ack request) towards that node.
- Don't use broadcasts for the network pings (close open github issue)
- add ignoreSenders to radioconfig to allow testing different mesh topologies by refusing to see certain senders
- test multihop delivery with the python framework
@ -34,6 +35,12 @@ optimizations / low priority:
- handle 51 day rollover in doRetransmissions
- use a priority queue for the messages waiting to send. Send acks first, then routing messages, then data messages, then broadcasts?
when we send a packet
- do "hop by hop" routing
- when sending, if destnodeinfo.next_hop is zero (and no message is already waiting for an arp for that node), startRouteDiscovery() for that node. Queue the message in the 'waiting for arp queue' so we can send it later when then the arp completes.
- otherwise, use next_hop and start sending a message (with ack request) towards that node (starting with next_hop).
when we receive any packet
- sniff and update tables (especially useful to find adjacent nodes). Update user, network and position info.
@ -47,13 +54,13 @@ routeDiscovery
- if we've already passed through us (or is from us), then it ignore it
- use the nodes already mentioned in the request to update our routing table
- if they were looking for us, send back a routereply
- if max_hops is zero and they weren't looking for us, drop (FIXME, send back error - I think not though?)
- if we receive a discovery packet, we use it to populate next_hop (if needed) towards the requester (after decrementing max_hops)
- NOT DOING FOR NOW -if max_hops is zero and they weren't looking for us, drop (FIXME, send back error - I think not though?)
- if we receive a discovery packet, and we don't have next_hop set in our nodedb, we use it to populate next_hop (if needed) towards the requester (after decrementing max_hops)
- if we receive a discovery packet, and we have a next_hop in our nodedb for that destination we send a (reliable) we send a route reply towards the requester
when sending any reliable packet
- if we get back a nak, send a routeError message back towards the original requester. all nodes eavesdrop on that packet and update their route caches
- if timeout doing retries, send a routeError (nak) message back towards the original requester. all nodes eavesdrop on that packet and update their route caches.
when we receive a routereply packet

2
proto

@ -1 +1 @@
Subproject commit e095ea92e62edc3f5dd6864c3d08d113fd8842e2
Subproject commit bfae47bdc0da23bb1e53fed054d3de2d161389bc

Wyświetl plik

@ -0,0 +1,80 @@
#include "DSRRouter.h"
#include "configuration.h"
/* when we receive any packet
- sniff and update tables (especially useful to find adjacent nodes). Update user, network and position info.
- if we need to route() that packet, resend it to the next_hop based on our nodedb.
- if it is broadcast or destined for our node, deliver locally
- handle routereply/routeerror/routediscovery messages as described below
- then free it
routeDiscovery
- if we've already passed through us (or is from us), then it ignore it
- use the nodes already mentioned in the request to update our routing table
- if they were looking for us, send back a routereply
- if max_hops is zero and they weren't looking for us, drop (FIXME, send back error - I think not though?)
- if we receive a discovery packet, we use it to populate next_hop (if needed) towards the requester (after decrementing max_hops)
- if we receive a discovery packet, and we have a next_hop in our nodedb for that destination we send a (reliable) we send a route
reply towards the requester
when sending any reliable packet
- if timeout doing retries, send a routeError (nak) message back towards the original requester. all nodes eavesdrop on that
packet and update their route caches.
when we receive a routereply packet
- update next_hop on the node, if the new reply needs fewer hops than the existing one (we prefer shorter paths). fixme, someday
use a better heuristic
when we receive a routeError packet
- delete the route for that failed recipient, restartRouteDiscovery()
- if we receive routeerror in response to a discovery,
- fixme, eventually keep caches of possible other routes.
*/
void DSRRouter::sniffReceived(const MeshPacket *p)
{
// FIXME, update nodedb
// Handle route discovery packets (will be a broadcast message)
if (p->decoded.which_payload == SubPacket_request_tag) {
// FIXME - always start request with the senders nodenum
if (weAreInRoute(p->decoded.request)) {
DEBUG_MSG("Ignoring a route request that contains us\n");
} else {
updateRoutes(p->decoded.request, false); // Update our routing tables based on the route that came in so far on this request
if (p->decoded.dest == getNodeNum()) {
// They were looking for us, send back a route reply (the sender address will be first in the list)
sendRouteReply(p->decoded.request);
} else {
// They were looking for someone else, forward it along (as a zero hop broadcast)
NodeNum nextHop = getNextHop(p->decoded.dest);
if (nextHop) {
// in our route cache, reply to the requester (the sender address will be first in the list)
sendRouteReply(p->decoded.request, nextHop);
} else {
// Not in our route cache, rebroadcast on their behalf (after adding ourselves to the request route)
resendRouteRequest(p);
}
}
}
}
// Handle regular packets
if (p->to == getNodeNum()) { // Destined for us (at least for this hop)
// We need to route this packet
if (p->decoded.dest != p->to) {
// FIXME
}
}
return ReliableRouter::sniffReceived(p);
}

Wyświetl plik

@ -0,0 +1,39 @@
#include "ReliableRouter.h"
class DSRRouter : public ReliableRouter
{
protected:
/**
* Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to
* update routing tables etc... based on what we overhear (even for messages not destined to our node)
*/
virtual void sniffReceived(const MeshPacket *p);
private:
/**
* Does our node appear in the specified route
*/
bool weAreInRoute(const RouteDiscovery &route);
/**
* Given a DSR route, use that route to update our DB of possible routes
**/
void updateRoutes(const RouteDiscovery &route, bool reverse);
/**
* send back a route reply (the sender address will be first in the list)
*/
void sendRouteReply(const RouteDiscovery &route, NodeNum toAppend = 0);
/**
* Given a nodenum return the next node we should forward to if we want to reach that node.
*
* @return 0 if no route found
*/
NodeNum getNextHop(NodeNum dest);
/** Not in our route cache, rebroadcast on their behalf (after adding ourselves to the request route)
*/
void resendRouteRequest(const MeshPacket *p);
};

Wyświetl plik

@ -11,7 +11,7 @@ FloodingRouter::FloodingRouter() {}
*/
ErrorCode FloodingRouter::send(MeshPacket *p)
{
// Add any messages _we_ send to the seen message list
// Add any messages _we_ send to the seen message list (so we will ignore all retransmissions we see)
wasSeenRecently(p); // FIXME, move this to a sniffSent method
return Router::send(p);

Wyświetl plik

@ -247,7 +247,7 @@ void MeshService::sendToMesh(MeshPacket *p)
}
// Note: We might return !OK if our fifo was full, at that point the only option we have is to drop it
if (router.send(p) != ERRNO_OK) {
if (router.sendLocal(p) != ERRNO_OK) {
DEBUG_MSG("No radio was able to send packet, discarding...\n");
releaseToPool(p);
}

Wyświetl plik

@ -29,7 +29,7 @@ bool PacketHistory::wasSeenRecently(const MeshPacket *p, bool withUpdate)
recentPackets.erase(recentPackets.begin() + i); // delete old record
} else {
if (r.id == p->id && r.sender == p->from) {
DEBUG_MSG("Found existing broadcast record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
DEBUG_MSG("Found existing packet record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
// Update the time on this record to now
if (withUpdate)
@ -48,7 +48,7 @@ bool PacketHistory::wasSeenRecently(const MeshPacket *p, bool withUpdate)
r.sender = p->from;
r.rxTimeMsec = now;
recentPackets.push_back(r);
DEBUG_MSG("Adding broadcast record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
DEBUG_MSG("Adding packet record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
}
return false;

Wyświetl plik

@ -12,6 +12,11 @@
ErrorCode ReliableRouter::send(MeshPacket *p)
{
if (p->want_ack) {
// If someone asks for acks on broadcast, we need the hop limit to be at least one, so that first node that receives our
// message will rebroadcast
if (p->to == NODENUM_BROADCAST && p->hop_limit == 0)
p->hop_limit = 1;
auto copy = packetPool.allocCopy(*p);
startRetransmission(copy);
}
@ -33,7 +38,19 @@ ErrorCode ReliableRouter::send(MeshPacket *p)
*/
void ReliableRouter::handleReceived(MeshPacket *p)
{
if (p->to == getNodeNum()) { // ignore ack/nak/want_ack packets that are not address to us (for now)
NodeNum ourNode = getNodeNum();
if (p->from == ourNode && p->to == NODENUM_BROADCAST) {
DEBUG_MSG("Received someone rebroadcasting for us fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
// We are seeing someone rebroadcast one of our broadcast attempts.
// If this is the first time we saw this, cancel any retransmissions we have queued up and generate an internal ack for
// the original sending process.
if (stopRetransmission(p->from, p->id)) {
DEBUG_MSG("Someone is retransmitting for us, generate implicit ack\n");
sendAckNak(true, p->from, p->id);
}
} else if (p->to == ourNode) { // ignore ack/nak/want_ack packets that are not address to us (for now)
if (p->want_ack) {
sendAckNak(true, p->from, p->id);
}
@ -67,10 +84,10 @@ void ReliableRouter::handleReceived(MeshPacket *p)
*/
void ReliableRouter::sendAckNak(bool isAck, NodeNum to, PacketId idFrom)
{
DEBUG_MSG("Sending an ack=%d,to=%d,idFrom=%d", isAck, to, idFrom);
auto p = allocForSending();
p->hop_limit = 0; // Assume just immediate neighbors for now
p->to = to;
DEBUG_MSG("Sending an ack=0x%x,to=0x%x,idFrom=%d,id=%d\n", isAck, to, idFrom, p->id);
if (isAck) {
p->decoded.ack.success_id = idFrom;
@ -80,7 +97,7 @@ void ReliableRouter::sendAckNak(bool isAck, NodeNum to, PacketId idFrom)
p->decoded.which_ack = SubPacket_fail_id_tag;
}
send(p);
sendLocal(p); // we sometimes send directly to the local node
}
#define NUM_RETRANSMISSIONS 3
@ -95,20 +112,22 @@ PendingPacket::PendingPacket(MeshPacket *p)
/**
* Stop any retransmissions we are doing of the specified node/packet ID pair
*/
void ReliableRouter::stopRetransmission(NodeNum from, PacketId id)
bool ReliableRouter::stopRetransmission(NodeNum from, PacketId id)
{
auto key = GlobalPacketId(from, id);
stopRetransmission(key);
return stopRetransmission(key);
}
void ReliableRouter::stopRetransmission(GlobalPacketId key)
bool ReliableRouter::stopRetransmission(GlobalPacketId key)
{
auto old = pending.find(key); // If we have an old record, someone messed up because id got reused
if (old != pending.end()) {
auto numErased = pending.erase(key);
assert(numErased == 1);
packetPool.release(old->second.packet);
}
return true;
} else
return false;
}
/**
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
@ -138,12 +157,17 @@ void ReliableRouter::doRetransmissions()
// FIXME, handle 51 day rolloever here!!!
if (p.nextTxMsec <= now) {
if (p.numRetransmissions == 0) {
DEBUG_MSG("Reliable send failed, returning a nak\n");
DEBUG_MSG("Reliable send failed, returning a nak fr=0x%x,to=0x%x,id=%d\n", p.packet->from, p.packet->to,
p.packet->id);
sendAckNak(false, p.packet->from, p.packet->id);
stopRetransmission(it->first);
} else {
DEBUG_MSG("Sending reliable retransmission\n");
send(packetPool.allocCopy(*p.packet));
DEBUG_MSG("Sending reliable retransmission fr=0x%x,to=0x%x,id=%d, tries left=%d\n", p.packet->from, p.packet->to,
p.packet->id, p.numRetransmissions);
// Note: we call the superclass version because we don't want to have our version of send() add a new
// retransmission record
FloodingRouter::send(packetPool.allocCopy(*p.packet));
// Queue again
--p.numRetransmissions;

Wyświetl plik

@ -39,10 +39,16 @@ struct PendingPacket {
/** Starts at NUM_RETRANSMISSIONS -1(normally 3) and counts down. Once zero it will be removed from the list */
uint8_t numRetransmissions;
/** True if we have started trying to find a route - for DSR usage
* While trying to find a route we don't actually send the data packet. We just leave it here pending until
* we have a route or we've failed to find one.
*/
bool wantRoute = false;
PendingPacket() {}
PendingPacket(MeshPacket *p);
void setNextTx() { nextTxMsec = millis() + random(10 * 1000, 12 * 1000); }
void setNextTx() { nextTxMsec = millis() + random(20 * 1000, 22 * 1000); }
};
class GlobalPacketIdHashFunction
@ -98,9 +104,11 @@ class ReliableRouter : public FloodingRouter
/**
* Stop any retransmissions we are doing of the specified node/packet ID pair
*
* @return true if we found and removed a transmission with this ID
*/
void stopRetransmission(NodeNum from, PacketId id);
void stopRetransmission(GlobalPacketId p);
bool stopRetransmission(NodeNum from, PacketId id);
bool stopRetransmission(GlobalPacketId p);
/**
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.

Wyświetl plik

@ -77,6 +77,16 @@ MeshPacket *Router::allocForSending()
return p;
}
ErrorCode Router::sendLocal(MeshPacket *p)
{
if (p->to == nodeDB.getNodeNum()) {
DEBUG_MSG("Enqueuing internal message for the receive queue\n");
fromRadioQueue.enqueue(p);
return ERRNO_OK;
} else
return send(p);
}
/**
* Send a packet on a suitable interface. This routine will
* later free() the packet to pool. This routine is not allowed to stall.
@ -84,40 +94,39 @@ MeshPacket *Router::allocForSending()
*/
ErrorCode Router::send(MeshPacket *p)
{
// If this packet was destined only to apps on our node, don't send it out into the network
if (p->to == nodeDB.getNodeNum()) {
DEBUG_MSG("Dropping locally processed message\n");
packetPool.release(p);
return ERRNO_OK;
assert(p->to != nodeDB.getNodeNum()); // should have already been handled by sendLocal
// Never set the want_ack flag on broadcast packets sent over the air.
if (p->to == NODENUM_BROADCAST)
p->want_ack = false;
// If the packet hasn't yet been encrypted, do so now (it might already be encrypted if we are just forwarding it)
assert(p->which_payload == MeshPacket_encrypted_tag ||
p->which_payload == MeshPacket_decoded_tag); // I _think_ all packets should have a payload by now
// First convert from protobufs to raw bytes
if (p->which_payload == MeshPacket_decoded_tag) {
static uint8_t bytes[MAX_RHPACKETLEN]; // we have to use a scratch buffer because a union
size_t numbytes = pb_encode_to_bytes(bytes, sizeof(bytes), SubPacket_fields, &p->decoded);
assert(numbytes <= MAX_RHPACKETLEN);
crypto->encrypt(p->from, p->id, numbytes, bytes);
// Copy back into the packet and set the variant type
memcpy(p->encrypted.bytes, bytes, numbytes);
p->encrypted.size = numbytes;
p->which_payload = MeshPacket_encrypted_tag;
}
if (iface) {
// DEBUG_MSG("Sending packet via interface fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
return iface->send(p);
} else {
// If the packet hasn't yet been encrypted, do so now (it might already be encrypted if we are just forwarding it)
assert(p->which_payload == MeshPacket_encrypted_tag ||
p->which_payload == MeshPacket_decoded_tag); // I _think_ all packets should have a payload by now
// First convert from protobufs to raw bytes
if (p->which_payload == MeshPacket_decoded_tag) {
static uint8_t bytes[MAX_RHPACKETLEN]; // we have to use a scratch buffer because a union
size_t numbytes = pb_encode_to_bytes(bytes, sizeof(bytes), SubPacket_fields, &p->decoded);
assert(numbytes <= MAX_RHPACKETLEN);
crypto->encrypt(p->from, p->id, numbytes, bytes);
// Copy back into the packet and set the variant type
memcpy(p->encrypted.bytes, bytes, numbytes);
p->encrypted.size = numbytes;
p->which_payload = MeshPacket_encrypted_tag;
}
if (iface) {
// DEBUG_MSG("Sending packet via interface fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
return iface->send(p);
} else {
DEBUG_MSG("Dropping packet - no interfaces - fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
packetPool.release(p);
return ERRNO_NO_INTERFACES;
}
DEBUG_MSG("Dropping packet - no interfaces - fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
packetPool.release(p);
return ERRNO_NO_INTERFACES;
}
}
@ -125,9 +134,9 @@ ErrorCode Router::send(MeshPacket *p)
* Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to
* update routing tables etc... based on what we overhear (even for messages not destined to our node)
*/
void Router::sniffReceived(MeshPacket *p)
void Router::sniffReceived(const MeshPacket *p)
{
DEBUG_MSG("Sniffing packet not sent to us fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
DEBUG_MSG("FIXME-update-db Sniffing packet fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
}
bool Router::perhapsDecode(MeshPacket *p)

Wyświetl plik

@ -47,11 +47,9 @@ class Router
virtual void loop();
/**
* Send a packet on a suitable interface. This routine will
* later free() the packet to pool. This routine is not allowed to stall.
* If the txmit queue is full it might return an error
* Works like send, but if we are sending to the local node, we directly put the message in the receive queue
*/
virtual ErrorCode send(MeshPacket *p);
ErrorCode sendLocal(MeshPacket *p);
/// Allocate and return a meshpacket which defaults as send to broadcast from the current node.
MeshPacket *allocForSending();
@ -61,6 +59,13 @@ class Router
NodeNum getNodeNum();
protected:
/**
* Send a packet on a suitable interface. This routine will
* later free() the packet to pool. This routine is not allowed to stall.
* If the txmit queue is full it might return an error
*/
virtual ErrorCode send(MeshPacket *p);
/**
* Called from loop()
* Handle any packet that is received by an interface on this node.
@ -75,7 +80,7 @@ class Router
* Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to
* update routing tables etc... based on what we overhear (even for messages not destined to our node)
*/
virtual void sniffReceived(MeshPacket *p);
virtual void sniffReceived(const MeshPacket *p);
/**
* Remove any encryption and decode the protobufs inside this packet (if necessary).