Use qMap instead of qVector for buffers as they are auto-sorted.

merge-requests/4/head
Phil Taylor 2021-05-17 16:03:53 +01:00
rodzic c56a5c0f05
commit 02b9311931
2 zmienionych plików z 155 dodań i 105 usunięć

Wyświetl plik

@ -545,10 +545,10 @@ udpCivData::~udpCivData()
void udpCivData::watchdog()
{
static bool alerted = false;
if (lastReceived.msecsTo(QTime::currentTime()) > 500)
if (lastReceived.msecsTo(QTime::currentTime()) > 2000)
{
if (!alerted) {
qInfo(logUdp()) << " CIV Watchdog: no CIV data received for 500ms, requesting data start.";
qInfo(logUdp()) << " CIV Watchdog: no CIV data received for 2s, requesting data start.";
if (startCivDataTimer != Q_NULLPTR)
{
startCivDataTimer->start(100);
@ -804,13 +804,13 @@ udpAudio::~udpAudio()
void udpAudio::watchdog()
{
static bool alerted = false;
if (lastReceived.msecsTo(QTime::currentTime()) > 500)
if (lastReceived.msecsTo(QTime::currentTime()) > 2000)
{
if (!alerted) {
/* Just log it at the moment, maybe try signalling the control channel that it needs to
try requesting civ/audio again? */
qInfo(logUdp()) << " Audio Watchdog: no audio data received for 500ms, restart required";
qInfo(logUdp()) << " Audio Watchdog: no audio data received for 2s, restart required";
alerted = true;
}
}
@ -997,23 +997,22 @@ void udpBase::dataReceived(QByteArray r)
control_packet_t in = (control_packet_t)r.constData();
if (in->type == 0x01)
{
QMutexLocker txlocker(&txBufferMutex);
// Single packet request
packetsLost++;
auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) {
return s.seqNum == cs;
});
congestion ++;
txBufferMutex.lock();
QMap<quint16,SEQBUFENTRY>::iterator match = txSeqBuf.find(in->seq);
if (match != txSeqBuf.end()) {
// Found matching entry?
// Send "untracked" as it has already been sent once.
// Don't constantly retransmit the same packet, give-up eventually
//qInfo(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum;
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum;
match->retransmitCount++;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(match->data, radioIP, port);
udpMutex.unlock();
}
txBufferMutex.unlock();
}
if (in->type == 0x04) {
qInfo(logUdp()) << this->metaObject()->className() << ": Received I am here ";
@ -1049,8 +1048,9 @@ void udpBase::dataReceived(QByteArray r)
p.reply = 0x01;
p.seq = in->seq;
p.time = in->time;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
udpMutex.unlock();
}
else if (in->reply == 0x01) {
if (in->seq == pingSendSeq)
@ -1089,66 +1089,79 @@ void udpBase::dataReceived(QByteArray r)
// This is a variable length retransmit request!
if (in->type == 0x01 && in->len != 0x10)
{
QMutexLocker txlocker(&txBufferMutex);
for (quint16 i = 0x10; i < r.length(); i = i + 2)
{
quint16 seq = (quint8)r[i] | (quint8)r[i + 1] << 8;
auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = seq](SEQBUFENTRY& s) {
return s.seqNum == cs;
});
txBufferMutex.lock();
QMap<quint16, SEQBUFENTRY>::iterator match = txSeqBuf.find(seq);
if (match == txSeqBuf.end()) {
qInfo(logUdp()) << this->metaObject()->className() << ": Requested packet " << hex << seq << " not found";
qDebug(logUdp()) << this->metaObject()->className() << ": Requested packet " << hex << seq << " not found";
// Just send idle packet.
sendControl(false, 0, match->seqNum);
}
else {
// Found matching entry?
// Send "untracked" as it has already been sent once.
//qInfo(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << hex << match->seqNum;
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << hex << match->seqNum;
match->retransmitCount++;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(match->data, radioIP, port);
udpMutex.unlock();
match++;
packetsLost++;
congestion++;
}
txBufferMutex.unlock();
}
}
else if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00)
{
QMutexLocker rxlocker(&rxBufferMutex);
rxBufferMutex.lock();
if (rxSeqBuf.isEmpty()) {
rxSeqBuf.append(in->seq);
rxSeqBuf.insert(in->seq, QTime::currentTime());
}
else
{
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
if (in->seq < rxSeqBuf.front())
//std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
if (in->seq < rxSeqBuf.firstKey())
{
qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.back() << " current: " << hex << in->seq;
qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.lastKey() << " current: " << hex << in->seq;
//seqPrefix++;
// Looks like it has rolled over so clear buffer and start again.
rxSeqBuf.clear();
rxMissing.clear();
rxBufferMutex.unlock();
return;
}
if (!rxSeqBuf.contains(in->seq))
{
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
rxSeqBuf.append(in->seq);
// Check whether this is one of our missing ones!
auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; });
rxSeqBuf.insert(in->seq, QTime::currentTime());
}
else {
// This is probably one of our missing packets!
missingMutex.lock();
QMap<quint16,int>::iterator s = rxMissing.find(in->seq);
if (s != rxMissing.end())
{
qInfo(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq;
qDebug(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq;
s = rxMissing.erase(s);
}
missingMutex.unlock();
}
}
rxBufferMutex.unlock();
}
}
bool missing(quint16 i, quint16 j) {
return (i + 1 != j);
}
void udpBase::sendRetransmitRequest()
{
@ -1157,63 +1170,52 @@ void udpBase::sendRetransmitRequest()
QByteArray missingSeqs;
QMutexLocker rxlocker(&rxBufferMutex);
auto i = std::adjacent_find(rxSeqBuf.begin(), rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; });
while (i != rxSeqBuf.end())
{
if (i + 1 != rxSeqBuf.end())
{
if (*(i + 1) - *i < 30)
{
for (quint16 j = *i + 1; j < *(i + 1); j++)
rxBufferMutex.lock();
if (!rxSeqBuf.empty() && rxSeqBuf.size() <= rxSeqBuf.lastKey() - rxSeqBuf.firstKey())
{
// We have at least 1 missing packet!
qDebug(logUdp()) << "Missing Seq: size=" << rxSeqBuf.size() << "firstKey=" << rxSeqBuf.firstKey() << "lastKey=" << rxSeqBuf.lastKey() << "missing=" << rxSeqBuf.lastKey()-rxSeqBuf.firstKey()- rxSeqBuf.size()+1;
// We are missing packets so iterate through the buffer and add the missing ones to missing packet list
for (int i = 0; i < rxSeqBuf.keys().length() - 1; i++) {
missingMutex.lock();
for (quint16 j = rxSeqBuf.keys()[i] + 1; j < rxSeqBuf.keys()[i + 1]; j++) {
auto s = rxMissing.find(j);
if (s == rxMissing.end())
{
//qInfo(logUdp()) << this->metaObject()->className() << ": Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")";
auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; });
if (s == rxMissing.end())
// We haven't seen this missing packet before
qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << rxMissing.size() << "): " << j;
rxMissing.insert(j, 0);
rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
packetsLost++;
}
else {
if (s.value() == 4)
{
// We haven't seen this missing packet before
//qInfo(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len="<< rxMissing.length() << "): " << j;
SEQBUFENTRY b;
b.seqNum = j;
b.retransmitCount = 0;
b.timeSent = QTime::currentTime();
rxMissing.append(b);
packetsLost++;
// We have tried 4 times to request this packet, time to give up!
s = rxMissing.erase(s);
}
else {
if (s->retransmitCount == 4)
{
// We have tried 4 times to request this packet, time to give up!
s = rxMissing.erase(s);
rxSeqBuf.append(j); // Final thing is to add to received buffer!
}
}
}
}
else {
qInfo(logUdp()) << this->metaObject()->className() << ": Too many missing, flushing buffers";
rxSeqBuf.clear();
missingSeqs.clear();
break;
}
missingMutex.unlock();
}
i++;
}
rxBufferMutex.unlock();
missingMutex.lock();
for (auto it = rxMissing.begin(); it != rxMissing.end(); ++it)
{
if (it->retransmitCount < 10)
if (it.value() < 10)
{
missingSeqs.append(it->seqNum & 0xff);
missingSeqs.append(it->seqNum >> 8 & 0xff);
missingSeqs.append(it->seqNum & 0xff);
missingSeqs.append(it->seqNum >> 8 & 0xff);
it->retransmitCount++;
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
it.value()++;
}
}
missingMutex.unlock();
if (missingSeqs.length() != 0)
{
control_packet p;
@ -1225,16 +1227,21 @@ void udpBase::sendRetransmitRequest()
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qInfo(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
QMutexLocker udplocker(&udpMutex);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
udpMutex.lock();
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
udpMutex.unlock();
}
else
{
qInfo(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
missingMutex.lock();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
QMutexLocker udplocker(&udpMutex);
missingMutex.unlock();
udpMutex.lock();
udp->writeDatagram(missingSeqs, radioIP, port);
udpMutex.unlock();
}
}
@ -1243,7 +1250,7 @@ void udpBase::sendRetransmitRequest()
// Used to send idle and other "control" style messages
void udpBase::sendControl(bool tracked=true, quint8 type=0, quint16 seq=0)
void udpBase::sendControl(bool tracked = true, quint8 type = 0, quint16 seq = 0)
{
control_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -1254,8 +1261,9 @@ void udpBase::sendControl(bool tracked=true, quint8 type=0, quint16 seq=0)
if (!tracked) {
p.seq = seq;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
udpMutex.unlock();
}
else {
sendTrackedPacket(QByteArray::fromRawData((const char*)p.packet, sizeof(p)));
@ -1275,12 +1283,13 @@ void udpBase::sendPing()
p.seq = pingSendSeq;
p.time = timeStarted.msecsSinceStartOfDay();
lastPingSentTime = QDateTime::currentDateTime();
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
udpMutex.unlock();
return;
}
void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third,quint16 fourth)
void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third, quint16 fourth)
{
retransmit_range_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -1292,8 +1301,9 @@ void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third,q
p.second = second;
p.third = third;
p.fourth = fourth;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
udpMutex.unlock();
return;
}
@ -1308,17 +1318,29 @@ void udpBase::sendTrackedPacket(QByteArray d)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = d;
QMutexLocker txlocker(&txBufferMutex);
txSeqBuf.append(s);
QMutexLocker rxlocker(&rxBufferMutex);
txBufferMutex.lock();
if (sendSeq == 0) {
// We are either the first ever sent packet or have rolled-over so clear the buffer.
txSeqBuf.clear();
congestion = 0;
}
txSeqBuf.insert(sendSeq,s);
txBufferMutex.unlock();
purgeOldEntries(); // Delete entries older than PURGE_SECONDS seconds (currently 5)
sendSeq++;
QMutexLocker udplocker(&udpMutex);
udpMutex.lock();
udp->writeDatagram(d, radioIP, port);
if (congestion>10) { // Poor quality connection?
udp->writeDatagram(d, radioIP, port);
if (congestion>20) // Even worse so send again.
udp->writeDatagram(d, radioIP, port);
}
if (idleTimer != Q_NULLPTR && idleTimer->isActive()) {
idleTimer->start(IDLE_PERIOD); // Reset idle counter if it's running
}
udpMutex.unlock();
packetsSent++;
return;
}
@ -1330,26 +1352,46 @@ void udpBase::sendTrackedPacket(QByteArray d)
void udpBase::purgeOldEntries()
{
// Erase old entries from the tx packet buffer
txBufferMutex.lock();
if (!txSeqBuf.isEmpty())
{
txSeqBuf.erase(std::remove_if(txSeqBuf.begin(), txSeqBuf.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), txSeqBuf.end());
}
// Erase old entries from the missing packets buffer
if (!rxMissing.isEmpty()) {
rxMissing.erase(std::remove_if(rxMissing.begin(), rxMissing.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), rxMissing.end());
}
if (!rxSeqBuf.isEmpty()) {
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
if (rxSeqBuf.length() > 400)
{
rxSeqBuf.remove(0, 200);
// Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS
for (auto it = txSeqBuf.begin(); it != txSeqBuf.end();) {
if (it.value().timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS) {
txSeqBuf.erase(it++);
}
else {
break;
}
}
}
txBufferMutex.unlock();
rxBufferMutex.lock();
if (!rxSeqBuf.isEmpty()) {
// Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS
for (auto it = rxSeqBuf.begin(); it != rxSeqBuf.end();) {
if (it.value().secsTo(QTime::currentTime()) > PURGE_SECONDS) {
rxSeqBuf.erase(it++);
}
else {
break;
}
}
}
rxBufferMutex.unlock();
missingMutex.lock();
// Erase old entries from the missing packets buffer
if (!rxMissing.isEmpty() && rxMissing.size() > 50) {
for (size_t i = 0; i < 25; ++i) {
rxMissing.erase(rxMissing.begin());
}
}
missingMutex.unlock();
}
/// <summary>

Wyświetl plik

@ -10,6 +10,7 @@
#include <QDateTime>
#include <QByteArray>
#include <QVector>
#include <QMap>
// Allow easy endian-ness conversions
#include <QtEndian>
@ -98,6 +99,7 @@ public:
QMutex udpMutex;
QMutex txBufferMutex;
QMutex rxBufferMutex;
QMutex missingMutex;
struct SEQBUFENTRY {
QTime timeSent;
@ -106,11 +108,14 @@ public:
quint8 retransmitCount;
};
QVector<SEQBUFENTRY> txSeqBuf;
QMap<quint16, QTime> rxSeqBuf;
QMap<quint16, SEQBUFENTRY> txSeqBuf;
QMap<quint16, int> rxMissing;
//QVector<SEQBUFENTRY> txSeqBuf;
QVector<quint16> rxSeqBuf;
//QVector<quint16> rxSeqBuf;
QVector<SEQBUFENTRY> rxMissing;
//QVector<SEQBUFENTRY> rxMissing;
void sendTrackedPacket(QByteArray d);
void purgeOldEntries();
@ -132,6 +137,9 @@ public:
quint16 seqPrefix = 0;
int congestion = 0;
private:
void sendRetransmitRequest();