Improve missing packet handling

merge-requests/9/merge
Phil Taylor 2022-01-27 19:11:16 +00:00
rodzic 4a1be30c40
commit 2f4fe061b3
5 zmienionych plików z 389 dodań i 409 usunięć

Wyświetl plik

@ -14,7 +14,7 @@
#define RETRANSMIT_PERIOD 100 // How often to attempt retransmit #define RETRANSMIT_PERIOD 100 // How often to attempt retransmit
#define LOCK_PERIOD 10 // How long to try to lock mutex (ms) #define LOCK_PERIOD 10 // How long to try to lock mutex (ms)
#define STALE_CONNECTION 15 // Not heard from in this many seconds #define STALE_CONNECTION 15 // Not heard from in this many seconds
#define BUFSIZE 50 // Number of packets to buffer #define BUFSIZE 500 // Number of packets to buffer
#define TXAUDIO_PERIOD 20 #define TXAUDIO_PERIOD 20

Wyświetl plik

@ -48,14 +48,16 @@ servermain::servermain(const QString serialPortCL, const QString hostCL, const Q
servermain::~servermain() servermain::~servermain()
{ {
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (radio.rigThread != Q_NULLPTR) if (radio->rigThread != Q_NULLPTR)
{ {
radio.rigThread->quit(); radio->rigThread->quit();
radio.rigThread->wait(); radio->rigThread->wait();
} }
delete radio; // This has been created by new in loadSettings();
} }
serverConfig.rigs.clear();
if (serverThread != Q_NULLPTR) { if (serverThread != Q_NULLPTR) {
serverThread->quit(); serverThread->quit();
serverThread->wait(); serverThread->wait();
@ -79,26 +81,14 @@ void servermain::openRig()
makeRig(); makeRig();
if( (prefs.serialPortRadio.toLower() == QString("auto")) && (serialPortCL.isEmpty())) for (RIGCONFIG* radio : serverConfig.rigs)
{
findSerialPort();
} else {
if(serialPortCL.isEmpty())
{
serialPortRig = prefs.serialPortRadio;
} else {
serialPortRig = serialPortCL;
}
}
for (RIGCONFIG& radio : serverConfig.rigs)
{ {
//qInfo(logSystem()) << "Opening rig"; //qInfo(logSystem()) << "Opening rig";
if (radio.rigThread != Q_NULLPTR) if (radio->rigThread != Q_NULLPTR)
{ {
//qInfo(logSystem()) << "Got rig"; //qInfo(logSystem()) << "Got rig";
QMetaObject::invokeMethod(radio.rig, [=]() { QMetaObject::invokeMethod(radio->rig, [=]() {
radio.rig->commSetup(radio.civAddr, radio.serialPort, radio.baudRate, QString("none")); radio->rig->commSetup(radio->civAddr, radio->serialPort, radio->baudRate, QString("none"));
}, Qt::QueuedConnection); }, Qt::QueuedConnection);
} }
} }
@ -106,49 +96,49 @@ void servermain::openRig()
void servermain::makeRig() void servermain::makeRig()
{ {
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (radio.rigThread == Q_NULLPTR) if (radio->rigThread == Q_NULLPTR)
{ {
qInfo(logSystem()) << "Creating new rigThread()"; qInfo(logSystem()) << "Creating new rigThread()";
radio.rig = new rigCommander(radio.guid); radio->rig = new rigCommander(radio->guid);
radio.rigThread = new QThread(this); radio->rigThread = new QThread(this);
// Thread: // Thread:
radio.rig->moveToThread(radio.rigThread); radio->rig->moveToThread(radio->rigThread);
connect(radio.rigThread, SIGNAL(started()), radio.rig, SLOT(process())); connect(radio->rigThread, SIGNAL(started()), radio->rig, SLOT(process()));
connect(radio.rigThread, SIGNAL(finished()), radio.rig, SLOT(deleteLater())); connect(radio->rigThread, SIGNAL(finished()), radio->rig, SLOT(deleteLater()));
radio.rigThread->start(); radio->rigThread->start();
// Rig status and Errors: // Rig status and Errors:
connect(radio.rig, SIGNAL(haveSerialPortError(QString, QString)), this, SLOT(receiveSerialPortError(QString, QString))); connect(radio->rig, SIGNAL(haveSerialPortError(QString, QString)), this, SLOT(receiveSerialPortError(QString, QString)));
connect(radio.rig, SIGNAL(haveStatusUpdate(networkStatus)), this, SLOT(receiveStatusUpdate(networkStatus))); connect(radio->rig, SIGNAL(haveStatusUpdate(networkStatus)), this, SLOT(receiveStatusUpdate(networkStatus)));
// Rig comm setup: // Rig comm setup:
//connect(this, SIGNAL(sendCommSetup(unsigned char, udpPreferences, audioSetup, audioSetup, QString)), radio.rig, SLOT(commSetup(unsigned char, udpPreferences, audioSetup, audioSetup, QString))); //connect(this, SIGNAL(sendCommSetup(unsigned char, udpPreferences, audioSetup, audioSetup, QString)), radio->rig, SLOT(commSetup(unsigned char, udpPreferences, audioSetup, audioSetup, QString)));
//connect(this, SIGNAL(sendCommSetup(unsigned char, QString, quint32, QString)), radio.rig, SLOT(commSetup(unsigned char, QString, quint32, QString))); //connect(this, SIGNAL(sendCommSetup(unsigned char, QString, quint32, QString)), radio->rig, SLOT(commSetup(unsigned char, QString, quint32, QString)));
connect(this, SIGNAL(setRTSforPTT(bool)), radio.rig, SLOT(setRTSforPTT(bool))); connect(this, SIGNAL(setRTSforPTT(bool)), radio->rig, SLOT(setRTSforPTT(bool)));
connect(radio.rig, SIGNAL(haveBaudRate(quint32)), this, SLOT(receiveBaudRate(quint32))); connect(radio->rig, SIGNAL(haveBaudRate(quint32)), this, SLOT(receiveBaudRate(quint32)));
connect(this, SIGNAL(sendCloseComm()), radio.rig, SLOT(closeComm())); connect(this, SIGNAL(sendCloseComm()), radio->rig, SLOT(closeComm()));
connect(this, SIGNAL(sendChangeLatency(quint16)), radio.rig, SLOT(changeLatency(quint16))); connect(this, SIGNAL(sendChangeLatency(quint16)), radio->rig, SLOT(changeLatency(quint16)));
//connect(this, SIGNAL(getRigCIV()), radio.rig, SLOT(findRigs())); //connect(this, SIGNAL(getRigCIV()), radio->rig, SLOT(findRigs()));
//connect(this, SIGNAL(setRigID(unsigned char)), radio.rig, SLOT(setRigID(unsigned char))); //connect(this, SIGNAL(setRigID(unsigned char)), radio->rig, SLOT(setRigID(unsigned char)));
connect(radio.rig, SIGNAL(discoveredRigID(rigCapabilities)), this, SLOT(receiveFoundRigID(rigCapabilities))); connect(radio->rig, SIGNAL(discoveredRigID(rigCapabilities)), this, SLOT(receiveFoundRigID(rigCapabilities)));
connect(radio.rig, SIGNAL(commReady()), this, SLOT(receiveCommReady())); connect(radio->rig, SIGNAL(commReady()), this, SLOT(receiveCommReady()));
connect(this, SIGNAL(requestRigState()), radio.rig, SLOT(sendState())); connect(this, SIGNAL(requestRigState()), radio->rig, SLOT(sendState()));
connect(this, SIGNAL(stateUpdated()), radio.rig, SLOT(stateUpdated())); connect(this, SIGNAL(stateUpdated()), radio->rig, SLOT(stateUpdated()));
connect(radio.rig, SIGNAL(stateInfo(rigstate*)), this, SLOT(receiveStateInfo(rigstate*))); connect(radio->rig, SIGNAL(stateInfo(rigstate*)), this, SLOT(receiveStateInfo(rigstate*)));
//Other connections //Other connections
connect(this, SIGNAL(setCIVAddr(unsigned char)), radio.rig, SLOT(setCIVAddr(unsigned char))); connect(this, SIGNAL(setCIVAddr(unsigned char)), radio->rig, SLOT(setCIVAddr(unsigned char)));
connect(radio.rig, SIGNAL(havePTTStatus(bool)), this, SLOT(receivePTTstatus(bool))); connect(radio->rig, SIGNAL(havePTTStatus(bool)), this, SLOT(receivePTTstatus(bool)));
connect(this, SIGNAL(setPTT(bool)), radio.rig, SLOT(setPTT(bool))); connect(this, SIGNAL(setPTT(bool)), radio->rig, SLOT(setPTT(bool)));
connect(this, SIGNAL(getPTT()), radio.rig, SLOT(getPTT())); connect(this, SIGNAL(getPTT()), radio->rig, SLOT(getPTT()));
connect(this, SIGNAL(getDebug()), radio.rig, SLOT(getDebug())); connect(this, SIGNAL(getDebug()), radio->rig, SLOT(getDebug()));
if (radio.rigThread->isRunning()) { if (radio->rigThread->isRunning()) {
qInfo(logSystem()) << "Rig thread is running"; qInfo(logSystem()) << "Rig thread is running";
} }
else { else {
@ -163,15 +153,15 @@ void servermain::makeRig()
void servermain::removeRig() void servermain::removeRig()
{ {
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (radio.rigThread != Q_NULLPTR) if (radio->rigThread != Q_NULLPTR)
{ {
radio.rigThread->disconnect(); radio->rigThread->disconnect();
radio.rig->disconnect(); radio->rig->disconnect();
delete radio.rigThread; delete radio->rigThread;
delete radio.rig; delete radio->rig;
radio.rig = Q_NULLPTR; radio->rig = Q_NULLPTR;
} }
} }
} }
@ -242,28 +232,28 @@ void servermain::receiveCommReady()
// Use the GUID to determine which radio the response is from // Use the GUID to determine which radio the response is from
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (sender != Q_NULLPTR && radio.rig != Q_NULLPTR && !memcmp(sender->getGUID(), radio.guid, sizeof(radio.guid))) if (sender != Q_NULLPTR && radio->rig != Q_NULLPTR && !memcmp(sender->getGUID(), radio->guid, sizeof(radio->guid)))
{ {
qInfo(logSystem()) << "Received CommReady!! "; qInfo(logSystem()) << "Received CommReady!! ";
if (radio.civAddr == 0) if (radio->civAddr == 0)
{ {
// tell rigCommander to broadcast a request for all rig IDs. // tell rigCommander to broadcast a request for all rig IDs.
// qInfo(logSystem()) << "Beginning search from wfview for rigCIV (auto-detection broadcast)"; // qInfo(logSystem()) << "Beginning search from wfview for rigCIV (auto-detection broadcast)";
if (!radio.rigAvailable) { if (!radio->rigAvailable) {
QMetaObject::invokeMethod(radio.rig, [=]() { QMetaObject::invokeMethod(radio->rig, [=]() {
radio.rig->findRigs(); radio->rig->findRigs();
}, Qt::QueuedConnection); }, Qt::QueuedConnection);
} }
} }
else { else {
// don't bother, they told us the CIV they want, stick with it. // don't bother, they told us the CIV they want, stick with it.
// We still query the rigID to find the model, but at least we know the CIV. // We still query the rigID to find the model, but at least we know the CIV.
qInfo(logSystem()) << "Skipping automatic CIV, using user-supplied value of " << radio.civAddr; qInfo(logSystem()) << "Skipping automatic CIV, using user-supplied value of " << radio->civAddr;
QMetaObject::invokeMethod(radio.rig, [=]() { QMetaObject::invokeMethod(radio->rig, [=]() {
radio.rig->setRigID(radio.civAddr); radio->rig->setRigID(radio->civAddr);
}, Qt::QueuedConnection); }, Qt::QueuedConnection);
} }
} }
@ -279,10 +269,10 @@ void servermain::receiveFoundRigID(rigCapabilities rigCaps)
rigCommander* sender = qobject_cast<rigCommander*>(QObject::sender()); rigCommander* sender = qobject_cast<rigCommander*>(QObject::sender());
// Use the GUID to determine which radio the response is from // Use the GUID to determine which radio the response is from
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (sender != Q_NULLPTR && radio.rig != Q_NULLPTR && !radio.rigAvailable && !memcmp(sender->getGUID(), radio.guid, sizeof(radio.guid))) if (sender != Q_NULLPTR && radio->rig != Q_NULLPTR && !radio->rigAvailable && !memcmp(sender->getGUID(), radio->guid, sizeof(radio->guid)))
{ {
qDebug(logSystem()) << "Rig name: " << rigCaps.modelName; qDebug(logSystem()) << "Rig name: " << rigCaps.modelName;
@ -309,7 +299,7 @@ void servermain::receiveFoundRigID(rigCapabilities rigCaps)
.arg(rigCaps.guid[14], 2, 16, QLatin1Char('0')) .arg(rigCaps.guid[14], 2, 16, QLatin1Char('0'))
.arg(rigCaps.guid[15], 2, 16, QLatin1Char('0')) .arg(rigCaps.guid[15], 2, 16, QLatin1Char('0'))
; ;
radio.rigCaps = rigCaps; radio->rigCaps = rigCaps;
// Added so that server receives rig capabilities. // Added so that server receives rig capabilities.
emit sendRigCaps(rigCaps); emit sendRigCaps(rigCaps);
} }
@ -372,7 +362,7 @@ void servermain::setServerToPrefs()
udp = Q_NULLPTR; udp = Q_NULLPTR;
} }
udp = new udpServer(&serverConfig, serverTxSetup, serverRxSetup); udp = new udpServer(serverConfig, serverTxSetup, serverRxSetup);
serverThread = new QThread(this); serverThread = new QThread(this);
@ -385,15 +375,15 @@ void servermain::setServerToPrefs()
// Step through all radios and connect them to the server, // Step through all radios and connect them to the server,
// server will then use GUID to determine which actual radio it belongs to. // server will then use GUID to determine which actual radio it belongs to.
for (RIGCONFIG& radio : serverConfig.rigs) for (RIGCONFIG* radio : serverConfig.rigs)
{ {
if (radio.rigThread != Q_NULLPTR) if (radio->rigThread != Q_NULLPTR)
{ {
if (radio.rig != Q_NULLPTR) { if (radio->rig != Q_NULLPTR) {
connect(radio.rig, SIGNAL(haveAudioData(audioPacket)), udp, SLOT(receiveAudioData(audioPacket))); connect(radio->rig, SIGNAL(haveAudioData(audioPacket)), udp, SLOT(receiveAudioData(audioPacket)));
connect(radio.rig, SIGNAL(haveDataForServer(QByteArray)), udp, SLOT(dataForServer(QByteArray))); connect(radio->rig, SIGNAL(haveDataForServer(QByteArray)), udp, SLOT(dataForServer(QByteArray)));
//connect(udp, SIGNAL(haveDataFromServer(QByteArray)), radio.rig, SLOT(dataFromServer(QByteArray))); //connect(udp, SIGNAL(haveDataFromServer(QByteArray)), radio->rig, SLOT(dataFromServer(QByteArray)));
connect(this, SIGNAL(sendRigCaps(rigCapabilities)), udp, SLOT(receiveRigCaps(rigCapabilities))); connect(this, SIGNAL(sendRigCaps(rigCapabilities)), udp, SLOT(receiveRigCaps(rigCapabilities)));
} }
} }
@ -508,21 +498,21 @@ void servermain::loadSettings()
else { else {
settings->setArrayIndex(i); settings->setArrayIndex(i);
} }
RIGCONFIG tempPrefs; RIGCONFIG* tempPrefs = new RIGCONFIG();
tempPrefs.civAddr = (unsigned char)settings->value("RigCIVuInt", defPrefs.radioCIVAddr).toInt(); tempPrefs->civAddr = (unsigned char)settings->value("RigCIVuInt", defPrefs.radioCIVAddr).toInt();
tempPrefs.forceRTSasPTT = (bool)settings->value("ForceRTSasPTT", defPrefs.forceRTSasPTT).toBool(); tempPrefs->forceRTSasPTT = (bool)settings->value("ForceRTSasPTT", defPrefs.forceRTSasPTT).toBool();
tempPrefs.serialPort = settings->value("SerialPortRadio", defPrefs.serialPortRadio).toString(); tempPrefs->serialPort = settings->value("SerialPortRadio", defPrefs.serialPortRadio).toString();
tempPrefs.rigName = settings->value("RigName", "<NONE>").toString(); tempPrefs->rigName = settings->value("RigName", "<NONE>").toString();
tempPrefs.baudRate = (quint32)settings->value("SerialPortBaud", defPrefs.serialPortBaud).toInt(); tempPrefs->baudRate = (quint32)settings->value("SerialPortBaud", defPrefs.serialPortBaud).toInt();
if (tempPrefs.rigName=="<NONE>") if (tempPrefs->rigName=="<NONE>")
{ {
foreach(const QSerialPortInfo & serialPortInfo, QSerialPortInfo::availablePorts()) foreach(const QSerialPortInfo & serialPortInfo, QSerialPortInfo::availablePorts())
{ {
//qInfo(logSystem()) << "Serial Port found: " << serialPortInfo.portName() << "Manufacturer:" << serialPortInfo.manufacturer() << "Product ID" << serialPortInfo.description() << "S/N" << serialPortInfo.serialNumber(); //qInfo(logSystem()) << "Serial Port found: " << serialPortInfo.portName() << "Manufacturer:" << serialPortInfo.manufacturer() << "Product ID" << serialPortInfo.description() << "S/N" << serialPortInfo.serialNumber();
if (serialPortInfo.portName() == tempPrefs.serialPort && !serialPortInfo.serialNumber().isEmpty()) if (serialPortInfo.portName() == tempPrefs->serialPort && !serialPortInfo.serialNumber().isEmpty())
{ {
tempPrefs.rigName = serialPortInfo.serialNumber(); tempPrefs->rigName = serialPortInfo.serialNumber();
} }
} }
} }
@ -531,30 +521,33 @@ void servermain::loadSettings()
guid = QUuid::createUuid().toString(); guid = QUuid::createUuid().toString();
settings->setValue("GUID", guid); settings->setValue("GUID", guid);
} }
memcpy(tempPrefs.guid, QUuid::fromString(guid).toRfc4122().constData(), sizeof(tempPrefs.guid)); memcpy(tempPrefs->guid, QUuid::fromString(guid).toRfc4122().constData(), sizeof(tempPrefs->guid));
tempPrefs.rxAudioSetup.isinput = true; tempPrefs->rxAudioSetup.isinput = true;
tempPrefs.txAudioSetup.isinput = false; tempPrefs->txAudioSetup.isinput = false;
tempPrefs.rxAudioSetup.localAFgain = 255; tempPrefs->rxAudioSetup.localAFgain = 255;
tempPrefs.txAudioSetup.localAFgain = 255; tempPrefs->txAudioSetup.localAFgain = 255;
tempPrefs.rxAudioSetup.resampleQuality = 4; tempPrefs->rxAudioSetup.resampleQuality = 4;
tempPrefs.txAudioSetup.resampleQuality = 4; tempPrefs->txAudioSetup.resampleQuality = 4;
tempPrefs.rxAudioSetup.name = settings->value("AudioInput", "").toString();
tempPrefs.txAudioSetup.name = settings->value("AudioOutput", "").toString();
tempPrefs->rxAudioSetup.name = settings->value("AudioInput", "").toString();
tempPrefs->txAudioSetup.name = settings->value("AudioOutput", "").toString();
bool rxDeviceFound = false;
bool txDeviceFound = false;
// Find the actual audio devices // Find the actual audio devices
#if defined(RTAUDIO) #if defined(RTAUDIO)
for (unsigned int i = 1; i < devices; i++) { for (unsigned int i = 1; i < devices; i++) {
info = audio->getDeviceInfo(i); info = audio->getDeviceInfo(i);
if (info.outputChannels > 0) { if (info.outputChannels > 0) {
if (tempPrefs.txAudio.name == info->name) { if (tempPrefs->txAudio.name == info->name) {
tempPrefs.txAudio.port = i; tempPrefs->txAudio.port = i;
txDeviceFound = true;
} }
} }
if (info.inputChannels > 0) { if (info.inputChannels > 0) {
if (tempPrefs.rxAudio.name == info->name) { if (tempPrefs->rxAudio.name == info->name) {
tempPrefs.rxAudio.port = i; tempPrefs->rxAudio.port = i;
rxDeviceFound = true;
} }
} }
} }
@ -563,14 +556,16 @@ void servermain::loadSettings()
{ {
info = Pa_GetDeviceInfo(i); info = Pa_GetDeviceInfo(i);
if (info->maxInputChannels > 0) { if (info->maxInputChannels > 0) {
if (tempPrefs.txAudio.name == info->name) { if (tempPrefs->txAudio.name == info->name) {
tempPrefs.txAudio.port = i; tempPrefs->txAudio.port = i;
txDeviceFound = true;
} }
} }
if (info->maxOutputChannels > 0) { if (info->maxOutputChannels > 0) {
if (tempPrefs.rxAudio.name == info->name) { if (tempPrefs->rxAudio.name == info->name) {
tempPrefs.rxAudio.port = i; tempPrefs->rxAudio.port = i;
} rxDeviceFound = true;
}
} }
} }
#else #else
@ -581,20 +576,29 @@ void servermain::loadSettings()
//qInfo(logAudio()) << "Looking for audio output devices"; //qInfo(logAudio()) << "Looking for audio output devices";
for (const QAudioDeviceInfo& deviceInfo : audioOutputs) { for (const QAudioDeviceInfo& deviceInfo : audioOutputs) {
if (deviceInfo.deviceName() == tempPrefs.txAudioSetup.name) { if (deviceInfo.deviceName() == tempPrefs->txAudioSetup.name) {
tempPrefs.txAudioSetup.port = deviceInfo; tempPrefs->txAudioSetup.port = deviceInfo;
txDeviceFound = true;
} }
} }
//qInfo(logAudio()) << "Looking for audio input devices"; //qInfo(logAudio()) << "Looking for audio input devices";
for (const QAudioDeviceInfo& deviceInfo : audioInputs) { for (const QAudioDeviceInfo& deviceInfo : audioInputs) {
if (deviceInfo.deviceName() == tempPrefs.rxAudioSetup.name) { if (deviceInfo.deviceName() == tempPrefs->rxAudioSetup.name) {
tempPrefs.rxAudioSetup.port = deviceInfo; tempPrefs->rxAudioSetup.port = deviceInfo;
rxDeviceFound = true;
} }
} }
#endif #endif
tempPrefs.rig = Q_NULLPTR;
tempPrefs.rigThread = Q_NULLPTR; if (!txDeviceFound) {
qInfo() << "Cannot find txAudioDevice" << tempPrefs->txAudioSetup.name;
}
if (!rxDeviceFound) {
qInfo() << "Cannot find rxAudioDevice" << tempPrefs->rxAudioSetup.name;
}
tempPrefs->rig = Q_NULLPTR;
tempPrefs->rigThread = Q_NULLPTR;
serverConfig.rigs.append(tempPrefs); serverConfig.rigs.append(tempPrefs);
if (tempNum == 0) { if (tempNum == 0) {
settings->endGroup(); settings->endGroup();

Wyświetl plik

@ -1246,7 +1246,6 @@ void udpBase::dataReceived(QByteArray r)
} }
else else
{ {
//std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
if (in->seq < rxSeqBuf.firstKey()) if (in->seq < rxSeqBuf.firstKey())
{ {
qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.lastKey() << " current: " << hex << in->seq; qInfo(logUdp()) << this->metaObject()->className() << ": ******* seq number has rolled over ****** previous highest: " << hex << rxSeqBuf.lastKey() << " current: " << hex << in->seq;
@ -1261,10 +1260,43 @@ void udpBase::dataReceived(QByteArray r)
if (!rxSeqBuf.contains(in->seq)) if (!rxSeqBuf.contains(in->seq))
{ {
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it. // Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
rxSeqBuf.insert(in->seq, QTime::currentTime());
if (rxSeqBuf.size() > BUFSIZE)
{ if (in->seq > rxSeqBuf.lastKey() + 1) {
rxSeqBuf.erase(rxSeqBuf.begin()); // We are likely missing packets then!
missingMutex.lock();
int missCounter = 0;
for (quint16 f = rxSeqBuf.lastKey() + 1; f < in->seq; f++)
{
if (missCounter > 50) {
// More than 50 packets missing, something horrific has happened!
qDebug(logUdp()) << "Too many missing packets, full reset!";
rxSeqBuf.clear();
rxMissing.clear();
missingMutex.unlock();
break;
}
qDebug(logUdp()) << "Detected missing packet" << f;
if (rxSeqBuf.size() > BUFSIZE)
{
rxSeqBuf.erase(rxSeqBuf.begin());
}
rxSeqBuf.insert(f, QTime::currentTime());
if (!rxMissing.contains(f))
{
rxMissing.insert(f, 0);
}
}
missingMutex.unlock();
}
else {
if (rxSeqBuf.size() > BUFSIZE)
{
rxSeqBuf.erase(rxSeqBuf.begin());
}
rxSeqBuf.insert(in->seq, QTime::currentTime());
} }
} }
else { else {

Wyświetl plik

@ -4,8 +4,8 @@
#define STALE_CONNECTION 15 #define STALE_CONNECTION 15
#define LOCK_PERIOD 10 // time to attempt to lock Mutex in ms #define LOCK_PERIOD 10 // time to attempt to lock Mutex in ms
#define AUDIO_SEND_PERIOD 20 // #define AUDIO_SEND_PERIOD 20 //
udpServer::udpServer(SERVERCONFIG* config, audioSetup outAudio, audioSetup inAudio) : udpServer::udpServer(SERVERCONFIG& config, audioSetup outAudio, audioSetup inAudio) :
config(*config), config(config),
outAudio(outAudio), outAudio(outAudio),
inAudio(inAudio) inAudio(inAudio)
{ {
@ -15,29 +15,29 @@ udpServer::udpServer(SERVERCONFIG* config, audioSetup outAudio, audioSetup inAud
void udpServer::init() void udpServer::init()
{ {
for (RIGCONFIG rig : config.rigs) for (RIGCONFIG* rig : config.rigs)
{ {
qDebug(logUdpServer()) << "CIV:" << rig.civAddr; qDebug(logUdpServer()) << "CIV:" << rig->civAddr;
qDebug(logUdpServer()) << "Model:" << rig.modelName; qDebug(logUdpServer()) << "Model:" << rig->modelName;
qDebug(logUdpServer()) << "Name:" << rig.rigName; qDebug(logUdpServer()) << "Name:" << rig->rigName;
qDebug(logUdpServer()) << "CIV:" << rig.civAddr; qDebug(logUdpServer()) << "CIV:" << rig->civAddr;
qDebug(logUdpServer()).noquote() << QString("GUID: {%1%2%3%4-%5%6-%7%8-%9%10-%11%12%13%14%15%16}") qDebug(logUdpServer()).noquote() << QString("GUID: {%1%2%3%4-%5%6-%7%8-%9%10-%11%12%13%14%15%16}")
.arg(rig.guid[0], 2, 16, QLatin1Char('0')) .arg(rig->guid[0], 2, 16, QLatin1Char('0'))
.arg(rig.guid[1], 2, 16, QLatin1Char('0')) .arg(rig->guid[1], 2, 16, QLatin1Char('0'))
.arg(rig.guid[2], 2, 16, QLatin1Char('0')) .arg(rig->guid[2], 2, 16, QLatin1Char('0'))
.arg(rig.guid[3], 2, 16, QLatin1Char('0')) .arg(rig->guid[3], 2, 16, QLatin1Char('0'))
.arg(rig.guid[4], 2, 16, QLatin1Char('0')) .arg(rig->guid[4], 2, 16, QLatin1Char('0'))
.arg(rig.guid[5], 2, 16, QLatin1Char('0')) .arg(rig->guid[5], 2, 16, QLatin1Char('0'))
.arg(rig.guid[6], 2, 16, QLatin1Char('0')) .arg(rig->guid[6], 2, 16, QLatin1Char('0'))
.arg(rig.guid[7], 2, 16, QLatin1Char('0')) .arg(rig->guid[7], 2, 16, QLatin1Char('0'))
.arg(rig.guid[8], 2, 16, QLatin1Char('0')) .arg(rig->guid[8], 2, 16, QLatin1Char('0'))
.arg(rig.guid[9], 2, 16, QLatin1Char('0')) .arg(rig->guid[9], 2, 16, QLatin1Char('0'))
.arg(rig.guid[10], 2, 16, QLatin1Char('0')) .arg(rig->guid[10], 2, 16, QLatin1Char('0'))
.arg(rig.guid[11], 2, 16, QLatin1Char('0')) .arg(rig->guid[11], 2, 16, QLatin1Char('0'))
.arg(rig.guid[12], 2, 16, QLatin1Char('0')) .arg(rig->guid[12], 2, 16, QLatin1Char('0'))
.arg(rig.guid[13], 2, 16, QLatin1Char('0')) .arg(rig->guid[13], 2, 16, QLatin1Char('0'))
.arg(rig.guid[14], 2, 16, QLatin1Char('0')) .arg(rig->guid[14], 2, 16, QLatin1Char('0'))
.arg(rig.guid[15], 2, 16, QLatin1Char('0')) .arg(rig->guid[15], 2, 16, QLatin1Char('0'))
; ;
} }
srand(time(NULL)); // Generate random key srand(time(NULL)); // Generate random key
@ -85,13 +85,6 @@ void udpServer::init()
udpAudio->bind(config.audioPort); udpAudio->bind(config.audioPort);
QUdpSocket::connect(udpAudio, &QUdpSocket::readyRead, this, &udpServer::audioReceived); QUdpSocket::connect(udpAudio, &QUdpSocket::readyRead, this, &udpServer::audioReceived);
#if !defined(PORTAUDIO) && !defined(RTAUDIO)
qInfo(logUdpServer()) << "Server audio input (RX):" << inAudio.port.deviceName();
qInfo(logUdpServer()) << "Server audio output (TX):" << outAudio.port.deviceName();
#else
qInfo(logUdpServer()) << "Server audio input (RX):" << inAudio.name;
qInfo(logUdpServer()) << "Server audio output (TX):" << outAudio.name;
#endif
wdTimer = new QTimer(); wdTimer = new QTimer();
connect(wdTimer, &QTimer::timeout, this, &udpServer::watchdog); connect(wdTimer, &QTimer::timeout, this, &udpServer::watchdog);
wdTimer->start(500); wdTimer->start(500);
@ -137,14 +130,14 @@ udpServer::~udpServer()
void udpServer::receiveRigCaps(rigCapabilities caps) void udpServer::receiveRigCaps(rigCapabilities caps)
{ {
for (RIGCONFIG &rig: config.rigs) { for (RIGCONFIG* rig: config.rigs) {
if (!memcmp(rig.guid, caps.guid, sizeof(rig.guid))) { if (!memcmp(rig->guid, caps.guid, sizeof(rig->guid))) {
// Matching rig, fill-in missing details // Matching rig, fill-in missing details
rig.rigAvailable = true; rig->rigAvailable = true;
rig.modelName = caps.modelName; rig->modelName = caps.modelName;
rig.civAddr = caps.civ; rig->civAddr = caps.civ;
if (rig.rigName=="<NONE>") { if (rig->rigName=="<NONE>") {
rig.rigName = caps.modelName; rig->rigName = caps.modelName;
} }
} }
} }
@ -267,8 +260,8 @@ void udpServer::controlReceived()
// Request for new token // Request for new token
qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received create token request"; qInfo(logUdpServer()) << current->ipAddress.toString() << ": Received create token request";
sendCapabilities(current); sendCapabilities(current);
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
sendConnectionInfo(current, radio.guid); sendConnectionInfo(current, radio->guid);
} }
} }
else if (in->res == 0x01) { else if (in->res == 0x01) {
@ -280,10 +273,10 @@ void udpServer::controlReceived()
// Disconnect audio/civ // Disconnect audio/civ
sendTokenResponse(current, in->res); sendTokenResponse(current, in->res);
current->isStreaming = false; current->isStreaming = false;
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
if (!memcmp(radio.guid, current->guid, sizeof(radio.guid))) if (!memcmp(radio->guid, current->guid, sizeof(radio->guid)))
{ {
sendConnectionInfo(current, radio.guid); sendConnectionInfo(current, radio->guid);
} }
} }
} }
@ -350,68 +343,71 @@ void udpServer::controlReceived()
" txSampleRate" << current->txSampleRate << " txSampleRate" << current->txSampleRate <<
" txBufferLen" << current->txBufferLen; " txBufferLen" << current->txBufferLen;
if (!config.lan) {
// Radio is connected by USB/Serial and we assume that audio is connected as well.
// Create audio TX/RX threads if they don't already exist (first client chooses samplerate/codec)
audioSetup setup; audioSetup setup;
setup.resampleQuality = config.resampleQuality; setup.resampleQuality = config.resampleQuality;
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
if (!memcmp(radio.guid, current->guid, sizeof(radio.guid)) && radio.txaudio == Q_NULLPTR) if (!memcmp(radio->guid, current->guid, sizeof(radio->guid)) && radio->txaudio == Q_NULLPTR)
{ {
radio.txAudioSetup.codec = current->txCodec; radio->txAudioSetup.codec = current->txCodec;
radio.txAudioSetup.samplerate = current->txSampleRate; radio->txAudioSetup.samplerate = current->txSampleRate;
radio.txAudioSetup.isinput = false; radio->txAudioSetup.isinput = false;
radio.txAudioSetup.latency = current->txBufferLen; radio->txAudioSetup.latency = current->txBufferLen;
outAudio.isinput = false; outAudio.isinput = false;
radio.txaudio = new audioHandler(); radio->txaudio = new audioHandler();
radio.txAudioThread = new QThread(this); radio->txAudioThread = new QThread(this);
radio.txaudio->moveToThread(radio.txAudioThread); radio->txaudio->moveToThread(radio->txAudioThread);
radio.txAudioThread->start(QThread::TimeCriticalPriority); radio->txAudioThread->start(QThread::TimeCriticalPriority);
//connect(this, SIGNAL(setupTxAudio(audioSetup)), txaudio, SLOT(init(audioSetup))); //connect(this, SIGNAL(setupTxAudio(audioSetup)), txaudio, SLOT(init(audioSetup)));
connect(radio.txAudioThread, SIGNAL(finished()), radio.txaudio, SLOT(deleteLater())); connect(radio->txAudioThread, SIGNAL(finished()), radio->txaudio, SLOT(deleteLater()));
QMetaObject::invokeMethod(radio.txaudio, [=]() { QMetaObject::invokeMethod(radio->txaudio, [=]() {
radio.txaudio->init(radio.txAudioSetup); radio->txaudio->init(radio->txAudioSetup);
}, Qt::QueuedConnection); }, Qt::QueuedConnection);
emit setupTxAudio(outAudio); emit setupTxAudio(outAudio);
hasTxAudio = datagram.senderAddress(); hasTxAudio = datagram.senderAddress();
connect(this, SIGNAL(haveAudioData(audioPacket)), radio.txaudio, SLOT(incomingAudio(audioPacket))); connect(this, SIGNAL(haveAudioData(audioPacket)), radio->txaudio, SLOT(incomingAudio(audioPacket)));
} }
if (!memcmp(radio.guid, current->guid, sizeof(radio.guid)) && radio.rxaudio == Q_NULLPTR) if (!memcmp(radio->guid, current->guid, sizeof(radio->guid)) && radio->rxaudio == Q_NULLPTR)
{ {
radio.rxAudioSetup.codec = current->rxCodec; #if !defined(PORTAUDIO) && !defined(RTAUDIO)
radio.rxAudioSetup.samplerate = current->rxSampleRate; qInfo(logUdpServer()) << "Radio" << radio->rigName << "audio input(RX) :" << radio->rxAudioSetup.port.deviceName();
radio.rxAudioSetup.latency = current->txBufferLen; qInfo(logUdpServer()) << "Radio" << radio->rigName << "audio output(TX) :" << radio->txAudioSetup.port.deviceName();
radio.rxAudioSetup.isinput = true; #else
qInfo(logUdpServer()) << "Server audio input (RX):" << inAudio.name;
qInfo(logUdpServer()) << "Server audio output (TX):" << outAudio.name;
#endif
radio.rxaudio = new audioHandler(); radio->rxAudioSetup.codec = current->rxCodec;
radio->rxAudioSetup.samplerate = current->rxSampleRate;
radio->rxAudioSetup.latency = current->txBufferLen;
radio->rxAudioSetup.isinput = true;
radio.rxAudioThread = new QThread(this); radio->rxaudio = new audioHandler();
radio.rxaudio->moveToThread(radio.rxAudioThread); radio->rxAudioThread = new QThread(this);
radio.rxAudioThread->start(QThread::TimeCriticalPriority); radio->rxaudio->moveToThread(radio->rxAudioThread);
//connect(this, SIGNAL(setupRxAudio(audioSetup)), rxaudio, SLOT(init(audioSetup))); radio->rxAudioThread->start(QThread::TimeCriticalPriority);
connect(radio.rxAudioThread, SIGNAL(finished()), radio.rxaudio, SLOT(deleteLater()));
QMetaObject::invokeMethod(radio.rxaudio, [=]() { connect(radio->rxAudioThread, SIGNAL(finished()), radio->rxaudio, SLOT(deleteLater()));
radio.rxaudio->init(radio.rxAudioSetup);
}, Qt::QueuedConnection);
radio.rxAudioTimer = new QTimer(); QMetaObject::invokeMethod(radio->rxaudio, [=]() {
radio.rxAudioTimer->setTimerType(Qt::PreciseTimer); radio->rxaudio->init(radio->rxAudioSetup);
connect(radio.rxAudioTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRxAudio, this)); }, Qt::QueuedConnection);
radio.rxAudioTimer->start(TXAUDIO_PERIOD);
} radio->rxAudioTimer = new QTimer();
radio->rxAudioTimer->setTimerType(Qt::PreciseTimer);
connect(radio->rxAudioTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRxAudio, this));
radio->rxAudioTimer->start(TXAUDIO_PERIOD);
} }
} }
@ -568,12 +564,12 @@ void udpServer::civReceived()
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Detected invalid remote CI-V:" << hex << (quint8)r[lastFE+2]; qDebug(logUdpServer()) << current->ipAddress.toString() << ": Detected invalid remote CI-V:" << hex << (quint8)r[lastFE+2];
} }
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
if (!memcmp(radio.guid, current->guid, sizeof(radio.guid))) if (!memcmp(radio->guid, current->guid, sizeof(radio->guid)))
{ {
// Only send to the rig that it belongs to! // Only send to the rig that it belongs to!
QMetaObject::invokeMethod(radio.rig, [=]() { QMetaObject::invokeMethod(radio->rig, [=]() {
radio.rig->dataFromServer(r.mid(0x15));; radio->rig->dataFromServer(r.mid(0x15));;
}, Qt::DirectConnection); }, Qt::DirectConnection);
} }
} }
@ -886,14 +882,53 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
if (!current->rxSeqBuf.contains(in->seq)) if (!current->rxSeqBuf.contains(in->seq))
{ {
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
if (current->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD))) if (current->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
if (current->rxSeqBuf.size() > BUFSIZE) // Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
{ int missCounter = 0;
current->rxSeqBuf.remove(current->rxSeqBuf.firstKey()); if (in->seq > current->rxSeqBuf.lastKey() + 1) {
// We are likely missing packets then!
if (current->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
for (quint16 f = current->rxSeqBuf.lastKey() + 1; f < in->seq; f++)
{
if (missCounter > 50) {
// More than 50 packets missing, something horrific has happened!
qDebug(logUdpServer()) << "Too many missing packets, full reset!";
current->rxSeqBuf.clear();
current->rxMissing.clear();
current->missMutex.unlock();
break;
}
qInfo(logUdpServer()) << "Detected missing packet" << f;
if (current->rxSeqBuf.size() > BUFSIZE)
{
current->rxSeqBuf.remove(current->rxSeqBuf.firstKey());
}
current->rxSeqBuf.insert(f, QTime::currentTime());
if (!current->rxMissing.contains(f))
{
current->rxMissing.insert(f, 0);
}
}
current->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
}
else {
if (current->rxSeqBuf.size() > BUFSIZE)
{
current->rxSeqBuf.remove(current->rxSeqBuf.firstKey());
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
} }
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
current->rxMutex.unlock(); current->rxMutex.unlock();
} }
else { else {
@ -944,6 +979,11 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD))) if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
if (c->txSeqBuf.size() > BUFSIZE)
{
c->txSeqBuf.remove(c->txSeqBuf.firstKey());
}
c->txSeqBuf.insert(seq, s); c->txSeqBuf.insert(seq, s);
c->txSeq++; c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
@ -1069,6 +1109,11 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD))) if (c->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
if (c->txSeqBuf.size() > BUFSIZE)
{
c->txSeqBuf.remove(c->txSeqBuf.firstKey());
}
c->txSeqBuf.insert(c->txSeq, s); c->txSeqBuf.insert(c->txSeq, s);
c->txSeq++; c->txSeq++;
c->txMutex.unlock(); c->txMutex.unlock();
@ -1111,24 +1156,24 @@ void udpServer::sendCapabilities(CLIENT* c)
s.timeSent = QTime::currentTime(); s.timeSent = QTime::currentTime();
s.retransmitCount = 0; s.retransmitCount = 0;
for (RIGCONFIG &rig : config.rigs) { for (RIGCONFIG* rig : config.rigs) {
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Capabilities :" << c->txSeq << "for" << rig.modelName; qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Capabilities :" << c->txSeq << "for" << rig->modelName;
radio_cap_packet r; radio_cap_packet r;
memset(r.packet, 0x0, sizeof(r)); // We can't be sure it is initialized with 0x00! memset(r.packet, 0x0, sizeof(r)); // We can't be sure it is initialized with 0x00!
memcpy(r.guid, rig.guid, sizeof(r.guid)); memcpy(r.guid, rig->guid, sizeof(r.guid));
memcpy(r.name, rig.rigName.toLocal8Bit(), sizeof(r.name)); memcpy(r.name, rig->rigName.toLocal8Bit(), sizeof(r.name));
memcpy(r.audio, QByteArrayLiteral("ICOM_VAUDIO").constData(), 11); memcpy(r.audio, QByteArrayLiteral("ICOM_VAUDIO").constData(), 11);
if (rig.hasWiFi && !rig.hasEthernet) { if (rig->hasWiFi && !rig->hasEthernet) {
r.conntype = 0x0707; // 0x0707 for wifi rig. r.conntype = 0x0707; // 0x0707 for wifi rig->
} }
else { else {
r.conntype = 0x073f; // 0x073f for ethernet rig. r.conntype = 0x073f; // 0x073f for ethernet rig->
} }
r.civ = rig.civAddr; r.civ = rig->civAddr;
r.baudrate = (quint32)qToBigEndian(rig.baudRate); r.baudrate = (quint32)qToBigEndian(rig->baudRate);
/* /*
0x80 = 12K only 0x80 = 12K only
0x40 = 44.1K only 0x40 = 44.1K only
@ -1139,7 +1184,7 @@ void udpServer::sendCapabilities(CLIENT* c)
0x02 = 16K only 0x02 = 16K only
0x01 = 8K only 0x01 = 8K only
*/ */
if (rig.rxaudio == Q_NULLPTR) { if (rig->rxaudio == Q_NULLPTR) {
r.rxsample = 0x8b01; // all rx sample frequencies supported r.rxsample = 0x8b01; // all rx sample frequencies supported
} }
else { else {
@ -1160,7 +1205,7 @@ void udpServer::sendCapabilities(CLIENT* c)
} }
} }
if (rig.txaudio == Q_NULLPTR) { if (rig->txaudio == Q_NULLPTR) {
r.txsample = 0x8b01; // all tx sample frequencies supported r.txsample = 0x8b01; // all tx sample frequencies supported
r.enablea = 0x01; // 0x01 enables TX 24K mode? r.enablea = 0x01; // 0x01 enables TX 24K mode?
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Client will have TX audio"; qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Client will have TX audio";
@ -1217,8 +1262,8 @@ void udpServer::sendCapabilities(CLIENT* c)
// Also used to display currently connected used information. // Also used to display currently connected used information.
void udpServer::sendConnectionInfo(CLIENT* c, quint8 guid[16]) void udpServer::sendConnectionInfo(CLIENT* c, quint8 guid[16])
{ {
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
if (!memcmp(guid, radio.guid, sizeof(guid))) if (!memcmp(guid, radio->guid, sizeof(guid)))
{ {
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending ConnectionInfo :" << c->txSeq; qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending ConnectionInfo :" << c->txSeq;
conninfo_packet p; conninfo_packet p;
@ -1232,10 +1277,10 @@ void udpServer::sendConnectionInfo(CLIENT* c, quint8 guid[16])
p.tokrequest = c->tokenRx; p.tokrequest = c->tokenRx;
p.token = c->tokenTx; p.token = c->tokenTx;
p.code = 0x0380; p.code = 0x0380;
memcpy(p.guid, radio.guid, sizeof(p.guid)); memcpy(p.guid, radio->guid, sizeof(p.guid));
memcpy(p.name, radio.rigName.toLocal8Bit(), sizeof(p.name)); memcpy(p.name, radio->rigName.toLocal8Bit(), sizeof(p.name));
if (radio.rigAvailable) { if (radio->rigAvailable) {
if (c->isStreaming) { if (c->isStreaming) {
p.busy = 0x01; p.busy = 0x01;
} }
@ -1465,7 +1510,6 @@ void udpServer::sendStatus(CLIENT* c)
void udpServer::dataForServer(QByteArray d) void udpServer::dataForServer(QByteArray d)
{ {
rigCommander* sender = qobject_cast<rigCommander*>(QObject::sender()); rigCommander* sender = qobject_cast<rigCommander*>(QObject::sender());
if (sender == Q_NULLPTR) if (sender == Q_NULLPTR)
@ -1479,67 +1523,63 @@ void udpServer::dataForServer(QByteArray d)
{ {
continue; continue;
} }
for (RIGCONFIG& radio : config.rigs) // Use the GUID to determine which radio the response is from
if (memcmp(sender->getGUID(), client->guid, sizeof(client->guid)))
{ {
continue; // Rig guid doesn't match the one requested by the client.
if (memcmp(radio.guid, client->guid, sizeof(radio.guid))) }
int lastFE = d.lastIndexOf((quint8)0xfe);
//qInfo(logUdpServer()) << "Server got CIV data from" << radio->rigName << "length" << d.length();
if (client->connected && d.length() > lastFE + 2 &&
((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId ||
(quint8)d[lastFE + 1] == 0x00 || (quint8)d[lastFE + 2] == 0x00 || (quint8)d[lastFE + 1] == 0xE1 || (quint8)d[lastFE + 2] == 0xE1))
{
data_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = (quint16)d.length() + sizeof(p);
p.seq = client->txSeq;
p.sentid = client->myId;
p.rcvdid = client->remoteId;
p.reply = (char)0xc1;
p.datalen = (quint16)d.length();
p.sendseq = client->innerSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d);
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = t;
if (client->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
continue; if (client->txSeqBuf.size() > BUFSIZE)
}
int lastFE = d.lastIndexOf((quint8)0xfe);
// Use the GUID to determine which radio the response is from
qInfo(logUdpServer()) << "Server got CIV data from" << radio.rigName << "length" << d.length();
if (client->connected && d.length() > lastFE + 2 &&
((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId ||
(quint8)d[lastFE + 1] == 0x00 || (quint8)d[lastFE + 2] == 0x00 || (quint8)d[lastFE + 1] == 0xE1 || (quint8)d[lastFE + 2] == 0xE1))
{
data_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = (quint16)d.length() + sizeof(p);
p.seq = client->txSeq;
p.sentid = client->myId;
p.rcvdid = client->remoteId;
p.reply = (char)0xc1;
p.datalen = (quint16)d.length();
p.sendseq = client->innerSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d);
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = t;
if (client->txMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
if (client->txSeqBuf.size() > BUFSIZE) client->txSeqBuf.remove(client->txSeqBuf.firstKey());
{
client->txSeqBuf.remove(client->txSeqBuf.firstKey());
}
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->innerSeq++;
client->txMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock txMutex()";
}
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
} }
client->txSeqBuf.insert(p.seq, s);
client->txSeq++;
client->innerSeq++;
client->txMutex.unlock();
} }
else { else {
qInfo(logUdpServer()) << "Got data for different ID" << hex << (quint8)d[lastFE + 1] << ":" << hex << (quint8)d[lastFE + 2]; qInfo(logUdpServer()) << "Unable to lock txMutex()";
} }
if (udpMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock udpMutex()";
}
}
else {
qInfo(logUdpServer()) << "Got data for different ID" << hex << (quint8)d[lastFE + 1] << ":" << hex << (quint8)d[lastFE + 2];
} }
} }
return; return;
@ -1548,13 +1588,13 @@ void udpServer::dataForServer(QByteArray d)
void udpServer::sendRxAudio() void udpServer::sendRxAudio()
{ {
QByteArray audio; QByteArray audio;
for (RIGCONFIG &rig : config.rigs) { for (RIGCONFIG* rig : config.rigs) {
if (rig.rxaudio != Q_NULLPTR) { if (rig->rxaudio != Q_NULLPTR) {
if (audioMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD))) if (audioMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
audio.clear(); audio.clear();
rig.rxaudio->getNextAudioChunk(audio); rig->rxaudio->getNextAudioChunk(audio);
int len = 0; int len = 0;
while (len < audio.length()) { while (len < audio.length()) {
audioPacket partial; audioPacket partial;
@ -1641,122 +1681,26 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
QByteArray missingSeqs; QByteArray missingSeqs;
QTime missingTime = QTime::currentTime(); QTime missingTime = QTime::currentTime();
if (!c->rxSeqBuf.empty() && c->rxSeqBuf.size() <= c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey())
{
if ((c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size()) > 20)
{
// Too many packets to process, flush buffers and start again!
qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxMissing.clear();
c->missMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
if (c->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
c->rxSeqBuf.clear();
c->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
}
else {
// We have at least 1 missing packet!
qDebug(logUdp()) << "Missing Seq: size=" << c->rxSeqBuf.size() << "firstKey=" << c->rxSeqBuf.firstKey() << "lastKey=" << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
// We are missing packets so iterate through the buffer and add the missing ones to missing packet list
if (c->rxMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{
int missCounter = 0;
auto i = std::adjacent_find(c->rxSeqBuf.keys().begin(), c->rxSeqBuf.keys().end(), [](int l, int r) {return l + 1 < r; });
while (i != c->rxSeqBuf.keys().end())
{
quint16 j = 1 + *i;
++i;
if (i == c->rxSeqBuf.keys().end())
{
continue;
}
if (c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() == 0 && c->type == "AUDIO" &&
(c->txCodec == 0x40 || c->txCodec == 0x80))
{
// Single missing audio packet ignore it!
qDebug(logUdpServer()) << "Single missing audio packet will be handled by FEC (" << hex << j << ")";
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer so it doesn't try to retransmit
c->missMutex.unlock();
c->rxMutex.unlock();
return;
}
missCounter++;
if (missCounter > 20) {
// More than 20 packets missing, something horrific has happened!
qDebug(logUdpServer()) << ": Too many missing packets, clearing buffer";
c->rxSeqBuf.clear();
c->rxMissing.clear();
c->missMutex.unlock();
c->rxMutex.unlock();
return;
}
auto s = c->rxMissing.find(j);
if (s == c->rxMissing.end())
{
// We haven't seen this missing packet before
qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j << dec << missingTime.msecsTo(QTime::currentTime()) << "ms";
c->rxMissing.insert(j, 0);
if (c->rxSeqBuf.size() > BUFSIZE)
{
c->rxSeqBuf.remove(c->rxSeqBuf.firstKey());
}
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
}
else {
if (s.value() == 4)
{
// We have tried 4 times to request this packet, time to give up!
s = c->rxMissing.erase(s);
}
}
}
}
else {
qInfo(logUdpServer()) << "Unable to lock missMutex()";
}
c->rxMutex.unlock();
}
else {
qInfo(logUdpServer()) << "Unable to lock rxMutex()";
}
c->missMutex.unlock();
}
}
if (missingTime.msecsTo(QTime::currentTime()) > 10) {
qInfo(logUdpServer()) << "Initial missing processing has been running for" << missingTime.msecsTo(QTime::currentTime()) << "(ms)";
}
if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD))) if (c->missMutex.try_lock_for(std::chrono::milliseconds(LOCK_PERIOD)))
{ {
for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it) for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it)
{ {
if (it.value() < 10) if (it.value() < 4)
{ {
missingSeqs.append(it.key() & 0xff); missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff); missingSeqs.append(it.key() >> 8 & 0xff);
missingSeqs.append(it.key() & 0xff); missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff); missingSeqs.append(it.key() >> 8 & 0xff);
it.value()++; it.value()++;
}
else {
// We have tried 4 times to request this packet, time to give up!
qDebug(logUdp()) << this->metaObject()->className() << ": No response for missing packet" << it.key() << "deleting";
it = c->rxMissing.erase(it);
} }
} }
if (missingSeqs.length() != 0) if (missingSeqs.length() != 0)
@ -1889,27 +1833,27 @@ void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
} }
if (len == 0) { if (len == 0) {
for (RIGCONFIG& radio : config.rigs) { for (RIGCONFIG* radio : config.rigs) {
if (!memcmp(radio.guid, guid, sizeof(radio.guid))) if (!memcmp(radio->guid, guid, sizeof(radio->guid)))
{ {
if (radio.rxAudioTimer != Q_NULLPTR) { if (radio->rxAudioTimer != Q_NULLPTR) {
radio.rxAudioTimer->stop(); radio->rxAudioTimer->stop();
delete radio.rxAudioTimer; delete radio->rxAudioTimer;
radio.rxAudioTimer = Q_NULLPTR; radio->rxAudioTimer = Q_NULLPTR;
} }
if (radio.rxAudioThread != Q_NULLPTR) { if (radio->rxAudioThread != Q_NULLPTR) {
radio.rxAudioThread->quit(); radio->rxAudioThread->quit();
radio.rxAudioThread->wait(); radio->rxAudioThread->wait();
radio.rxaudio = Q_NULLPTR; radio->rxaudio = Q_NULLPTR;
radio.rxAudioThread = Q_NULLPTR; radio->rxAudioThread = Q_NULLPTR;
} }
if (radio.txAudioThread != Q_NULLPTR) { if (radio->txAudioThread != Q_NULLPTR) {
radio.txAudioThread->quit(); radio->txAudioThread->quit();
radio.txAudioThread->wait(); radio->txAudioThread->wait();
radio.txaudio = Q_NULLPTR; radio->txaudio = Q_NULLPTR;
radio.txAudioThread = Q_NULLPTR; radio->txAudioThread = Q_NULLPTR;
} }
} }
} }

Wyświetl plik

@ -79,7 +79,7 @@ struct SERVERCONFIG {
quint8 resampleQuality; quint8 resampleQuality;
quint32 baudRate; quint32 baudRate;
QList <SERVERUSER> users; QList <SERVERUSER> users;
QList <RIGCONFIG> rigs; QList <RIGCONFIG*> rigs;
}; };
@ -88,7 +88,7 @@ class udpServer : public QObject
Q_OBJECT Q_OBJECT
public: public:
udpServer(SERVERCONFIG* config, audioSetup outAudio, audioSetup inAudio); udpServer(SERVERCONFIG& config, audioSetup outAudio, audioSetup inAudio);
~udpServer(); ~udpServer();
public slots: public slots: