kopia lustrzana https://gitlab.com/eliggett/wfview
546 wiersze
18 KiB
C++
546 wiersze
18 KiB
C++
#include "udpbase.h"
|
|
#include "logcategories.h"
|
|
|
|
void udpBase::init(quint16 lport)
|
|
{
|
|
//timeStarted.start();
|
|
udp = new QUdpSocket(this);
|
|
udp->bind(lport); // Bind to random port.
|
|
localPort = udp->localPort();
|
|
qInfo(logUdp()) << "UDP Stream bound to local port:" << localPort << " remote port:" << port;
|
|
uint32_t addr = localIP.toIPv4Address();
|
|
myId = (addr >> 8 & 0xff) << 24 | (addr & 0xff) << 16 | (localPort & 0xffff);
|
|
|
|
retransmitTimer = new QTimer();
|
|
connect(retransmitTimer, &QTimer::timeout, this, &udpBase::sendRetransmitRequest);
|
|
retransmitTimer->start(RETRANSMIT_PERIOD);
|
|
}
|
|
|
|
udpBase::~udpBase()
|
|
{
|
|
qInfo(logUdp()) << "Closing UDP stream :" << radioIP.toString() << ":" << port;
|
|
if (udp != Q_NULLPTR) {
|
|
sendControl(false, 0x05, 0x00); // Send disconnect
|
|
udp->close();
|
|
delete udp;
|
|
}
|
|
if (areYouThereTimer != Q_NULLPTR)
|
|
{
|
|
areYouThereTimer->stop();
|
|
delete areYouThereTimer;
|
|
}
|
|
|
|
if (pingTimer != Q_NULLPTR)
|
|
{
|
|
pingTimer->stop();
|
|
delete pingTimer;
|
|
}
|
|
if (idleTimer != Q_NULLPTR)
|
|
{
|
|
idleTimer->stop();
|
|
delete idleTimer;
|
|
}
|
|
if (retransmitTimer != Q_NULLPTR)
|
|
{
|
|
retransmitTimer->stop();
|
|
delete retransmitTimer;
|
|
}
|
|
|
|
pingTimer = Q_NULLPTR;
|
|
idleTimer = Q_NULLPTR;
|
|
areYouThereTimer = Q_NULLPTR;
|
|
retransmitTimer = Q_NULLPTR;
|
|
|
|
}
|
|
|
|
// Base class!
|
|
|
|
void udpBase::dataReceived(QByteArray r)
|
|
{
|
|
if (r.length() < 0x10)
|
|
{
|
|
return; // Packet too small do to anything with?
|
|
}
|
|
|
|
switch (r.length())
|
|
{
|
|
case (CONTROL_SIZE): // Empty response used for simple comms and retransmit requests.
|
|
{
|
|
control_packet_t in = (control_packet_t)r.constData();
|
|
if (in->type == 0x01 && in->len == 0x10)
|
|
{
|
|
// Single packet request
|
|
packetsLost++;
|
|
congestion = static_cast<double>(packetsSent) / packetsLost * 100;
|
|
txBufferMutex.lock();
|
|
auto match = txSeqBuf.find(in->seq);
|
|
if (match != txSeqBuf.end()) {
|
|
// Found matching entry?
|
|
// Send "untracked" as it has already been sent once.
|
|
// Don't constantly retransmit the same packet, give-up eventually
|
|
qDebug(logUdp()) << this->metaObject()->className() << ": Sending (single packet) retransmit of " << QString("0x%1").arg(match->seqNum, 0, 16);
|
|
match->retransmitCount++;
|
|
udpMutex.lock();
|
|
udp->writeDatagram(match->data, radioIP, port);
|
|
udpMutex.unlock();
|
|
}
|
|
else {
|
|
qDebug(logUdp()) << this->metaObject()->className() << ": Remote requested packet"
|
|
<< QString("0x%1").arg(in->seq, 0, 16) <<
|
|
"not found, have " << QString("0x%1").arg(txSeqBuf.firstKey(), 0, 16) <<
|
|
"to" << QString("0x%1").arg(txSeqBuf.lastKey(), 0, 16);
|
|
}
|
|
txBufferMutex.unlock();
|
|
}
|
|
if (in->type == 0x04) {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": Received I am here ";
|
|
areYouThereCounter = 0;
|
|
// I don't think that we will ever receive an "I am here" other than in response to "Are you there?"
|
|
remoteId = in->sentid;
|
|
if (areYouThereTimer != Q_NULLPTR && areYouThereTimer->isActive()) {
|
|
// send ping packets every second
|
|
areYouThereTimer->stop();
|
|
}
|
|
sendControl(false, 0x06, 0x01); // Send Are you ready - untracked.
|
|
}
|
|
else if (in->type == 0x06)
|
|
{
|
|
// Just get the seqnum and ignore the rest.
|
|
}
|
|
break;
|
|
}
|
|
case (PING_SIZE): // ping packet
|
|
{
|
|
ping_packet_t in = (ping_packet_t)r.constData();
|
|
if (in->type == 0x07)
|
|
{
|
|
// It is a ping request/response
|
|
if (in->reply == 0x00)
|
|
{
|
|
ping_packet p;
|
|
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
|
|
p.len = sizeof(p);
|
|
p.type = 0x07;
|
|
p.sentid = myId;
|
|
p.rcvdid = remoteId;
|
|
p.reply = 0x01;
|
|
p.seq = in->seq;
|
|
p.time = in->time;
|
|
udpMutex.lock();
|
|
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
|
|
udpMutex.unlock();
|
|
}
|
|
else if (in->reply == 0x01) {
|
|
if (in->seq == pingSendSeq)
|
|
{
|
|
// This is response to OUR request so increment counter
|
|
pingSendSeq++;
|
|
}
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << "Unhandled response to ping. I have never seen this! 0x10=" << r[16];
|
|
}
|
|
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
|
|
// All packets "should" be added to the incoming buffer.
|
|
// First check that we haven't already received it.
|
|
|
|
|
|
}
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
// All packets except ping and retransmit requests should trigger this.
|
|
control_packet_t in = (control_packet_t)r.constData();
|
|
|
|
// This is a variable length retransmit request!
|
|
if (in->type == 0x01 && in->len != 0x10)
|
|
{
|
|
|
|
for (quint16 i = 0x10; i < r.length(); i = i + 2)
|
|
{
|
|
quint16 seq = (quint8)r[i] | (quint8)r[i + 1] << 8;
|
|
auto match = txSeqBuf.find(seq);
|
|
if (match == txSeqBuf.end()) {
|
|
qDebug(logUdp()) << this->metaObject()->className() << ": Remote requested packet"
|
|
<< QString("0x%1").arg(seq, 0, 16) <<
|
|
"not found, have " << QString("0x%1").arg(txSeqBuf.firstKey(), 0, 16) <<
|
|
"to" << QString("0x%1").arg(txSeqBuf.lastKey(), 0, 16);
|
|
// Just send idle packet.
|
|
sendControl(false, 0, seq);
|
|
}
|
|
else {
|
|
// Found matching entry?
|
|
// Send "untracked" as it has already been sent once.
|
|
qDebug(logUdp()) << this->metaObject()->className() << ": Sending (multiple packet) retransmit of " << QString("0x%1").arg(match->seqNum, 0, 16);
|
|
match->retransmitCount++;
|
|
udpMutex.lock();
|
|
udp->writeDatagram(match->data, radioIP, port);
|
|
udpMutex.unlock();
|
|
packetsLost++;
|
|
congestion = static_cast<double>(packetsSent) / packetsLost * 100;
|
|
}
|
|
}
|
|
}
|
|
else if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00)
|
|
{
|
|
rxBufferMutex.lock();
|
|
if (rxSeqBuf.isEmpty()) {
|
|
rxSeqBuf.insert(in->seq, QTime::currentTime());
|
|
}
|
|
else
|
|
{
|
|
if (in->seq < rxSeqBuf.firstKey() || in->seq - rxSeqBuf.lastKey() > MAX_MISSING)
|
|
{
|
|
qInfo(logUdp()) << this->metaObject()->className() << "Large seq number gap detected, previous highest: " <<
|
|
QString("0x%1").arg(rxSeqBuf.lastKey(), 0, 16) << " current: " << QString("0x%1").arg(in->seq, 0, 16);
|
|
//seqPrefix++;
|
|
// Looks like it has rolled over so clear buffer and start again.
|
|
rxSeqBuf.clear();
|
|
// Add this packet to the incoming buffer
|
|
rxSeqBuf.insert(in->seq, QTime::currentTime());
|
|
rxBufferMutex.unlock();
|
|
missingMutex.lock();
|
|
rxMissing.clear();
|
|
missingMutex.unlock();
|
|
return;
|
|
}
|
|
|
|
if (!rxSeqBuf.contains(in->seq))
|
|
{
|
|
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
|
|
|
|
if (in->seq > rxSeqBuf.lastKey() + 1) {
|
|
qInfo(logUdp()) << this->metaObject()->className() << "1 or more missing packets detected, previous: " <<
|
|
QString("0x%1").arg(rxSeqBuf.lastKey(), 0, 16) << " current: " << QString("0x%1").arg(in->seq, 0, 16);
|
|
// We are likely missing packets then!
|
|
missingMutex.lock();
|
|
//int missCounter = 0;
|
|
// Sanity check!
|
|
for (quint16 f = rxSeqBuf.lastKey() + 1; f <= in->seq; f++)
|
|
{
|
|
if (rxSeqBuf.size() > BUFSIZE)
|
|
{
|
|
rxSeqBuf.erase(rxSeqBuf.begin());
|
|
}
|
|
rxSeqBuf.insert(f, QTime::currentTime());
|
|
if (f != in->seq && !rxMissing.contains(f))
|
|
{
|
|
rxMissing.insert(f, 0);
|
|
}
|
|
}
|
|
missingMutex.unlock();
|
|
}
|
|
else {
|
|
if (rxSeqBuf.size() > BUFSIZE)
|
|
{
|
|
rxSeqBuf.erase(rxSeqBuf.begin());
|
|
}
|
|
rxSeqBuf.insert(in->seq, QTime::currentTime());
|
|
|
|
}
|
|
}
|
|
else {
|
|
// This is probably one of our missing packets!
|
|
missingMutex.lock();
|
|
auto s = rxMissing.find(in->seq);
|
|
if (s != rxMissing.end())
|
|
{
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << QString("0x%1").arg(in->seq, 0, 16);
|
|
|
|
s = rxMissing.erase(s);
|
|
}
|
|
missingMutex.unlock();
|
|
|
|
}
|
|
|
|
}
|
|
rxBufferMutex.unlock();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
void udpBase::sendRetransmitRequest()
|
|
{
|
|
// Find all gaps in received packets and then send requests for them.
|
|
// This will run every 100ms so out-of-sequence packets will not trigger a retransmit request.
|
|
if (rxMissing.isEmpty()) {
|
|
return;
|
|
}
|
|
else if (rxMissing.size() > MAX_MISSING) {
|
|
qInfo(logUdp()) << "Too many missing packets," << rxMissing.size() << "flushing all buffers";
|
|
missingMutex.lock();
|
|
rxMissing.clear();
|
|
missingMutex.unlock();
|
|
|
|
rxBufferMutex.lock();
|
|
rxSeqBuf.clear();
|
|
rxBufferMutex.unlock();
|
|
return;
|
|
}
|
|
|
|
QByteArray missingSeqs;
|
|
|
|
missingMutex.lock();
|
|
auto it = rxMissing.begin();
|
|
while (it != rxMissing.end())
|
|
{
|
|
if (it.key() != 0x0)
|
|
{
|
|
if (it.value() < 4)
|
|
{
|
|
missingSeqs.append(it.key() & 0xff);
|
|
missingSeqs.append(it.key() >> 8 & 0xff);
|
|
missingSeqs.append(it.key() & 0xff);
|
|
missingSeqs.append(it.key() >> 8 & 0xff);
|
|
it.value()++;
|
|
it++;
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": No response for missing packet" << QString("0x%1").arg(it.key(), 0, 16) << "deleting";
|
|
it = rxMissing.erase(it);
|
|
}
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": found empty key in missing buffer";
|
|
it++;
|
|
}
|
|
}
|
|
missingMutex.unlock();
|
|
|
|
if (missingSeqs.length() != 0)
|
|
{
|
|
control_packet p;
|
|
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
|
|
p.len = sizeof(p);
|
|
p.type = 0x01;
|
|
p.seq = 0x0000;
|
|
p.sentid = myId;
|
|
p.rcvdid = remoteId;
|
|
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
|
|
{
|
|
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << QString("0x%1").arg(p.seq, 0, 16);
|
|
udpMutex.lock();
|
|
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
|
|
udpMutex.unlock();
|
|
}
|
|
else
|
|
{
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex(':');
|
|
missingMutex.lock();
|
|
p.len = sizeof(p) + missingSeqs.size();
|
|
missingSeqs.insert(0, p.packet, sizeof(p));
|
|
missingMutex.unlock();
|
|
|
|
udpMutex.lock();
|
|
udp->writeDatagram(missingSeqs, radioIP, port);
|
|
udpMutex.unlock();
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Used to send idle and other "control" style messages
|
|
void udpBase::sendControl(bool tracked = true, quint8 type = 0, quint16 seq = 0)
|
|
{
|
|
control_packet p;
|
|
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
|
|
p.len = sizeof(p);
|
|
p.type = type;
|
|
p.sentid = myId;
|
|
p.rcvdid = remoteId;
|
|
|
|
if (!tracked) {
|
|
p.seq = seq;
|
|
udpMutex.lock();
|
|
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
|
|
udpMutex.unlock();
|
|
}
|
|
else {
|
|
sendTrackedPacket(QByteArray::fromRawData((const char*)p.packet, sizeof(p)));
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Send periodic ping packets
|
|
void udpBase::sendPing()
|
|
{
|
|
ping_packet p;
|
|
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
|
|
p.len = sizeof(p);
|
|
p.type = 0x07;
|
|
p.sentid = myId;
|
|
p.rcvdid = remoteId;
|
|
p.seq = pingSendSeq;
|
|
QTime now = QTime::currentTime();
|
|
p.time = (quint32)now.msecsSinceStartOfDay();
|
|
lastPingSentTime = QDateTime::currentDateTime();
|
|
udpMutex.lock();
|
|
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
|
|
udpMutex.unlock();
|
|
return;
|
|
}
|
|
|
|
|
|
void udpBase::sendTrackedPacket(QByteArray d)
|
|
{
|
|
// As the radio can request retransmission of these packets, store them in a buffer
|
|
d[6] = sendSeq & 0xff;
|
|
d[7] = (sendSeq >> 8) & 0xff;
|
|
SEQBUFENTRY s;
|
|
s.seqNum = sendSeq;
|
|
s.timeSent = QTime::currentTime();
|
|
s.retransmitCount = 0;
|
|
s.data = d;
|
|
if (txBufferMutex.tryLock(100))
|
|
{
|
|
|
|
if (sendSeq == 0) {
|
|
// We are either the first ever sent packet or have rolled-over so clear the buffer.
|
|
txSeqBuf.clear();
|
|
congestion = 0;
|
|
}
|
|
if (txSeqBuf.size() > BUFSIZE)
|
|
{
|
|
txSeqBuf.erase(txSeqBuf.begin());
|
|
}
|
|
txSeqBuf.insert(sendSeq, s);
|
|
|
|
txBufferMutex.unlock();
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": txBuffer mutex is locked";
|
|
}
|
|
// Stop using purgeOldEntries() as it is likely slower than just removing the earliest packet.
|
|
//qInfo(logUdp()) << this->metaObject()->className() << "RX:" << rxSeqBuf.size() << "TX:" <<txSeqBuf.size() << "MISS:" << rxMissing.size();
|
|
//purgeOldEntries(); // Delete entries older than PURGE_SECONDS seconds (currently 5)
|
|
sendSeq++;
|
|
|
|
udpMutex.lock();
|
|
udp->writeDatagram(d, radioIP, port);
|
|
if (congestion > 10) { // Poor quality connection?
|
|
udp->writeDatagram(d, radioIP, port);
|
|
if (congestion > 20) // Even worse so send again.
|
|
udp->writeDatagram(d, radioIP, port);
|
|
}
|
|
if (idleTimer != Q_NULLPTR && idleTimer->isActive()) {
|
|
idleTimer->start(IDLE_PERIOD); // Reset idle counter if it's running
|
|
}
|
|
udpMutex.unlock();
|
|
packetsSent++;
|
|
return;
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Once a packet has reached PURGE_SECONDS old (currently 10) then it is not likely to be any use.
|
|
/// </summary>
|
|
void udpBase::purgeOldEntries()
|
|
{
|
|
// Erase old entries from the tx packet buffer
|
|
if (txBufferMutex.tryLock(100))
|
|
{
|
|
if (!txSeqBuf.isEmpty())
|
|
{
|
|
// Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS
|
|
for (auto it = txSeqBuf.begin(); it != txSeqBuf.end();) {
|
|
if (it.value().timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS) {
|
|
txSeqBuf.erase(it++);
|
|
}
|
|
else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
txBufferMutex.unlock();
|
|
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": txBuffer mutex is locked";
|
|
}
|
|
|
|
|
|
|
|
if (rxBufferMutex.tryLock(100))
|
|
{
|
|
if (!rxSeqBuf.isEmpty()) {
|
|
// Loop through the earliest items in the buffer and delete if older than PURGE_SECONDS
|
|
for (auto it = rxSeqBuf.begin(); it != rxSeqBuf.end();) {
|
|
if (it.value().secsTo(QTime::currentTime()) > PURGE_SECONDS) {
|
|
rxSeqBuf.erase(it++);
|
|
}
|
|
else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
rxBufferMutex.unlock();
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": rxBuffer mutex is locked";
|
|
}
|
|
|
|
if (missingMutex.tryLock(100))
|
|
{
|
|
// Erase old entries from the missing packets buffer
|
|
if (!rxMissing.isEmpty() && rxMissing.size() > 50) {
|
|
for (size_t i = 0; i < 25; ++i) {
|
|
rxMissing.erase(rxMissing.begin());
|
|
}
|
|
}
|
|
missingMutex.unlock();
|
|
}
|
|
else {
|
|
qInfo(logUdp()) << this->metaObject()->className() << ": missingBuffer mutex is locked";
|
|
}
|
|
}
|
|
|
|
void udpBase::printHex(const QByteArray& pdata)
|
|
{
|
|
printHex(pdata, false, true);
|
|
}
|
|
|
|
void udpBase::printHex(const QByteArray& pdata, bool printVert, bool printHoriz)
|
|
{
|
|
qDebug(logUdp()) << "---- Begin hex dump -----:";
|
|
QString sdata("DATA: ");
|
|
QString index("INDEX: ");
|
|
QStringList strings;
|
|
|
|
for (int i = 0; i < pdata.length(); i++)
|
|
{
|
|
strings << QString("[%1]: %2").arg(i, 8, 10, QChar('0')).arg((unsigned char)pdata[i], 2, 16, QChar('0'));
|
|
sdata.append(QString("%1 ").arg((unsigned char)pdata[i], 2, 16, QChar('0')));
|
|
index.append(QString("%1 ").arg(i, 2, 10, QChar('0')));
|
|
}
|
|
|
|
if (printVert)
|
|
{
|
|
for (int i = 0; i < strings.length(); i++)
|
|
{
|
|
//sdata = QString(strings.at(i));
|
|
qDebug(logUdp()) << strings.at(i);
|
|
}
|
|
}
|
|
|
|
if (printHoriz)
|
|
{
|
|
qDebug(logUdp()) << index;
|
|
qDebug(logUdp()) << sdata;
|
|
}
|
|
qDebug(logUdp()) << "----- End hex dump -----";
|
|
}
|
|
|
|
|