From 4c2ca8fa20da11f9f06a6ba240f9b110f181e528 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sun, 20 Feb 2022 22:08:49 +0100 Subject: [PATCH] Data pipes redesign --- plugins/channelrx/demodais/aisdemodsink.cpp | 16 +- plugins/channelrx/demodam/amdemodsink.cpp | 16 +- plugins/channelrx/demoddab/dabdemodsink.cpp | 16 +- plugins/channelrx/demoddsd/dsddemodsink.cpp | 16 +- plugins/channelrx/demodnfm/nfmdemodsink.cpp | 16 +- .../channelrx/demodpacket/packetdemodsink.cpp | 16 +- .../channelrx/demodpager/pagerdemodsink.cpp | 16 +- plugins/channelrx/demodssb/ssbdemodsink.cpp | 24 +- plugins/channelrx/demodwfm/wfmdemodsink.cpp | 16 +- plugins/channeltx/modais/aismodsource.cpp | 16 +- plugins/channeltx/modam/ammodsource.cpp | 16 +- plugins/channeltx/modnfm/nfmmodsource.cpp | 16 +- .../channeltx/modpacket/packetmodsource.cpp | 16 +- plugins/channeltx/modssb/ssbmodsource.cpp | 24 +- plugins/channeltx/modwfm/wfmmodsource.cpp | 16 +- .../feature/demodanalyzer/demodanalyzer.cpp | 51 ++-- plugins/feature/demodanalyzer/demodanalyzer.h | 4 +- sdrbase/CMakeLists.txt | 10 + sdrbase/maincore.h | 6 +- sdrbase/pipes/datafifostore.cpp | 56 +++++ sdrbase/pipes/datafifostore.h | 42 ++++ sdrbase/pipes/datapipes2.cpp | 68 +++++ sdrbase/pipes/datapipes2.h | 55 ++++ sdrbase/pipes/datapipes2gcworker.cpp | 46 ++++ sdrbase/pipes/datapipes2gcworker.h | 49 ++++ sdrbase/pipes/objectpipe.cpp | 47 ++++ sdrbase/pipes/objectpipe.h | 51 ++++ sdrbase/pipes/objectpipeelementsstore.h | 30 +++ sdrbase/pipes/objectpipesregistrations.cpp | 236 ++++++++++++++++++ sdrbase/pipes/objectpipesregistrations.h | 75 ++++++ 30 files changed, 979 insertions(+), 103 deletions(-) create mode 100644 sdrbase/pipes/datafifostore.cpp create mode 100644 sdrbase/pipes/datafifostore.h create mode 100644 sdrbase/pipes/datapipes2.cpp create mode 100644 sdrbase/pipes/datapipes2.h create mode 100644 sdrbase/pipes/datapipes2gcworker.cpp create mode 100644 sdrbase/pipes/datapipes2gcworker.h create mode 100644 sdrbase/pipes/objectpipe.cpp create mode 100644 sdrbase/pipes/objectpipe.h create mode 100644 sdrbase/pipes/objectpipeelementsstore.h create mode 100644 sdrbase/pipes/objectpipesregistrations.cpp create mode 100644 sdrbase/pipes/objectpipesregistrations.h diff --git a/plugins/channelrx/demodais/aisdemodsink.cpp b/plugins/channelrx/demodais/aisdemodsink.cpp index a4fce9ade..a46d6ea8c 100644 --- a/plugins/channelrx/demodais/aisdemodsink.cpp +++ b/plugins/channelrx/demodais/aisdemodsink.cpp @@ -383,14 +383,20 @@ void AISDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demodam/amdemodsink.cpp b/plugins/channelrx/demodam/amdemodsink.cpp index 5b9aea881..9dee01d79 100644 --- a/plugins/channelrx/demodam/amdemodsink.cpp +++ b/plugins/channelrx/demodam/amdemodsink.cpp @@ -219,14 +219,20 @@ void AMDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demoddab/dabdemodsink.cpp b/plugins/channelrx/demoddab/dabdemodsink.cpp index b97b345dc..9d181a495 100644 --- a/plugins/channelrx/demoddab/dabdemodsink.cpp +++ b/plugins/channelrx/demoddab/dabdemodsink.cpp @@ -447,14 +447,20 @@ void DABDemodSink::processOneAudioSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeCI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeCI16); + } } } diff --git a/plugins/channelrx/demoddsd/dsddemodsink.cpp b/plugins/channelrx/demoddsd/dsddemodsink.cpp index 74297a753..6ba7cdf30 100644 --- a/plugins/channelrx/demoddsd/dsddemodsink.cpp +++ b/plugins/channelrx/demoddsd/dsddemodsink.cpp @@ -180,14 +180,20 @@ void DSDDemodSink::feed(const SampleVector::const_iterator& begin, const SampleV if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demodnfm/nfmdemodsink.cpp b/plugins/channelrx/demodnfm/nfmdemodsink.cpp index 17e9cc7ef..88639f467 100644 --- a/plugins/channelrx/demodnfm/nfmdemodsink.cpp +++ b/plugins/channelrx/demodnfm/nfmdemodsink.cpp @@ -256,14 +256,20 @@ void NFMDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demodpacket/packetdemodsink.cpp b/plugins/channelrx/demodpacket/packetdemodsink.cpp index ce8f08101..9741bad68 100644 --- a/plugins/channelrx/demodpacket/packetdemodsink.cpp +++ b/plugins/channelrx/demodpacket/packetdemodsink.cpp @@ -249,14 +249,20 @@ void PacketDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demodpager/pagerdemodsink.cpp b/plugins/channelrx/demodpager/pagerdemodsink.cpp index 200aeb461..c1e662bbf 100644 --- a/plugins/channelrx/demodpager/pagerdemodsink.cpp +++ b/plugins/channelrx/demodpager/pagerdemodsink.cpp @@ -594,14 +594,20 @@ void PagerDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channelrx/demodssb/ssbdemodsink.cpp b/plugins/channelrx/demodssb/ssbdemodsink.cpp index 6ca13b44c..dc03063f4 100644 --- a/plugins/channelrx/demodssb/ssbdemodsink.cpp +++ b/plugins/channelrx/demodssb/ssbdemodsink.cpp @@ -207,19 +207,25 @@ void SSBDemodSink::processOneSample(Complex &ci) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) + for (; it != dataPipes.end(); ++it) { - (*it)->write( - (quint8*) &m_demodBuffer[0], - m_demodBuffer.size() * sizeof(qint16), - m_audioBinaual ? DataFifo::DataTypeCI16 : DataFifo::DataTypeI16 - ); + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) + { + fifo->write( + (quint8*) &m_demodBuffer[0], + m_demodBuffer.size() * sizeof(qint16), + m_audioBinaual ? DataFifo::DataTypeCI16 : DataFifo::DataTypeI16 + ); + } } } diff --git a/plugins/channelrx/demodwfm/wfmdemodsink.cpp b/plugins/channelrx/demodwfm/wfmdemodsink.cpp index d81e83683..7b086e9e5 100644 --- a/plugins/channelrx/demodwfm/wfmdemodsink.cpp +++ b/plugins/channelrx/demodwfm/wfmdemodsink.cpp @@ -141,14 +141,20 @@ void WFMDemodSink::feed(const SampleVector::const_iterator& begin, const SampleV if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channeltx/modais/aismodsource.cpp b/plugins/channeltx/modais/aismodsource.cpp index b21c6bf64..1cc908a08 100644 --- a/plugins/channeltx/modais/aismodsource.cpp +++ b/plugins/channeltx/modais/aismodsource.cpp @@ -272,14 +272,20 @@ void AISModSource::modulateSample() if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channeltx/modam/ammodsource.cpp b/plugins/channeltx/modam/ammodsource.cpp index ea51f3605..d5d29720f 100644 --- a/plugins/channeltx/modam/ammodsource.cpp +++ b/plugins/channeltx/modam/ammodsource.cpp @@ -112,14 +112,20 @@ void AMModSource::pullOne(Sample& sample) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channeltx/modnfm/nfmmodsource.cpp b/plugins/channeltx/modnfm/nfmmodsource.cpp index 6a0168061..88ba75580 100644 --- a/plugins/channeltx/modnfm/nfmmodsource.cpp +++ b/plugins/channeltx/modnfm/nfmmodsource.cpp @@ -173,14 +173,20 @@ void NFMModSource::modulateSample() if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channeltx/modpacket/packetmodsource.cpp b/plugins/channeltx/modpacket/packetmodsource.cpp index 6dee2ce04..426d98cc4 100644 --- a/plugins/channeltx/modpacket/packetmodsource.cpp +++ b/plugins/channeltx/modpacket/packetmodsource.cpp @@ -280,14 +280,20 @@ void PacketModSource::modulateSample() if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/channeltx/modssb/ssbmodsource.cpp b/plugins/channeltx/modssb/ssbmodsource.cpp index 6a696a8d5..a0b150454 100644 --- a/plugins/channeltx/modssb/ssbmodsource.cpp +++ b/plugins/channeltx/modssb/ssbmodsource.cpp @@ -184,19 +184,25 @@ void SSBModSource::modulateSample() if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) + for (; it != dataPipes.end(); ++it) { - (*it)->write( - (quint8*) &m_demodBuffer[0], - m_demodBuffer.size() * sizeof(qint16), - m_settings.m_audioBinaural ? DataFifo::DataTypeCI16 : DataFifo::DataTypeI16 - ); + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) + { + fifo->write( + (quint8*) &m_demodBuffer[0], + m_demodBuffer.size() * sizeof(qint16), + m_settings.m_audioBinaural ? DataFifo::DataTypeCI16 : DataFifo::DataTypeI16 + ); + } } } diff --git a/plugins/channeltx/modwfm/wfmmodsource.cpp b/plugins/channeltx/modwfm/wfmmodsource.cpp index 3d9400b12..c39b9c13d 100644 --- a/plugins/channeltx/modwfm/wfmmodsource.cpp +++ b/plugins/channeltx/modwfm/wfmmodsource.cpp @@ -152,14 +152,20 @@ void WFMModSource::pullOne(Sample& sample) if (m_demodBufferFill >= m_demodBuffer.size()) { - QList *dataFifos = MainCore::instance()->getDataPipes().getFifos(m_channel, "demod"); + QList dataPipes; + MainCore::instance()->getDataPipes().getDataPipes(m_channel, "demod", dataPipes); - if (dataFifos) + if (dataPipes.size() > 0) { - QList::iterator it = dataFifos->begin(); + QList::iterator it = dataPipes.begin(); - for (; it != dataFifos->end(); ++it) { - (*it)->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + for (; it != dataPipes.end(); ++it) + { + DataFifo *fifo = qobject_cast((*it)->m_element); + + if (fifo) { + fifo->write((quint8*) &m_demodBuffer[0], m_demodBuffer.size() * sizeof(qint16), DataFifo::DataTypeI16); + } } } diff --git a/plugins/feature/demodanalyzer/demodanalyzer.cpp b/plugins/feature/demodanalyzer/demodanalyzer.cpp index cbd591023..02146a4cb 100644 --- a/plugins/feature/demodanalyzer/demodanalyzer.cpp +++ b/plugins/feature/demodanalyzer/demodanalyzer.cpp @@ -50,7 +50,7 @@ DemodAnalyzer::DemodAnalyzer(WebAPIAdapterInterface *webAPIAdapterInterface) : Feature(m_featureIdURI, webAPIAdapterInterface), m_spectrumVis(SDR_RX_SCALEF), m_selectedChannel(nullptr), - m_dataFifo(nullptr) + m_dataPipe(nullptr) { qDebug("DemodAnalyzer::DemodAnalyzer: webAPIAdapterInterface: %p", webAPIAdapterInterface); setObjectName(m_featureId); @@ -88,10 +88,15 @@ void DemodAnalyzer::start() = DemodAnalyzerWorker::MsgConfigureDemodAnalyzerWorker::create(m_settings, true); m_worker->getInputMessageQueue()->push(msg); - if (m_dataFifo) + if (m_dataPipe) { - DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(m_dataFifo, true); - m_worker->getInputMessageQueue()->push(msg); + DataFifo *fifo = qobject_cast(m_dataPipe->m_element); + + if (fifo) + { + DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(fifo, true); + m_worker->getInputMessageQueue()->push(msg); + } } } @@ -99,10 +104,15 @@ void DemodAnalyzer::stop() { qDebug("DemodAnalyzer::stop"); - if (m_dataFifo) + if (m_dataPipe) { - DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(m_dataFifo, false); - m_worker->getInputMessageQueue()->push(msg); + DataFifo *fifo = qobject_cast(m_dataPipe->m_element); + + if (fifo) + { + DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(fifo, false); + m_worker->getInputMessageQueue()->push(msg); + } } m_worker->stopWork(); @@ -169,8 +179,13 @@ bool DemodAnalyzer::handleMessage(const Message& cmd) DSPSignalNotification *msg = new DSPSignalNotification(0, m_sampleRate); m_spectrumVis.getInputMessageQueue()->push(msg); - if (m_dataFifo) { - m_dataFifo->setSize(2*m_sampleRate); + if (m_dataPipe) + { + DataFifo *fifo = qobject_cast(m_dataPipe->m_element); + + if (fifo) { + fifo->setSize(2*m_sampleRate); + } } if (getMessageQueueToGUI()) @@ -319,7 +334,8 @@ void DemodAnalyzer::setChannel(ChannelAPI *selectedChannel) if (m_selectedChannel) { - DataFifo *fifo = mainCore->getDataPipes().unregisterChannelToFeature(m_selectedChannel, this, "demod"); + ObjectPipe *pipe = mainCore->getDataPipes().unregisterProducerToConsumer(m_selectedChannel, this, "demod"); + DataFifo *fifo = qobject_cast(pipe->m_element); if ((fifo) && m_worker->isRunning()) { @@ -331,13 +347,18 @@ void DemodAnalyzer::setChannel(ChannelAPI *selectedChannel) disconnect(messageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleChannelMessageQueue(MessageQueue*))); } - m_dataFifo = mainCore->getDataPipes().registerChannelToFeature(selectedChannel, this, "demod"); - m_dataFifo->setSize(96000); + m_dataPipe = mainCore->getDataPipes().registerProducerToConsumer(selectedChannel, this, "demod"); + DataFifo *fifo = qobject_cast(m_dataPipe->m_element); - if (m_worker->isRunning()) + if (fifo) { - DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(m_dataFifo, true); - m_worker->getInputMessageQueue()->push(msg); + fifo->setSize(96000); + + if (m_worker->isRunning()) + { + DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(fifo, true); + m_worker->getInputMessageQueue()->push(msg); + } } MessageQueue *messageQueue = mainCore->getMessagePipes().registerChannelToFeature(selectedChannel, this, "reportdemod"); diff --git a/plugins/feature/demodanalyzer/demodanalyzer.h b/plugins/feature/demodanalyzer/demodanalyzer.h index d69a10ec0..1d88478ac 100644 --- a/plugins/feature/demodanalyzer/demodanalyzer.h +++ b/plugins/feature/demodanalyzer/demodanalyzer.h @@ -33,7 +33,7 @@ class WebAPIAdapterInterface; class DemodAnalyzerWorker; class QNetworkAccessManager; class QNetworkReply; -class DataFifo; +class ObjectPipe; namespace SWGSDRangel { class SWGDeviceState; @@ -201,7 +201,7 @@ private: ScopeVis m_scopeVis; QHash m_availableChannels; ChannelAPI *m_selectedChannel; - DataFifo *m_dataFifo; + ObjectPipe *m_dataPipe; int m_sampleRate; QNetworkAccessManager *m_networkManager; diff --git a/sdrbase/CMakeLists.txt b/sdrbase/CMakeLists.txt index f0580042e..a74b7da38 100644 --- a/sdrbase/CMakeLists.txt +++ b/sdrbase/CMakeLists.txt @@ -170,10 +170,15 @@ set(sdrbase_SOURCES pipes/datapipes.cpp pipes/datapipescommon.cpp pipes/datapipesgcworker.cpp + pipes/datafifostore.cpp + pipes/datapipes2.cpp + pipes/datapipes2gcworker.cpp pipes/messagepipes.cpp pipes/messagepipescommon.cpp pipes/messagepipesgcworker.cpp pipes/pipeendpoint.cpp + pipes/objectpipe.cpp + pipes/objectpipesregistrations.cpp settings/featuresetpreset.cpp settings/preferences.cpp @@ -374,12 +379,17 @@ set(sdrbase_HEADERS pipes/datapipes.h pipes/datapipescommon.h pipes/datapipesgcworker.h + pipes/datafifostore.h + pipes/datapipes2.h + pipes/datapipes2gcworker.h pipes/elementpipescommon.h pipes/elementpipesgc.h pipes/messagepipes.h pipes/messagepipescommon.h pipes/messagepipesgcworker.h pipes/pipeendpoint.h + pipes/objectpipe.h + pipes/objectpipesregistrations.h plugin/plugininterface.h plugin/pluginapi.h diff --git a/sdrbase/maincore.h b/sdrbase/maincore.h index 6cbc0d0d0..80470bb3c 100644 --- a/sdrbase/maincore.h +++ b/sdrbase/maincore.h @@ -28,7 +28,7 @@ #include "settings/mainsettings.h" #include "util/message.h" #include "pipes/messagepipes.h" -#include "pipes/datapipes.h" +#include "pipes/datapipes2.h" #include "channel/channelapi.h" class DeviceSet; @@ -731,7 +731,7 @@ public: void clearFeatures(FeatureSet *featureSet); // pipes MessagePipes& getMessagePipes() { return m_messagePipes; } - DataPipes& getDataPipes() { return m_dataPipes; } + DataPipes2& getDataPipes() { return m_dataPipes; } friend class MainServer; friend class MainWindow; @@ -751,7 +751,7 @@ private: QMap m_featuresMap; //!< Feature to feature set map PluginManager* m_pluginManager; MessagePipes m_messagePipes; - DataPipes m_dataPipes; + DataPipes2 m_dataPipes; void debugMaps(); }; diff --git a/sdrbase/pipes/datafifostore.cpp b/sdrbase/pipes/datafifostore.cpp new file mode 100644 index 000000000..68139fc88 --- /dev/null +++ b/sdrbase/pipes/datafifostore.cpp @@ -0,0 +1,56 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "dsp/datafifo.h" +#include "datafifostore.h" + +DataFifoStore::DataFifoStore() +{} + +DataFifoStore::~DataFifoStore() +{ + deleteAllElements(); +} + +QObject *DataFifoStore::createElement() +{ + DataFifo *fifo = new DataFifo(); + m_dataFifos.push_back(fifo); + qDebug("DataFifoStore::createElement: %d added", m_dataFifos.size() - 1); + return fifo; +} + +void DataFifoStore::deleteElement(QObject *element) +{ + int i = m_dataFifos.indexOf((DataFifo*) element); + + if (i >= 0) + { + qDebug("DataFifoStore::deleteElement: delte element at %d", i); + delete m_dataFifos[i]; + m_dataFifos.removeAt(i); + } +} + +void DataFifoStore::deleteAllElements() +{ + for (auto& fifo : m_dataFifos) { + delete fifo; + } + + m_dataFifos.clear(); +} diff --git a/sdrbase/pipes/datafifostore.h b/sdrbase/pipes/datafifostore.h new file mode 100644 index 000000000..4c7ad5228 --- /dev/null +++ b/sdrbase/pipes/datafifostore.h @@ -0,0 +1,42 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAFIFOSTORE_H_ +#define SDRBASE_PIPES_DATAFIFOSTORE_H_ + +#include + +#include "export.h" +#include "objectpipeelementsstore.h" + +class DataFifo; + +class SDRBASE_API DataFifoStore : public ObjectPipeElementsStore +{ +public: + DataFifoStore(); + virtual ~DataFifoStore(); + + virtual QObject *createElement(); + virtual void deleteElement(QObject*); + +private: + void deleteAllElements(); + QList m_dataFifos; +}; + +#endif // SDRBASE_PIPES_DATAFIFOSTORE_H_ diff --git a/sdrbase/pipes/datapipes2.cpp b/sdrbase/pipes/datapipes2.cpp new file mode 100644 index 000000000..e48a1489b --- /dev/null +++ b/sdrbase/pipes/datapipes2.cpp @@ -0,0 +1,68 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "dsp/datafifo.h" +#include "datapipes2.h" +#include "datapipes2gcworker.h" + +DataPipes2::DataPipes2() : + m_registrations(&m_dataFifoStore) +{ + m_gcWorker = new DataPipes2GCWorker(m_registrations); + m_gcWorker->moveToThread(&m_gcThread); + startGC(); +} + +DataPipes2::~DataPipes2() +{ + if (m_gcWorker->isRunning()) { + stopGC(); + } + + m_gcWorker->deleteLater(); +} + +ObjectPipe *DataPipes2::registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + return m_registrations.registerProducerToConsumer(producer, consumer, type); +} + +ObjectPipe *DataPipes2::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + return m_registrations.unregisterProducerToConsumer(producer, consumer, type); +} + +void DataPipes2::getDataPipes(const QObject *producer, const QString& type, QList& pipes) +{ + return m_registrations.getPipes(producer, type, pipes); +} + +void DataPipes2::startGC() +{ + qDebug("DataPipes2::startGC"); + + m_gcWorker->startWork(); + m_gcThread.start(); +} + +void DataPipes2::stopGC() +{ + qDebug("DataPipes2::stopGC"); + m_gcWorker->stopWork(); + m_gcThread.quit(); + m_gcThread.wait(); +} diff --git a/sdrbase/pipes/datapipes2.h b/sdrbase/pipes/datapipes2.h new file mode 100644 index 000000000..5438f0013 --- /dev/null +++ b/sdrbase/pipes/datapipes2.h @@ -0,0 +1,55 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAPIPES2_H_ +#define SDRBASE_PIPES_DATAPIPES2_H_ + +#include +#include + +#include "export.h" +#include "objectpipesregistrations.h" +#include "datafifostore.h" + +class DataFifo; +class DataPipes2GCWorker; + +class SDRBASE_API DataPipes2 : public QObject +{ + Q_OBJECT +public: + DataPipes2(); + DataPipes2(const DataPipes2&) = delete; + DataPipes2& operator=(const DataPipes2&) = delete; + ~DataPipes2(); + + ObjectPipe *registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + ObjectPipe *unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + void getDataPipes(const QObject *producer, const QString& type, QList& pipes); + +private: + DataFifoStore m_dataFifoStore; + ObjectPipesRegistrations m_registrations; + QThread m_gcThread; //!< Garbage collector thread + DataPipes2GCWorker *m_gcWorker; //!< Garbage collector + + void startGC(); //!< Start garbage collector + void stopGC(); //!< Stop garbage collector +}; + + +#endif // SDRBASE_PIPES_DATAPIPES2_H_ diff --git a/sdrbase/pipes/datapipes2gcworker.cpp b/sdrbase/pipes/datapipes2gcworker.cpp new file mode 100644 index 000000000..08f20c173 --- /dev/null +++ b/sdrbase/pipes/datapipes2gcworker.cpp @@ -0,0 +1,46 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "dsp/datafifo.h" +#include "datapipes2gcworker.h" + +DataPipes2GCWorker::DataPipes2GCWorker(ObjectPipesRegistrations& objectPipesRegistrations) : + m_running(false), + m_objectPipesRegistrations(objectPipesRegistrations) +{} + +DataPipes2GCWorker::~DataPipes2GCWorker() +{} + +void DataPipes2GCWorker::startWork() +{ + connect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); + m_gcTimer.start(10000); // collect garbage every 10s + m_running = true; +} + +void DataPipes2GCWorker::stopWork() +{ + m_running = false; + m_gcTimer.stop(); + disconnect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); +} + +void DataPipes2GCWorker::processGC() +{ + m_objectPipesRegistrations.processGC(); +} diff --git a/sdrbase/pipes/datapipes2gcworker.h b/sdrbase/pipes/datapipes2gcworker.h new file mode 100644 index 000000000..03a9dcd06 --- /dev/null +++ b/sdrbase/pipes/datapipes2gcworker.h @@ -0,0 +1,49 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAPIPES2GCWORKER_H_ +#define SDRBASE_PIPES_DATAPIPES2GCWORKER_H_ + +#include +#include + +#include "export.h" +#include "objectpipesregistrations.h" + +class DataFifo; + +class SDRBASE_API DataPipes2GCWorker : public QObject +{ + Q_OBJECT +public: + DataPipes2GCWorker(ObjectPipesRegistrations& objectPipesRegistrations); + ~DataPipes2GCWorker(); + + void startWork(); + void stopWork(); + bool isRunning() const { return m_running; } + +private: + bool m_running; + QTimer m_gcTimer; + ObjectPipesRegistrations& m_objectPipesRegistrations; + +private slots: + void processGC(); //!< Collect garbage +}; + +#endif // SDRBASE_PIPES_DATAPIPES2GCWORKER_H_ diff --git a/sdrbase/pipes/objectpipe.cpp b/sdrbase/pipes/objectpipe.cpp new file mode 100644 index 000000000..2d7cf220a --- /dev/null +++ b/sdrbase/pipes/objectpipe.cpp @@ -0,0 +1,47 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "objectpipe.h" + +ObjectPipe::ObjectPipe() : + m_pipeId(0), + m_typeId(0), + m_producer(nullptr), + m_consumer(nullptr), + m_element(nullptr), + m_gcCount(0) +{} + +void ObjectPipe::setToBeDeleted(int reason) +{ + m_gcCount = 2; // will defer actual deletion by one GC pass + emit toBeDeleted(reason); +} + +int ObjectPipe::getGCCount() const { + return m_gcCount; +} + +int ObjectPipe::decreaseGCCount() +{ + if (m_gcCount > 0) { + return m_gcCount--; + } else { + return m_gcCount; + } +} + diff --git a/sdrbase/pipes/objectpipe.h b/sdrbase/pipes/objectpipe.h new file mode 100644 index 000000000..94a9f15cb --- /dev/null +++ b/sdrbase/pipes/objectpipe.h @@ -0,0 +1,51 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_OBJECTPIPE_H_ +#define SDRBASE_PIPES_OBJECTPIPE_H_ + +#include + +#include "export.h" + +class SDRBASE_API ObjectPipe : public QObject +{ + Q_OBJECT +public: + ObjectPipe(); + ObjectPipe(const ObjectPipe&) = default; + ObjectPipe& operator=(const ObjectPipe&) = default; + + void setToBeDeleted(int reason); + int getGCCount() const; + int decreaseGCCount(); + + unsigned int m_pipeId; + int m_typeId; + const QObject *m_producer; + const QObject *m_consumer; + QObject *m_element; + +signals: + void toBeDeleted(int reason); + +private: + int m_gcCount; +}; + + +#endif // SDRBASE_PIPES_OBJECTPIPE_H_ diff --git a/sdrbase/pipes/objectpipeelementsstore.h b/sdrbase/pipes/objectpipeelementsstore.h new file mode 100644 index 000000000..0dac5d294 --- /dev/null +++ b/sdrbase/pipes/objectpipeelementsstore.h @@ -0,0 +1,30 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_OBJECTPIPESELEMENT_H_ +#define SDRBASE_PIPES_OBJECTPIPESELEMENT_H_ + +class QObject; + +class ObjectPipeElementsStore +{ +public: + virtual QObject *createElement() = 0; + virtual void deleteElement(QObject*) = 0; +}; + +#endif // SDRBASE_PIPES_OBJECTPIPESELEMENT_H_ diff --git a/sdrbase/pipes/objectpipesregistrations.cpp b/sdrbase/pipes/objectpipesregistrations.cpp new file mode 100644 index 000000000..685e32185 --- /dev/null +++ b/sdrbase/pipes/objectpipesregistrations.cpp @@ -0,0 +1,236 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "objectpipesregistrations.h" + +ObjectPipesRegistrations::ObjectPipesRegistrations(ObjectPipeElementsStore *objectPipeElementsStore) : + m_typeCount(0), + m_pipeId(0), + m_objectPipeElementsStore(objectPipeElementsStore), + m_mutex(QMutex::Recursive) +{} + +ObjectPipesRegistrations::~ObjectPipesRegistrations() +{} + +ObjectPipe *ObjectPipesRegistrations::registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + int typeId; + QMutexLocker mlock(&m_mutex); + + if (m_typeIds.contains(type)) + { + typeId = m_typeIds.value(type); + } + else + { + typeId = m_typeCount++; + m_typeIds.insert(type, typeId); + m_types.insert(typeId, type); + } + + for (auto& pipe : m_pipes) // check if pipe exists already - there is a unique pipe per producer, consumer and type + { + if ((producer == pipe->m_producer) && (consumer == pipe->m_consumer) && (typeId == pipe->m_typeId)) { + return pipe; + } + } + + QObject *element = m_objectPipeElementsStore->createElement(); + m_pipes.push_back(new ObjectPipe()); + m_pipes.back()->m_pipeId = ++m_pipeId; + m_pipes.back()->m_typeId = typeId; + m_pipes.back()->m_producer = producer; + m_pipes.back()->m_consumer = consumer; + m_pipes.back()->m_element = element; + + m_producerPipes[producer].push_back(m_pipes.back()); + m_consumerPipes[consumer].push_back(m_pipes.back()); + m_typeIdPipes[typeId].push_back(m_pipes.back()); + m_producerAndTypeIdPipes[std::make_tuple(producer, typeId)].push_back(m_pipes.back()); + m_pipeMap[std::make_tuple(producer, consumer, typeId)] = m_pipes.back(); + + connect(producer, SIGNAL(destroyed(QObject*)), this, SLOT(removeProducer(QObject*))); + connect(consumer, SIGNAL(destroyed(QObject*)), this, SLOT(removeConsumer(QObject*))); + + return m_pipes.back(); +} + +ObjectPipe *ObjectPipesRegistrations::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + ObjectPipe *pipe = nullptr; + + if (m_typeIds.contains(type)) + { + int typeId = m_typeIds.value(type); + + if (m_pipeMap.contains(std::make_tuple(producer, consumer, typeId))) + { + pipe = m_pipeMap[std::make_tuple(producer, consumer, typeId)]; + m_producerPipes[producer].removeAll(pipe); + m_consumerPipes[consumer].removeAll(pipe); + m_typeIdPipes[typeId].removeAll(pipe); + m_producerAndTypeIdPipes[std::make_tuple(producer, typeId)].removeAll(pipe); + + if (m_producerPipes[producer].size() == 0) { + m_producerPipes.remove(producer); + } + + if (m_consumerPipes[consumer].size() == 0) { + m_consumerPipes.remove(consumer); + } + + if (m_typeIdPipes[typeId].size() == 0) { + m_typeIdPipes.remove(typeId); + } + + if (m_producerAndTypeIdPipes[std::make_tuple(producer, typeId)].size() == 0) { + m_producerAndTypeIdPipes.remove(std::make_tuple(producer, typeId)); + } + + pipe->setToBeDeleted(PipeDeletionReason::PipeDeleted); + } + } + + return pipe; +} + +void ObjectPipesRegistrations::getPipes(const QObject *producer, const QString& type, QList& pipes) +{ + QMutexLocker mlock(&m_mutex); + + if (m_typeIds.contains(type)) + { + if (m_producerAndTypeIdPipes.contains(std::make_tuple(producer, m_typeIds[type]))) { + pipes = m_producerAndTypeIdPipes[std::make_tuple(producer, m_typeIds[type])]; + } + } +} + +void ObjectPipesRegistrations::processGC() +{ + qDebug("ObjectPipesRegistrations::processGC"); + QMutexLocker mlock(&m_mutex); + + typename QList::iterator itPipe = m_pipes.begin(); + + while (itPipe != m_pipes.end()) + { + if ((*itPipe)->getGCCount() > 0) // scheduled for deletion + { + if ((*itPipe)->decreaseGCCount() == 0) // delete on this pass + { + m_objectPipeElementsStore->deleteElement((*itPipe)->m_element); + delete *itPipe; + itPipe = m_pipes.erase(itPipe); + } + else + { + ++itPipe; + } + } + } +} + +void ObjectPipesRegistrations::removeProducer(QObject *producer) +{ + qDebug("ObjectPipesRegistrations::removeProducer"); + QMutexLocker mlock(&m_mutex); + + if (m_producerPipes.contains(producer) && (m_producerPipes[producer].size() != 0)) + { + const QList& pipeList = m_producerPipes[producer]; + + for (auto& pipe : pipeList) + { + for (const auto& consumer : m_consumerPipes.keys()) { + m_consumerPipes[consumer].removeAll(pipe); + } + + for (const auto& typeId : m_typeIdPipes.keys()) { + m_typeIdPipes[typeId].removeAll(pipe); + } + + pipe->setToBeDeleted(PipeDeletionReason::PipeProducerDeleted); + } + + m_producerPipes.remove(producer); + } + + typename QMap, ObjectPipe*>::iterator itP = m_pipeMap.begin(); + + while (itP != m_pipeMap.end()) + { + if (std::get<0>(itP.key())) { + itP = m_pipeMap.erase(itP); + } else { + ++itP; + } + } + + typename QMap, QList>::iterator itPT = m_producerAndTypeIdPipes.begin(); + + while (itPT != m_producerAndTypeIdPipes.end()) + { + if (std::get<0>(itPT.key()) == producer) { + itPT = m_producerAndTypeIdPipes.erase(itPT); + } else { + ++itPT; + } + } +} + +void ObjectPipesRegistrations::removeConsumer(QObject *consumer) +{ + qDebug("ObjectPipesRegistrations::removeConsumer"); + QMutexLocker mlock(&m_mutex); + + if (m_consumerPipes.contains(consumer) && (m_consumerPipes[consumer].size() != 0)) + { + QList& pipeList = m_consumerPipes[consumer]; + + for (auto& pipe : pipeList) + { + for (const auto& producer : m_producerPipes.keys()) { + m_producerPipes[producer].removeAll(pipe); + } + + for (const auto& typeId : m_typeIdPipes.keys()) { + m_typeIdPipes[typeId].removeAll(pipe); + } + + for (const auto& producerAndTypeId : m_producerAndTypeIdPipes.keys()) { + m_producerAndTypeIdPipes[producerAndTypeId].removeAll(pipe); + } + + pipe->setToBeDeleted(PipeDeletionReason::PipeConsumerDeleted); + } + + m_consumerPipes.remove(consumer); + } + + typename QMap, ObjectPipe*>::iterator it = m_pipeMap.begin(); + + while (it != m_pipeMap.end()) + { + if (std::get<1>(it.key()) == consumer) { + it = m_pipeMap.erase(it); + } else { + ++it; + } + } +} diff --git a/sdrbase/pipes/objectpipesregistrations.h b/sdrbase/pipes/objectpipesregistrations.h new file mode 100644 index 000000000..d8ae0ab53 --- /dev/null +++ b/sdrbase/pipes/objectpipesregistrations.h @@ -0,0 +1,75 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_OBJECTPIPESREGISTRATION_H_ +#define SDRBASE_PIPES_OBJECTPIPESREGISTRATION_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "export.h" +#include "objectpipe.h" +#include "objectpipeelementsstore.h" + +class SDRBASE_API ObjectPipesRegistrations : public QObject +{ + Q_OBJECT +public: + enum PipeDeletionReason + { + PipeProducerDeleted, + PipeConsumerDeleted, + PipeDeleted + }; + + ObjectPipesRegistrations(ObjectPipeElementsStore *objectPipeElementsStore); + ~ObjectPipesRegistrations(); + + ObjectPipe *registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + ObjectPipe *unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + void getPipes(const QObject *producer, const QString& type, QList& pipes); + void processGC(); + +private slots: + void removeProducer(QObject *producer); + void removeConsumer(QObject *consumer); + +private: + QHash m_typeIds; + QMap m_types; + int m_typeCount; + unsigned int m_pipeId; + ObjectPipeElementsStore *m_objectPipeElementsStore; + QList m_pipes; + + QMap> m_producerPipes; + QMap> m_consumerPipes; + QMap> m_typeIdPipes; + QMap, QList> m_producerAndTypeIdPipes; + QMap, ObjectPipe*> m_pipeMap; + + QMutex m_mutex; +}; + +#endif // SDRBASE_PIPES_OBJECTPIPESREGISTRATION_H_