Changes to retransmit code

merge-requests/2/head
Phil Taylor 2021-02-22 22:25:09 +00:00
rodzic 6c54421f16
commit ae3005b8f8
3 zmienionych plików z 175 dodań i 79 usunięć

Wyświetl plik

@ -9,7 +9,7 @@
#define WATCHDOG_SIZE 0x14
#define PING_SIZE 0x15
#define OPENCLOSE_SIZE 0x16
#define RETRANSMIT_SIZE 0x18
#define RETRANSMIT_RANGE_SIZE 0x18
#define TOKEN_SIZE 0x40
#define STATUS_SIZE 0x50
#define LOGIN_RESPONSE_SIZE 0x60
@ -96,6 +96,23 @@ typedef union txaudio_packet {
char packet[TXAUDIO_SIZE];
} *txaudio_packet_t;
// 0x18 length retransmit_range packet
typedef union retransmit_range_packet {
struct
{
quint32 len; // 0x00
quint16 type; // 0x04
quint16 seq; // 0x06
quint32 sentid; // 0x08
quint32 rcvdid; // 0x0c
quint16 first; // 0x10
quint16 second; // 0x12
quint16 third; // 0x14
quint16 fourth; // 0x16
};
char packet[RETRANSMIT_RANGE_SIZE];
} *retransmit_range_packet_t;
// 0x18 length txaudio packet
/* tx[0] = static_cast<quint8>(tx.length() & 0xff);

Wyświetl plik

@ -97,6 +97,7 @@ udpHandler::~udpHandler()
}
}
void udpHandler::changeBufferSize(quint16 value)
{
emit haveChangeBufferSize(value);
@ -267,7 +268,7 @@ void udpHandler::dataReceived()
QHostAddress ip = QHostAddress(qToBigEndian(in->ipaddress));
if (!streamOpened && in->busy)
{
if (strcmp(in->computer,compName.toLocal8Bit()))
if (in->ipaddress != 0x00 && strcmp(in->computer,compName.toLocal8Bit()))
{
emit haveNetworkStatus(devName + " in use by: " + in->computer + " (" + ip.toString() + ")");
sendControl(false, 0x00, in->seq); // Respond with an idle
@ -278,6 +279,7 @@ void udpHandler::dataReceived()
QObject::connect(civ, SIGNAL(receive(QByteArray)), this, SLOT(receiveFromCivStream(QByteArray)));
QObject::connect(this, SIGNAL(haveChangeBufferSize(quint16)), audio, SLOT(changeBufferSize(quint16)));
streamOpened = true;
@ -532,19 +534,8 @@ void udpCivData::dataReceived()
default:
{
if (r.length() > 21) {
// First check if we are missing any packets?
uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
if (lastReceivedSeq == 0 || lastReceivedSeq > gotSeq) {
lastReceivedSeq = gotSeq;
}
for (uint16_t f = lastReceivedSeq + 1; f < gotSeq; f++) {
// Do we need to request a retransmit?
qDebug() << this->metaObject()->className() << ": Missing Sequence: (" << r.length() << ") " << f;
}
lastReceivedSeq = gotSeq;
// Process this packet, any re-transmit requests will happen later.
//uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
quint8 temp = r[0] - 0x15;
if ((quint8)r[16] == 0xc1 && (quint8)r[17] == temp)
@ -731,17 +722,6 @@ void udpAudio::dataReceived()
r.mid(0, 2) == QByteArrayLiteral("\x70\x04"))
{
// First check if we are missing any packets as seq should be sequential.
uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
if (lastReceivedSeq == 0 || lastReceivedSeq > gotSeq) {
lastReceivedSeq = gotSeq;
}
for (uint16_t f = lastReceivedSeq + 1; f < gotSeq; f++) {
// Do we need to request a retransmit?
qDebug() << this->metaObject()->className() << ": Missing Sequence: (" << r.length() << ") " << f;
}
lastReceivedSeq = gotSeq;
rxaudio->incomingAudio(r.mid(24));
}
@ -795,14 +775,17 @@ udpBase::~udpBase()
void udpBase::dataReceived(QByteArray r)
{
if (r.length() < 0x10)
{
return; // Packet too small do to anything with?
}
switch (r.length())
{
case (CONTROL_SIZE): // Empty response used for simple comms and retransmit requests.
{
control_packet_t in = (control_packet_t)r.constData();
// We should check for missing packets here
// for now just store received seq.
lastReceivedSeq = in->seq;
if (in->type == 0x04) {
qDebug() << this->metaObject()->className() << ": Received I am here";
areYouThereCounter = 0;
@ -818,22 +801,23 @@ void udpBase::dataReceived(QByteArray r)
{
// retransmit request
// Send an idle with the requested seqnum if not found.
bool found = false;
for (int f = txSeqBuf.length() - 1; f >= 0; f--)
{
packetsLost++;
if (txSeqBuf[f].seqNum == in->seq) {
//qDebug() << this->metaObject()->className() << ": retransmitting packet :" << gotSeq << " (len=" << txSeqBuf[f].data.length() << ")";
QMutexLocker locker(&mutex);
udp->writeDatagram(txSeqBuf[f].data, radioIP, port);
found = true;
break;
}
packetsLost++;
auto match = std::find_if(txSeqBuf.cbegin(), txSeqBuf.cend(), [&cs = in->seq](const SEQBUFENTRY& s) {
return s.seqNum == cs;
});
if (match != txSeqBuf.cend()) {
// Found matching entry?
// Send "untracked" as it has already been sent once.
QMutexLocker locker(&mutex);
qDebug() << this->metaObject()->className() << ": Sending retransmit of " << match->seqNum;
udp->writeDatagram(match->data, radioIP, port);
break;
}
if (!found)
{
else {
// Packet was not found in buffer
//qDebug() << this->metaObject()->className() << ": Could not find requested packet " << gotSeq << ", sending idle.";
qDebug() << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle.";
sendControl(false, 0, in->seq);
}
}
@ -845,7 +829,6 @@ void udpBase::dataReceived(QByteArray r)
if (in->type == 0x07)
{
// It is a ping request/response
//uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
if (in->reply == 0x00)
{
ping_packet p;
@ -860,17 +843,16 @@ void udpBase::dataReceived(QByteArray r)
QMutexLocker locker(&mutex);
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
}
else if (r[0x10] == (char)0x01) {
else if (in->reply == 0x01) {
if (in->seq == pingSendSeq)
{
// This is response to OUR request so increment counter
pingSendSeq++;
}
else {
// Not sure what to do here, need to spend more time with the protocol but try sending ping with same seq next time?
//qDebug() << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << gotSeq;
// Not sure what to do here, need to spend more time with the protocol but will try sending ping with same seq next time.
qDebug() << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << in->seq;
}
}
else {
qDebug() << "Unhandled response to ping. I have never seen this! 0x10=" << r[16];
@ -879,42 +861,111 @@ void udpBase::dataReceived(QByteArray r)
}
break;
}
case (0x18):
case (RETRANSMIT_RANGE_SIZE):
{
if (r.mid(0, 6) == QByteArrayLiteral("\x18\x00\x00\x00\x01\x00"))
{ // retransmit range request, can contain multiple ranges.
for (int f = 16; f < r.length() - 4; f = f + 4)
{
quint16 start = qFromLittleEndian<quint16>(r.mid(f, 2));
quint16 end = qFromLittleEndian<quint16>(r.mid(f + 2, 2));
packetsLost = packetsLost + (end - start);
qDebug() << this->metaObject()->className() << ": Retransmit range request for:" << start << " to " << end;
for (quint16 gotSeq = start; gotSeq <= end; gotSeq++)
{
bool found = false;
for (int h = txSeqBuf.length() - 1; h >= 0; h--)
if (txSeqBuf[h].seqNum == gotSeq) {
//qDebug() << this->metaObject()->className() << ": retransmitting packet :" << gotSeq << " (len=" << txSeqBuf[f].data.length() << ")";
QMutexLocker locker(&mutex);
udp->writeDatagram(txSeqBuf[h].data, radioIP, port);
found = true;
break;
}
if (!found)
{
//qDebug() << this->metaObject()->className() << ": Could not find requested packet " << gotSeq << ", sending idle.";
sendControl(false, 0, gotSeq);
}
retransmit_range_packet_t in = (retransmit_range_packet_t)r.constData();
if (in->type==0x01)
{ // retransmit range request
qDebug() << this->metaObject()->className() << ": Retransmit range request for:" << in->first << ", " << in->second << ", " << in->third << ", " << in->fourth << ", ";
auto match = std::find_if(txSeqBuf.cbegin(), txSeqBuf.cend(), [&ca = in->first, &cb = in->second, &cc = in->third, &cd = in->fourth](const SEQBUFENTRY& s) {
return s.seqNum == ca || s.seqNum == cb || s.seqNum == cc || s.seqNum == cd;
});
if (match == txSeqBuf.cend()) {
qDebug() << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle.";
sendControl(false, 0, in->seq);
}
else {
while (match != txSeqBuf.cend()) {
// Found matching entry?
// Send "untracked" as it has already been sent once.
qDebug() << this->metaObject()->className() << ": Sending retransmit of " << match->seqNum;
QMutexLocker locker(&mutex);
udp->writeDatagram(match->data, radioIP, port);
udp->writeDatagram(match->data, radioIP, port);
match++;
packetsLost++;
}
}
}
break;
}
default:
break;
{
// All packets "should" be added to the incoming buffer.
// First check that we haven't already received it.
}
break;
}
// All packets except ping and retransmit requests should trigger this.
control_packet_t in = (control_packet_t)r.constData();
if (r.length() != PING_SIZE && in->type != (char)0x01 && in->seq != 0)
{
if (in->seq < lastReceivedSeq)
{
qDebug() << this->metaObject()->className() << ": ******* seq number may have rolled over ****** previous highest: " << rxSeqBuf.back() << " current: " << in->seq;
// Looks like it has rolled over so clear buffer and start again.
rxSeqBuf.clear();
rxSeqBuf.append(in->seq);
lastReceivedSeq = in->seq;
return;
}
if (!rxSeqBuf.contains(in->seq))
{
rxSeqBuf.append(in->seq);
}
if (!rxSeqBuf.isEmpty())
{
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
// Find all gaps in received packets (in reverse order)
quint16 first, second, third,count=0;
auto i = std::adjacent_find(rxSeqBuf.begin(), rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; });
while (i != rxSeqBuf.end())
{
if (count == 0)
first = *i;
else if (count == 1)
second = *i;
else if (count == 2)
third = *i;
else {
break;
}
count++;
i++;
}
if (count == 1)
{
qDebug() << this->metaObject()->className() << ": Requesting retransmit of: " << first;
sendControl(false, 0x01, first);
}
else if (count == 2) {
count = 3;
third=second;
}
if (count == 3)
{
qDebug() << this->metaObject()->className() << ": Requesting retransmit of: " << first << ", " << second << ", " << third;
sendRetransmitRange(first, second, third);
}
}
}
}
// Used to send idle and other "control" style messages
@ -952,7 +1003,23 @@ void udpBase::sendPing()
lastPingSentTime = QDateTime::currentDateTime();
QMutexLocker locker(&mutex);
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
innerSendSeq++;
return;
}
void udpBase::sendRetransmitRange(quint16 first, quint16 second, quint16 third)
{
retransmit_range_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = sizeof(p);
p.type = 0x00;
p.sentid = myId;
p.rcvdid = remoteId;
p.first = first;
p.second = first;
p.third = second;
p.fourth = third;
QMutexLocker locker(&mutex);
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
return;
}
@ -988,6 +1055,14 @@ void udpBase::purgeOldEntries()
txSeqBuf.removeAt(f);
}
}
if (rxSeqBuf.length() > 2048) {
// If the buffer is over 2K, remove the first 1K.
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
rxSeqBuf.remove(0,1024);
lastReceivedSeq = *rxSeqBuf.begin();
}
}
/// <summary>

Wyświetl plik

@ -9,6 +9,7 @@
#include <QMutex>
#include <QDateTime>
#include <QByteArray>
#include <QVector>
// Allow easy endian-ness conversions
#include <QtEndian>
@ -44,7 +45,9 @@ public:
void init();
void dataReceived(QByteArray r);
void sendPing(); // Periodic type 0x07 ping packet sending
void sendPing();
void sendRetransmitRange(quint16 first, quint16 second, quint16 third);
void sendControl(bool tracked,quint8 id, quint16 seq);
QTime timeStarted;
@ -56,7 +59,7 @@ public:
uint16_t innerSendSeq = 0x8304; // Not sure why?
uint16_t sendSeqB = 0;
uint16_t sendSeq = 1;
uint16_t lastReceivedSeq = 0;
uint16_t lastReceivedSeq = 1;
uint16_t pkt0SendSeq = 0;
uint16_t periodicSeq = 0;
quint64 latency = 0;
@ -79,8 +82,9 @@ public:
QByteArray data;
};
QList <SEQBUFENTRY> txSeqBuf = QList<SEQBUFENTRY>();
std::vector< quint16 > rxSeqBuf;
QVector<SEQBUFENTRY> txSeqBuf = QVector<SEQBUFENTRY>();
QVector< quint16 > rxSeqBuf = QVector<quint16>();
void sendTrackedPacket(QByteArray d);
void purgeOldEntries();