kopia lustrzana https://gitlab.com/eliggett/wfview
382 wiersze
13 KiB
C++
382 wiersze
13 KiB
C++
/*
|
|
* This is a singleton queue (only 1 instance can be created)
|
|
*
|
|
* Although subclassing QThread is often not recommended, I think that this is
|
|
* a valid use case for subclassing.
|
|
*
|
|
*/
|
|
#include "logcategories.h"
|
|
#include "cachingqueue.h"
|
|
|
|
cachingQueue* cachingQueue::instance{};
|
|
QMutex cachingQueue::instanceMutex;
|
|
|
|
cachingQueue *cachingQueue::getInstance(QObject* parent)
|
|
{
|
|
QMutexLocker locker(&instanceMutex);
|
|
if (instance == Q_NULLPTR)
|
|
{
|
|
instance = new cachingQueue(parent);
|
|
instance->setObjectName(("Command Queue"));
|
|
connect (instance, SIGNAL(finished()),instance, SLOT(deleteLater()));
|
|
instance->start(QThread::HighPriority);
|
|
}
|
|
qInfo() << "Returning instance of cachingQueue() to calling process:" << ((parent != Q_NULLPTR) ? parent->objectName(): "<unknown>");
|
|
return instance;
|
|
}
|
|
|
|
cachingQueue::~cachingQueue()
|
|
{
|
|
aborted = true;
|
|
waiting.wakeOne(); // wake the thread and signal it to exit.
|
|
qInfo() << "Destroying caching queue (parent closing)";
|
|
}
|
|
|
|
void cachingQueue::run()
|
|
{
|
|
// Setup queue.
|
|
qInfo() << "Starting caching queue handler thread (ThreadId:" << QThread::currentThreadId() << ")";
|
|
|
|
QElapsedTimer timer;
|
|
timer.start();
|
|
|
|
QDeadlineTimer deadline(queueInterval);
|
|
QMutexLocker locker(&mutex);
|
|
quint64 counter=0; // Will run for many years!
|
|
while (!aborted)
|
|
{
|
|
if (!waiting.wait(&mutex, deadline.remainingTime()))
|
|
{
|
|
// Time to process the queue - mutex is locked
|
|
queuePriority prio = priorityImmediate;
|
|
|
|
// priorityNone=0, priorityImmediate=1, priorityHighest=2, priorityHigh=3, priorityMediumHigh=5, priorityMedium=7, priorityMediumLow=11, priorityLow=19, priorityLowest=23
|
|
|
|
// If no immediate commands then process the rest of the queue
|
|
if (!queue.contains(prio)) {
|
|
if (counter % priorityHighest == 0)
|
|
prio = priorityHighest;
|
|
else if (counter % priorityHigh == 0)
|
|
prio = priorityHigh;
|
|
else if (counter % priorityMediumHigh == 0)
|
|
prio = priorityMediumHigh;
|
|
else if (counter % priorityMedium == 0)
|
|
prio = priorityMedium;
|
|
else if (counter % priorityMediumLow == 0)
|
|
prio = priorityMediumLow;
|
|
else if (counter % priorityLow == 0)
|
|
prio = priorityLow;
|
|
else if (counter % priorityLowest == 0)
|
|
prio = priorityLowest;
|
|
}
|
|
counter++;
|
|
|
|
|
|
auto it = queue.find(prio);
|
|
if (it != queue.end())
|
|
{
|
|
while (it != queue.end() && it.key() == prio)
|
|
{
|
|
it++;
|
|
}
|
|
it--;
|
|
auto item = it.value();
|
|
emit haveCommand(item.command,item.param,item.sub);
|
|
it=queue.erase(it);
|
|
if (item.recurring && prio != priorityImmediate) {
|
|
queue.insert(prio,item);
|
|
}
|
|
updateCache(false,item.command,item.param,item.sub);
|
|
}
|
|
|
|
QCoreApplication::processEvents();
|
|
deadline.setRemainingTime(queueInterval); // reset the deadline to the poll frequency
|
|
|
|
}
|
|
else if (!aborted) {
|
|
// Mutex is locked
|
|
while (!items.isEmpty()) {
|
|
emit sendValue(items.dequeue());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void cachingQueue::interval(quint64 val)
|
|
{
|
|
this->queueInterval = val;
|
|
waiting.wakeOne();
|
|
qInfo() << "Changing queue interval to" << val << "ms";
|
|
}
|
|
|
|
void cachingQueue::add(queuePriority prio ,funcs func, bool recurring, bool sub)
|
|
{
|
|
queueItem q(func,recurring,sub);
|
|
add(prio,q);
|
|
}
|
|
|
|
void cachingQueue::add(queuePriority prio ,queueItem item)
|
|
{
|
|
if (item.command != funcNone)
|
|
{
|
|
QMutexLocker locker(&mutex);
|
|
if (!item.recurring || isRecurring(item.command) != prio)
|
|
{
|
|
if (item.recurring && prio == queuePriority::priorityImmediate) {
|
|
qWarning() << "Warning, cannot add recurring command with immediate priority!" << funcString[item.command];
|
|
} else {
|
|
if (item.recurring) {
|
|
// also insert an immediate command to get the current value "now" (removes the need to get rigstate)
|
|
queueItem it=item;
|
|
it.recurring=false;
|
|
queue.insert(queue.cend(),priorityHighest, it);
|
|
qInfo() << "adding" << funcString[item.command] << "recurring" << item.recurring << "priority" << prio << "sub" << item.sub;
|
|
}
|
|
queue.insert(prio, item);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void cachingQueue::addUnique(queuePriority prio ,funcs func, bool recurring, bool sub)
|
|
{
|
|
queueItem q(func,recurring, sub);
|
|
addUnique(prio,q);
|
|
}
|
|
|
|
|
|
|
|
void cachingQueue::addUnique(queuePriority prio ,queueItem item)
|
|
{
|
|
if (item.command != funcNone)
|
|
{
|
|
QMutexLocker locker(&mutex);
|
|
if (item.recurring && prio == queuePriority::priorityImmediate) {
|
|
qWarning() << "Warning, cannot add unique recurring command with immediate priority!" << funcString[item.command];
|
|
} else {
|
|
auto it(queue.begin());
|
|
// This is quite slow but a new unique command is only added in response to user interaction (mode change etc.)
|
|
while (it != queue.end()) {
|
|
if (it.value().command == item.command && it.value().recurring == item.recurring && it.value().sub == item.sub && it.value().param.isValid() == item.param.isValid())
|
|
{
|
|
//qInfo() << "deleting" << it.value().id << funcString[it.value().command] << "sub" << it.value().sub << "recurring" << it.value().recurring ;
|
|
it = queue.erase(it);
|
|
}
|
|
else
|
|
{
|
|
it++;
|
|
}
|
|
}
|
|
if (item.recurring) {
|
|
// also insert an immediate command to get the current value "now" (removes the need to get initial rigstate)
|
|
queueItem it = item;
|
|
it.recurring=false;
|
|
queue.insert(queue.cend(),priorityHighest, it);
|
|
qInfo() << "adding unique" << funcString[item.command] << "recurring" << item.recurring << "priority" << prio << "sub" << item.sub;
|
|
}
|
|
queue.insert(prio, item);
|
|
}
|
|
}
|
|
}
|
|
|
|
void cachingQueue::del(funcs func, bool sub)
|
|
{
|
|
// This will immediately delete any matching commands.
|
|
if (func != funcNone)
|
|
{
|
|
QMutexLocker locker(&mutex);
|
|
auto it = std::find_if(queue.begin(), queue.end(), [func,sub](const queueItem& c) { return (c.command == func && c.sub == sub && c.recurring); });
|
|
//auto it(queue.begin());
|
|
if (it == queue.end())
|
|
qInfo() << "recurring command" << funcString[func] << "sub" << sub << "not found in queue";
|
|
while (it != queue.end()) {
|
|
if (it.value().command == func && it.value().sub == sub) {
|
|
//qInfo() << "deleting" << funcString[it.value().command] << "sub" << it.value().sub << "recurring" << it.value().recurring;
|
|
it = queue.erase(it);
|
|
}
|
|
else
|
|
{
|
|
it++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
queuePriority cachingQueue::isRecurring(funcs func, bool sub)
|
|
{
|
|
// Does NOT lock the mutex
|
|
auto rec = std::find_if(queue.begin(), queue.end(), [func,sub](const queueItem& c) { return (c.command == func && c.sub == sub && c.recurring); });
|
|
if (rec != queue.end())
|
|
{
|
|
return rec.key();
|
|
}
|
|
return queuePriority::priorityNone;
|
|
}
|
|
|
|
void cachingQueue::clear()
|
|
{
|
|
QMutexLocker locker(&mutex);
|
|
queue.clear();
|
|
}
|
|
void cachingQueue::message(QString msg)
|
|
{
|
|
qInfo() << "Received:" << msg;
|
|
waiting.wakeOne();
|
|
}
|
|
|
|
void cachingQueue::receiveValue(funcs func, QVariant value, bool sub)
|
|
{
|
|
QMutexLocker locker(&mutex);
|
|
cacheItem c = cacheItem(func,value,sub);
|
|
items.enqueue(c);
|
|
updateCache(true,func,value,sub);
|
|
waiting.wakeOne();
|
|
}
|
|
|
|
|
|
void cachingQueue::updateCache(bool reply, queueItem item)
|
|
{
|
|
// Mutex MUST be locked by the calling function.
|
|
auto cv = cache.find(item.command);
|
|
while (cv != cache.end() && cv->command == item.command) {
|
|
if (cv->sub == item.sub) {
|
|
if (reply) {
|
|
cv->reply = QDateTime::currentDateTime();
|
|
} else {
|
|
cv->req = QDateTime::currentDateTime();
|
|
}
|
|
// If we are sending an actual value, update the cache with it
|
|
// Value will be replaced if invalid on next get()
|
|
|
|
if (compare(item.param,cv.value().value))
|
|
{
|
|
cv->value = item.param;
|
|
emit cacheUpdated(cv.value());
|
|
}
|
|
return;
|
|
// We have found (and updated) a matching item so return
|
|
}
|
|
++cv;
|
|
}
|
|
|
|
cacheItem c;
|
|
c.command = item.command;
|
|
c.sub = item.sub;
|
|
if (reply) {
|
|
c.reply = QDateTime::currentDateTime();
|
|
} else {
|
|
c.req = QDateTime::currentDateTime();
|
|
}
|
|
// If we are sending an actual value, update the cache with it
|
|
// Value will be replaced if invalid on next get()
|
|
if (item.param.isValid())
|
|
c.value = item.param;
|
|
cache.insert(item.command,c);
|
|
}
|
|
|
|
|
|
void cachingQueue::updateCache(bool reply, funcs func, QVariant value, bool sub)
|
|
{
|
|
queueItem q(func,value,false,sub);
|
|
updateCache(reply,q);
|
|
}
|
|
|
|
|
|
cacheItem cachingQueue::getCache(funcs func, bool sub)
|
|
{
|
|
cacheItem ret;
|
|
if (func != funcNone) {
|
|
QMutexLocker locker(&mutex);
|
|
auto it = cache.find(func);
|
|
while (it != cache.end() && it->command == func)
|
|
{
|
|
if (it->sub == sub)
|
|
ret = cacheItem(*it);
|
|
++it;
|
|
}
|
|
}
|
|
// If the cache is more than 5-20 seconds old, re-request it as it may be stale (maybe make this a config option?)
|
|
// Using priorityhighest WILL slow down the S-Meter when a command intensive client is connected to rigctl
|
|
if (func != funcNone && (!ret.value.isValid() || ret.reply.addSecs(QRandomGenerator::global()->bounded(5,20)) <= QDateTime::currentDateTime())) {
|
|
//qInfo() << "No (or expired) cache found for" << funcString[func] << "requesting";
|
|
add(priorityHighest,func,false,sub);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
//Calling function MUST call unlockMutex() once finished with data
|
|
QMultiMap<funcs,cacheItem> cachingQueue::getCacheItems()
|
|
{
|
|
mutex.lock();
|
|
return cache;
|
|
}
|
|
|
|
//Calling function MUST call unlockMutex() once finished with data
|
|
QMultiMap <queuePriority,queueItem> cachingQueue::getQueueItems()
|
|
{
|
|
mutex.lock();
|
|
return queue;
|
|
}
|
|
|
|
bool cachingQueue::compare(QVariant a, QVariant b)
|
|
{
|
|
bool changed = false;
|
|
if (a.isValid() && b.isValid()) {
|
|
// Compare the details
|
|
if (!strcmp(a.typeName(),"bool")){
|
|
if (a.value<bool>() != b.value<bool>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"QString")) {
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"uchar")) {
|
|
if (a.value<uchar>() != b.value<uchar>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"ushort")) {
|
|
if (a.value<ushort>() != b.value<ushort>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"short")) {
|
|
if (a.value<short>() != a.value<short>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"uint")) {
|
|
if (a.value<uint>() != b.value<uint>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"int")) {
|
|
if (a.value<int>() != b.value<int>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"modeInfo")) {
|
|
if (a.value<modeInfo>().mk != b.value<modeInfo>().mk || a.value<modeInfo>().reg != b.value<modeInfo>().reg
|
|
|| a.value<modeInfo>().filter != b.value<modeInfo>().filter) {
|
|
changed=true;
|
|
}
|
|
} else if(!strcmp(a.typeName(),"freqt")) {
|
|
if (a.value<freqt>().Hz != b.value<freqt>().Hz)
|
|
changed=true;
|
|
} else if(!strcmp(a.typeName(),"antennaInfo")) {
|
|
if (a.value<antennaInfo>().antenna != b.value<antennaInfo>().antenna || a.value<antennaInfo>().rx != b.value<antennaInfo>().rx)
|
|
changed=true;
|
|
} else if(!strcmp(a.typeName(),"rigInput")) {
|
|
if (a.value<rigInput>().type != b.value<rigInput>().type)
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"duplexMode_t")) {
|
|
if (a.value<duplexMode_t>() != b.value<duplexMode_t>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"spectrumMode_t")) {
|
|
if (a.value<spectrumMode_t>() != b.value<spectrumMode_t>())
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"centerSpanData")) {
|
|
if (a.value<centerSpanData>().cstype != b.value<centerSpanData>().cstype || a.value<centerSpanData>().freq != b.value<centerSpanData>().freq )
|
|
changed=true;
|
|
} else if (!strcmp(a.typeName(),"scopeData") || !strcmp(a.typeName(),"memoryType") || !strcmp(a.typeName(),"bandStackType") ) {
|
|
changed=true; // Always different
|
|
} else {
|
|
// Maybe Try simple comparison?
|
|
qInfo () << "Unsupported cache value:" << a.typeName();
|
|
}
|
|
} else if (a.isValid()) {
|
|
changed = true;
|
|
}
|
|
|
|
return changed;
|
|
}
|
|
|