From 36c778961e5db4812e1276c3f4fd6e79b84f0adc Mon Sep 17 00:00:00 2001 From: Phil Taylor Date: Tue, 1 Jun 2021 17:49:48 +0100 Subject: [PATCH] Mutex work in udpserver --- udpserver.cpp | 771 +++++++++++++++++++++++++------------------------- 1 file changed, 388 insertions(+), 383 deletions(-) diff --git a/udpserver.cpp b/udpserver.cpp index 14ae346..98ea5c3 100644 --- a/udpserver.cpp +++ b/udpserver.cpp @@ -44,7 +44,7 @@ void udpServer::init() qInfo(logUdpServer()) << "Server Binding Control to: " << config.controlPort; udpControl = new QUdpSocket(this); - udpControl->bind(config.controlPort); + udpControl->bind(config.controlPort); QUdpSocket::connect(udpControl, &QUdpSocket::readyRead, this, &udpServer::controlReceived); qInfo(logUdpServer()) << "Server Binding CIV to: " << config.civPort; @@ -156,7 +156,7 @@ udpServer::~udpServer() connMutex.unlock(); - + } @@ -232,218 +232,218 @@ void udpServer::controlReceived() switch (r.length()) { - - case (CONTROL_SIZE): - { - control_packet_t in = (control_packet_t)r.constData(); - if (in->type == 0x05) - { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received 'disconnect' request"; - sendControl(current, 0x00, in->seq); - //current->wdTimer->stop(); // Keep watchdog running to delete stale connection. - deleteConnection(&controlClients, current); - } - break; - } - case (PING_SIZE): - { - ping_packet_t in = (ping_packet_t)r.constData(); - if (in->type == 0x07) - { - // It is a ping request/response - if (in->reply == 0x00) + case (CONTROL_SIZE): + { + control_packet_t in = (control_packet_t)r.constData(); + if (in->type == 0x05) + { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received 'disconnect' request"; + sendControl(current, 0x00, in->seq); + //current->wdTimer->stop(); // Keep watchdog running to delete stale connection. + deleteConnection(&controlClients, current); + } + break; + } + case (PING_SIZE): + { + ping_packet_t in = (ping_packet_t)r.constData(); + if (in->type == 0x07) + { + // It is a ping request/response + + if (in->reply == 0x00) + { + current->rxPingTime = in->time; + sendPing(&controlClients, current, in->seq, true); + } + else if (in->reply == 0x01) { + if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) { - current->rxPingTime = in->time; - sendPing(&controlClients, current, in->seq, true); - } - else if (in->reply == 0x01) { - if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) - { - // A Reply to our ping! - if (in->seq == current->pingSeq) { - current->pingSeq++; - } - else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; - } + // A Reply to our ping! + if (in->seq == current->pingSeq) { + current->pingSeq++; + } + else { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; } } } - break; } - case (TOKEN_SIZE): - { - // Token request - token_packet_t in = (token_packet_t)r.constData(); - current->rxSeq = in->seq; - current->authInnerSeq = in->innerseq; - current->identa = in->identa; - current->identb = in->identb; - if (in->res == 0x02) { - // Request for new token - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received create token request"; - sendCapabilities(current); - sendConnectionInfo(current); - } - else if (in->res == 0x01) { - // Token disconnect - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received token disconnect request"; - sendTokenResponse(current, in->res); - } - else if (in->res == 0x04) { - // Disconnect audio/civ - sendTokenResponse(current, in->res); - current->isStreaming = false; - sendConnectionInfo(current); - } - else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received token request"; - sendTokenResponse(current, in->res); - } - break; - } - case (LOGIN_SIZE): - { - login_packet_t in = (login_packet_t)r.constData(); - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received 'login'"; - bool userOk = false; - foreach(SERVERUSER user, config.users) - { - QByteArray usercomp; - passcode(user.username, usercomp); - QByteArray passcomp; - passcode(user.password, passcomp); - if (!strcmp(in->username, usercomp.constData()) && !strcmp(in->password, passcomp.constData())) - { - userOk = true; - current->user = user; - break; - } - - - } - // Generate login response - current->rxSeq = in->seq; - current->clientName = in->name; - current->authInnerSeq = in->innerseq; - current->tokenRx = in->tokrequest; - current->tokenTx =(quint8)rand() | (quint8)rand() << 8 | (quint8)rand() << 16 | (quint8)rand() << 24; - - if (userOk) { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": User " << current->user.username << " login OK"; - sendLoginResponse(current, true); - } - else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Incorrect username/password"; - - sendLoginResponse(current, false); - } - break; - } - case (CONNINFO_SIZE): - { - conninfo_packet_t in = (conninfo_packet_t)r.constData(); - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received request for radio connection"; - // Request to start audio and civ! - current->isStreaming = true; - current->rxSeq = in->seq; - current->rxCodec = in->rxcodec; - current->txCodec = in->txcodec; - current->rxSampleRate = qFromBigEndian(in->rxsample); - current->txSampleRate = qFromBigEndian(in->txsample); - current->txBufferLen = qFromBigEndian(in->txbuffer); - current->authInnerSeq = in->innerseq; - current->identa = in->identa; - current->identb = in->identb; - sendStatus(current); - current->authInnerSeq = 0x00; + break; + } + case (TOKEN_SIZE): + { + // Token request + token_packet_t in = (token_packet_t)r.constData(); + current->rxSeq = in->seq; + current->authInnerSeq = in->innerseq; + current->identa = in->identa; + current->identb = in->identb; + if (in->res == 0x02) { + // Request for new token + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received create token request"; + sendCapabilities(current); sendConnectionInfo(current); - qInfo(logUdpServer()) << current->ipAddress.toString() << ": rxCodec:" << current->rxCodec << " txCodec:" << current->txCodec << - " rxSampleRate" << current->rxSampleRate << - " txSampleRate" << current->rxSampleRate << - " txBufferLen" << current->txBufferLen; - - if (!config.lan) { - // Radio is connected by USB/Serial and we assume that audio is connected as well. - // Create audio TX/RX threads if they don't already exist (first client chooses samplerate/codec) - if (txaudio == Q_NULLPTR) - { - bool uLaw = false; - quint8 channels = 1; - quint8 samples = 8; - txSampleRate = current->txSampleRate; - txCodec = current->txCodec; - - if (current->txCodec == 0x01 || current->txCodec == 0x20) { - uLaw = true; - } - if (current->txCodec == 0x08 || current->txCodec == 0x10 || current->txCodec == 0x20) { - channels = 2; - } - if (current->txCodec == 0x04 || current->txCodec == 0x10) { - samples = 16; - } - - - txaudio = new audioHandler(); - txAudioThread = new QThread(this); - txaudio->moveToThread(txAudioThread); - - txAudioThread->start(); - - connect(this, SIGNAL(setupTxAudio(quint8,quint8,quint16,quint16,bool,bool,int,quint8)), txaudio, SLOT(init(quint8,quint8,quint16,quint16,bool,bool,int,quint8))); - connect(txAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater())); - - emit setupTxAudio(samples, channels, current->txSampleRate, current->txBufferLen, uLaw, false, config.audioOutput, config.resampleQuality); - hasTxAudio=datagram.senderAddress(); - - connect(this, SIGNAL(haveAudioData(audioPacket)), txaudio, SLOT(incomingAudio(audioPacket))); - - } - if (rxaudio == Q_NULLPTR) - { - bool uLaw = false; - quint8 channels = 1; - quint8 samples = 8; - rxSampleRate = current->rxSampleRate; - rxCodec = current->rxCodec; - - if (current->rxCodec == 0x01 || current->rxCodec == 0x20) { - uLaw = true; - } - if (current->rxCodec == 0x08 || current->rxCodec == 0x10 || current->rxCodec == 0x20) { - channels = 2; - } - if (current->rxCodec == 0x04 || current->rxCodec == 0x10) { - samples = 16; - } - - - rxaudio = new audioHandler(); - rxAudioThread = new QThread(this); - rxaudio->moveToThread(rxAudioThread); - rxAudioThread->start(); - - connect(this, SIGNAL(setupRxAudio(quint8,quint8,quint16,quint16,bool,bool,int,quint8)), rxaudio, SLOT(init(quint8,quint8,quint16,quint16,bool,bool,int,quint8))); - connect(rxAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater())); - - emit setupRxAudio(samples, channels, current->rxSampleRate,150, uLaw, true, config.audioInput, config.resampleQuality); - - rxAudioTimer = new QTimer(); - rxAudioTimer->setTimerType(Qt::PreciseTimer); - connect(rxAudioTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRxAudio, this)); - rxAudioTimer->start(20); - } - + } + else if (in->res == 0x01) { + // Token disconnect + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received token disconnect request"; + sendTokenResponse(current, in->res); + } + else if (in->res == 0x04) { + // Disconnect audio/civ + sendTokenResponse(current, in->res); + current->isStreaming = false; + sendConnectionInfo(current); + } + else { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received token request"; + sendTokenResponse(current, in->res); + } + break; + } + case (LOGIN_SIZE): + { + login_packet_t in = (login_packet_t)r.constData(); + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received 'login'"; + bool userOk = false; + foreach(SERVERUSER user, config.users) + { + QByteArray usercomp; + passcode(user.username, usercomp); + QByteArray passcomp; + passcode(user.password, passcomp); + if (!strcmp(in->username, usercomp.constData()) && !strcmp(in->password, passcomp.constData())) + { + userOk = true; + current->user = user; + break; } - break; + } - default: - { - break; + // Generate login response + current->rxSeq = in->seq; + current->clientName = in->name; + current->authInnerSeq = in->innerseq; + current->tokenRx = in->tokrequest; + current->tokenTx = (quint8)rand() | (quint8)rand() << 8 | (quint8)rand() << 16 | (quint8)rand() << 24; + + if (userOk) { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": User " << current->user.username << " login OK"; + sendLoginResponse(current, true); } + else { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Incorrect username/password"; + + sendLoginResponse(current, false); + } + break; + } + case (CONNINFO_SIZE): + { + conninfo_packet_t in = (conninfo_packet_t)r.constData(); + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received request for radio connection"; + // Request to start audio and civ! + current->isStreaming = true; + current->rxSeq = in->seq; + current->rxCodec = in->rxcodec; + current->txCodec = in->txcodec; + current->rxSampleRate = qFromBigEndian(in->rxsample); + current->txSampleRate = qFromBigEndian(in->txsample); + current->txBufferLen = qFromBigEndian(in->txbuffer); + current->authInnerSeq = in->innerseq; + current->identa = in->identa; + current->identb = in->identb; + sendStatus(current); + current->authInnerSeq = 0x00; + sendConnectionInfo(current); + qInfo(logUdpServer()) << current->ipAddress.toString() << ": rxCodec:" << current->rxCodec << " txCodec:" << current->txCodec << + " rxSampleRate" << current->rxSampleRate << + " txSampleRate" << current->rxSampleRate << + " txBufferLen" << current->txBufferLen; + + if (!config.lan) { + // Radio is connected by USB/Serial and we assume that audio is connected as well. + // Create audio TX/RX threads if they don't already exist (first client chooses samplerate/codec) + if (txaudio == Q_NULLPTR) + { + bool uLaw = false; + quint8 channels = 1; + quint8 samples = 8; + txSampleRate = current->txSampleRate; + txCodec = current->txCodec; + + if (current->txCodec == 0x01 || current->txCodec == 0x20) { + uLaw = true; + } + if (current->txCodec == 0x08 || current->txCodec == 0x10 || current->txCodec == 0x20) { + channels = 2; + } + if (current->txCodec == 0x04 || current->txCodec == 0x10) { + samples = 16; + } + + + txaudio = new audioHandler(); + txAudioThread = new QThread(this); + txaudio->moveToThread(txAudioThread); + + txAudioThread->start(); + + connect(this, SIGNAL(setupTxAudio(quint8, quint8, quint16, quint16, bool, bool, int, quint8)), txaudio, SLOT(init(quint8, quint8, quint16, quint16, bool, bool, int, quint8))); + connect(txAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater())); + + emit setupTxAudio(samples, channels, current->txSampleRate, current->txBufferLen, uLaw, false, config.audioOutput, config.resampleQuality); + hasTxAudio = datagram.senderAddress(); + + connect(this, SIGNAL(haveAudioData(audioPacket)), txaudio, SLOT(incomingAudio(audioPacket))); + + } + if (rxaudio == Q_NULLPTR) + { + bool uLaw = false; + quint8 channels = 1; + quint8 samples = 8; + rxSampleRate = current->rxSampleRate; + rxCodec = current->rxCodec; + + if (current->rxCodec == 0x01 || current->rxCodec == 0x20) { + uLaw = true; + } + if (current->rxCodec == 0x08 || current->rxCodec == 0x10 || current->rxCodec == 0x20) { + channels = 2; + } + if (current->rxCodec == 0x04 || current->rxCodec == 0x10) { + samples = 16; + } + + + rxaudio = new audioHandler(); + rxAudioThread = new QThread(this); + rxaudio->moveToThread(rxAudioThread); + rxAudioThread->start(); + + connect(this, SIGNAL(setupRxAudio(quint8, quint8, quint16, quint16, bool, bool, int, quint8)), rxaudio, SLOT(init(quint8, quint8, quint16, quint16, bool, bool, int, quint8))); + connect(rxAudioThread, SIGNAL(finished()), txaudio, SLOT(deleteLater())); + + emit setupRxAudio(samples, channels, current->rxSampleRate, 150, uLaw, true, config.audioInput, config.resampleQuality); + + rxAudioTimer = new QTimer(); + rxAudioTimer->setTimerType(Qt::PreciseTimer); + connect(rxAudioTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRxAudio, this)); + rxAudioTimer->start(20); + } + + } + + break; + } + default: + { + break; + } } commonReceived(&controlClients, current, r); @@ -518,66 +518,66 @@ void udpServer::civReceived() { } */ - case (PING_SIZE): + case (PING_SIZE): + { + ping_packet_t in = (ping_packet_t)r.constData(); + if (in->type == 0x07) { - ping_packet_t in = (ping_packet_t)r.constData(); - if (in->type == 0x07) + // It is a ping request/response + + if (in->reply == 0x00) { - // It is a ping request/response - - if (in->reply == 0x00) - { - current->rxPingTime = in->time; - sendPing(&civClients, current, in->seq, true); - } - else if (in->reply == 0x01) { - if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) - { - // A Reply to our ping! - if (in->seq == current->pingSeq) { - current->pingSeq++; - } - else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; - } - } - } + current->rxPingTime = in->time; + sendPing(&civClients, current, in->seq, true); } - break; - } - default: - { - - if (r.length() > 0x18) { - data_packet_t in = (data_packet_t)r.constData(); - if (in->type != 0x01) + else if (in->reply == 0x01) { + if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) { - if (quint16(in->datalen + 0x15) == (quint16)in->len) - { - // Strip all '0xFE' command preambles first: - int lastFE = r.lastIndexOf((char)0xfe); - //qInfo(logUdpServer()) << "Got:" << r.mid(lastFE); - if (current->civId == 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] > (quint8)0xdf && (quint8)r[lastFE + 2] < (quint8)0xef) { - // This is (should be) the remotes CIV id. - current->civId = (quint8)r[lastFE + 2]; - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Detected remote CI-V:" << hex << current->civId; - } - else if (current->civId != 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] != current->civId) - { - current->civId = (quint8)r[lastFE + 2]; - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Detected different remote CI-V:" << hex << current->civId; - } - - emit haveDataFromServer(r.mid(0x15)); + // A Reply to our ping! + if (in->seq == current->pingSeq) { + current->pingSeq++; } else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": Datalen mismatch " << quint16(in->datalen + 0x15) << ":" << (quint16)in->len; - + qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; } } } - //break; } + break; + } + default: + { + + if (r.length() > 0x18) { + data_packet_t in = (data_packet_t)r.constData(); + if (in->type != 0x01) + { + if (quint16(in->datalen + 0x15) == (quint16)in->len) + { + // Strip all '0xFE' command preambles first: + int lastFE = r.lastIndexOf((char)0xfe); + //qInfo(logUdpServer()) << "Got:" << r.mid(lastFE); + if (current->civId == 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] > (quint8)0xdf && (quint8)r[lastFE + 2] < (quint8)0xef) { + // This is (should be) the remotes CIV id. + current->civId = (quint8)r[lastFE + 2]; + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Detected remote CI-V:" << hex << current->civId; + } + else if (current->civId != 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] != current->civId) + { + current->civId = (quint8)r[lastFE + 2]; + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Detected different remote CI-V:" << hex << current->civId; + } + + emit haveDataFromServer(r.mid(0x15)); + } + else { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": Datalen mismatch " << quint16(in->datalen + 0x15) << ":" << (quint16)in->len; + + } + } + } + //break; + } } if (current != Q_NULLPTR) { udpServer::commonReceived(&civClients, current, r); @@ -620,7 +620,7 @@ void udpServer::audioReceived() current->remoteId = qFromLittleEndian(r.mid(8, 4)); current->socket = udpAudio; current->pingSeq = (quint8)rand() << 8 | (quint8)rand(); - + current->pingTimer = new QTimer(); connect(current->pingTimer, &QTimer::timeout, this, std::bind(&udpServer::sendPing, this, &audioClients, current, (quint16)0x00, false)); current->pingTimer->start(100); @@ -643,65 +643,65 @@ void udpServer::audioReceived() switch (r.length()) { - case (PING_SIZE): + case (PING_SIZE): + { + ping_packet_t in = (ping_packet_t)r.constData(); + if (in->type == 0x07) { - ping_packet_t in = (ping_packet_t)r.constData(); - if (in->type == 0x07) - { - // It is a ping request/response + // It is a ping request/response - if (in->reply == 0x00) + if (in->reply == 0x00) + { + current->rxPingTime = in->time; + sendPing(&audioClients, current, in->seq, true); + } + else if (in->reply == 0x01) { + if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) { - current->rxPingTime = in->time; - sendPing(&audioClients, current, in->seq, true); - } - else if (in->reply == 0x01) { - if (in->seq == current->pingSeq || in->seq == current->pingSeq - 1) - { - // A Reply to our ping! - if (in->seq == current->pingSeq) { - current->pingSeq++; - } - else { - qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; - } + // A Reply to our ping! + if (in->seq == current->pingSeq) { + current->pingSeq++; + } + else { + qInfo(logUdpServer()) << current->ipAddress.toString() << ": got out of sequence ping reply. Got: " << in->seq << " expecting: " << current->pingSeq; } } } - break; } - default: - { - /* Audio packets start as follows: - PCM 16bit and PCM8/uLAW stereo: 0x44,0x02 for first packet and 0x6c,0x05 for second. - uLAW 8bit/PCM 8bit 0xd8,0x03 for all packets - PCM 16bit stereo 0x6c,0x05 first & second 0x70,0x04 third + break; + } + default: + { + /* Audio packets start as follows: + PCM 16bit and PCM8/uLAW stereo: 0x44,0x02 for first packet and 0x6c,0x05 for second. + uLAW 8bit/PCM 8bit 0xd8,0x03 for all packets + PCM 16bit stereo 0x6c,0x05 first & second 0x70,0x04 third - */ - control_packet_t in = (control_packet_t)r.constData(); + */ + control_packet_t in = (control_packet_t)r.constData(); - if (in->type != 0x01 && in->len >= 0xAC) { - if (in->seq == 0) - { - // Seq number has rolled over. - current->seqPrefix++; - } - - if (hasTxAudio == current->ipAddress) - { - // 0xac is the smallest possible audio packet. - audioPacket tempAudio; - tempAudio.seq = (quint32)current->seqPrefix << 16 | in->seq; - tempAudio.time = QTime::currentTime();; - tempAudio.sent = 0; - tempAudio.data = r.mid(0x18); - //qInfo(logUdpServer()) << "sending tx audio " << in->seq; - emit haveAudioData(tempAudio); - } + if (in->type != 0x01 && in->len >= 0xAC) { + if (in->seq == 0) + { + // Seq number has rolled over. + current->seqPrefix++; + } + + if (hasTxAudio == current->ipAddress) + { + // 0xac is the smallest possible audio packet. + audioPacket tempAudio; + tempAudio.seq = (quint32)current->seqPrefix << 16 | in->seq; + tempAudio.time = QTime::currentTime();; + tempAudio.sent = 0; + tempAudio.data = r.mid(0x18); + //qInfo(logUdpServer()) << "sending tx audio " << in->seq; + emit haveAudioData(tempAudio); } - break; } + break; + } } if (current != Q_NULLPTR) { @@ -711,7 +711,7 @@ void udpServer::audioReceived() } -void udpServer::commonReceived(QList* l,CLIENT* current, QByteArray r) +void udpServer::commonReceived(QList* l, CLIENT* current, QByteArray r) { Q_UNUSED(l); // We might need it later! if (current == Q_NULLPTR || r.isNull()) { @@ -727,51 +727,52 @@ void udpServer::commonReceived(QList* l,CLIENT* current, QByteArray r) switch (r.length()) { - case (CONTROL_SIZE): + case (CONTROL_SIZE): + { + control_packet_t in = (control_packet_t)r.constData(); + if (in->type == 0x03) { + qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'are you there'"; + current->remoteId = in->sentid; + sendControl(current, 0x04, in->seq); + } // This is This is "Are you ready" in response to "I am here". + else if (in->type == 0x06) { - control_packet_t in = (control_packet_t)r.constData(); - if (in->type == 0x03) { - qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'are you there'"; - current->remoteId = in->sentid; - sendControl(current, 0x04, in->seq); - } // This is This is "Are you ready" in response to "I am here". - else if (in->type == 0x06) - { - qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'Are you ready'"; - current->remoteId = in->sentid; - sendControl(current, 0x06, in->seq); - if (current->idleTimer != Q_NULLPTR && !current->idleTimer->isActive()) { - current->idleTimer->start(100); - } - } // This is a retransmit request - else if (in->type == 0x01) - { - // Single packet request - qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'retransmit' request for " << in->seq; - - auto match = std::find_if(current->txSeqBuf.begin(), current->txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) { - return s.seqNum == cs; - }); - - if (match != current->txSeqBuf.end() && match->retransmitCount < 5) { - // Found matching entry? - // Don't constantly retransmit the same packet, give-up eventually - qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum; - match->retransmitCount++; - udpMutex.lock(); - current->socket->writeDatagram(match->data, current->ipAddress, current->port); - udpMutex.unlock(); - } else { - // Just send an idle! - sendControl(current, 0x00, in->seq); - } + qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'Are you ready'"; + current->remoteId = in->sentid; + sendControl(current, 0x06, in->seq); + if (current->idleTimer != Q_NULLPTR && !current->idleTimer->isActive()) { + current->idleTimer->start(100); } - break; - } - default: + } // This is a retransmit request + else if (in->type == 0x01) { - //break; + // Single packet request + qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'retransmit' request for " << in->seq; + + auto match = std::find_if(current->txSeqBuf.begin(), current->txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) { + return s.seqNum == cs; + }); + + if (match != current->txSeqBuf.end() && match->retransmitCount < 5) { + // Found matching entry? + // Don't constantly retransmit the same packet, give-up eventually + qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum; + match->retransmitCount++; + udpMutex.lock(); + current->socket->writeDatagram(match->data, current->ipAddress, current->port); + udpMutex.unlock(); + } + else { + // Just send an idle! + sendControl(current, 0x00, in->seq); + } } + break; + } + default: + { + //break; + } } // The packet is at least 0x10 in length so safe to cast it to control_packet for processing @@ -888,7 +889,7 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq) -void udpServer::sendPing(QList *l,CLIENT* c, quint16 seq, bool reply) +void udpServer::sendPing(QList* l, CLIENT* c, quint16 seq, bool reply) { // Also use to detect "stale" connections QDateTime now = QDateTime::currentDateTime(); @@ -962,7 +963,7 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed) c->wdTimer->stop(); } else { - strcpy(p.connection,"WFVIEW"); + strcpy(p.connection, "WFVIEW"); } c->txMutex.lock(); @@ -1063,7 +1064,7 @@ void udpServer::sendCapabilities(CLIENT* c) // I still don't know what these are? p.enableb = 0x01; // 0x01 doesn't seem to do anything? p.enablec = 0x01; // 0x01 doesn't seem to do anything? - p.capf = 0x5001; + p.capf = 0x5001; p.capg = 0x0190; c->txMutex.lock(); @@ -1182,34 +1183,35 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type) void udpServer::watchdog(CLIENT* c) { - c->txMutex.lock(); - //qInfo(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length(); - // Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these? - if (!c->txSeqBuf.isEmpty()) - { - c->txSeqBuf.erase(std::remove_if(c->txSeqBuf.begin(), c->txSeqBuf.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end()); - } - c->txMutex.unlock(); - - // Erase old entries from the missing packets buffer - c->missMutex.lock(); - if (!c->rxMissing.isEmpty()) { - c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end()); - } - c->missMutex.unlock(); - - c->rxMutex.lock(); - if (!c->rxSeqBuf.isEmpty()) { - std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end()); - - if (c->rxSeqBuf.length() > 400) + if (c->txMutex.tryLock()) { + //qInfo(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length(); + // Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these? + if (!c->txSeqBuf.isEmpty()) { - c->rxSeqBuf.remove(0, 200); + c->txSeqBuf.erase(std::remove_if(c->txSeqBuf.begin(), c->txSeqBuf.end(), [](const SEQBUFENTRY& v) + { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end()); } + c->txMutex.unlock(); + } + // Erase old entries from the missing packets buffer + if (c->missMutex.tryLock()) { + if (!c->rxMissing.isEmpty()) { + c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v) + { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end()); + } + c->missMutex.unlock(); + } + if (c->rxMutex.tryLock()) { + if (!c->rxSeqBuf.isEmpty()) { + std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end()); + + if (c->rxSeqBuf.length() > 400) + { + c->rxSeqBuf.remove(0, 200); + } + } + c->rxMutex.unlock(); } - c->rxMutex.unlock(); } void udpServer::sendStatus(CLIENT* c) @@ -1234,8 +1236,8 @@ void udpServer::sendStatus(CLIENT* c) p.identa = c->identa; p.identb = c->identb; - p.civport=qToBigEndian(c->civPort); - p.audioport=qToBigEndian(c->audioPort); + p.civport = qToBigEndian(c->civPort); + p.audioport = qToBigEndian(c->audioPort); // Send this to reject the request to tx/rx audio/civ //memcpy(p + 0x30, QByteArrayLiteral("\xff\xff\xff\xfe").constData(), 4); @@ -1264,7 +1266,7 @@ void udpServer::dataForServer(QByteArray d) foreach(CLIENT * client, civClients) { int lastFE = d.lastIndexOf((quint8)0xfe); - if (client != Q_NULLPTR && client->connected && d.length() > lastFE+2 && ((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId)) { + if (client != Q_NULLPTR && client->connected && d.length() > lastFE + 2 && ((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId)) { data_packet p; memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00! p.len = (quint16)d.length() + sizeof(p); @@ -1314,7 +1316,7 @@ void udpServer::sendRxAudio() -void udpServer::receiveAudioData(const audioPacket &d) +void udpServer::receiveAudioData(const audioPacket& d) { //qInfo(logUdpServer()) << "Server got:" << d.data.length(); foreach(CLIENT * client, audioClients) @@ -1339,10 +1341,13 @@ void udpServer::receiveAudioData(const audioPacket &d) client->txSeqBuf.last().data = t; client->txMutex.unlock(); - udpMutex.lock(); - client->socket->writeDatagram(t, client->ipAddress, client->port); - udpMutex.unlock(); - + if (udpMutex.tryLock()) { + client->socket->writeDatagram(t, client->ipAddress, client->port); + udpMutex.unlock(); + } + else { + qDebug(logUdpServer()) << "Failed to lock udpMutex()"; + } client->txSeq++; client->sendAudioSeq++; } @@ -1356,7 +1361,7 @@ void udpServer::receiveAudioData(const audioPacket &d) /// This will run every 100ms so out-of-sequence packets will not trigger a retransmit request. /// /// -void udpServer::sendRetransmitRequest(CLIENT *c) +void udpServer::sendRetransmitRequest(CLIENT* c) { c->missMutex.lock(); @@ -1377,7 +1382,7 @@ void udpServer::sendRetransmitRequest(CLIENT *c) if (s == c->rxMissing.end()) { // We haven't seen this missing packet before - qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len="<< c->rxMissing.length() << "): " << j; + qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len=" << c->rxMissing.length() << "): " << j; c->rxMissing.append(SEQBUFENTRY()); c->rxMissing.last().seqNum = j; c->rxMissing.last().retransmitCount = 0; @@ -1457,7 +1462,7 @@ void udpServer::sendRetransmitRequest(CLIENT *c) /// /// /// -void udpServer::deleteConnection(QList *l, CLIENT* c) +void udpServer::deleteConnection(QList* l, CLIENT* c) { connMutex.lock(); @@ -1515,7 +1520,7 @@ void udpServer::deleteConnection(QList *l, CLIENT* c) txaudio = Q_NULLPTR; txAudioThread = Q_NULLPTR; } - + } connMutex.unlock();