Tidy up server mutexes

merge-requests/5/head
Phil Taylor 2021-06-02 10:43:52 +01:00
rodzic 0f9e008507
commit 66c0d18b75
1 zmienionych plików z 43 dodań i 31 usunięć

Wyświetl plik

@ -805,14 +805,11 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
//if (current->type == "CIV") { //if (current->type == "CIV") {
// qInfo(logUdpServer()) << "Got:" << in->seq; // qInfo(logUdpServer()) << "Got:" << in->seq;
//} //}
current->rxMutex.lock();
if (current->rxSeqBuf.isEmpty()) if (current->rxSeqBuf.isEmpty())
{ {
if (current->rxSeqBuf.size() > 400) current->rxMutex.lock();
{
current->rxSeqBuf.remove(0);
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime()); current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
} }
else else
{ {
@ -821,8 +818,9 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
{ {
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.lastKey() << " current: " << hex << in->seq; qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.lastKey() << " current: " << hex << in->seq;
// Looks like it has rolled over so clear buffer and start again. // Looks like it has rolled over so clear buffer and start again.
current->rxMutex.lock();
current->rxSeqBuf.clear(); current->rxSeqBuf.clear();
current->rxMutex.unlock(); // Must unlock the Mutex! current->rxMutex.unlock();
current->missMutex.lock(); current->missMutex.lock();
current->rxMissing.clear(); current->rxMissing.clear();
current->missMutex.unlock(); current->missMutex.unlock();
@ -832,11 +830,13 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
if (!current->rxSeqBuf.contains(in->seq)) if (!current->rxSeqBuf.contains(in->seq))
{ {
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it. // Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
current->rxMutex.lock();
if (current->rxSeqBuf.size() > 400) if (current->rxSeqBuf.size() > 400)
{ {
current->rxSeqBuf.remove(0); current->rxSeqBuf.remove(0);
} }
current->rxSeqBuf.insert(in->seq, QTime::currentTime()); current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock();
} else{ } else{
// Check whether this is one of our missing ones! // Check whether this is one of our missing ones!
current->missMutex.lock(); current->missMutex.lock();
@ -849,7 +849,6 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
current->missMutex.unlock(); current->missMutex.unlock();
} }
} }
current->rxMutex.unlock();
} }
} }
@ -877,14 +876,12 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock(); c->txMutex.lock();
c->txSeqBuf.insert(seq, s); c->txSeqBuf.insert(seq, s);
c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
c->txSeq++;
//if (c->idleTimer != Q_NULLPTR) {
// c->idleTimer->start(100);
//}
} }
else { else {
p.seq = seq; p.seq = seq;
@ -982,12 +979,13 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock(); c->txMutex.lock();
c->txSeqBuf.insert(c->txSeq, s); c->txSeqBuf.insert(c->txSeq, s);
c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
c->txSeq++;
if (c->idleTimer != Q_NULLPTR) if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100); c->idleTimer->start(100);
@ -1090,7 +1088,9 @@ void udpServer::sendCapabilities(CLIENT* c)
c->txSeqBuf.remove(0); c->txSeqBuf.remove(0);
} }
c->txSeqBuf.insert(p.seq, s); c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
@ -1098,8 +1098,6 @@ void udpServer::sendCapabilities(CLIENT* c)
if (c->idleTimer != Q_NULLPTR) if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100); c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return; return;
} }
@ -1143,13 +1141,16 @@ void udpServer::sendConnectionInfo(CLIENT* c)
s.timeSent = QTime::currentTime(); s.timeSent = QTime::currentTime();
s.retransmitCount = 0; s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock(); c->txMutex.lock();
if (c->txSeqBuf.size() > 400) if (c->txSeqBuf.size() > 400)
{ {
c->txSeqBuf.remove(0); c->txSeqBuf.remove(0);
} }
c->txSeqBuf.insert(p.seq, s); c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
@ -1157,8 +1158,6 @@ void udpServer::sendConnectionInfo(CLIENT* c)
if (c->idleTimer != Q_NULLPTR) if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100); c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return; return;
} }
@ -1188,13 +1187,16 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
s.timeSent = QTime::currentTime(); s.timeSent = QTime::currentTime();
s.retransmitCount = 0; s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock(); c->txMutex.lock();
if (c->txSeqBuf.size() > 400) if (c->txSeqBuf.size() > 400)
{ {
c->txSeqBuf.remove(0); c->txSeqBuf.remove(0);
} }
c->txSeqBuf.insert(p.seq, s); c->txSeqBuf.insert(p.seq, s);
c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
@ -1203,8 +1205,6 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
if (c->idleTimer != Q_NULLPTR) if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100); c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return; return;
} }
@ -1255,13 +1255,14 @@ void udpServer::sendStatus(CLIENT* c)
{ {
c->txSeqBuf.remove(0); c->txSeqBuf.remove(0);
} }
c->txSeq++;
c->txSeqBuf.insert(p.seq, s); c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock(); c->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
c->txSeq++;
} }
@ -1290,18 +1291,20 @@ void udpServer::dataForServer(QByteArray d)
s.timeSent = QTime::currentTime(); s.timeSent = QTime::currentTime();
s.retransmitCount = 0; s.retransmitCount = 0;
s.data = t; s.data = t;
client->txMutex.lock(); client->txMutex.lock();
if (client->txSeqBuf.size() > 400) if (client->txSeqBuf.size() > 400)
{ {
client->txSeqBuf.remove(0); client->txSeqBuf.remove(0);
} }
client->txSeqBuf.insert(p.seq, s); client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->innerSeq++;
client->txMutex.unlock(); client->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port); client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock(); udpMutex.unlock();
client->txSeq++;
client->innerSeq++;
} }
} }
@ -1355,12 +1358,13 @@ void udpServer::receiveAudioData(const audioPacket& d)
client->txSeqBuf.remove(0); client->txSeqBuf.remove(0);
} }
client->txSeqBuf.insert(p.seq, s); client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->sendAudioSeq++;
client->txMutex.unlock(); client->txMutex.unlock();
udpMutex.lock(); udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port); client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock(); udpMutex.unlock();
client->txSeq++;
client->sendAudioSeq++;
} }
} }
@ -1379,7 +1383,6 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
QByteArray missingSeqs; QByteArray missingSeqs;
c->rxMutex.lock();
if (!c->rxSeqBuf.empty() && c->rxSeqBuf.size() <= c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey()) if (!c->rxSeqBuf.empty() && c->rxSeqBuf.size() <= c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey())
{ {
@ -1389,42 +1392,50 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1; qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
c->missMutex.lock(); c->missMutex.lock();
c->rxMissing.clear(); c->rxMissing.clear();
c->rxSeqBuf.clear();
c->missMutex.unlock(); c->missMutex.unlock();
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
} }
else { else {
// We have at least 1 missing packet! // We have at least 1 missing packet!
qDebug(logUdp()) << "Missing Seq: size=" << c->rxSeqBuf.size() << "firstKey=" << c->rxSeqBuf.firstKey() << "lastKey=" << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1; qDebug(logUdp()) << "Missing Seq: size=" << c->rxSeqBuf.size() << "firstKey=" << c->rxSeqBuf.firstKey() << "lastKey=" << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
// We are missing packets so iterate through the buffer and add the missing ones to missing packet list // We are missing packets so iterate through the buffer and add the missing ones to missing packet list
for (int i = 0; i < c->rxSeqBuf.keys().length() - 1; i++) { for (int i = 0; i < c->rxSeqBuf.keys().length() - 1; i++) {
c->missMutex.lock();
for (quint16 j = c->rxSeqBuf.keys()[i] + 1; j < c->rxSeqBuf.keys()[i + 1]; j++) { for (quint16 j = c->rxSeqBuf.keys()[i] + 1; j < c->rxSeqBuf.keys()[i + 1]; j++) {
auto s = c->rxMissing.find(j); auto s = c->rxMissing.find(j);
if (s == c->rxMissing.end()) if (s == c->rxMissing.end())
{ {
// We haven't seen this missing packet before // We haven't seen this missing packet before
qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j; qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j;
c->missMutex.lock();
c->rxMissing.insert(j, 0); c->rxMissing.insert(j, 0);
c->missMutex.unlock();
c->rxMutex.lock();
if (c->rxSeqBuf.size() > 400) if (c->rxSeqBuf.size() > 400)
{ {
c->rxSeqBuf.remove(0); c->rxSeqBuf.remove(0);
} }
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it. c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
c->rxMutex.unlock();
} }
else { else {
if (s.value() == 4) if (s.value() == 4)
{ {
// We have tried 4 times to request this packet, time to give up! // We have tried 4 times to request this packet, time to give up!
c->missMutex.lock();
s = c->rxMissing.erase(s); s = c->rxMissing.erase(s);
c->missMutex.unlock();
} }
} }
} }
c->missMutex.unlock();
} }
} }
} }
c->rxMutex.unlock();
c->missMutex.lock(); c->missMutex.lock();
for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it) for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it)
@ -1438,6 +1449,7 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
it.value()++; it.value()++;
} }
} }
c->missMutex.unlock();
if (missingSeqs.length() != 0) if (missingSeqs.length() != 0)
{ {
@ -1451,6 +1463,7 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
{ {
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8); p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq; qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
udpMutex.lock(); udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock(); udpMutex.unlock();
@ -1458,6 +1471,7 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
else else
{ {
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex(); qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
udpMutex.lock(); udpMutex.lock();
missingSeqs.insert(0, p.packet, sizeof(p.packet)); missingSeqs.insert(0, p.packet, sizeof(p.packet));
c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port); c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port);
@ -1465,7 +1479,6 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
} }
} }
c->missMutex.unlock();
} }
@ -1498,8 +1511,6 @@ void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
delete c->retransmitTimer; delete c->retransmitTimer;
} }
connMutex.lock();
c->rxMutex.lock(); c->rxMutex.lock();
c->rxSeqBuf.clear(); c->rxSeqBuf.clear();
c->rxMutex.unlock(); c->rxMutex.unlock();
@ -1513,6 +1524,7 @@ void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
c->missMutex.unlock(); c->missMutex.unlock();
connMutex.lock();
QList<CLIENT*>::iterator it = l->begin(); QList<CLIENT*>::iterator it = l->begin();
while (it != l->end()) { while (it != l->end()) {
CLIENT* client = *it; CLIENT* client = *it;