From 596f2739b947ecd842e2ec66c38ca6c6fbf023cd Mon Sep 17 00:00:00 2001 From: Phil Taylor Date: Thu, 27 May 2021 11:41:08 +0100 Subject: [PATCH] Use ring buffer with rtaudio to eliminate mutexes --- audiohandler.cpp | 220 ++++++++++++--------- audiohandler.h | 14 +- ring/LICENSE | 21 ++ ring/README.md | 10 + ring/ring.cpp | 290 +++++++++++++++++++++++++++ ring/ring.h | 440 +++++++++++++++++++++++++++++++++++++++++ udphandler.cpp | 17 +- udphandler.h | 2 + udpserver.cpp | 12 +- wfview.vcxproj | 2 + wfview.vcxproj.filters | 7 + 11 files changed, 923 insertions(+), 112 deletions(-) create mode 100644 ring/LICENSE create mode 100644 ring/README.md create mode 100644 ring/ring.cpp create mode 100644 ring/ring.h diff --git a/audiohandler.cpp b/audiohandler.cpp index fff49c0..39265f1 100644 --- a/audiohandler.cpp +++ b/audiohandler.cpp @@ -28,6 +28,8 @@ audioHandler::~audioHandler() audio.stopStream(); audio.closeStream(); } + if (ringBuf != Q_NULLPTR) + delete ringBuf; } bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16 samplerate, const quint16 latency, const bool ulaw, const bool isinput, int port, quint8 resampleQuality) @@ -46,6 +48,9 @@ bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16 // chunk size is always relative to Internal Sample Rate. this->chunkSize = (INTERNAL_SAMPLE_RATE / 25) * radioChannels; + ringBuf = new wilt::Ring(100); // Should be customizable. + tempBuf.sent = 0; + if (port > 0) { aParams.deviceId = port; } @@ -55,7 +60,7 @@ bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16 else { aParams.deviceId = audio.getDefaultOutputDevice(); } - aParams.nChannels = channels; + aParams.nChannels = 2; // Internally this is always 2 channels aParams.firstChannel = 0; try { @@ -140,67 +145,70 @@ int audioHandler::readData(void* outputBuffer, void* inputBuffer, unsigned int n { // Calculate output length, always full samples int sentlen = 0; - qint16* buffer = (qint16*)outputBuffer; - //qDebug(logAudio()) << "looking for: " << nFrames << this->audioBuffer.size(); + + quint8* buffer = (quint8*)outputBuffer; if (status == RTAUDIO_OUTPUT_UNDERFLOW) qDebug(logAudio()) << "Underflow detected"; + unsigned int nBytes = nFrames * 2 * 2; // This is ALWAYS 2 bytes per sample and 2 channels - if (!audioBuffer.isEmpty()) + if (ringBuf->size()>0) { - mutex.lock(); // Output buffer is ALWAYS 16 bit. - auto packet = audioBuffer.begin(); - while (packet != audioBuffer.end() && sentlen < nFrames/2) + while (sentlen < nBytes) { - int timediff = packet->time.msecsTo(QTime::currentTime()); - - if (timediff > (int)audioLatency * 2) { - qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Packet " << hex << packet->seq << - " arrived too late (increase output latency!) " << - dec << packet->time.msecsTo(QTime::currentTime()) << "ms"; - while (packet != audioBuffer.end() && timediff > (int)audioLatency) { - timediff = packet->time.msecsTo(QTime::currentTime()); - lastSeq = packet->seq; - packet = audioBuffer.erase(packet); // returns next packet - } - if (packet == audioBuffer.end()) { - break; - } - } - - // If we got here then packet time must be within latency threshold - - if (packet->seq == lastSeq + 1 || packet->seq <= lastSeq) + audioPacket packet; + if (!ringBuf->try_read(packet)) { - int send = qMin((int)nFrames*2 - sentlen, packet->dataout.length() - packet->sent); - lastSeq = packet->seq; - //qInfo(logAudio()) << "Packet " << hex << packet->seq << " arrived on time " << Qt::dec << packet->time.msecsTo(QTime::currentTime()) << "ms"; - - memcpy(buffer + sentlen, packet->dataout.constData() + packet->sent, send); + qDebug() << "No more data available but buffer is not full! sentlen:" << sentlen << " nBytes:" << nBytes ; + return 0; + } + currentLatency = packet.time.msecsTo(QTime::currentTime()); + // This shouldn't be required but if we did output a partial packet + // This will add the remaining packet data to the output buffer. + if (tempBuf.sent != tempBuf.data.length()) + { + int send = qMin((int)nBytes - sentlen, tempBuf.data.length() - tempBuf.sent); + memcpy(buffer + sentlen, tempBuf.data.constData() + tempBuf.sent, send); + tempBuf.sent = tempBuf.sent + send; sentlen = sentlen + send; + qDebug(logAudio()) << "Adding partial:" << send; + } - if (send == packet->dataout.length() - packet->sent) - { - //qInfo(logAudio()) << "Get next packet"; - packet = audioBuffer.erase(packet); // returns next packet - } - else - { - // Store sent amount (could be zero if audioOutput buffer full) then break. - packet->sent = send; - break; - } + while (currentLatency > (int)audioLatency) { + qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Packet " << hex << packet.seq << + " arrived too late (increase output latency!) " << + dec << packet.time.msecsTo(QTime::currentTime()) << "ms"; + lastSeq = packet.seq; + if (!ringBuf->try_read(packet)) + return sentlen; + currentLatency = packet.time.msecsTo(QTime::currentTime()); } - else { - qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Missing audio packet(s) from: " << hex << lastSeq + 1 << " to " << hex << packet->seq - 1; - lastSeq = packet->seq; + + int send = qMin((int)nBytes - sentlen, packet.data.length()); + memcpy(buffer + sentlen, packet.data.constData(), send); + sentlen = sentlen + send; + if (send < packet.data.length()) + { + qDebug(logAudio()) << "Asking for partial, sent:" << send << "packet length" << packet.data.length(); + tempBuf = packet; + tempBuf.sent = tempBuf.sent + send; + //lastSeq = packet.seq; + //break; } + + if (packet.seq <= lastSeq) { + qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Duplicate/early audio packet: " << hex << lastSeq << " got " << hex << packet.seq; + } + else if (packet.seq != lastSeq + 1) { + qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Missing audio packet(s) from: " << hex << lastSeq + 1 << " to " << hex << packet.seq - 1; + } + lastSeq = packet.seq; } - mutex.unlock(); } + //qDebug(logAudio()) << "looking for: " << nBytes << " got: " << sentlen; return 0; } @@ -231,13 +239,13 @@ int audioHandler::writeData(void* outputBuffer, void* inputBuffer, unsigned int int send = qMin((int)(len - sentlen), (int)chunkSize - current->sent); - current->datain.append(QByteArray::fromRawData(data + sentlen, send)); + current->data.append(QByteArray::fromRawData(data + sentlen, send)); sentlen = sentlen + send; current->seq = 0; // Not used in TX current->time = QTime::currentTime(); - current->sent = current->datain.length(); + current->sent = current->data.length(); if (current->sent == chunkSize) { @@ -274,64 +282,82 @@ void audioHandler::stateChanged(QAudio::State state) -void audioHandler::incomingAudio(audioPacket data) +int audioHandler::incomingAudio(audioPacket data) { // No point buffering audio until stream is actually running. if (!audio.isStreamRunning()) { - return; + qDebug(logAudio()) << "Packet received before stream was started"; + return currentLatency; } - // Incoming data is 8bits? - if (radioSampleBits == 8) + // Incoming data is 8bits? + if (radioSampleBits == 8) + { + QByteArray outPacket((int)data.data.length() * 2, (char)0xff); + qint16* out = (qint16*)outPacket.data(); + for (int f = 0; f < data.data.length(); f++) { - QByteArray inPacket((int)data.datain.length() * 2, (char)0xff); - qint16* in = (qint16*)inPacket.data(); - for (int f = 0; f < data.datain.length(); f++) + if (isUlaw) { - if (isUlaw) - { - in[f] = ulaw_decode[(quint8)data.datain[f]]; - } - else - { - // Convert 8-bit sample to 16-bit - in[f] = (qint16)(((quint8)data.datain[f] << 8) - 32640); - } + out[f] = ulaw_decode[(quint8)data.data[f]]; + } + else + { + // Convert 8-bit sample to 16-bit + out[f] = (qint16)(((quint8)data.data[f] << 8) - 32640); } - data.datain = inPacket; // Replace incoming data with converted. } + data.data.clear(); + data.data = outPacket; // Replace incoming data with converted. + } - //qInfo(logAudio()) << "Adding packet to buffer:" << data.seq << ": " << data.datain.length(); + //qInfo(logAudio()) << "Adding packet to buffer:" << data.seq << ": " << data.data.length(); - /* We now have an array of 16bit samples in the NATIVE samplerate of the radio - If the radio sample rate is below 48000, we need to resample. - */ + /* We now have an array of 16bit samples in the NATIVE samplerate of the radio + If the radio sample rate is below 48000, we need to resample. + */ - if (ratioDen != 1) { + if (ratioDen != 1) { - // We need to resample - quint32 outFrames = ((data.datain.length() / 2) * ratioDen) / radioChannels; - quint32 inFrames = (data.datain.length() / 2) / radioChannels; - data.dataout.resize(outFrames * 2 * radioChannels); // Preset the output buffer size. + // We need to resample + quint32 outFrames = ((data.data.length() / 2) * ratioDen) / radioChannels; + quint32 inFrames = (data.data.length() / 2) / radioChannels; + QByteArray outPacket(outFrames * 2 * radioChannels, 0xff); // Preset the output buffer size. - int err = 0; - if (this->radioChannels == 1) { - err = wf_resampler_process_int(resampler, 0, (const qint16*)data.datain.constData(), &inFrames, (qint16*)data.dataout.data(), &outFrames); - } - else { - err = wf_resampler_process_interleaved_int(resampler, (const qint16*)data.datain.constData(), &inFrames, (qint16*)data.dataout.data(), &outFrames); - } - if (err) { - qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames; - } + int err = 0; + if (this->radioChannels == 1) { + err = wf_resampler_process_int(resampler, 0, (const qint16*)data.data.constData(), &inFrames, (qint16*)outPacket.data(), &outFrames); } else { - data.dataout = data.datain; + err = wf_resampler_process_interleaved_int(resampler, (const qint16*)data.data.constData(), &inFrames, (qint16*)outPacket.data(), &outFrames); } + if (err) { + qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames; + } + data.data.clear(); + data.data = outPacket; // Replace incoming data with converted. + } - mutex.lock(); - audioBuffer.insert( data.seq, data ); - mutex.unlock(); + if (radioChannels == 1) + { + // Convert to stereo + QByteArray outPacket(data.data.length()*2, 0xff); // Preset the output buffer size. + qint16* in = (qint16*)data.data.data(); + qint16* out = (qint16*)outPacket.data(); + for (int f = 0; f < data.data.length()/2; f++) + { + *out++ = *in; + *out++ = *in++; + } + data.data.clear(); + data.data = outPacket; // Replace incoming data with converted. + } + + if (!ringBuf->try_write(data)) + { + qDebug(logAudio()) << "Buffer full! capacity:" << ringBuf->capacity() << "length" << ringBuf->size(); + } + return currentLatency; } void audioHandler::changeLatency(const quint16 newSize) @@ -367,7 +393,7 @@ void audioHandler::getNextAudioChunk(QByteArray& ret) packet = audioBuffer.erase(packet); // returns next packet } else { - if (packet->datain.length() == chunkSize && ret.length() == 0) + if (packet->data.length() == chunkSize && ret.length() == 0) { // We now have an array of samples in the computer native format (48000) // If the radio sample rate is below 48000, we need to resample. @@ -376,11 +402,11 @@ void audioHandler::getNextAudioChunk(QByteArray& ret) if (ratioNum != 1) { // We need to resample (we are STILL 16 bit!) - quint32 outFrames = ((packet->datain.length() / 2) / ratioNum) / radioChannels; - quint32 inFrames = (packet->datain.length() / 2) / radioChannels; + quint32 outFrames = ((packet->data.length() / 2) / ratioNum) / radioChannels; + quint32 inFrames = (packet->data.length() / 2) / radioChannels; packet->dataout.resize(outFrames * 2 * radioChannels); // Preset the output buffer size. - const qint16* in = (qint16*)packet->datain.constData(); + const qint16* in = (qint16*)packet->data.constData(); qint16* out = (qint16*)packet->dataout.data(); int err = 0; if (this->radioChannels == 1) { @@ -393,22 +419,22 @@ void audioHandler::getNextAudioChunk(QByteArray& ret) qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames; } //qInfo(logAudio()) << "Resampler run " << err << " inFrames:" << inFrames << " outFrames:" << outFrames; - //qInfo(logAudio()) << "Resampler run inLen:" << packet->datain.length() << " outLen:" << packet->dataout.length(); + //qInfo(logAudio()) << "Resampler run inLen:" << packet->data.length() << " outLen:" << packet->dataout.length(); if (radioSampleBits == 8) { - packet->datain = packet->dataout; // Copy output packet back to input buffer. + packet->data = packet->dataout; // Copy output packet back to input buffer. packet->dataout.clear(); // Buffer MUST be cleared ready to be re-filled by the upsampling below. } } else if (radioSampleBits == 16) { // Only copy buffer if radioSampleBits is 16, as it will be handled below otherwise. - packet->dataout = packet->datain; + packet->dataout = packet->data; } // Do we need to convert 16-bit to 8-bit? if (radioSampleBits == 8) { - packet->dataout.resize(packet->datain.length() / 2); - qint16* in = (qint16*)packet->datain.data(); + packet->dataout.resize(packet->data.length() / 2); + qint16* in = (qint16*)packet->data.data(); for (int f = 0; f < packet->dataout.length(); f++) { quint8 outdata = 0; diff --git a/audiohandler.h b/audiohandler.h index 539c991..dda7c7c 100644 --- a/audiohandler.h +++ b/audiohandler.h @@ -18,6 +18,7 @@ typedef signed short MY_TYPE; #include #include #include "resampler/speex_resampler.h" +#include "ring/ring.h" #include @@ -33,8 +34,7 @@ struct audioPacket { quint32 seq; QTime time; quint16 sent; - QByteArray datain; - QByteArray dataout; + QByteArray data; }; class audioHandler : public QObject @@ -57,10 +57,10 @@ public: bool isSequential() const; void getNextAudioChunk(QByteArray &data); bool isChunkAvailable(); + int incomingAudio(const audioPacket data); private slots: bool init(const quint8 bits, const quint8 channels, const quint16 samplerate, const quint16 latency, const bool isulaw, const bool isinput, int port, quint8 resampleQuality); - void incomingAudio(const audioPacket data); void changeLatency(const quint16 newSize); void notified(); void stateChanged(QAudio::State state); @@ -111,8 +111,12 @@ private: unsigned int ratioNum; unsigned int ratioDen; - QMutex mutex; - volatile bool lock=false; + + wilt::Ring *ringBuf=Q_NULLPTR; + volatile bool ready = false; + audioPacket tempBuf; + quint16 currentLatency; + }; #endif // AUDIOHANDLER_H diff --git a/ring/LICENSE b/ring/LICENSE new file mode 100644 index 0000000..b9a4d39 --- /dev/null +++ b/ring/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Trevor Wilson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ring/README.md b/ring/README.md new file mode 100644 index 0000000..2cdbbe9 --- /dev/null +++ b/ring/README.md @@ -0,0 +1,10 @@ +# Ring Library + +## Overview + +This library provides source for a multi-producer multi-consumer lock-free ring buffer. It provides a very simple interface for writing and reading from the buffer. The source includes a `Ring_` class, that provides the raw implementation and C-like facilities, as well as a templated `Ring` class for typed reads and writes. + + +## Contact + +If you have any questions, concerns, or recommendations please feel free to e-mail me at kmdreko@gmail.com. If you notice a bug or defect, create an issue to report it. diff --git a/ring/ring.cpp b/ring/ring.cpp new file mode 100644 index 0000000..d50555d --- /dev/null +++ b/ring/ring.cpp @@ -0,0 +1,290 @@ +//////////////////////////////////////////////////////////////////////////////// +// FILE: ring.cpp +// DATE: 2016-02-25 +// AUTH: Trevor Wilson +// DESC: Implements a lock-free, multi-consumer, multi-producer ring buffer +// class + +//////////////////////////////////////////////////////////////////////////////// +// Copyright (c) 2016 Trevor Wilson +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files(the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions : +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#include "ring.h" +using namespace wilt; + +#include +// - std::memcpy + +Ring_::Ring_() + : beg_(nullptr) + , end_(nullptr) +{ + std::atomic_init(&used_, static_cast(0)); + std::atomic_init(&free_, static_cast(0)); + std::atomic_init(&rbuf_, static_cast(0)); + std::atomic_init(&rptr_, static_cast(0)); + std::atomic_init(&wptr_, static_cast(0)); + std::atomic_init(&wbuf_, static_cast(0)); +} + +Ring_::Ring_(std::size_t size) + : beg_(new char[size]) + , end_(beg_ + size) +{ + std::atomic_init(&used_, static_cast(0)); + std::atomic_init(&free_, static_cast(size)); + std::atomic_init(&rbuf_, beg_); + std::atomic_init(&rptr_, beg_); + std::atomic_init(&wptr_, beg_); + std::atomic_init(&wbuf_, beg_); +} + +Ring_::Ring_(Ring_&& ring) + : beg_(ring.beg_) + , end_(ring.end_) +{ + std::atomic_init(&used_, ring.used_.load()); + std::atomic_init(&free_, ring.free_.load()); + std::atomic_init(&rbuf_, ring.rbuf_.load()); + std::atomic_init(&rptr_, ring.rptr_.load()); + std::atomic_init(&wptr_, ring.wptr_.load()); + std::atomic_init(&wbuf_, ring.wbuf_.load()); + + ring.beg_ = nullptr; + ring.end_ = nullptr; + + ring.used_.store(0); + ring.free_.store(0); + ring.rbuf_.store(nullptr); + ring.rptr_.store(nullptr); + ring.wptr_.store(nullptr); + ring.wbuf_.store(nullptr); +} + +Ring_& Ring_::operator= (Ring_&& ring) +{ + delete[] beg_; + + beg_ = ring.beg_; + end_ = ring.end_; + + used_.store(ring.used_.load()); + free_.store(ring.free_.load()); + rbuf_.store(ring.rbuf_.load()); + rptr_.store(ring.rptr_.load()); + wptr_.store(ring.wptr_.load()); + wbuf_.store(ring.wbuf_.load()); + + ring.beg_ = nullptr; + ring.end_ = nullptr; + + ring.used_.store(0); + ring.free_.store(0); + ring.rbuf_.store(nullptr); + ring.rptr_.store(nullptr); + ring.wptr_.store(nullptr); + ring.wbuf_.store(nullptr); + + return *this; +} + +Ring_::~Ring_() +{ + delete[] beg_; +} + +std::size_t Ring_::size() const +{ + // The 'used' space can be negative in an over-reserved case, but it can be + // clamped to 0 for simplicity. + + auto s = used_.load(); + return s < 0 ? 0 : static_cast(s); +} + +std::size_t Ring_::capacity() const +{ + return static_cast(end_ - beg_); +} + +void Ring_::read(void* data, std::size_t length) noexcept +{ + auto block = acquire_read_block_(length); + + copy_read_block_(block, (char*)data, length); + release_read_block_(block, length); +} + +void Ring_::write(const void* data, std::size_t length) noexcept +{ + auto block = acquire_write_block_(length); + + copy_write_block_(block, (const char*)data, length); + release_write_block_(block, length); +} + +bool Ring_::try_read(void* data, std::size_t length) noexcept +{ + auto block = try_acquire_read_block_(length); + if (block == nullptr) + return false; + + copy_read_block_(block, (char*)data, length); + release_read_block_(block, length); + + return true; +} + +bool Ring_::try_write(const void* data, std::size_t length) noexcept +{ + auto block = try_acquire_write_block_(length); + if (block == nullptr) + return false; + + copy_write_block_(block, (const char*)data, length); + release_write_block_(block, length); + + return true; +} + +char* Ring_::normalize_(char* ptr) +{ + return ptr < end_ ? ptr : ptr - capacity(); +} + +char* Ring_::acquire_read_block_(std::size_t length) +{ + auto size = static_cast(length); + while (true) // loop while conflict + { + auto old_rptr = rptr_.load(std::memory_order_consume); // read rptr + while (used_.load(std::memory_order_consume) < size) // check for data + ; // spin until success + + auto new_rptr = normalize_(old_rptr + size); // get block end + used_.fetch_sub(size); // reserve + if (rptr_.compare_exchange_strong(old_rptr, new_rptr)) // try commit + return old_rptr; // committed + + used_.fetch_add(size, std::memory_order_relaxed); // un-reserve + } +} + +char* Ring_::try_acquire_read_block_(std::size_t length) +{ + auto size = static_cast(length); + while (true) // loop while conflict + { + auto old_rptr = rptr_.load(std::memory_order_consume); // read rptr + if (used_.load(std::memory_order_consume) < size) // check for data + return nullptr; // return failure + + auto new_rptr = normalize_(old_rptr + size); // get block end + used_.fetch_sub(size); // reserve + if (rptr_.compare_exchange_strong(old_rptr, new_rptr)) // try commit + return old_rptr; // committed + + used_.fetch_add(size, std::memory_order_relaxed); // un-reserve + } +} + +void Ring_::copy_read_block_(const char* block, char* data, std::size_t length) +{ + if (block + length < end_) + { + std::memcpy(data, block, length); + } + else + { + auto first = end_ - block; + std::memcpy(data, block, first); + std::memcpy(data + first, beg_, length - first); + } +} + +void Ring_::release_read_block_(char* old_rptr, std::size_t length) +{ + auto new_rptr = normalize_(old_rptr + length); // get block end + while (rbuf_.load() != old_rptr) // check for earlier reads + ; // spin until reads complete + + rbuf_.store(new_rptr); // finish commit + free_.fetch_add(length, std::memory_order_relaxed); // add to free space +} + +char* Ring_::acquire_write_block_(std::size_t length) +{ + auto size = static_cast(length); + while (true) // loop while conflict + { + auto old_wbuf = wbuf_.load(std::memory_order_consume); // read wbuf + while (free_.load(std::memory_order_consume) < size) // check for space + ; // spin until success + + auto new_wbuf = normalize_(old_wbuf + size); // get block end + free_.fetch_sub(size); // reserve + if (wbuf_.compare_exchange_strong(old_wbuf, new_wbuf)) // try commit + return old_wbuf; // committed + + free_.fetch_add(size, std::memory_order_relaxed); // un-reserve + } +} + +char* Ring_::try_acquire_write_block_(std::size_t length) +{ + auto size = static_cast(length); + while (true) // loop while conflict + { + auto old_wbuf = wbuf_.load(std::memory_order_consume); // read wbuf + if (free_.load(std::memory_order_consume) < size) // check for space + return nullptr; // return failure + + auto new_wbuf = normalize_(old_wbuf + size); // get block end + free_.fetch_sub(size); // reserve + if (wbuf_.compare_exchange_strong(old_wbuf, new_wbuf)) // try commit + return old_wbuf; // committed + + free_.fetch_add(size, std::memory_order_relaxed); // un-reserve + } +} + +void Ring_::copy_write_block_(char* block, const char* data, std::size_t length) +{ + if (block + length < end_) + { + std::memcpy(block, data, length); + } + else + { + auto first = end_ - block; + std::memcpy(block, data, first); + std::memcpy(beg_, data + first, length - first); + } +} + +void Ring_::release_write_block_(char* old_wbuf, std::size_t length) +{ + auto new_wbuf = normalize_(old_wbuf + length); // get block end + while (wptr_.load() != old_wbuf) // wait for earlier writes + ; // spin until writes complete + + wptr_.store(new_wbuf); // finish commit + used_.fetch_add(length, std::memory_order_relaxed); // add to used space +} diff --git a/ring/ring.h b/ring/ring.h new file mode 100644 index 0000000..cec5852 --- /dev/null +++ b/ring/ring.h @@ -0,0 +1,440 @@ +//////////////////////////////////////////////////////////////////////////////// +// FILE: ring.h +// DATE: 2016-02-25 +// AUTH: Trevor Wilson +// DESC: Defines a lock-free, multi-consumer, multi-producer ring buffer class + +//////////////////////////////////////////////////////////////////////////////// +// Copyright (c) 2016 Trevor Wilson +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files(the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions : +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#ifndef WILT_RING_H +#define WILT_RING_H + +#include +// - std::atomic +#include +// - std::size_t +// - std::ptrdiff_t +#include +// - ::new(ptr) +#include +// - std::is_nothrow_copy_constructible +// - std::is_nothrow_move_constructible +// - std::is_nothrow_move_assignable +// - std::is_nothrow_destructible +#include +// - std::move + +namespace wilt +{ + ////////////////////////////////////////////////////////////////////////////// + // This structure aims to access elements in a ring buffer from multiple + // concurrent readers and writers in a lock-free manner. + // + // The class works by allocating the array and storing two pointers (for the + // beginning and end of the allocated space). Two atomic pointers are used to + // track the beginning and end of the currently used storage space. To + // facilitate concurrent reads and writes, theres a read buffer pointer before + // the read pointer for data currently being read, and a corresponding write + // buffer pointer beyond the write pointer for data currently being written. + // These buffer pointers cannot overlap. Just using these pointers suffer from + // some minute inefficiencies and a few ABA problems. Therfore, atomic + // integers are used to store the currently used and currently free sizes. + // + // It allows multiple readers and multiple writers by implementing a reserve- + // commit system. A thread wanting to read will check the used size to see if + // there's enough data. If there is, it subtracts from the used size to + // 'reserve' the read. It then does a compare-exchange to 'commit' by + // increasing the read pointer. If that fails, then it backs out ('un- + // reserves') by adding back to the used size and tries again. If it + // succeeds, then it proceeds to read the data. In order to complete, the + // reader must update the read buffer pointer to where it just finished + // reading from. However, because other readers that started before may not be + // done yet, the reader must wait until the read buffer pointer points to + // where the read started. Only, then is the read buffer pointer updated, and + // the free size increased. So while this implementation is lock-free, it is + // not wait-free. This same principle works the same when writing (ammended + // for the appropriate pointers). + // + // If two readers try to read at the same time and there is only enough data + // for one of them. The used size MAY be negative because they both 'reserve' + // the data. This is an over-reserved state. But the compare-exchange will + // only allow one reader to 'commit' to the read and the other will 'un- + // reserve' the read. + // + // |beg |rptr used=5 |wbuf - unused + // |----|----|++++|====|====|====|====|====|++++|----| + modifying + // free=3 |rbuf |wptr |end = used + // + // The diagram above shows a buffer of size 10 storing 5 bytes with a reader + // reading one byte and one writer reading one byte. + // + // Out of the box, the class works by reading and writing raw bytes from POD + // data types and arrays. A wrapper could allow for a nicer interface for + // pushing and popping elements. As it stands, this structure cannot be easily + // modified to store types of variable size. + + class Ring_ + { + private: + //////////////////////////////////////////////////////////////////////////// + // TYPE DEFINITIONS + //////////////////////////////////////////////////////////////////////////// + + typedef char* data_ptr; + typedef std::atomic size_type; + typedef std::atomic atom_ptr; + + private: + //////////////////////////////////////////////////////////////////////////// + // PRIVATE MEMBERS + //////////////////////////////////////////////////////////////////////////// + // Beginning and end pointers don't need to be atomic because they don't + // change. used_ and free_ can be negative in certain cases (and that's ok). + + data_ptr beg_; // pointer to beginning of data block + data_ptr end_; // pointer to end of data block + + alignas(64) + size_type used_; // size of unreserved used space + + alignas(64) + size_type free_; // size of unreserved free space + + alignas(64) + atom_ptr rbuf_; // pointer to beginning of data being read + atom_ptr rptr_; // pointer to beginning of data + + alignas(64) + atom_ptr wptr_; // pointer to end of data + atom_ptr wbuf_; // pointer to end of data being written + + public: + //////////////////////////////////////////////////////////////////////////// + // CONSTRUCTORS AND DESTRUCTORS + //////////////////////////////////////////////////////////////////////////// + + // Constructs a ring without a buffer (capacity() == 0) + Ring_(); + + // Constructs a ring with a buffer with a size + Ring_(std::size_t size); + + // Moves the buffer between rings, assumes no concurrent operations + Ring_(Ring_&& ring); + + // Moves the buffer between rings, assumes no concurrent operations on + // either ring. Deallocates the buffer + Ring_& operator= (Ring_&& ring); + + // No copying + Ring_(const Ring_&) = delete; + Ring_& operator= (const Ring_&) = delete; + + // Deallocates the buffer + ~Ring_(); + + public: + //////////////////////////////////////////////////////////////////////////// + // QUERY FUNCTIONS + //////////////////////////////////////////////////////////////////////////// + // Functions only report on the state of the ring + + // Returns the current amount of non-reserved used space (amount of written + // data that a read hasn't yet reserved). Over-reserved scenarios mean this + // number is not the ultimate source of truth with concurrent operations, + // but its the closest safe approximation. This, of course, doesn't report + // writes that have not completed. + std::size_t size() const; + + // Maximum amount of data that can be held + std::size_t capacity() const; + + public: + //////////////////////////////////////////////////////////////////////////// + // ACCESSORS AND MODIFIERS + //////////////////////////////////////////////////////////////////////////// + // All operations assume object has not been moved. Blocking operations run + // until operation is completed. Non-blocking operations fail if there is + // not enough space + + void read(void* data, std::size_t length) noexcept; + void write(const void* data, std::size_t length) noexcept; + bool try_read(void* data, std::size_t length) noexcept; + bool try_write(const void* data, std::size_t length) noexcept; + + protected: + //////////////////////////////////////////////////////////////////////////// + // PROTECTED FUNCTIONS + //////////////////////////////////////////////////////////////////////////// + // Helper functions + + // Wraps a pointer within the array. Assumes 'beg_ <= ptr < end_+capacity()' + char* normalize_(char*); + + char* acquire_read_block_(std::size_t length); + char* try_acquire_read_block_(std::size_t length); + void copy_read_block_(const char* block, char* data, std::size_t length); + void release_read_block_(char* block, std::size_t length); + + char* acquire_write_block_(std::size_t length); + char* try_acquire_write_block_(std::size_t length); + void copy_write_block_(char* block, const char* data, std::size_t length); + void release_write_block_(char* block, std::size_t length); + + char* begin_alloc_() { return beg_; } + const char* begin_alloc_() const { return beg_; } + char* end_alloc_() { return end_; } + const char* end_alloc_() const { return end_; } + char* begin_data_() { return rptr_; } + const char* begin_data_() const { return rptr_; } + char* end_data_() { return wptr_; } + const char* end_data_() const { return wptr_; } + + }; // class Ring_ + + template + class Ring : protected Ring_ + { + public: + //////////////////////////////////////////////////////////////////////////// + // CONSTRUCTORS AND DESTRUCTORS + //////////////////////////////////////////////////////////////////////////// + + // Constructs a ring without a buffer (capacity() == 0) + Ring(); + + // Constructs a ring with a buffer with a size + Ring(std::size_t size); + + // Moves the buffer between rings, assumes no concurrent operations + Ring(Ring&& ring); + + // Moves the buffer between rings, assumes no concurrent operations on + // either ring. Deallocates the buffer + Ring& operator= (Ring&& ring); + + // No copying + Ring(const Ring_&) = delete; + Ring& operator= (const Ring_&) = delete; + + // Deallocates the buffer, destructs stored data. + ~Ring(); + + public: + //////////////////////////////////////////////////////////////////////////// + // QUERY FUNCTIONS + //////////////////////////////////////////////////////////////////////////// + // Functions only report on the state of the ring + + // Returns the current amount of non-reserved used space (amount of written + // data that a read hasn't yet reserved). Over-reserved scenarios mean this + // number is not the ultimate source of truth with concurrent operations, + // but its the closest safe approximation. This, of course, doesn't report + // writes that have not completed. + std::size_t size() const; + + // Maximum amount of data that can be held + std::size_t capacity() const; + + public: + //////////////////////////////////////////////////////////////////////////// + // ACCESSORS AND MODIFIERS + //////////////////////////////////////////////////////////////////////////// + // All operations assume object has not been moved. Blocking operations run + // until operation is completed. Non-blocking operations fail if there is + // not enough space + + void read(T& data) noexcept; // blocking read + void write(const T& data) noexcept; // blocking write + void write(T&& data) noexcept; // blocking write + bool try_read(T& data) noexcept; // non-blocking read + bool try_write(const T& data) noexcept; // non-blocking write + bool try_write(T&& data) noexcept; // non-blocking write + + private: + //////////////////////////////////////////////////////////////////////////// + // PRIVATE HELPER FUNCTIONS + //////////////////////////////////////////////////////////////////////////// + + void destruct_(); + + }; // class Ring + + template + Ring::Ring() + : Ring_() + { } + + template + Ring::Ring(std::size_t size) + : Ring_(size * sizeof(T)) + { } + + template + Ring::Ring(Ring&& ring) + : Ring_(std::move(ring)) + { } + + template + Ring& Ring::operator= (Ring&& ring) + { + destruct_(); + + Ring_::operator= (ring); + + return *this; + } + + template + Ring::~Ring() + { + destruct_(); + } + + template + void Ring::destruct_() + { + if (size() == 0) + return; + + auto itr = begin_data_(); + auto end = end_data_(); + do + { + auto t = reinterpret_cast(itr); + t->~T(); + + itr = normalize_(itr + sizeof(T)); + } while (itr != end); + } + + template + std::size_t Ring::size() const + { + return Ring_::size() / sizeof(T); + } + + template + std::size_t Ring::capacity() const + { + return Ring_::capacity() / sizeof(T); + } + + template + void Ring::read(T& data) noexcept + { + static_assert(std::is_nothrow_move_assignable::value, "T move assignment must not throw"); + static_assert(std::is_nothrow_destructible::value, "T destructor must not throw"); + + auto block = acquire_read_block_(sizeof(T)); + + // critical section + auto t = reinterpret_cast(block); + data = std::move(*t); + t->~T(); + + release_read_block_(block, sizeof(T)); + } + + template + void Ring::write(const T& data) noexcept + { + static_assert(std::is_nothrow_copy_constructible::value, "T copy constructor must not throw"); + + auto block = acquire_write_block_(sizeof(T)); + + // critical section + new(block) T(data); + + release_write_block_(block, sizeof(T)); + } + + template + void Ring::write(T&& data) noexcept + { + static_assert(std::is_nothrow_move_constructible::value, "T move constructor must not throw"); + + auto block = acquire_write_block_(sizeof(T)); + + // critical section + new(block) T(std::move(data)); + + release_write_block_(block, sizeof(T)); + } + + template + bool Ring::try_read(T& data) noexcept + { + static_assert(std::is_nothrow_move_assignable::value, "T move assignment must not throw"); + static_assert(std::is_nothrow_destructible::value, "T destructor must not throw"); + + auto block = try_acquire_read_block_(sizeof(T)); + if (block == nullptr) + return false; + + // critical section + auto t = reinterpret_cast(block); + data = std::move(*t); + t->~T(); + + release_read_block_(block, sizeof(T)); + + return true; + } + + template + bool Ring::try_write(const T& data) noexcept + { + static_assert(std::is_nothrow_copy_constructible::value, "T copy constructor must not throw"); + + auto block = try_acquire_write_block_(sizeof(T)); + if (block == nullptr) + return false; + + // critical section + new(block) T(data); + + release_write_block_(block, sizeof(T)); + + return true; + } + + template + bool Ring::try_write(T&& data) noexcept + { + static_assert(std::is_nothrow_move_constructible::value, "T move constructor must not throw"); + + auto block = try_acquire_write_block_(sizeof(T)); + if (block == nullptr) + return false; + + // critical section + new(block) T(std::move(data)); + + release_write_block_(block, sizeof(T)); + + return true; + } + +} // namespace wilt + +#endif // !WILT_RING_H \ No newline at end of file diff --git a/udphandler.cpp b/udphandler.cpp index 6220225..22b248b 100644 --- a/udphandler.cpp +++ b/udphandler.cpp @@ -189,7 +189,16 @@ void udpHandler::dataReceived() totallost = totallost + civ->packetsLost; } - emit haveNetworkStatus(" rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(totallost) + "/" + QString::number(totalsent) + ")"); + QString tempLatency; + if (rxLatency > audio->audioLatency) + { + tempLatency = QString::number(audio->audioLatency) + "ms"; + } + else { + tempLatency = "" + QString::number(audio->audioLatency) + "ms"; + } + + emit haveNetworkStatus("rx latency: " + tempLatency + " ms / rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(totallost) + "/" + QString::number(totalsent) + ")"); } break; } @@ -936,11 +945,11 @@ void udpAudio::dataReceived() tempAudio.seq = (quint32)seqPrefix << 16 | in->seq; tempAudio.time = lastReceived; tempAudio.sent = 0; - tempAudio.datain = r.mid(0x18); + tempAudio.data = r.mid(0x18); // Prefer signal/slot to forward audio as it is thread/safe // Need to do more testing but latency appears fine. - emit haveAudioData(tempAudio); - //rxaudio->incomingAudio(tempAudio); + //emit haveAudioData(tempAudio); + audioLatency=rxaudio->incomingAudio(tempAudio); } break; } diff --git a/udphandler.h b/udphandler.h index 59e7328..a835d95 100644 --- a/udphandler.h +++ b/udphandler.h @@ -178,6 +178,8 @@ public: udpAudio(QHostAddress local, QHostAddress ip, quint16 aport, quint16 rxlatency, quint16 txlatency, quint16 rxsample, quint8 rxcodec, quint16 txsample, quint8 txcodec, int outputPort, int inputPort, quint8 resampleQuality); ~udpAudio(); + int audioLatency = 0; + signals: void haveAudioData(audioPacket data); diff --git a/udpserver.cpp b/udpserver.cpp index d392c7e..23a1dff 100644 --- a/udpserver.cpp +++ b/udpserver.cpp @@ -695,7 +695,7 @@ void udpServer::audioReceived() tempAudio.seq = (quint32)current->seqPrefix << 16 | in->seq; tempAudio.time = QTime::currentTime();; tempAudio.sent = 0; - tempAudio.datain = r.mid(0x18); + tempAudio.data = r.mid(0x18); //qInfo(logUdpServer()) << "sending tx audio " << in->seq; emit haveAudioData(tempAudio); } @@ -1305,9 +1305,9 @@ void udpServer::sendRxAudio() while (len < audio.length()) { audioPacket partial; - partial.datain = audio.mid(len, 1364); + partial.data = audio.mid(len, 1364); receiveAudioData(partial); - len = len + partial.datain.length(); + len = len + partial.data.length(); } } } @@ -1322,15 +1322,15 @@ void udpServer::receiveAudioData(const audioPacket &d) if (client != Q_NULLPTR && client->connected) { audio_packet p; memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00! - p.len = sizeof(p) + d.datain.length(); + p.len = sizeof(p) + d.data.length(); p.sentid = client->myId; p.rcvdid = client->remoteId; p.ident = 0x0080; // audio is always this? - p.datalen = (quint16)qToBigEndian((quint16)d.datain.length()); + p.datalen = (quint16)qToBigEndian((quint16)d.data.length()); p.sendseq = (quint16)qToBigEndian((quint16)client->sendAudioSeq); // THIS IS BIG ENDIAN! p.seq = client->txSeq; QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); - t.append(d.datain); + t.append(d.data); client->txMutex.lock(); client->txSeqBuf.append(SEQBUFENTRY()); client->txSeqBuf.last().seqNum = p.seq; diff --git a/wfview.vcxproj b/wfview.vcxproj index 1dc80f4..d977fb5 100644 --- a/wfview.vcxproj +++ b/wfview.vcxproj @@ -199,6 +199,7 @@ + @@ -222,6 +223,7 @@ + diff --git a/wfview.vcxproj.filters b/wfview.vcxproj.filters index e09d8db..06a9b64 100644 --- a/wfview.vcxproj.filters +++ b/wfview.vcxproj.filters @@ -120,6 +120,9 @@ Source Files + + Source Files + @@ -197,6 +200,9 @@ Header Files + + Header Files + @@ -354,6 +360,7 @@ Resource Files +