From 9c0b57a0364f3a21fae74adb715db922b723fa30 Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Wed, 19 Apr 2023 02:25:44 +0200 Subject: [PATCH] Work on the spectran HTTP source --- core/src/utils/new_event.h | 51 ++++++++++++++++++ .../spectran_http_source/src/main.cpp | 29 ++++++++-- .../src/spectran_http_client.cpp | 53 ++++++++++++++++++- .../src/spectran_http_client.h | 12 +++++ 4 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 core/src/utils/new_event.h diff --git a/core/src/utils/new_event.h b/core/src/utils/new_event.h new file mode 100644 index 00000000..128eff99 --- /dev/null +++ b/core/src/utils/new_event.h @@ -0,0 +1,51 @@ +#pragma once +#include +#include +#include +#include + +typedef int HandlerID; + +template +class NewEvent { + using Handler = std::function; +public: + HandlerID bind(const Handler& handler) { + std::lock_guard lck(mtx); + HandlerID id = genID(); + handlers[id] = handler; + return id; + } + + template + HandlerID bind(MHandler handler, T* ctx) { + return bind([=](Args... args){ + (ctx->*handler)(args...); + }); + } + + void unbind(HandlerID id) { + std::lock_guard lck(mtx); + if (handlers.find(id) == handlers.end()) { + throw std::runtime_error("Could not unbind handler, unknown ID"); + } + handlers.erase(id); + } + + void operator()(Args... args) { + std::lock_guard lck(mtx); + for (const auto& [desc, handler] : handlers) { + handler(args...); + } + } + +private: + HandlerID genID() { + int id; + for (id = 1; handlers.find(id) != handlers.end(); id++); + return id; + } + + std::map handlers; + std::mutex mtx; +}; \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/main.cpp b/source_modules/spectran_http_source/src/main.cpp index 14e49109..b221281c 100644 --- a/source_modules/spectran_http_source/src/main.cpp +++ b/source_modules/spectran_http_source/src/main.cpp @@ -28,7 +28,7 @@ public: this->name = name; strcpy(hostname, "localhost"); - sampleRate = 41000000.0; + sampleRate = 5750000.0; handler.ctx = this; handler.selectHandler = menuSelected; @@ -103,8 +103,14 @@ private: static void tune(double freq, void* ctx) { SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; - if (_this->running) { - // TODO + bool connected = (_this->client && _this->client->isOpen()); + if (connected) { + int64_t newfreq = round(freq); + if (newfreq != _this->lastReportedFreq && _this->gotReport) { + flog::debug("Sending tuning command"); + _this->lastReportedFreq = newfreq; + _this->client->setCenterFrequency(newfreq); + } } _this->freq = freq; flog::info("SpectranHTTPSourceModule '{0}': Tune: {1}!", _this->name, freq); @@ -138,6 +144,7 @@ private: _this->tryConnect(); } else if (connected && SmGui::Button("Disconnect##spectran_http_source")) { + _this->client->onCenterFrequencyChanged.unbind(_this->onFreqChangedId); _this->client->close(); } if (_this->running) { style::endDisabled(); } @@ -154,13 +161,23 @@ private: void tryConnect() { try { + gotReport = false; client = std::make_shared(hostname, port, &stream); + onFreqChangedId = client->onCenterFrequencyChanged.bind(&SpectranHTTPSourceModule::onFreqChanged, this); + client->startWorker(); } catch (std::runtime_error e) { flog::error("Could not connect: {0}", e.what()); } } + void onFreqChanged(double newFreq) { + if (lastReportedFreq == newFreq) { return; } + lastReportedFreq = newFreq; + tuner::tune(tuner::TUNER_MODE_IQ_ONLY, "", newFreq); + gotReport = true; + } + std::string name; bool enabled = true; double sampleRate; @@ -168,11 +185,15 @@ private: bool running = false; std::shared_ptr client; + HandlerID onFreqChangedId; double freq; + int64_t lastReportedFreq = 0; + bool gotReport; + char hostname[1024]; - int port = 80; + int port = 54664; dsp::stream stream; }; diff --git a/source_modules/spectran_http_source/src/spectran_http_client.cpp b/source_modules/spectran_http_source/src/spectran_http_client.cpp index f5959a96..958b6798 100644 --- a/source_modules/spectran_http_source/src/spectran_http_client.cpp +++ b/source_modules/spectran_http_source/src/spectran_http_client.cpp @@ -5,6 +5,8 @@ SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::streamstream = stream; // Connect to server + this->host = host; + this->port = port; sock = net::connect(host, port); http = net::http::Client(sock); @@ -14,6 +16,13 @@ SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::streamclearWriteStop(); } +void SpectranHTTPClient::setCenterFrequency(uint64_t freq) { + // Connect to control endpoint (TODO: Switch to an always connected endpoint) + auto controlSock = net::connect(host, port); + auto controlHttp = net::http::Client(controlSock); + + // Make request + net::http::RequestHeader rqhdr(net::http::METHOD_PUT, "/control", host); + char buf[1024]; + sprintf(buf, "{\"frequencyCenter\":%d,\"frequencySpan\":%d,\"type\":\"capture\"}", freq, _span); + std::string data = buf; + char lenBuf[16]; + sprintf(lenBuf, "%d", data.size()); + rqhdr.setField("Content-Length", lenBuf); + controlHttp.sendRequestHeader(rqhdr); + controlSock->sendstr(data); + net::http::ResponseHeader rshdr; + controlHttp.recvResponseHeader(rshdr, 5000); + + flog::debug("Response: {}", rshdr.getStatusString()); +} + void SpectranHTTPClient::worker() { while (sock->isOpen()) { // Get chunk header @@ -52,6 +82,26 @@ void SpectranHTTPClient::worker() { return; } + // Decode JSON (yes, this is hacky, but it must be extremely fast) + auto startFreqBegin = jsonData.find("\"startFrequency\":"); + auto startFreqEnd = jsonData.find(',', startFreqBegin); + std::string startFreqStr = jsonData.substr(startFreqBegin + 17, startFreqEnd - startFreqBegin - 17); + int64_t startFreq = std::stoll(startFreqStr); + + auto endFreqBegin = jsonData.find("\"endFrequency\":"); + auto endFreqEnd = jsonData.find(',', endFreqBegin); + std::string endFreqStr = jsonData.substr(endFreqBegin + 15, endFreqEnd - endFreqBegin - 15); + int64_t endFreq = std::stoll(endFreqStr); + + // Calculate and update center freq + _span = endFreq - startFreq; + int64_t centerFreq = round(((double)endFreq + (double)startFreq) / 2.0); + if (centerFreq != _centerFreq) { + flog::debug("{}, {}, {}", _span, centerFreq); + _centerFreq = centerFreq; + onCenterFrequencyChanged(centerFreq); + } + // Read (and check for) record separator uint8_t rs; int rslen = sock->recv(&rs, 1, true, 5000); @@ -72,10 +122,11 @@ void SpectranHTTPClient::worker() { i += read; sampLen += read; } + int sampCount = sampLen / 8; // Swap to stream if (streamingEnabled) { - if (!stream->swap(sampLen / 8)) { return; } + if (!stream->swap(sampCount)) { return; } } // Read trailing CRLF diff --git a/source_modules/spectran_http_source/src/spectran_http_client.h b/source_modules/spectran_http_source/src/spectran_http_client.h index 96e5cbae..214af854 100644 --- a/source_modules/spectran_http_source/src/spectran_http_client.h +++ b/source_modules/spectran_http_source/src/spectran_http_client.h @@ -4,22 +4,34 @@ #include #include #include +#include +#include class SpectranHTTPClient { public: SpectranHTTPClient(std::string host, int port, dsp::stream* stream); + void startWorker(); void streaming(bool enabled); bool isOpen(); void close(); + void setCenterFrequency(uint64_t freq); + + NewEvent onCenterFrequencyChanged; + private: void worker(); + std::string host; + int port; + std::shared_ptr sock; net::http::Client http; dsp::stream* stream; std::thread workerThread; bool streamingEnabled = false; + int64_t _centerFreq = 0; + uint64_t _span = 5000000; }; \ No newline at end of file