diff --git a/sdrdaemon/channel/sdrdaemonchannelsink.cpp b/sdrdaemon/channel/sdrdaemonchannelsink.cpp index 79a54e0de..149817708 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsink.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsink.cpp @@ -113,7 +113,7 @@ void SDRDaemonChannelSink::feed(const SampleVector::const_iterator& begin, const if (!m_dataBlock) { // on the very first cycle there is no data block allocated m_dataBlock = new SDRDaemonDataBlock(); } - + boost::crc_32_type crc32; crc32.process_bytes(&metaData, 20); metaData.m_crc32 = crc32.checksum(); @@ -182,7 +182,7 @@ void SDRDaemonChannelSink::feed(const SampleVector::const_iterator& begin, const { m_txBlockIndex++; } - } + } } } @@ -191,27 +191,27 @@ void SDRDaemonChannelSink::start() qDebug("SDRDaemonChannelSink::start"); memset((void *) &m_currentMetaFEC, 0, sizeof(SDRDaemonMetaDataFEC)); - - if (m_running) { - stop(); + + if (m_running) { + stop(); } - + m_sinkThread = new SDRDaemonChannelSinkThread(&m_dataQueue, m_cm256p); - m_sinkThread->startWork(); + m_sinkThread->startStop(true); m_running = true; } void SDRDaemonChannelSink::stop() { qDebug("SDRDaemonChannelSink::stop"); - + if (m_sinkThread != 0) { - m_sinkThread->stopWork(); - delete m_sinkThread; + m_sinkThread->startStop(false); + m_sinkThread->deleteLater(); m_sinkThread = 0; } - + m_running = false; } @@ -224,7 +224,7 @@ bool SDRDaemonChannelSink::handleMessage(const Message& cmd __attribute__((unuse qDebug() << "SDRDaemonChannelSink::handleMessage: MsgChannelizerNotification:" << " channelSampleRate: " << notif.getSampleRate() << " offsetFrequency: " << notif.getFrequencyOffset(); - + if (notif.getSampleRate() > 0) { setSampleRate(notif.getSampleRate()); } @@ -238,7 +238,7 @@ bool SDRDaemonChannelSink::handleMessage(const Message& cmd __attribute__((unuse qDebug() << "SDRDaemonChannelSink::handleMessage: DSPSignalNotification:" << " inputSampleRate: " << notif.getSampleRate() << " centerFrequency: " << notif.getCenterFrequency(); - + setCenterFrequency(notif.getCenterFrequency()); return true; @@ -246,7 +246,7 @@ bool SDRDaemonChannelSink::handleMessage(const Message& cmd __attribute__((unuse else { return false; - } + } } QByteArray SDRDaemonChannelSink::serialize() const diff --git a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp index 8241d68f6..4333cd96b 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp @@ -28,6 +28,8 @@ #include "cm256.h" +MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSinkThread::MsgStartStop, Message) + SDRDaemonChannelSinkThread::SDRDaemonChannelSinkThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) : QThread(parent), m_running(false), @@ -36,20 +38,26 @@ SDRDaemonChannelSinkThread::SDRDaemonChannelSinkThread(SDRDaemonDataQueue *dataQ m_address(QHostAddress::LocalHost), m_port(9090) { - m_socket = new QUdpSocket(this); + connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); connect(m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); } SDRDaemonChannelSinkThread::~SDRDaemonChannelSinkThread() { - stopWork(); - delete m_socket; + qDebug("SDRDaemonChannelSinkThread::~SDRDaemonChannelSinkThread"); +} + +void SDRDaemonChannelSinkThread::startStop(bool start) +{ + MsgStartStop *msg = MsgStartStop::create(start); + m_inputMessageQueue.push(msg); } void SDRDaemonChannelSinkThread::startWork() { qDebug("SDRDaemonChannelSinkThread::startWork"); m_startWaitMutex.lock(); + m_socket = new QUdpSocket(this); start(); while(!m_running) m_startWaiter.wait(&m_startWaitMutex, 100); @@ -59,6 +67,7 @@ void SDRDaemonChannelSinkThread::startWork() void SDRDaemonChannelSinkThread::stopWork() { qDebug("SDRDaemonChannelSinkThread::stopWork"); + delete m_socket; m_running = false; wait(); } @@ -156,3 +165,25 @@ void SDRDaemonChannelSinkThread::handleData() } } } + +void SDRDaemonChannelSinkThread::handleInputMessages() +{ + Message* message; + + while ((message = m_inputMessageQueue.pop()) != 0) + { + if (MsgStartStop::match(*message)) + { + MsgStartStop* notif = (MsgStartStop*) message; + qDebug("SDRDaemonChannelSinkThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); + + if (notif->getStartStop()) { + startWork(); + } else { + stopWork(); + } + + delete message; + } + } +} diff --git a/sdrdaemon/channel/sdrdaemonchannelsinkthread.h b/sdrdaemon/channel/sdrdaemonchannelsinkthread.h index a64115363..375dda954 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsinkthread.h +++ b/sdrdaemon/channel/sdrdaemonchannelsinkthread.h @@ -25,6 +25,9 @@ #include #include +#include "util/message.h" +#include "util/messagequeue.h" + class SDRDaemonDataQueue; class SDRDaemonDataBlock; class CM256; @@ -34,11 +37,29 @@ class SDRDaemonChannelSinkThread : public QThread { Q_OBJECT public: + class MsgStartStop : public Message { + MESSAGE_CLASS_DECLARATION + + public: + bool getStartStop() const { return m_startStop; } + + static MsgStartStop* create(bool startStop) { + return new MsgStartStop(startStop); + } + + protected: + bool m_startStop; + + MsgStartStop(bool startStop) : + Message(), + m_startStop(startStop) + { } + }; + SDRDaemonChannelSinkThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0); ~SDRDaemonChannelSinkThread(); - void startWork(); - void stopWork(); + void startStop(bool start); void setAddress(QString& address) { m_address.setAddress(address); } void setPort(unsigned int port) { m_port = port; } @@ -55,9 +76,15 @@ private: unsigned int m_port; QUdpSocket *m_socket; + MessageQueue m_inputMessageQueue; + + void startWork(); + void stopWork(); + void run(); bool handleDataBlock(SDRDaemonDataBlock& dataBlock); private slots: void handleData(); + void handleInputMessages(); };