Server only tries mutex lock for 10ms before giving up.

monitor
Phil Taylor 2021-11-10 20:45:59 +00:00
rodzic c99439e759
commit 0cc2945d28
1 zmienionych plików z 420 dodań i 194 usunięć

Wyświetl plik

@ -2,6 +2,7 @@
#include "logcategories.h"
#define STALE_CONNECTION 15
#define LOCK_PERIOD 10 // time to attempt to lock Mutex in ms
udpServer::udpServer(SERVERCONFIG config, audioSetup outAudio, audioSetup inAudio) :
config(config),
outAudio(outAudio),
@ -157,10 +158,15 @@ void udpServer::controlReceived()
current->commonCap = 0x8010;
qInfo(logUdpServer()) << current->ipAddress.toString() << ": New Control connection created";
connMutex.lock();
controlClients.append(current);
connMutex.unlock();
if (connMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
controlClients.append(current);
connMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock connMutex()";
}
}
current->lastHeard = QDateTime::currentDateTime();
@ -443,9 +449,15 @@ void udpServer::civReceived()
current->retransmitTimer->start(RETRANSMIT_PERIOD);
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): New connection created";
connMutex.lock();
civClients.append(current);
connMutex.unlock();
if (connMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
civClients.append(current);
connMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock connMutex()";
}
}
@ -589,9 +601,14 @@ void udpServer::audioReceived()
current->retransmitTimer->start(RETRANSMIT_PERIOD);
current->seqPrefix = 0;
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): New connection created";
connMutex.lock();
audioClients.append(current);
connMutex.unlock();
if (connMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
audioClients.append(current);
connMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock connMutex()";
}
}
@ -713,9 +730,15 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
// Don't constantly retransmit the same packet, give-up eventually
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum;
match->retransmitCount++;
udpMutex.lock();
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
else {
// Just send an idle!
@ -751,9 +774,15 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
// Send "untracked" as it has already been sent once.
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum;
match->retransmitCount++;
udpMutex.lock();
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
match++;
}
}
@ -765,9 +794,15 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
//}
if (current->rxSeqBuf.isEmpty())
{
current->rxMutex.lock();
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
if (current->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
}
else
{
@ -776,35 +811,58 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
{
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.lastKey() << " current: " << hex << in->seq;
// Looks like it has rolled over so clear buffer and start again.
current->rxMutex.lock();
current->rxSeqBuf.clear();
current->rxMutex.unlock();
current->missMutex.lock();
current->rxMissing.clear();
current->missMutex.unlock();
if (current->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->rxSeqBuf.clear();
current->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
if (current->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->rxMissing.clear();
current->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
return;
}
if (!current->rxSeqBuf.contains(in->seq))
{
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
current->rxMutex.lock();
if (current->rxSeqBuf.size() > 400)
if (current->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
current->rxSeqBuf.remove(0);
if (current->rxSeqBuf.size() > 400)
{
current->rxSeqBuf.remove(0);
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
} else{
// Check whether this is one of our missing ones!
current->missMutex.lock();
QMap<quint16, int>::iterator s = current->rxMissing.find(in->seq);
if (s != current->rxMissing.end())
if (current->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Missing SEQ has been received! " << hex << in->seq;
s = current->rxMissing.erase(s);
QMap<quint16, int>::iterator s = current->rxMissing.find(in->seq);
if (s != current->rxMissing.end())
{
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Missing SEQ has been received! " << hex << in->seq;
s = current->rxMissing.erase(s);
}
current->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
current->missMutex.unlock();
}
}
}
@ -832,20 +890,37 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.insert(seq, s);
c->txSeq++;
c->txMutex.unlock();
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.insert(seq, s);
c->txSeq++;
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
p.seq = seq;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
return;
@ -893,9 +968,14 @@ void udpServer::sendPing(QList<CLIENT*>* l, CLIENT* c, quint16 seq, bool reply)
p.time = pingTime;
p.reply = (char)reply;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
return;
}
@ -935,14 +1015,24 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.insert(c->txSeq, s);
c->txSeq++;
c->txMutex.unlock();
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.insert(c->txSeq, s);
c->txSeq++;
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
@ -1040,18 +1130,29 @@ void udpServer::sendCapabilities(CLIENT* c)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.remove(0);
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
@ -1100,18 +1201,30 @@ void udpServer::sendConnectionInfo(CLIENT* c)
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.remove(0);
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
@ -1146,18 +1259,29 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.remove(0);
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
if (c->idleTimer != Q_NULLPTR)
@ -1253,18 +1377,29 @@ void udpServer::sendStatus(CLIENT* c)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.remove(0);
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeq++;
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
c->txSeq++;
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
@ -1297,19 +1432,31 @@ void udpServer::dataForServer(QByteArray d)
s.retransmitCount = 0;
s.data = t;
client->txMutex.lock();
if (client->txSeqBuf.size() > 400)
if (client->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->txSeqBuf.remove(0);
if (client->txSeqBuf.size() > 400)
{
client->txSeqBuf.remove(0);
}
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->innerSeq++;
client->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->innerSeq++;
client->txMutex.unlock();
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
} else {
qInfo(logUdpServer()) << "Got data for different ID" << hex << (quint8)d[lastFE+1] << ":" << hex << (quint8)d[lastFE+2];
}
@ -1359,19 +1506,31 @@ void udpServer::receiveAudioData(const audioPacket& d)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = t;
client->txMutex.lock();
if (client->txSeqBuf.size() > 400)
if (client->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->txSeqBuf.remove(0);
if (client->txSeqBuf.size() > 400)
{
client->txSeqBuf.remove(0);
}
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->sendAudioSeq++;
client->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->sendAudioSeq++;
client->txMutex.unlock();
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
}
@ -1397,13 +1556,24 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
{
// Too many packets to process, flush buffers and start again!
qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
c->missMutex.lock();
c->rxMissing.clear();
c->missMutex.unlock();
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxMissing.clear();
c->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
if (c->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxSeqBuf.clear();
c->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
}
else {
// We have at least 1 missing packet!
@ -1417,25 +1587,43 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
// We haven't seen this missing packet before
qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j;
c->missMutex.lock();
c->rxMissing.insert(j, 0);
c->missMutex.unlock();
c->rxMutex.lock();
if (c->rxSeqBuf.size() > 400)
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxSeqBuf.remove(0);
c->rxMissing.insert(j, 0);
c->missMutex.unlock();
}
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
c->rxMutex.unlock();
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
if (c->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
if (c->rxSeqBuf.size() > 400)
{
c->rxSeqBuf.remove(0);
}
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
c->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
}
else {
if (s.value() == 4)
{
// We have tried 4 times to request this packet, time to give up!
c->missMutex.lock();
s = c->rxMissing.erase(s);
c->missMutex.unlock();
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
s = c->rxMissing.erase(s);
c->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
}
}
@ -1444,48 +1632,64 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
}
}
c->missMutex.lock();
for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it)
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
if (it.value() < 10)
for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it)
{
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
it.value()++;
if (it.value() < 10)
{
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
it.value()++;
}
}
if (missingSeqs.length() != 0)
{
control_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.type = 0x01;
p.seq = 0x0000;
p.sentid = c->myId;
p.rcvdid = c->remoteId;
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
else
{
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
missingSeqs.insert(0, p.packet, sizeof(p.packet));
c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
}
c->missMutex.unlock();
}
if (missingSeqs.length() != 0)
{
control_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.type = 0x01;
p.seq = 0x0000;
p.sentid = c->myId;
p.rcvdid = c->remoteId;
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else
{
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
udpMutex.lock();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
c->missMutex.unlock();
}
@ -1514,35 +1718,57 @@ void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
delete c->retransmitTimer;
}
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
c->txMutex.lock();
c->txSeqBuf.clear();
c->txMutex.unlock();
c->missMutex.lock();
c->rxMissing.clear();
c->missMutex.unlock();
connMutex.lock();
QList<CLIENT*>::iterator it = l->begin();
while (it != l->end()) {
CLIENT* client = *it;
if (client != Q_NULLPTR && client == c) {
qInfo(logUdpServer()) << "Found" << client->type << "connection to: " << client->ipAddress.toString() << ":" << QString::number(client->port);
it = l->erase(it);
}
else {
++it;
}
if (c->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxSeqBuf.clear();
c->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->txSeqBuf.clear();
c->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxMissing.clear();
c->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
if (connMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
QList<CLIENT*>::iterator it = l->begin();
while (it != l->end()) {
CLIENT* client = *it;
if (client != Q_NULLPTR && client == c) {
qInfo(logUdpServer()) << "Found" << client->type << "connection to: " << client->ipAddress.toString() << ":" << QString::number(client->port);
it = l->erase(it);
}
else {
++it;
}
}
delete c; // Is this needed or will the erase have done it?
c = Q_NULLPTR;
qInfo(logUdpServer()) << "Current Number of clients connected: " << l->length();
connMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock connMutex()";
}
delete c; // Is this needed or will the erase have done it?
c = Q_NULLPTR;
qInfo(logUdpServer()) << "Current Number of clients connected: " << l->length();
connMutex.unlock();
if (l->length() == 0) {