diff --git a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt index 28c8fc386..07a3c4fe3 100644 --- a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt +++ b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt @@ -7,6 +7,7 @@ set(sdrdaemonsink_SOURCES sdrdaemonsinksettings.cpp sdrdaemonsinkthread.cpp udpsinkfec.cpp + UDPSocket.cpp ) set(sdrdaemonsink_HEADERS @@ -16,6 +17,7 @@ set(sdrdaemonsink_HEADERS sdrdaemonsinksettings.h sdrdaemonsinkthread.h udpsinkfec.h + UDPSocket.h ) set(sdrdaemonsink_FORMS diff --git a/plugins/samplesink/sdrdaemonsink/UDPSocket.cpp b/plugins/samplesink/sdrdaemonsink/UDPSocket.cpp new file mode 100644 index 000000000..f2b02d207 --- /dev/null +++ b/plugins/samplesink/sdrdaemonsink/UDPSocket.cpp @@ -0,0 +1,428 @@ +/////////////////////////////////////////////////////////////////////////////////// +// SDRdaemon - send I/Q samples read from a SDR device over the network via UDP. // +// // +// Copyright (C) 2015 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +// Original code is posted at: https://cppcodetips.wordpress.com/2014/01/29/udp-socket-class-in-c/ + +#include "UDPSocket.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +CSocketException::CSocketException( const string &sMessage, bool blSysMsg /*= false*/ ) throw() :m_sMsg(sMessage) +{ + if (blSysMsg) { + m_sMsg.append(": "); + m_sMsg.append(strerror(errno)); + } +} + +CSocketException::~CSocketException() throw () +{ + +} + +CSocket::~CSocket() +{ + ::close(m_sockDesc); + m_sockDesc = -1; +} + +CSocket::CSocket( SocketType type, NetworkLayerProtocol protocol ) throw(CSocketException):m_sockDesc(-1) +{ + m_sockDesc = socket(protocol, type, 0); + if (m_sockDesc < 0) + { + throw CSocketException("Socket creation failed (socket())", true); + } +} + +CSocket::CSocket( int sockDesc ) +{ + m_sockDesc = sockDesc; +} + +CSocket::CSocket( const CSocket &sock ) +{ + +} + +void CSocket::operator=( const CSocket &sock ) +{ + +} + +std::string CSocket::GetLocalAddress() throw(CSocketException) +{ + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getsockname(m_sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw CSocketException("Fetch of local address failed (getsockname())", true); + } + return inet_ntoa(addr.sin_addr); +} + +unsigned short CSocket::GetLocalPort() throw(CSocketException) +{ + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getsockname(m_sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw CSocketException("Fetch of local port failed (getsockname())", true); + } + return ntohs(addr.sin_port); +} + +void CSocket::BindLocalPort( unsigned short localPort ) throw(CSocketException) +{ + // Bind the socket to its port + sockaddr_in localAddr; + memset(&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = htonl(INADDR_ANY); + localAddr.sin_port = htons(localPort); + + if (bind(m_sockDesc, (sockaddr *) &localAddr, sizeof(sockaddr_in)) < 0) { + throw CSocketException("Set of local port failed (bind())", true); + } +} + +void CSocket::BindLocalAddressAndPort( const string &localAddress, unsigned short localPort /*= 0*/ ) + throw(CSocketException) +{ + // Get the address of the requested host + sockaddr_in localAddr; + FillAddr(localAddress, localPort, localAddr); + + if (bind(m_sockDesc, (sockaddr *) &localAddr, sizeof(sockaddr_in)) < 0) { + throw CSocketException("Set of local address and port failed (bind())", true); + } +} + +void CSocket::FillAddr( const string & localAddress, unsigned short localPort, sockaddr_in& localAddr ) +{ + ////cout<<"\n Inside Fille addr:"<h_addr_list[0]); + + localAddr.sin_port = htons(localPort); // Assign port in network byte order + ////cout<<"\n returning from Fille addr"; +} + +unsigned long int CSocket::GetReadBufferSize() +{ + unsigned long int nSize; + socklen_t n = sizeof(nSize); + getsockopt(m_sockDesc,SOL_SOCKET,SO_RCVBUF,(void *)&nSize, (&n)); + // now the variable nSize will have the socket size + return nSize; +} + +void CSocket::SetReadBufferSize( unsigned int nSize ) throw(CSocketException) +{ + if (setsockopt(m_sockDesc, SOL_SOCKET, SO_RCVBUF, &nSize, sizeof(nSize)) == -1) + { + throw CSocketException("Error in setting socket buffer size ", true); + } +} + +void CSocket::SetNonBlocking( bool bBlocking ) throw(CSocketException) +{ + int opts; + + opts = fcntl ( m_sockDesc, F_GETFL ); + + if ( opts < 0 ) + { + return; + } + + if ( bBlocking ) + opts = ( opts | O_NONBLOCK ); + else + opts = ( opts & ~O_NONBLOCK ); + + fcntl ( m_sockDesc, F_SETFL,opts ); +} + +void CSocket::ConnectToHost( const string &foreignAddress, unsigned short foreignPort ) throw(CSocketException) +{ + //cout<<"\nstart Connect to host"; + // Get the address of the requested host + sockaddr_in destAddr; + //cout<<"\ninside Connect to host"; + FillAddr(foreignAddress, foreignPort, destAddr); + + //cout<<"trying to connect to host"; + // Try to connect to the given port + if (::connect(m_sockDesc, (sockaddr *) &destAddr, sizeof(destAddr)) < 0) { + throw CSocketException("Connect failed (connect())", true); + } + //cout<<"\n after connecting"; + +} + +void CSocket::Send( const void *buffer, int bufferLen ) throw(CSocketException) +{ + if (::send(m_sockDesc, (void *) buffer, bufferLen, 0) < 0) { + throw CSocketException("Send failed (send())", true); + } +} + +int CSocket::Recv( void *buffer, int bufferLen ) throw(CSocketException) +{ + int nBytes; + if ((nBytes = ::recv(m_sockDesc, (void *) buffer, bufferLen, 0)) < 0) { + throw CSocketException("Received failed (recv())", true); + } + char* sData = static_cast(buffer); + sData[nBytes] = '\0'; + return nBytes; +} + +std::string CSocket::GetPeerAddress() throw(CSocketException) +{ + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getpeername(m_sockDesc, (sockaddr *) &addr,(socklen_t *) &addr_len) < 0) { + throw CSocketException("Fetch of foreign address failed (getpeername())", true); + } + return inet_ntoa(addr.sin_addr); +} + +unsigned short CSocket::GetPeerPort() throw(CSocketException) +{ + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getpeername(m_sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw CSocketException("Fetch of foreign port failed (getpeername())", true); + } + return ntohs(addr.sin_port); +} + +CSocket& CSocket::operator<<(const string& sStr ) +{ + Send(sStr.c_str(), sStr.length()); + return *this; +} + +CSocket& CSocket::operator>>( string& sStr ) +{ + char *buff = new char[GetReadBufferSize()]; + Recv(buff, GetReadBufferSize()); + sStr.append(buff); + delete [] buff; + return *this; +} + +int CSocket::OnDataRead(unsigned long timeToWait) +{ + /* master file descriptor list */ + fd_set master; + //struct timeval *ptimeout = NULL; + + /* temp file descriptor list for select() */ + fd_set read_fds; + + /* maximum file descriptor number */ + int fdmax; + + /* clear the master and temp sets */ + FD_ZERO(&master); + FD_ZERO(&read_fds); + + /* add the listener to the master set */ + FD_SET(m_sockDesc, &master); + /* keep track of the biggest file descriptor */ + fdmax = m_sockDesc; /* so far, it's this one*/ + + /* copy it */ + read_fds = master; + //cout<<"Waiting for select"; + int nRet; + if (timeToWait == ULONG_MAX) + { + nRet = select(fdmax+1, &read_fds, NULL, NULL, NULL); + if (nRet == -1) + nRet = DATA_EXCEPTION; + else if (nRet > 0) + nRet = DATA_ARRIVED; + } + else + { + struct timeval timeout; + timeout.tv_sec = timeToWait; + timeout.tv_usec = 0; + nRet = select(fdmax+1, &read_fds, NULL, NULL, &timeout); + if (nRet == -1) + nRet = DATA_EXCEPTION; + else if (nRet > 0) + nRet = DATA_ARRIVED; + else if(nRet == 0) + nRet = DATA_TIMED_OUT; + } + + return nRet; +} + +void CSocket::SetBindToDevice( const string& sInterface ) throw(CSocketException) +{ + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", sInterface.c_str()); + //Todo:SO_BINDTODEVICE not declared error comes in CygWin, need to compile in Linux. + /*int nRet = ::setsockopt(m_sockDesc, SOL_SOCKET, SO_BINDTODEVICE, (void*)&ifr, sizeof(ifr)); + + if (nRet < 0) + { + throw CSocketException("Error in binding to device ", true); + }*/ +} + +UDPSocket::UDPSocket() throw(CSocketException):CSocket(UdpSocket,IPv4Protocol) +{ + SetBroadcast(); +} + +UDPSocket::UDPSocket( unsigned short localPort ) throw(CSocketException): +CSocket(UdpSocket,IPv4Protocol) +{ + BindLocalPort(localPort); + SetBroadcast(); +} + +UDPSocket::UDPSocket( const string &localAddress, unsigned short localPort ) throw(CSocketException): +CSocket(UdpSocket,IPv4Protocol) +{ + BindLocalAddressAndPort(localAddress, localPort); + SetBroadcast(); +} + +void UDPSocket::DisconnectFromHost() throw(CSocketException) +{ + sockaddr_in nullAddr; + memset(&nullAddr, 0, sizeof(nullAddr)); + nullAddr.sin_family = AF_UNSPEC; + // Try to disconnect + if (::connect(m_sockDesc, (sockaddr *) &nullAddr, sizeof(nullAddr)) < 0) + { + if (errno != EAFNOSUPPORT) + { + throw CSocketException("Disconnect failed (connect())", true); + } + } +} + +void UDPSocket::SendDataGram( const void *buffer, int bufferLen, const string &foreignAddress, + unsigned short foreignPort ) throw(CSocketException) +{ + //cout<<"Befor Fill addr"; + sockaddr_in destAddr; + FillAddr(foreignAddress, foreignPort, destAddr); + //cout<<"Befor socket send"; + // Write out the whole buffer as a single message. + if (sendto(m_sockDesc, (void *) buffer, bufferLen, 0,(sockaddr *) &destAddr, sizeof(destAddr)) != bufferLen) + { + throw CSocketException("Send failed (sendto())", true); + } + +} + +int UDPSocket::RecvDataGram( void *buffer, int bufferLen, string &sourceAddress, unsigned short &sourcePort ) + throw(CSocketException) +{ + sockaddr_in clntAddr; + socklen_t addrLen = sizeof(clntAddr); + int nBytes; + if ((nBytes = recvfrom(m_sockDesc, (void *) buffer, bufferLen, 0, (sockaddr *) &clntAddr, + (socklen_t *) &addrLen)) < 0) + { + throw CSocketException("Receive failed (recvfrom())", true); + } + sourceAddress = inet_ntoa(clntAddr.sin_addr); + sourcePort = ntohs(clntAddr.sin_port); + char* sData = static_cast(buffer); + sData[nBytes] = '\0'; + return nBytes; +} + +void UDPSocket::SetMulticastTTL( unsigned char multicastTTL ) throw(CSocketException) +{ + if (setsockopt(m_sockDesc, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &multicastTTL, sizeof(multicastTTL)) < 0) + { + throw CSocketException("Multicast TTL set failed (setsockopt())", true); + } +} + +void UDPSocket::JoinGroup( const string &multicastGroup ) throw(CSocketException) +{ + struct ip_mreq multicastRequest; + + multicastRequest.imr_multiaddr.s_addr = inet_addr(multicastGroup.c_str()); + multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(m_sockDesc, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (void *) &multicastRequest, + sizeof(multicastRequest)) < 0) + { + throw CSocketException("Multicast group join failed (setsockopt())", true); + } + +} + +void UDPSocket::LeaveGroup( const string &multicastGroup ) throw(CSocketException) +{ + struct ip_mreq multicastRequest; + + multicastRequest.imr_multiaddr.s_addr = inet_addr(multicastGroup.c_str()); + multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(m_sockDesc, IPPROTO_IP, IP_DROP_MEMBERSHIP, + (void *) &multicastRequest, + sizeof(multicastRequest)) < 0) + { + throw CSocketException("Multicast group leave failed (setsockopt())", true); + } + +} + +void UDPSocket::SetBroadcast() +{ + // If this fails, we'll hear about it when we try to send. This will allow + // system that cannot broadcast to continue if they don't plan to broadcast + int broadcastPermission = 1; + setsockopt(m_sockDesc, SOL_SOCKET, SO_BROADCAST, + (void *) &broadcastPermission, sizeof(broadcastPermission)); + +} + + diff --git a/plugins/samplesink/sdrdaemonsink/UDPSocket.h b/plugins/samplesink/sdrdaemonsink/UDPSocket.h new file mode 100644 index 000000000..b389db792 --- /dev/null +++ b/plugins/samplesink/sdrdaemonsink/UDPSocket.h @@ -0,0 +1,339 @@ +/////////////////////////////////////////////////////////////////////////////////// +// SDRdaemon - send I/Q samples read from a SDR device over the network via UDP. // +// // +// Copyright (C) 2015 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +// Original code is posted at: https://cppcodetips.wordpress.com/2014/01/29/udp-socket-class-in-c/ + +#ifndef INCLUDE_UDPSOCKET_H_ +#define INCLUDE_UDPSOCKET_H_ + +#include // For string +#include // For exception class +#include +#include // For data types +#include // For socket(), connect(), send(), and recv() +#include // For gethostbyname() +#include // For inet_addr() +#include // For close() +#include // For sockaddr_in +#include +#include + +using namespace std; + +/** + * Signals a problem with the execution of a socket call. + */ + +class CSocketException: public std::exception +{ +public: + /** + * Construct a SocketException with a explanatory message. + * @param message explanatory message + * @param bSysMsg true if system message (from strerror(errno)) + * should be postfixed to the user provided message + */ + CSocketException(const string &message, bool bSysMsg = false) throw(); + + + /** Destructor. + * Virtual to allow for subclassing. + */ + virtual ~CSocketException() throw (); + + /** Returns a pointer to the (constant) error description. + * @return A pointer to a \c const \c char*. The underlying memory + * is in posession of the \c Exception object. Callers \a must + * not attempt to free the memory. + */ + virtual const char* what() const throw (){ return m_sMsg.c_str(); } + +protected: + /** Error message. + */ + std::string m_sMsg; +}; + +/** + * Base class representing basic communication endpoint. + */ + +class CSocket +{ +public: + virtual ~CSocket(); + + /** + * Enum to represent type of socket(UDP or TCP) + */ + enum SocketType + { + TcpSocket = SOCK_STREAM, + UdpSocket = SOCK_DGRAM, + UnknownSocketType =-1 + }; + + /** + * Enum to represent type network layer protocol used for socket + */ + enum NetworkLayerProtocol + { + IPv4Protocol = AF_INET, + IPv6Protocol = AF_INET6, + UnknownNetworkLayerProtocol = -1 + }; + + /** + * Enum to represent Wait Result when reading data from a socket + */ + enum ReadResult + { + DATA_ARRIVED = 0, + DATA_TIMED_OUT = ETIMEDOUT, + DATA_EXCEPTION = 255 + }; + + /** + * Get the local address + * @return local address of socket + * @exception CSocketException thrown if fetch fails + */ + + string GetLocalAddress() throw(CSocketException); + + /** + * Get the local port + * @return local port of socket + * @exception CSocketException thrown if fetch fails + */ + + unsigned short GetLocalPort() throw(CSocketException); + + + /** + * Set the local port to the specified port and the local address + * to any interface + * @param localPort local port + * @exception CSocketException thrown if setting local port fails + */ + + void BindLocalPort(unsigned short localPort) throw(CSocketException); + + /** + * Set the local port to the specified port and the local address + * to the specified address. If you omit the port, a random port + * will be selected. + * @param localAddress local address + * @param localPort local port + * @exception CSocketException thrown if setting local port or address fails + */ + + void BindLocalAddressAndPort(const string &localAddress, unsigned short localPort = 0) throw(CSocketException); + + /** + * Returns the size of the internal read buffer. This limits the amount of data that the client + * can receive before you call + */ + unsigned long int GetReadBufferSize (); + + /** + * Sets the read buffer size of the socket. + * @param Size of the buffer. + */ + void SetReadBufferSize(unsigned int nSize) throw(CSocketException); + + /** + * Sets the socket to Blocking/Non blocking state. + * @param Bool flag for Non blocking status. + */ + void SetNonBlocking(bool bBlocking) throw(CSocketException); + + /** + * Establish a socket connection with the given foreign + * address and port + * @param foreignAddress foreign address (IP address or name) + * @param foreignPort foreign port + * @exception SocketException thrown if unable to establish connection + */ + void ConnectToHost(const string &foreignAddress, unsigned short foreignPort) throw(CSocketException); + + /** + * Write the given buffer to this socket. Call connect() before + * calling send() + * @param buffer buffer to be written + * @param bufferLen number of bytes from buffer to be written + * @exception SocketException thrown if unable to send data + */ + void Send(const void *buffer, int bufferLen) throw(CSocketException); + + /** + * Read into the given buffer up to bufferLen bytes data from this + * socket. Call connect() before calling recv() + * @param buffer buffer to receive the data + * @param bufferLen maximum number of bytes to read into buffer + * @return number of bytes read, 0 for EOF, and -1 for error + * @exception SocketException thrown if unable to receive data + */ + int Recv(void *buffer, int bufferLen) throw(CSocketException); + + /** + * Get the foreign address. Call connect() before calling recv() + * @return foreign address + * @exception SocketException thrown if unable to fetch foreign address + */ + string GetPeerAddress() throw(CSocketException); + + /** + * Get the foreign port. Call connect() before calling recv() + * @return foreign port + * @exception SocketException thrown if unable to fetch foreign port + */ + unsigned short GetPeerPort() throw(CSocketException); + + /** + * Writing sStr to socket + */ + CSocket& operator<<(const string& sStr ); + + /** + * Reading data to sStr from socket + */ + CSocket& operator>>(string& sStr); + + /** + * Blocking function to check whether data arrived in socket for reading. + * @param timeToWait waits for 'timeToWait' seconds. + */ + virtual int OnDataRead(unsigned long timeToWait = ULONG_MAX); + + /** + * To Bind socket to a symbolic device name like eth0 + * @param sInterface NIC device name + */ + void SetBindToDevice(const string& sInterface) throw(CSocketException); + +protected: + /** + * Internal Socket descriptor + **/ + int m_sockDesc; + + CSocket(SocketType type, NetworkLayerProtocol protocol) throw(CSocketException); + CSocket(int sockDesc); + static void FillAddr( const string & localAddress, unsigned short localPort, sockaddr_in& localAddr ); + +private: + // Prevent the user from trying to use Exact copy of this object + CSocket(const CSocket &sock); + void operator=(const CSocket &sock); +}; + +/** + * UDP Socket class. + */ + +class UDPSocket : public CSocket +{ +public: +/** + * Construct a UDP socket + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket() throw(CSocketException); + /** + * Construct a UDP socket with the given local port + * @param localPort local port + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket(unsigned short localPort) throw(CSocketException); + + /** + * Construct a UDP socket with the given local port and address + * @param localAddress local address + * @param localPort local port + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket(const string &localAddress, unsigned short localPort) throw(CSocketException); + + /** + * Unset foreign address and port + * @return true if disassociation is successful + * @exception SocketException thrown if unable to disconnect UDP socket + */ + + /** + * Unset foreign address and port + * @return true if disassociation is successful + * @exception SocketException thrown if unable to disconnect UDP socket + */ + void DisconnectFromHost() throw(CSocketException); + + /** + * Send the given buffer as a UDP datagram to the + * specified address/port + * @param buffer buffer to be written + * @param bufferLen number of bytes to write + * @param foreignAddress address (IP address or name) to send to + * @param foreignPort port number to send to + * @return true if send is successful + * @exception SocketException thrown if unable to send datagram + */ + void SendDataGram(const void *buffer, int bufferLen, const string &foreignAddress, + unsigned short foreignPort) throw(CSocketException); + + /** + * Read read up to bufferLen bytes data from this socket. The given buffer + * is where the data will be placed + * @param buffer buffer to receive data + * @param bufferLen maximum number of bytes to receive + * @param sourceAddress address of datagram source + * @param sourcePort port of data source + * @return number of bytes received and -1 for error + * @exception SocketException thrown if unable to receive datagram + */ + int RecvDataGram(void *buffer, int bufferLen, string &sourceAddress, + unsigned short &sourcePort) throw(CSocketException); + + /** + * Set the multicast TTL + * @param multicastTTL multicast TTL + * @exception SocketException thrown if unable to set TTL + */ + void SetMulticastTTL(unsigned char multicastTTL) throw(CSocketException); + + /** + * Join the specified multicast group + * @param multicastGroup multicast group address to join + * @exception SocketException thrown if unable to join group + */ + void JoinGroup(const string &multicastGroup) throw(CSocketException); + + /** + * Leave the specified multicast group + * @param multicastGroup multicast group address to leave + * @exception SocketException thrown if unable to leave group + */ + void LeaveGroup(const string &multicastGroup) throw(CSocketException); + +private: + void SetBroadcast(); + +}; + + +#endif /* INCLUDE_UDPSOCKET_H_ */ diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp index b5840ba2c..4128009da 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp @@ -47,7 +47,8 @@ SDRdaemonSinkGui::SDRdaemonSinkGui(DeviceSinkAPI *deviceAPI, QWidget* parent) : m_samplesCount(0), m_tickCount(0), m_lastEngineState((DSPDeviceSinkEngine::State)-1), - m_doApplySettings(true) + m_doApplySettings(true), + m_forceSettings(true) { m_nnSender = nn_socket(AF_SP, NN_PAIR); assert(m_nnSender != -1); @@ -140,6 +141,7 @@ bool SDRdaemonSinkGui::deserialize(const QByteArray& data) displaySettings(); blockApplySettings(false); sendControl(true); + m_forceSettings = true; sendSettings(); return true; } @@ -328,8 +330,9 @@ void SDRdaemonSinkGui::sendSettings() void SDRdaemonSinkGui::updateHardware() { qDebug() << "SDRdaemonSinkGui::updateHardware"; - SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink::create(m_settings); + SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink::create(m_settings, m_forceSettings); m_deviceSampleSink->getInputMessageQueue()->push(message); + m_forceSettings = false; m_updateTimer.stop(); } diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.h b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.h index b60f38d1f..d46e5babb 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.h +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.h @@ -64,6 +64,7 @@ private: std::size_t m_tickCount; int m_lastEngineState; bool m_doApplySettings; + bool m_forceSettings; int m_nnSender; diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp index f98f4708f..86baa58f2 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp @@ -60,8 +60,11 @@ bool SDRdaemonSinkOutput::start() stop(); return false; } - + m_sdrDaemonSinkThread->setRemoteAddress(m_settings.m_address, m_settings.m_dataPort); + m_sdrDaemonSinkThread->setCenterFrequency(m_settings.m_centerFrequency); m_sdrDaemonSinkThread->setSamplerate(m_settings.m_sampleRate); + m_sdrDaemonSinkThread->setTxDelay(m_settings.m_txDelay); + m_sdrDaemonSinkThread->setNbBlocksFEC(m_settings.m_nbFECBlocks); m_sdrDaemonSinkThread->connectTimer(m_masterTimer); m_sdrDaemonSinkThread->startWork(); @@ -107,11 +110,12 @@ std::time_t SDRdaemonSinkOutput::getStartingTimeStamp() const bool SDRdaemonSinkOutput::handleMessage(const Message& message) { - if (MsgConfigureSDRdaemonSink::match(message)) + + if (MsgConfigureSDRdaemonSink::match(message)) { - qDebug() << "SDRdaemonSinkOutput::handleMessage: MsgConfigureFileSink"; + qDebug() << "SDRdaemonSinkOutput::handleMessage:" << message.getIdentifier(); MsgConfigureSDRdaemonSink& conf = (MsgConfigureSDRdaemonSink&) message; - applySettings(conf.getSettings(), false); + applySettings(conf.getSettings(), conf.getForce()); return true; } else if (MsgConfigureSDRdaemonSinkWork::match(message)) diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h index 548c2a6ad..6a271f4d3 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h @@ -37,18 +37,21 @@ public: public: const SDRdaemonSinkSettings& getSettings() const { return m_settings; } + bool getForce() const { return m_force; } - static MsgConfigureSDRdaemonSink* create(const SDRdaemonSinkSettings& settings) + static MsgConfigureSDRdaemonSink* create(const SDRdaemonSinkSettings& settings, bool force = false) { - return new MsgConfigureSDRdaemonSink(settings); + return new MsgConfigureSDRdaemonSink(settings, force); } private: SDRdaemonSinkSettings m_settings; + bool m_force; - MsgConfigureSDRdaemonSink(const SDRdaemonSinkSettings& settings) : + MsgConfigureSDRdaemonSink(const SDRdaemonSinkSettings& settings, bool force) : Message(), - m_settings(settings) + m_settings(settings), + m_force(force) { } }; diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp index 5124ae9a2..ac8dc7be9 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp @@ -43,22 +43,26 @@ UDPSinkFEC::UDPSinkFEC() : m_currentMetaFEC.init(); m_bufMeta = new uint8_t[m_udpSize]; m_buf = new uint8_t[m_udpSize]; + m_udpThread = new QThread(); m_udpWorker = new UDPSinkFECWorker(); - m_udpWorker->moveToThread(&m_udpThread); - connect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages())); - m_udpThread.start(); + m_udpWorker->moveToThread(m_udpThread); + + connect(m_udpThread, SIGNAL(started()), m_udpWorker, SLOT(process())); + connect(m_udpWorker, SIGNAL(finished()), m_udpThread, SLOT(quit())); + + m_udpThread->start(); } UDPSinkFEC::~UDPSinkFEC() { - disconnect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages())); - m_udpThread.exit(); - m_udpThread.wait(); + m_udpWorker->stop(); + m_udpThread->wait(); delete[] m_buf; delete[] m_bufMeta; delete m_udpWorker; + delete m_udpThread; } void UDPSinkFEC::setTxDelay(uint32_t txDelay) @@ -81,6 +85,7 @@ void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port) void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize) { + //qDebug("UDPSinkFEC::write(: %u samples", sampleChunkSize); const SampleVector::iterator end = begin + sampleChunkSize; SampleVector::iterator it = begin; @@ -163,6 +168,7 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk int txDelay = m_txDelay; // TODO: send blocks + //qDebug("UDPSinkFEC::write: push frame to worker: %u", m_frameCount); m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount); //m_txThread = new std::thread(transmitUDP, this, m_txBlocks[m_txBlocksIndex], m_frameCount, nbBlocksFEC, txDelay, m_cm256Valid); //transmitUDP(this, m_txBlocks[m_txBlocksIndex], m_frameCount, m_nbBlocksFEC, m_txDelay, m_cm256Valid); @@ -179,26 +185,26 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk } } -UDPSinkFECWorker::UDPSinkFECWorker() : m_remotePort(9090) +UDPSinkFECWorker::UDPSinkFECWorker() : + m_running(false), + m_remotePort(9090) { m_cm256Valid = m_cm256.isInitialized(); + connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::DirectConnection); } UDPSinkFECWorker::~UDPSinkFECWorker() { - Message* message; - - while ((message = m_inputMessageQueue.pop()) != 0) - { - delete message; - } + disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + m_inputMessageQueue.clear(); } -void UDPSinkFECWorker::pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks, +void UDPSinkFECWorker::pushTxFrame(UDPSinkFEC::SuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex) { + //qDebug("UDPSinkFECWorker::pushTxFrame. %d", m_inputMessageQueue.size()); m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex)); } @@ -207,6 +213,26 @@ void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); } +void UDPSinkFECWorker::process() +{ + m_running = true; + + qDebug("UDPSinkFECWorker::process: started"); + + while (m_running) + { + usleep(250000); + } + + qDebug("UDPSinkFECWorker::process: stopped"); + emit finished(); +} + +void UDPSinkFECWorker::stop() +{ + m_running = false; +} + void UDPSinkFECWorker::handleInputMessages() { Message* message; @@ -216,11 +242,13 @@ void UDPSinkFECWorker::handleInputMessages() if (MsgUDPFECEncodeAndSend::match(*message)) { MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message; + encodeAndTransmit(sendMsg->getTxBlocks(), sendMsg->getFrameIndex(), sendMsg->getNbBlocsFEC(), sendMsg->getTxDelay()); } else if (MsgConfigureRemoteAddress::match(*message)) { + qDebug("UDPSinkFECWorker::handleInputMessages: %s", message->getIdentifier()); MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message; - m_remoteAddress.setAddress(addressMsg->getAddress()); + m_remoteAddress = addressMsg->getAddress(); m_remotePort = addressMsg->getPort(); } @@ -228,7 +256,7 @@ void UDPSinkFECWorker::handleInputMessages() } } -void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay) +void UDPSinkFECWorker::encodeAndTransmit(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay) { CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder @@ -236,9 +264,12 @@ void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t fr if ((nbBlocksFEC == 0) || !m_cm256Valid) { + qDebug("UDPSinkFECWorker::encodeAndTransmit: transmit frame without FEC to %s:%d", m_remoteAddress.toStdString().c_str(), m_remotePort); + for (int i = 0; i < UDPSinkFEC::m_nbOriginalBlocks; i++) { - m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); + m_socket.SendDataGram((const void *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress.toStdString(), (uint32_t) m_remotePort); + //m_udpSocket->writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); usleep(txDelay); } } @@ -265,7 +296,7 @@ void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t fr // Encode FEC blocks if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) { - qDebug() << "UDPSinkFECWorker::transmitUDP: CM256 encode failed. No transmission."; + qDebug("UDPSinkFECWorker::encodeAndTransmit: CM256 encode failed. No transmission."); return; } @@ -276,6 +307,9 @@ void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t fr } // Transmit all blocks + + qDebug("UDPSinkFECWorker::encodeAndTransmit: transmit frame with FEC to %s:%d", m_remoteAddress.toStdString().c_str(), m_remotePort); + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { #ifdef SDRDAEMON_PUNCTURE @@ -297,7 +331,8 @@ void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t fr // // std::cerr << std::endl; - m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); + m_socket.SendDataGram((const void *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, "127.0.0.1", (uint32_t) m_remotePort); + //m_udpSocket->writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); usleep(txDelay); } } diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h index 56133b897..a0a374e24 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -33,6 +32,8 @@ #include "util/messagequeue.h" #include "util/message.h" +#include "UDPSocket.h" + class UDPSinkFECWorker; class UDPSinkFEC : public QObject @@ -152,7 +153,7 @@ private: uint16_t m_frameCount; //!< transmission frame count int m_sampleIndex; //!< Current sample index in protected block data - QThread m_udpThread; + QThread *m_udpThread; UDPSinkFECWorker *m_udpWorker; }; @@ -165,13 +166,13 @@ public: { MESSAGE_CLASS_DECLARATION public: - const UDPSinkFEC::SuperBlock *getTxBlocks() const { return m_txBlockx; } + UDPSinkFEC::SuperBlock *getTxBlocks() const { return m_txBlockx; } uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; } uint32_t getTxDelay() const { return m_txDelay; } uint16_t getFrameIndex() const { return m_frameIndex; } static MsgUDPFECEncodeAndSend* create( - const UDPSinkFEC::SuperBlock *txBlocks, + UDPSinkFEC::SuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex) @@ -180,13 +181,13 @@ public: } private: - const UDPSinkFEC::SuperBlock *m_txBlockx; + UDPSinkFEC::SuperBlock *m_txBlockx; uint32_t m_nbBlocksFEC; uint32_t m_txDelay; uint16_t m_frameIndex; MsgUDPFECEncodeAndSend( - const UDPSinkFEC::SuperBlock *txBlocks, + UDPSinkFEC::SuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex) : @@ -222,24 +223,32 @@ public: UDPSinkFECWorker(); ~UDPSinkFECWorker(); - void pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks, + void pushTxFrame(UDPSinkFEC::SuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex); void setRemoteAddress(const QString& address, uint16_t port); + void stop(); MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication +signals: + void finished(); + public slots: + void process(); + +private slots: void handleInputMessages(); private: - void transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay); + void encodeAndTransmit(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay); - QUdpSocket m_udpSocket; + bool m_running; CM256 m_cm256; //!< CM256 library object bool m_cm256Valid; //!< true if CM256 library is initialized correctly - QHostAddress m_remoteAddress; + UDPSocket m_socket; + QString m_remoteAddress; uint16_t m_remotePort; };