Mutex work in udpserver

merge-requests/5/head
Phil Taylor 2021-06-01 17:49:48 +01:00
rodzic c7c4a326da
commit 36c778961e
1 zmienionych plików z 388 dodań i 383 usunięć

Wyświetl plik

@ -328,7 +328,7 @@ void udpServer::controlReceived()
current->clientName = in->name;
current->authInnerSeq = in->innerseq;
current->tokenRx = in->tokrequest;
current->tokenTx =(quint8)rand() | (quint8)rand() << 8 | (quint8)rand() << 16 | (quint8)rand() << 24;
current->tokenTx = (quint8)rand() | (quint8)rand() << 8 | (quint8)rand() << 16 | (quint8)rand() << 24;
if (userOk) {
qInfo(logUdpServer()) << current->ipAddress.toString() << ": User " << current->user.username << " login OK";
@ -392,11 +392,11 @@ void udpServer::controlReceived()
txAudioThread->start();
connect(this, SIGNAL(setupTxAudio(quint8,quint8,quint16,quint16,bool,bool,int,quint8)), txaudio, SLOT(init(quint8,quint8,quint16,quint16,bool,bool,int,quint8)));
connect(this, SIGNAL(setupTxAudio(quint8, quint8, quint16, quint16, bool, bool, int, quint8)), txaudio, SLOT(init(quint8, quint8, quint16, quint16, bool, bool, int, quint8)));
connect(txAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater()));
emit setupTxAudio(samples, channels, current->txSampleRate, current->txBufferLen, uLaw, false, config.audioOutput, config.resampleQuality);
hasTxAudio=datagram.senderAddress();
hasTxAudio = datagram.senderAddress();
connect(this, SIGNAL(haveAudioData(audioPacket)), txaudio, SLOT(incomingAudio(audioPacket)));
@ -425,10 +425,10 @@ void udpServer::controlReceived()
rxaudio->moveToThread(rxAudioThread);
rxAudioThread->start();
connect(this, SIGNAL(setupRxAudio(quint8,quint8,quint16,quint16,bool,bool,int,quint8)), rxaudio, SLOT(init(quint8,quint8,quint16,quint16,bool,bool,int,quint8)));
connect(this, SIGNAL(setupRxAudio(quint8, quint8, quint16, quint16, bool, bool, int, quint8)), rxaudio, SLOT(init(quint8, quint8, quint16, quint16, bool, bool, int, quint8)));
connect(rxAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater()));
emit setupRxAudio(samples, channels, current->rxSampleRate,150, uLaw, true, config.audioInput, config.resampleQuality);
emit setupRxAudio(samples, channels, current->rxSampleRate, 150, uLaw, true, config.audioInput, config.resampleQuality);
rxAudioTimer = new QTimer();
rxAudioTimer->setTimerType(Qt::PreciseTimer);
@ -711,7 +711,7 @@ void udpServer::audioReceived()
}
void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
{
Q_UNUSED(l); // We might need it later!
if (current == Q_NULLPTR || r.isNull()) {
@ -761,7 +761,8 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
udpMutex.lock();
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
} else {
}
else {
// Just send an idle!
sendControl(current, 0x00, in->seq);
}
@ -888,7 +889,7 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
void udpServer::sendPing(QList<CLIENT*> *l,CLIENT* c, quint16 seq, bool reply)
void udpServer::sendPing(QList<CLIENT*>* l, CLIENT* c, quint16 seq, bool reply)
{
// Also use to detect "stale" connections
QDateTime now = QDateTime::currentDateTime();
@ -962,7 +963,7 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
c->wdTimer->stop();
}
else {
strcpy(p.connection,"WFVIEW");
strcpy(p.connection, "WFVIEW");
}
c->txMutex.lock();
@ -1182,7 +1183,7 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
void udpServer::watchdog(CLIENT* c)
{
c->txMutex.lock();
if (c->txMutex.tryLock()) {
//qInfo(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length();
// Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these?
if (!c->txSeqBuf.isEmpty())
@ -1191,16 +1192,16 @@ void udpServer::watchdog(CLIENT* c)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end());
}
c->txMutex.unlock();
}
// Erase old entries from the missing packets buffer
c->missMutex.lock();
if (c->missMutex.tryLock()) {
if (!c->rxMissing.isEmpty()) {
c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end());
}
c->missMutex.unlock();
c->rxMutex.lock();
}
if (c->rxMutex.tryLock()) {
if (!c->rxSeqBuf.isEmpty()) {
std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end());
@ -1210,6 +1211,7 @@ void udpServer::watchdog(CLIENT* c)
}
}
c->rxMutex.unlock();
}
}
void udpServer::sendStatus(CLIENT* c)
@ -1234,8 +1236,8 @@ void udpServer::sendStatus(CLIENT* c)
p.identa = c->identa;
p.identb = c->identb;
p.civport=qToBigEndian(c->civPort);
p.audioport=qToBigEndian(c->audioPort);
p.civport = qToBigEndian(c->civPort);
p.audioport = qToBigEndian(c->audioPort);
// Send this to reject the request to tx/rx audio/civ
//memcpy(p + 0x30, QByteArrayLiteral("\xff\xff\xff\xfe").constData(), 4);
@ -1264,7 +1266,7 @@ void udpServer::dataForServer(QByteArray d)
foreach(CLIENT * client, civClients)
{
int lastFE = d.lastIndexOf((quint8)0xfe);
if (client != Q_NULLPTR && client->connected && d.length() > lastFE+2 && ((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId)) {
if (client != Q_NULLPTR && client->connected && d.length() > lastFE + 2 && ((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId)) {
data_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = (quint16)d.length() + sizeof(p);
@ -1314,7 +1316,7 @@ void udpServer::sendRxAudio()
void udpServer::receiveAudioData(const audioPacket &d)
void udpServer::receiveAudioData(const audioPacket& d)
{
//qInfo(logUdpServer()) << "Server got:" << d.data.length();
foreach(CLIENT * client, audioClients)
@ -1339,10 +1341,13 @@ void udpServer::receiveAudioData(const audioPacket &d)
client->txSeqBuf.last().data = t;
client->txMutex.unlock();
udpMutex.lock();
if (udpMutex.tryLock()) {
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qDebug(logUdpServer()) << "Failed to lock udpMutex()";
}
client->txSeq++;
client->sendAudioSeq++;
}
@ -1356,7 +1361,7 @@ void udpServer::receiveAudioData(const audioPacket &d)
/// This will run every 100ms so out-of-sequence packets will not trigger a retransmit request.
/// </summary>
/// <param name="c"></param>
void udpServer::sendRetransmitRequest(CLIENT *c)
void udpServer::sendRetransmitRequest(CLIENT* c)
{
c->missMutex.lock();
@ -1377,7 +1382,7 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
if (s == c->rxMissing.end())
{
// We haven't seen this missing packet before
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len="<< c->rxMissing.length() << "): " << j;
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len=" << c->rxMissing.length() << "): " << j;
c->rxMissing.append(SEQBUFENTRY());
c->rxMissing.last().seqNum = j;
c->rxMissing.last().retransmitCount = 0;
@ -1457,7 +1462,7 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
/// </summary>
/// <param name="l"></param>
/// <param name="c"></param>
void udpServer::deleteConnection(QList<CLIENT*> *l, CLIENT* c)
void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
{
connMutex.lock();