From 02b93119316dc4fa22548db7d7bf3d925a553d33 Mon Sep 17 00:00:00 2001 From: Phil Taylor Date: Mon, 17 May 2021 16:03:53 +0100 Subject: [PATCH] Use qMap instead of qVector for buffers as they are auto-sorted. --- udphandler.cpp | 246 +++++++++++++++++++++++++++++-------------------- udphandler.h | 14 ++- 2 files changed, 155 insertions(+), 105 deletions(-) diff --git a/udphandler.cpp b/udphandler.cpp index 0db339a..3b93c9c 100644 --- a/udphandler.cpp +++ b/udphandler.cpp @@ -545,10 +545,10 @@ udpCivData::~udpCivData() void udpCivData::watchdog() { static bool alerted = false; - if (lastReceived.msecsTo(QTime::currentTime()) > 500) + if (lastReceived.msecsTo(QTime::currentTime()) > 2000) { if (!alerted) { - qInfo(logUdp()) << " CIV Watchdog: no CIV data received for 500ms, requesting data start."; + qInfo(logUdp()) << " CIV Watchdog: no CIV data received for 2s, requesting data start."; if (startCivDataTimer != Q_NULLPTR) { startCivDataTimer->start(100); @@ -804,13 +804,13 @@ udpAudio::~udpAudio() void udpAudio::watchdog() { static bool alerted = false; - if (lastReceived.msecsTo(QTime::currentTime()) > 500) + if (lastReceived.msecsTo(QTime::currentTime()) > 2000) { if (!alerted) { /* Just log it at the moment, maybe try signalling the control channel that it needs to try requesting civ/audio again? */ - qInfo(logUdp()) << " Audio Watchdog: no audio data received for 500ms, restart required"; + qInfo(logUdp()) << " Audio Watchdog: no audio data received for 2s, restart required"; alerted = true; } } @@ -997,23 +997,22 @@ void udpBase::dataReceived(QByteArray r) control_packet_t in = (control_packet_t)r.constData(); if (in->type == 0x01) { - QMutexLocker txlocker(&txBufferMutex); // Single packet request packetsLost++; - - auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) { - return s.seqNum == cs; - }); - + congestion ++; + txBufferMutex.lock(); + QMap::iterator match = txSeqBuf.find(in->seq); if (match != txSeqBuf.end()) { // Found matching entry? // Send "untracked" as it has already been sent once. // Don't constantly retransmit the same packet, give-up eventually - //qInfo(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum; + qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum; match->retransmitCount++; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(match->data, radioIP, port); + udpMutex.unlock(); } + txBufferMutex.unlock(); } if (in->type == 0x04) { qInfo(logUdp()) << this->metaObject()->className() << ": Received I am here "; @@ -1049,8 +1048,9 @@ void udpBase::dataReceived(QByteArray r) p.reply = 0x01; p.seq = in->seq; p.time = in->time; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); + udpMutex.unlock(); } else if (in->reply == 0x01) { if (in->seq == pingSendSeq) @@ -1089,66 +1089,79 @@ void udpBase::dataReceived(QByteArray r) // This is a variable length retransmit request! if (in->type == 0x01 && in->len != 0x10) { - QMutexLocker txlocker(&txBufferMutex); + for (quint16 i = 0x10; i < r.length(); i = i + 2) { quint16 seq = (quint8)r[i] | (quint8)r[i + 1] << 8; - auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = seq](SEQBUFENTRY& s) { - return s.seqNum == cs; - }); + txBufferMutex.lock(); + QMap::iterator match = txSeqBuf.find(seq); if (match == txSeqBuf.end()) { - qInfo(logUdp()) << this->metaObject()->className() << ": Requested packet " << hex << seq << " not found"; + qDebug(logUdp()) << this->metaObject()->className() << ": Requested packet " << hex << seq << " not found"; // Just send idle packet. sendControl(false, 0, match->seqNum); } else { // Found matching entry? // Send "untracked" as it has already been sent once. - //qInfo(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << hex << match->seqNum; + qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << hex << match->seqNum; match->retransmitCount++; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(match->data, radioIP, port); + udpMutex.unlock(); match++; packetsLost++; + congestion++; } + txBufferMutex.unlock(); } } else if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00) { - QMutexLocker rxlocker(&rxBufferMutex); + rxBufferMutex.lock(); if (rxSeqBuf.isEmpty()) { - rxSeqBuf.append(in->seq); + rxSeqBuf.insert(in->seq, QTime::currentTime()); } else { - std::sort(rxSeqBuf.begin(), rxSeqBuf.end()); - if (in->seq < rxSeqBuf.front()) + //std::sort(rxSeqBuf.begin(), rxSeqBuf.end()); + if (in->seq < rxSeqBuf.firstKey()) { - qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.back() << " current: " << hex << in->seq; + qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.lastKey() << " current: " << hex << in->seq; //seqPrefix++; // Looks like it has rolled over so clear buffer and start again. rxSeqBuf.clear(); + rxMissing.clear(); + rxBufferMutex.unlock(); return; } if (!rxSeqBuf.contains(in->seq)) { // Add incoming packet to the received buffer and if it is in the missing buffer, remove it. - rxSeqBuf.append(in->seq); - // Check whether this is one of our missing ones! - auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; }); + rxSeqBuf.insert(in->seq, QTime::currentTime()); + } + else { + // This is probably one of our missing packets! + missingMutex.lock(); + QMap::iterator s = rxMissing.find(in->seq); if (s != rxMissing.end()) { - qInfo(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq; + qDebug(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq; s = rxMissing.erase(s); } + missingMutex.unlock(); + } } + rxBufferMutex.unlock(); + } } - +bool missing(quint16 i, quint16 j) { + return (i + 1 != j); +} void udpBase::sendRetransmitRequest() { @@ -1157,63 +1170,52 @@ void udpBase::sendRetransmitRequest() QByteArray missingSeqs; - QMutexLocker rxlocker(&rxBufferMutex); - - auto i = std::adjacent_find(rxSeqBuf.begin(), rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; }); - while (i != rxSeqBuf.end()) - { - if (i + 1 != rxSeqBuf.end()) - { - if (*(i + 1) - *i < 30) - { - for (quint16 j = *i + 1; j < *(i + 1); j++) + rxBufferMutex.lock(); + if (!rxSeqBuf.empty() && rxSeqBuf.size() <= rxSeqBuf.lastKey() - rxSeqBuf.firstKey()) + { + // We have at least 1 missing packet! + qDebug(logUdp()) << "Missing Seq: size=" << rxSeqBuf.size() << "firstKey=" << rxSeqBuf.firstKey() << "lastKey=" << rxSeqBuf.lastKey() << "missing=" << rxSeqBuf.lastKey()-rxSeqBuf.firstKey()- rxSeqBuf.size()+1; + // We are missing packets so iterate through the buffer and add the missing ones to missing packet list + for (int i = 0; i < rxSeqBuf.keys().length() - 1; i++) { + missingMutex.lock(); + for (quint16 j = rxSeqBuf.keys()[i] + 1; j < rxSeqBuf.keys()[i + 1]; j++) { + auto s = rxMissing.find(j); + if (s == rxMissing.end()) { - //qInfo(logUdp()) << this->metaObject()->className() << ": Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")"; - auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; }); - if (s == rxMissing.end()) + // We haven't seen this missing packet before + qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << rxMissing.size() << "): " << j; + rxMissing.insert(j, 0); + rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it. + packetsLost++; + } + else { + if (s.value() == 4) { - // We haven't seen this missing packet before - //qInfo(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len="<< rxMissing.length() << "): " << j; - SEQBUFENTRY b; - b.seqNum = j; - b.retransmitCount = 0; - b.timeSent = QTime::currentTime(); - rxMissing.append(b); - packetsLost++; + // We have tried 4 times to request this packet, time to give up! + s = rxMissing.erase(s); } - else { - if (s->retransmitCount == 4) - { - // We have tried 4 times to request this packet, time to give up! - s = rxMissing.erase(s); - rxSeqBuf.append(j); // Final thing is to add to received buffer! - } - } } } - else { - qInfo(logUdp()) << this->metaObject()->className() << ": Too many missing, flushing buffers"; - rxSeqBuf.clear(); - missingSeqs.clear(); - break; - } + missingMutex.unlock(); } - i++; + } + rxBufferMutex.unlock(); - + missingMutex.lock(); for (auto it = rxMissing.begin(); it != rxMissing.end(); ++it) { - if (it->retransmitCount < 10) + if (it.value() < 10) { - missingSeqs.append(it->seqNum & 0xff); - missingSeqs.append(it->seqNum >> 8 & 0xff); - missingSeqs.append(it->seqNum & 0xff); - missingSeqs.append(it->seqNum >> 8 & 0xff); - it->retransmitCount++; + missingSeqs.append(it.key() & 0xff); + missingSeqs.append(it.key() >> 8 & 0xff); + missingSeqs.append(it.key() & 0xff); + missingSeqs.append(it.key() >> 8 & 0xff); + it.value()++; } } + missingMutex.unlock(); if (missingSeqs.length() != 0) { control_packet p; @@ -1225,16 +1227,21 @@ void udpBase::sendRetransmitRequest() if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control. { p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8); - qInfo(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq; - QMutexLocker udplocker(&udpMutex); + qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq; + udpMutex.lock(); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); + udpMutex.unlock(); } else { - qInfo(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex(); + qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex(); + missingMutex.lock(); missingSeqs.insert(0, p.packet, sizeof(p.packet)); - QMutexLocker udplocker(&udpMutex); + missingMutex.unlock(); + + udpMutex.lock(); udp->writeDatagram(missingSeqs, radioIP, port); + udpMutex.unlock(); } } @@ -1243,7 +1250,7 @@ void udpBase::sendRetransmitRequest() // Used to send idle and other "control" style messages -void udpBase::sendControl(bool tracked=true, quint8 type=0, quint16 seq=0) +void udpBase::sendControl(bool tracked = true, quint8 type = 0, quint16 seq = 0) { control_packet p; memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00! @@ -1254,8 +1261,9 @@ void udpBase::sendControl(bool tracked=true, quint8 type=0, quint16 seq=0) if (!tracked) { p.seq = seq; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); + udpMutex.unlock(); } else { sendTrackedPacket(QByteArray::fromRawData((const char*)p.packet, sizeof(p))); @@ -1275,12 +1283,13 @@ void udpBase::sendPing() p.seq = pingSendSeq; p.time = timeStarted.msecsSinceStartOfDay(); lastPingSentTime = QDateTime::currentDateTime(); - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); + udpMutex.unlock(); return; } -void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third,quint16 fourth) +void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third, quint16 fourth) { retransmit_range_packet p; memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00! @@ -1292,8 +1301,9 @@ void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third,q p.second = second; p.third = third; p.fourth = fourth; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port); + udpMutex.unlock(); return; } @@ -1308,17 +1318,29 @@ void udpBase::sendTrackedPacket(QByteArray d) s.timeSent = QTime::currentTime(); s.retransmitCount = 0; s.data = d; - QMutexLocker txlocker(&txBufferMutex); - txSeqBuf.append(s); - QMutexLocker rxlocker(&rxBufferMutex); + txBufferMutex.lock(); + + if (sendSeq == 0) { + // We are either the first ever sent packet or have rolled-over so clear the buffer. + txSeqBuf.clear(); + congestion = 0; + } + txSeqBuf.insert(sendSeq,s); + txBufferMutex.unlock(); purgeOldEntries(); // Delete entries older than PURGE_SECONDS seconds (currently 5) sendSeq++; - QMutexLocker udplocker(&udpMutex); + udpMutex.lock(); udp->writeDatagram(d, radioIP, port); + if (congestion>10) { // Poor quality connection? + udp->writeDatagram(d, radioIP, port); + if (congestion>20) // Even worse so send again. + udp->writeDatagram(d, radioIP, port); + } if (idleTimer != Q_NULLPTR && idleTimer->isActive()) { idleTimer->start(IDLE_PERIOD); // Reset idle counter if it's running } + udpMutex.unlock(); packetsSent++; return; } @@ -1330,26 +1352,46 @@ void udpBase::sendTrackedPacket(QByteArray d) void udpBase::purgeOldEntries() { // Erase old entries from the tx packet buffer + txBufferMutex.lock(); + if (!txSeqBuf.isEmpty()) { - txSeqBuf.erase(std::remove_if(txSeqBuf.begin(), txSeqBuf.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), txSeqBuf.end()); - } - - // Erase old entries from the missing packets buffer - if (!rxMissing.isEmpty()) { - rxMissing.erase(std::remove_if(rxMissing.begin(), rxMissing.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), rxMissing.end()); - } - - if (!rxSeqBuf.isEmpty()) { - std::sort(rxSeqBuf.begin(), rxSeqBuf.end()); - - if (rxSeqBuf.length() > 400) - { - rxSeqBuf.remove(0, 200); + // Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS + for (auto it = txSeqBuf.begin(); it != txSeqBuf.end();) { + if (it.value().timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS) { + txSeqBuf.erase(it++); + } + else { + break; + } } } + + txBufferMutex.unlock(); + + rxBufferMutex.lock(); + if (!rxSeqBuf.isEmpty()) { + // Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS + for (auto it = rxSeqBuf.begin(); it != rxSeqBuf.end();) { + if (it.value().secsTo(QTime::currentTime()) > PURGE_SECONDS) { + rxSeqBuf.erase(it++); + } + else { + break; + } + } + } + rxBufferMutex.unlock(); + + missingMutex.lock(); + // Erase old entries from the missing packets buffer + if (!rxMissing.isEmpty() && rxMissing.size() > 50) { + for (size_t i = 0; i < 25; ++i) { + rxMissing.erase(rxMissing.begin()); + } + } + missingMutex.unlock(); + } /// diff --git a/udphandler.h b/udphandler.h index dc4bc48..903d48a 100644 --- a/udphandler.h +++ b/udphandler.h @@ -10,6 +10,7 @@ #include #include #include +#include // Allow easy endian-ness conversions #include @@ -98,6 +99,7 @@ public: QMutex udpMutex; QMutex txBufferMutex; QMutex rxBufferMutex; + QMutex missingMutex; struct SEQBUFENTRY { QTime timeSent; @@ -106,11 +108,14 @@ public: quint8 retransmitCount; }; - QVector txSeqBuf; + QMap rxSeqBuf; + QMap txSeqBuf; + QMap rxMissing; + //QVector txSeqBuf; - QVector rxSeqBuf; + //QVector rxSeqBuf; - QVector rxMissing; + //QVector rxMissing; void sendTrackedPacket(QByteArray d); void purgeOldEntries(); @@ -132,6 +137,9 @@ public: quint16 seqPrefix = 0; + int congestion = 0; + + private: void sendRetransmitRequest();