From 019f39c22020f66473e71ef9cec80ffd84954b2f Mon Sep 17 00:00:00 2001 From: Marcin Kondej Date: Sun, 21 May 2023 01:21:38 +0200 Subject: [PATCH] Conditional variable support added --- fm_transmitter.cpp | 9 +-- transmitter.cpp | 135 ++++++++++++++++++++++++++------------------- transmitter.hpp | 7 ++- wave_reader.cpp | 38 ++++++++----- wave_reader.hpp | 7 ++- 5 files changed, 117 insertions(+), 79 deletions(-) diff --git a/fm_transmitter.cpp b/fm_transmitter.cpp index b5b158a..32946e4 100644 --- a/fm_transmitter.cpp +++ b/fm_transmitter.cpp @@ -36,7 +36,8 @@ #include #include -bool stop = false; +std::mutex mtx; +bool enable = true; Transmitter *transmitter = nullptr; void sigIntHandler(int sigNum) @@ -44,7 +45,7 @@ void sigIntHandler(int sigNum) if (transmitter != nullptr) { std::cout << "Stopping..." << std::endl; transmitter->Stop(); - stop = true; + enable = false; } } @@ -97,14 +98,14 @@ int main(int argc, char** argv) if ((optind == argc) && loop) { optind = filesOffset; } - WaveReader reader(filename != "-" ? filename : std::string(), stop); + WaveReader reader(filename != "-" ? filename : std::string(), enable, mtx); WaveHeader header = reader.GetHeader(); std::cout << "Playing: " << reader.GetFilename() << ", " << header.sampleRate << " Hz, " << header.bitsPerSample << " bits, " << ((header.channels > 0x01) ? "stereo" : "mono") << std::endl; transmitter->Transmit(reader, frequency, bandwidth, dmaChannel, optind < argc); - } while (!stop && (optind < argc)); + } while (enable && (optind < argc)); } catch (std::exception &catched) { std::cout << "Error: " << catched.what() << std::endl; result = EXIT_FAILURE; diff --git a/transmitter.cpp b/transmitter.cpp index ca1aeff..d7f7497 100644 --- a/transmitter.cpp +++ b/transmitter.cpp @@ -32,7 +32,7 @@ */ #include "transmitter.hpp" -#include "mailbox.h" +#include "mailbox.hpp" #include #include #include @@ -354,7 +354,7 @@ class DMAController : public Device }; Transmitter::Transmitter() - : output(nullptr), stop(true) + : output(nullptr), enable(false) { } @@ -366,7 +366,10 @@ Transmitter::~Transmitter() { void Transmitter::Transmit(WaveReader &reader, float frequency, float bandwidth, unsigned dmaChannel, bool preserveCarrier) { - stop = false; + { + std::lock_guard lock(mtx); + enable = true; + } WaveHeader header = reader.GetHeader(); unsigned bufferSize = static_cast(static_cast(header.sampleRate) * BUFFER_TIME / 1000000); @@ -399,50 +402,64 @@ void Transmitter::Transmit(WaveReader &reader, float frequency, float bandwidth, void Transmitter::Stop() { - stop = true; + std::unique_lock lock(mtx); + enable = false; + lock.unlock(); + cv.notify_all(); } void Transmitter::TransmitViaCpu(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange) { - std::vector samples = reader.GetSamples(bufferSize, stop); - if (samples.empty()) { - return; - } - + std::vector samples; unsigned sampleOffset = 0; - bool eof = samples.size() < bufferSize, txStop = false; - std::thread transmitterThread(Transmitter::TransmitterThread, this, &output, sampleRate, clockDivisor, divisorRange, &sampleOffset, &samples, &txStop); - std::this_thread::sleep_for(std::chrono::microseconds(BUFFER_TIME / 2)); + bool eof = false, stop = false, start = true; + + std::thread transmitterThread(Transmitter::TransmitterThread, this, &output, sampleRate, clockDivisor, divisorRange, &sampleOffset, &samples, &stop); auto finally = [&]() { { - std::lock_guard lock(access); - txStop = true; + std::lock_guard lock(mtx); + stop = true; + cv.notify_one(); } transmitterThread.join(); samples.clear(); - stop = true; + enable = false; }; + try { - while (!eof && !stop) { - { - std::lock_guard lock(access); - if (txStop) { - throw std::runtime_error("Transmitter thread has unexpectedly exited"); - } - if (samples.empty()) { - if (!reader.SetSampleOffset(sampleOffset + bufferSize)) { - break; - } - samples = reader.GetSamples(bufferSize, stop); - if (samples.empty()) { - break; - } - eof = samples.size() < bufferSize; - } + while (!eof) { + std::unique_lock lock(mtx); + if (!start) { + cv.wait(lock, [&]() -> bool { + return samples.empty() || !enable || stop; + }); + } else { + start = false; + } + if (!enable) { + break; + } + if (stop) { + throw std::runtime_error("Transmitter thread has unexpectedly exited"); + } + if (samples.empty()) { + if (!reader.SetSampleOffset(sampleOffset + bufferSize)) { + break; + } + lock.unlock(); + samples = reader.GetSamples(bufferSize, enable, mtx); + lock.lock(); + if (samples.empty()) { + break; + } + eof = samples.size() < bufferSize; + lock.unlock(); + cv.notify_one(); + } else { + lock.unlock(); } - std::this_thread::sleep_for(std::chrono::microseconds(BUFFER_TIME / 2)); } } catch (...) { finally(); @@ -459,7 +476,7 @@ void Transmitter::TransmitViaDma(WaveReader &reader, ClockOutput &output, unsign AllocatedMemory allocated(sizeof(uint32_t) * bufferSize + sizeof(DMAControllBlock) * (2 * bufferSize) + sizeof(uint32_t)); - std::vector samples = reader.GetSamples(bufferSize, stop); + std::vector samples = reader.GetSamples(bufferSize, enable, mtx); if (samples.empty()) { return; } @@ -509,11 +526,18 @@ void Transmitter::TransmitViaDma(WaveReader &reader, ClockOutput &output, unsign std::this_thread::sleep_for(std::chrono::milliseconds(1)); } samples.clear(); - stop = true; + std::lock_guard lock(mtx); + enable = false; }; try { - while (!eof && !stop) { - samples = reader.GetSamples(bufferSize, stop); + while (!eof) { + { + std::lock_guard lock(mtx); + if (!enable) { + break; + } + } + samples = reader.GetSamples(bufferSize, enable, mtx); if (!samples.size()) { break; } @@ -542,28 +566,25 @@ void Transmitter::TransmitterThread(Transmitter *instance, ClockOutput *output, volatile TimerRegisters *timer = reinterpret_cast(peripherals.GetVirtualAddress(TIMER_BASE_OFFSET)); uint64_t current = *(reinterpret_cast(&timer->low)); - uint64_t playbackStart = current; + uint64_t playbackStart = current, start = current; while (true) { std::vector loadedSamples; - while (true) { - { - std::lock_guard lock(instance->access); - if (*stop) { - return; - } - loadedSamples = std::move(*samples); - current = *(reinterpret_cast(&timer->low)); - if (!loadedSamples.empty()) { - *sampleOffset = (current - playbackStart) * sampleRate / 1000000; - break; - } - } - std::this_thread::sleep_for(std::chrono::microseconds(1)); - }; - uint64_t start = current; - unsigned offset = (current - start) * sampleRate / 1000000; + std::unique_lock lock(instance->mtx); + instance->cv.wait(lock, [&]() -> bool { + return !samples->empty() || *stop; + }); + if (*stop) { + break; + } + start = current = *(reinterpret_cast(&timer->low)); + *sampleOffset = (current - playbackStart) * sampleRate / 1000000; + loadedSamples = std::move(*samples); + lock.unlock(); + instance->cv.notify_one(); + + unsigned offset = 0; while (true) { if (offset >= loadedSamples.size()) { @@ -573,14 +594,16 @@ void Transmitter::TransmitterThread(Transmitter *instance, ClockOutput *output, float value = loadedSamples[offset].GetMonoValue(); instance->output->SetDivisor(clockDivisor - static_cast(round(value * divisorRange))); while (offset == prevOffset) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); // asm("nop"); + std::this_thread::yield(); // asm("nop"); current = *(reinterpret_cast(&timer->low));; offset = (current - start) * sampleRate / 1000000; } } } } catch (...) { - std::lock_guard lock(instance->access); + std::unique_lock lock(instance->mtx); *stop = true; + lock.unlock(); + instance->cv.notify_one(); } } diff --git a/transmitter.hpp b/transmitter.hpp index 74a2e77..e5649bc 100644 --- a/transmitter.hpp +++ b/transmitter.hpp @@ -34,7 +34,7 @@ #pragma once #include "wave_reader.hpp" -#include +#include class ClockOutput; @@ -53,7 +53,8 @@ class Transmitter void TransmitViaDma(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange, unsigned dmaChannel); static void TransmitterThread(Transmitter *instance, ClockOutput *output, unsigned sampleRate, unsigned clockDivisor, unsigned divisorRange, unsigned *sampleOffset, std::vector *samples, bool *stop); + std::condition_variable cv; ClockOutput *output; - std::mutex access; - bool stop; + std::mutex mtx; + bool enable; }; diff --git a/wave_reader.cpp b/wave_reader.cpp index 72eca11..0ac441f 100644 --- a/wave_reader.cpp +++ b/wave_reader.cpp @@ -65,7 +65,7 @@ float Sample::GetMonoValue() const return value; } -WaveReader::WaveReader(const std::string &filename, bool &stop) : +WaveReader::WaveReader(const std::string &filename, bool &enable, std::mutex &mtx) : filename(filename), headerOffset(0), currentDataOffset(0) { if (!filename.empty()) { @@ -80,12 +80,12 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) : } try { - ReadData(sizeof(WaveHeader::chunkID) + sizeof(WaveHeader::chunkSize) + sizeof(WaveHeader::format), true, stop); + ReadData(sizeof(WaveHeader::chunkID) + sizeof(WaveHeader::chunkSize) + sizeof(WaveHeader::format), true, enable, mtx); if ((std::string(reinterpret_cast(header.chunkID), 4) != std::string("RIFF")) || (std::string(reinterpret_cast(header.format), 4) != std::string("WAVE"))) { throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", WAVE file expected")); } - ReadData(sizeof(WaveHeader::subchunk1ID) + sizeof(WaveHeader::subchunk1Size), true, stop); + ReadData(sizeof(WaveHeader::subchunk1ID) + sizeof(WaveHeader::subchunk1Size), true, enable, mtx); unsigned subchunk1MinSize = sizeof(WaveHeader::audioFormat) + sizeof(WaveHeader::channels) + sizeof(WaveHeader::sampleRate) + sizeof(WaveHeader::byteRate) + sizeof(WaveHeader::blockAlign) + sizeof(WaveHeader::bitsPerSample); @@ -93,7 +93,7 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) : throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted")); } - ReadData(header.subchunk1Size, true, stop); + ReadData(header.subchunk1Size, true, enable, mtx); if ((header.audioFormat != WAVE_FORMAT_PCM) || (header.byteRate != (header.bitsPerSample >> 3) * header.channels * header.sampleRate) || (header.blockAlign != (header.bitsPerSample >> 3) * header.channels) || @@ -101,7 +101,7 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) : throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", unsupported WAVE format")); } - ReadData(sizeof(WaveHeader::subchunk2ID) + sizeof(WaveHeader::subchunk2Size), true, stop); + ReadData(sizeof(WaveHeader::subchunk2ID) + sizeof(WaveHeader::subchunk2Size), true, enable, mtx); if (std::string(reinterpret_cast(header.subchunk2ID), 4) != std::string("data")) { throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted")); } @@ -134,7 +134,7 @@ const WaveHeader &WaveReader::GetHeader() const return header; } -std::vector WaveReader::GetSamples(unsigned quantity, bool &stop) { +std::vector WaveReader::GetSamples(unsigned quantity, bool &enable, std::mutex &mtx) { unsigned bytesPerSample = (header.bitsPerSample >> 3) * header.channels; unsigned bytesToRead = quantity * bytesPerSample; unsigned bytesLeft = header.subchunk2Size - currentDataOffset; @@ -143,7 +143,7 @@ std::vector WaveReader::GetSamples(unsigned quantity, bool &stop) { quantity = bytesToRead / bytesPerSample; } - std::vector data = std::move(ReadData(bytesToRead, false, stop)); + std::vector data = std::move(ReadData(bytesToRead, false, enable, mtx)); if (data.size() < bytesToRead) { quantity = data.size() / bytesPerSample; } @@ -166,12 +166,18 @@ bool WaveReader::SetSampleOffset(unsigned offset) { return true; } -std::vector WaveReader::ReadData(unsigned bytesToRead, bool headerBytes, bool &stop) +std::vector WaveReader::ReadData(unsigned bytesToRead, bool headerBytes, bool &enable, std::mutex &mtx) { unsigned bytesRead = 0; std::vector data; data.resize(bytesToRead); - while ((bytesRead < bytesToRead) && !stop) { + while (bytesRead < bytesToRead) { + { + std::lock_guard lock(mtx); + if (!enable) { + break; + } + } int bytes = read(fileDescriptor, &data[bytesRead], bytesToRead - bytesRead); if (((bytes == -1) && ((fileDescriptor != STDIN_FILENO) || (errno != EAGAIN))) || ((static_cast(bytes) < bytesToRead) && headerBytes && (fileDescriptor != STDIN_FILENO))) { @@ -191,14 +197,20 @@ std::vector WaveReader::ReadData(unsigned bytesToRead, bool headerBytes } if (headerBytes) { - if (stop) { - throw std::runtime_error("Cannot obtain header, program interrupted"); + { + std::lock_guard lock(mtx); + if (!enable) { + throw std::runtime_error("Cannot obtain header, program interrupted"); + } } std::memcpy(&(reinterpret_cast(&header))[headerOffset], data.data(), bytesRead); headerOffset += bytesRead; } else { - if (stop) { - data.resize(bytesRead); + { + std::lock_guard lock(mtx); + if (!enable) { + data.resize(bytesRead); + } } currentDataOffset += bytesRead; } diff --git a/wave_reader.hpp b/wave_reader.hpp index 2e2a54c..060f10a 100644 --- a/wave_reader.hpp +++ b/wave_reader.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #define WAVE_FORMAT_PCM 0x0001 @@ -68,17 +69,17 @@ protected: class WaveReader { public: - WaveReader(const std::string &filename, bool &stop); + WaveReader(const std::string &filename, bool &enable, std::mutex &mtx); virtual ~WaveReader(); WaveReader(const WaveReader &) = delete; WaveReader(WaveReader &&) = delete; WaveReader &operator=(const WaveReader &) = delete; std::string GetFilename() const; const WaveHeader &GetHeader() const; - std::vector GetSamples(unsigned quantity, bool &stop); + std::vector GetSamples(unsigned quantity, bool &enable, std::mutex &mtx); bool SetSampleOffset(unsigned offset); private: - std::vector ReadData(unsigned bytesToRead, bool headerBytes, bool &stop); + std::vector ReadData(unsigned bytesToRead, bool headerBytes, bool &enable, std::mutex &mtx); std::string filename; WaveHeader header;