From ae3005b8f8f995fc748bf81ef3d279eb4b406561 Mon Sep 17 00:00:00 2001 From: Phil Taylor Date: Mon, 22 Feb 2021 22:25:09 +0000 Subject: [PATCH] Changes to retransmit code --- packettypes.h | 19 ++++- udphandler.cpp | 223 +++++++++++++++++++++++++++++++++---------------- udphandler.h | 12 ++- 3 files changed, 175 insertions(+), 79 deletions(-) diff --git a/packettypes.h b/packettypes.h index 95a2f8b..ed2ec6c 100644 --- a/packettypes.h +++ b/packettypes.h @@ -9,7 +9,7 @@ #define WATCHDOG_SIZE 0x14 #define PING_SIZE 0x15 #define OPENCLOSE_SIZE 0x16 -#define RETRANSMIT_SIZE 0x18 +#define RETRANSMIT_RANGE_SIZE 0x18 #define TOKEN_SIZE 0x40 #define STATUS_SIZE 0x50 #define LOGIN_RESPONSE_SIZE 0x60 @@ -96,6 +96,23 @@ typedef union txaudio_packet { char packet[TXAUDIO_SIZE]; } *txaudio_packet_t; +// 0x18 length retransmit_range packet +typedef union retransmit_range_packet { + struct + { + quint32 len; // 0x00 + quint16 type; // 0x04 + quint16 seq; // 0x06 + quint32 sentid; // 0x08 + quint32 rcvdid; // 0x0c + quint16 first; // 0x10 + quint16 second; // 0x12 + quint16 third; // 0x14 + quint16 fourth; // 0x16 + }; + char packet[RETRANSMIT_RANGE_SIZE]; +} *retransmit_range_packet_t; + // 0x18 length txaudio packet /* tx[0] = static_cast(tx.length() & 0xff); diff --git a/udphandler.cpp b/udphandler.cpp index 7284bdd..b1860f0 100644 --- a/udphandler.cpp +++ b/udphandler.cpp @@ -97,6 +97,7 @@ udpHandler::~udpHandler() } } + void udpHandler::changeBufferSize(quint16 value) { emit haveChangeBufferSize(value); @@ -267,7 +268,7 @@ void udpHandler::dataReceived() QHostAddress ip = QHostAddress(qToBigEndian(in->ipaddress)); if (!streamOpened && in->busy) { - if (strcmp(in->computer,compName.toLocal8Bit())) + if (in->ipaddress != 0x00 && strcmp(in->computer,compName.toLocal8Bit())) { emit haveNetworkStatus(devName + " in use by: " + in->computer + " (" + ip.toString() + ")"); sendControl(false, 0x00, in->seq); // Respond with an idle @@ -278,6 +279,7 @@ void udpHandler::dataReceived() QObject::connect(civ, SIGNAL(receive(QByteArray)), this, SLOT(receiveFromCivStream(QByteArray))); QObject::connect(this, SIGNAL(haveChangeBufferSize(quint16)), audio, SLOT(changeBufferSize(quint16))); + streamOpened = true; @@ -532,19 +534,8 @@ void udpCivData::dataReceived() default: { if (r.length() > 21) { - // First check if we are missing any packets? - uint16_t gotSeq = qFromLittleEndian(r.mid(6, 2)); - if (lastReceivedSeq == 0 || lastReceivedSeq > gotSeq) { - lastReceivedSeq = gotSeq; - } - - for (uint16_t f = lastReceivedSeq + 1; f < gotSeq; f++) { - // Do we need to request a retransmit? - qDebug() << this->metaObject()->className() << ": Missing Sequence: (" << r.length() << ") " << f; - } - - - lastReceivedSeq = gotSeq; + // Process this packet, any re-transmit requests will happen later. + //uint16_t gotSeq = qFromLittleEndian(r.mid(6, 2)); quint8 temp = r[0] - 0x15; if ((quint8)r[16] == 0xc1 && (quint8)r[17] == temp) @@ -731,17 +722,6 @@ void udpAudio::dataReceived() r.mid(0, 2) == QByteArrayLiteral("\x70\x04")) { // First check if we are missing any packets as seq should be sequential. - uint16_t gotSeq = qFromLittleEndian(r.mid(6, 2)); - if (lastReceivedSeq == 0 || lastReceivedSeq > gotSeq) { - lastReceivedSeq = gotSeq; - } - - for (uint16_t f = lastReceivedSeq + 1; f < gotSeq; f++) { - // Do we need to request a retransmit? - qDebug() << this->metaObject()->className() << ": Missing Sequence: (" << r.length() << ") " << f; - } - - lastReceivedSeq = gotSeq; rxaudio->incomingAudio(r.mid(24)); } @@ -795,14 +775,17 @@ udpBase::~udpBase() void udpBase::dataReceived(QByteArray r) { + if (r.length() < 0x10) + { + return; // Packet too small do to anything with? + } + switch (r.length()) { case (CONTROL_SIZE): // Empty response used for simple comms and retransmit requests. { control_packet_t in = (control_packet_t)r.constData(); - // We should check for missing packets here - // for now just store received seq. - lastReceivedSeq = in->seq; + if (in->type == 0x04) { qDebug() << this->metaObject()->className() << ": Received I am here"; areYouThereCounter = 0; @@ -818,22 +801,23 @@ void udpBase::dataReceived(QByteArray r) { // retransmit request // Send an idle with the requested seqnum if not found. - bool found = false; - for (int f = txSeqBuf.length() - 1; f >= 0; f--) - { - packetsLost++; - if (txSeqBuf[f].seqNum == in->seq) { - //qDebug() << this->metaObject()->className() << ": retransmitting packet :" << gotSeq << " (len=" << txSeqBuf[f].data.length() << ")"; - QMutexLocker locker(&mutex); - udp->writeDatagram(txSeqBuf[f].data, radioIP, port); - found = true; - break; - } + packetsLost++; + + auto match = std::find_if(txSeqBuf.cbegin(), txSeqBuf.cend(), [&cs = in->seq](const SEQBUFENTRY& s) { + return s.seqNum == cs; + }); + + if (match != txSeqBuf.cend()) { + // Found matching entry? + // Send "untracked" as it has already been sent once. + QMutexLocker locker(&mutex); + qDebug() << this->metaObject()->className() << ": Sending retransmit of " << match->seqNum; + udp->writeDatagram(match->data, radioIP, port); + break; } - if (!found) - { + else { // Packet was not found in buffer - //qDebug() << this->metaObject()->className() << ": Could not find requested packet " << gotSeq << ", sending idle."; + qDebug() << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle."; sendControl(false, 0, in->seq); } } @@ -845,7 +829,6 @@ void udpBase::dataReceived(QByteArray r) if (in->type == 0x07) { // It is a ping request/response - //uint16_t gotSeq = qFromLittleEndian(r.mid(6, 2)); if (in->reply == 0x00) { ping_packet p; @@ -860,17 +843,16 @@ void udpBase::dataReceived(QByteArray r) QMutexLocker locker(&mutex); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); } - else if (r[0x10] == (char)0x01) { + else if (in->reply == 0x01) { if (in->seq == pingSendSeq) { // This is response to OUR request so increment counter pingSendSeq++; } else { - // Not sure what to do here, need to spend more time with the protocol but try sending ping with same seq next time? - //qDebug() << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << gotSeq; + // Not sure what to do here, need to spend more time with the protocol but will try sending ping with same seq next time. + qDebug() << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << in->seq; } - } else { qDebug() << "Unhandled response to ping. I have never seen this! 0x10=" << r[16]; @@ -879,42 +861,111 @@ void udpBase::dataReceived(QByteArray r) } break; } - case (0x18): + case (RETRANSMIT_RANGE_SIZE): { - if (r.mid(0, 6) == QByteArrayLiteral("\x18\x00\x00\x00\x01\x00")) - { // retransmit range request, can contain multiple ranges. - for (int f = 16; f < r.length() - 4; f = f + 4) - { - quint16 start = qFromLittleEndian(r.mid(f, 2)); - quint16 end = qFromLittleEndian(r.mid(f + 2, 2)); - packetsLost = packetsLost + (end - start); - qDebug() << this->metaObject()->className() << ": Retransmit range request for:" << start << " to " << end; - for (quint16 gotSeq = start; gotSeq <= end; gotSeq++) - { - bool found = false; - for (int h = txSeqBuf.length() - 1; h >= 0; h--) - if (txSeqBuf[h].seqNum == gotSeq) { - //qDebug() << this->metaObject()->className() << ": retransmitting packet :" << gotSeq << " (len=" << txSeqBuf[f].data.length() << ")"; - QMutexLocker locker(&mutex); - udp->writeDatagram(txSeqBuf[h].data, radioIP, port); - found = true; - break; - } - if (!found) - { - //qDebug() << this->metaObject()->className() << ": Could not find requested packet " << gotSeq << ", sending idle."; - sendControl(false, 0, gotSeq); - } + retransmit_range_packet_t in = (retransmit_range_packet_t)r.constData(); + + if (in->type==0x01) + { // retransmit range request + qDebug() << this->metaObject()->className() << ": Retransmit range request for:" << in->first << ", " << in->second << ", " << in->third << ", " << in->fourth << ", "; + + auto match = std::find_if(txSeqBuf.cbegin(), txSeqBuf.cend(), [&ca = in->first, &cb = in->second, &cc = in->third, &cd = in->fourth](const SEQBUFENTRY& s) { + return s.seqNum == ca || s.seqNum == cb || s.seqNum == cc || s.seqNum == cd; + }); + + if (match == txSeqBuf.cend()) { + qDebug() << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle."; + sendControl(false, 0, in->seq); + } + else { + while (match != txSeqBuf.cend()) { + // Found matching entry? + // Send "untracked" as it has already been sent once. + qDebug() << this->metaObject()->className() << ": Sending retransmit of " << match->seqNum; + QMutexLocker locker(&mutex); + udp->writeDatagram(match->data, radioIP, port); + udp->writeDatagram(match->data, radioIP, port); + match++; + packetsLost++; } } } break; } default: - break; + { + + // All packets "should" be added to the incoming buffer. + // First check that we haven't already received it. + + + } + break; } + + // All packets except ping and retransmit requests should trigger this. + control_packet_t in = (control_packet_t)r.constData(); + if (r.length() != PING_SIZE && in->type != (char)0x01 && in->seq != 0) + { + + if (in->seq < lastReceivedSeq) + { + qDebug() << this->metaObject()->className() << ": ******* seq number may have rolled over ****** previous highest: " << rxSeqBuf.back() << " current: " << in->seq; + + // Looks like it has rolled over so clear buffer and start again. + rxSeqBuf.clear(); + rxSeqBuf.append(in->seq); + lastReceivedSeq = in->seq; + return; + } + + if (!rxSeqBuf.contains(in->seq)) + { + rxSeqBuf.append(in->seq); + } + + if (!rxSeqBuf.isEmpty()) + { + std::sort(rxSeqBuf.begin(), rxSeqBuf.end()); + // Find all gaps in received packets (in reverse order) + quint16 first, second, third,count=0; + auto i = std::adjacent_find(rxSeqBuf.begin(), rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; }); + while (i != rxSeqBuf.end()) + { + if (count == 0) + first = *i; + else if (count == 1) + second = *i; + else if (count == 2) + third = *i; + else { + break; + } + count++; + i++; + } + + if (count == 1) + { + qDebug() << this->metaObject()->className() << ": Requesting retransmit of: " << first; + sendControl(false, 0x01, first); + } + else if (count == 2) { + count = 3; + third=second; + } + + if (count == 3) + { + qDebug() << this->metaObject()->className() << ": Requesting retransmit of: " << first << ", " << second << ", " << third; + sendRetransmitRange(first, second, third); + } + } + } + + } // Used to send idle and other "control" style messages @@ -952,7 +1003,23 @@ void udpBase::sendPing() lastPingSentTime = QDateTime::currentDateTime(); QMutexLocker locker(&mutex); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); - innerSendSeq++; + return; +} + +void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third) +{ + retransmit_range_packet p; + memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00! + p.len = sizeof(p); + p.type = 0x00; + p.sentid = myId; + p.rcvdid = remoteId; + p.first = first; + p.second = first; + p.third = second; + p.fourth = third; + QMutexLocker locker(&mutex); + udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); return; } @@ -988,6 +1055,14 @@ void udpBase::purgeOldEntries() txSeqBuf.removeAt(f); } } + + if (rxSeqBuf.length() > 2048) { + // If the buffer is over 2K, remove the first 1K. + std::sort(rxSeqBuf.begin(), rxSeqBuf.end()); + rxSeqBuf.remove(0,1024); + lastReceivedSeq = *rxSeqBuf.begin(); + } + } /// diff --git a/udphandler.h b/udphandler.h index b04563e..60a0c7f 100644 --- a/udphandler.h +++ b/udphandler.h @@ -9,6 +9,7 @@ #include #include #include +#include // Allow easy endian-ness conversions #include @@ -44,7 +45,9 @@ public: void init(); void dataReceived(QByteArray r); - void sendPing(); // Periodic type 0x07 ping packet sending + void sendPing(); + void sendRetransmitRange(quint16 first, quint16 second, quint16 third); + void sendControl(bool tracked,quint8 id, quint16 seq); QTime timeStarted; @@ -56,7 +59,7 @@ public: uint16_t innerSendSeq = 0x8304; // Not sure why? uint16_t sendSeqB = 0; uint16_t sendSeq = 1; - uint16_t lastReceivedSeq = 0; + uint16_t lastReceivedSeq = 1; uint16_t pkt0SendSeq = 0; uint16_t periodicSeq = 0; quint64 latency = 0; @@ -79,8 +82,9 @@ public: QByteArray data; }; - QList txSeqBuf = QList(); - std::vector< quint16 > rxSeqBuf; + QVector txSeqBuf = QVector(); + + QVector< quint16 > rxSeqBuf = QVector(); void sendTrackedPacket(QByteArray d); void purgeOldEntries();