Merge pull request #160 from KeyOz/dd6zj

Adding MQTT for local APRS packet storage
pull/161/head
Peter Buchegger 2022-02-24 16:28:13 +01:00 zatwierdzone przez GitHub
commit 4f0b843cf6
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
10 zmienionych plików z 154 dodań i 5 usunięć

Wyświetl plik

@ -65,5 +65,13 @@
}
]
},
"mqtt": {
"active": false,
"server": "",
"port": 1883,
"name": "",
"password": "",
"topic": "LoraAPRS/Data"
},
"ntp_server": "pool.ntp.org"
}

Wyświetl plik

@ -13,6 +13,7 @@ lib_deps =
peterus/APRS-Decoder-Lib @ 0.0.6
peterus/esp-logger @ 0.0.1
peterus/ESP-FTP-Server-Lib @ 0.9.5
knolleary/PubSubClient@^2.8
check_tool = cppcheck
check_flags =
cppcheck: --suppress=*:*.pio\* --inline-suppr -DCPPCHECK --force lib -ilib/TimeLib -ilib/LoRa -ilib/NTPClient

Wyświetl plik

@ -11,6 +11,7 @@
#include "TaskDisplay.h"
#include "TaskEth.h"
#include "TaskFTP.h"
#include "TaskMQTT.h"
#include "TaskModem.h"
#include "TaskNTP.h"
#include "TaskOTA.h"
@ -18,7 +19,7 @@
#include "TaskWifi.h"
#include "project_configuration.h"
#define VERSION "22.7.0"
#define VERSION "22.8.0"
String create_lat_aprs(double lat);
String create_long_aprs(double lng);
@ -26,6 +27,7 @@ String create_long_aprs(double lng);
TaskQueue<std::shared_ptr<APRSMessage>> toAprsIs;
TaskQueue<std::shared_ptr<APRSMessage>> fromModem;
TaskQueue<std::shared_ptr<APRSMessage>> toModem;
TaskQueue<std::shared_ptr<APRSMessage>> toMQTT;
System LoRaSystem;
Configuration userConfig;
@ -37,8 +39,9 @@ WifiTask wifiTask;
OTATask otaTask;
NTPTask ntpTask;
FTPTask ftpTask;
MQTTTask mqttTask(toMQTT);
AprsIsTask aprsIsTask(toAprsIs);
RouterTask routerTask(fromModem, toModem, toAprsIs);
RouterTask routerTask(fromModem, toModem, toAprsIs, toMQTT);
void setup() {
Serial.begin(115200);
@ -114,6 +117,10 @@ void setup() {
LoRaSystem.getTaskManager().addTask(&aprsIsTask);
}
if (userConfig.mqtt.active) {
LoRaSystem.getTaskManager().addTask(&mqttTask);
}
LoRaSystem.getTaskManager().setup(LoRaSystem);
LoRaSystem.getDisplay().showSpashScreen("LoRa APRS iGate", VERSION);

Wyświetl plik

@ -12,6 +12,7 @@ enum TaskNames
TaskWifi,
TaskRouter,
TaskSize,
TaskMQTT,
};
#define TASK_APRS_IS "AprsIsTask"
@ -22,5 +23,6 @@ enum TaskNames
#define TASK_OTA "OTATask"
#define TASK_WIFI "WifiTask"
#define TASK_ROUTER "RouterTask"
#define TASK_MQTT "MQTTTask"
#endif

74
src/TaskMQTT.cpp 100644
Wyświetl plik

@ -0,0 +1,74 @@
#include <logger.h>
#include "Task.h"
#include "TaskMQTT.h"
#include "project_configuration.h"
#include <ArduinoJson.h>
MQTTTask::MQTTTask(TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _toMQTT(toMQTT), _MQTT(_client) {
}
MQTTTask::~MQTTTask() {
}
bool MQTTTask::setup(System &system) {
_MQTT.setServer(system.getUserConfig()->mqtt.server.c_str(), system.getUserConfig()->mqtt.port);
return true;
}
bool MQTTTask::loop(System &system) {
if (!system.isWifiEthConnected()) {
return false;
}
if (!_MQTT.connected()) {
connect(system);
}
if (!_toMQTT.empty()) {
std::shared_ptr<APRSMessage> msg = _toMQTT.getElement();
DynamicJsonDocument data(1024);
data["Source"] = msg->getSource();
data["Destination"] = msg->getDestination();
data["Path"] = msg->getPath();
data["Type"] = msg->getType().toString();
String body = msg->getBody()->encode();
body.replace("\n", "");
data["Data"] = body;
String r;
serializeJson(data, r);
String topic = String(system.getUserConfig()->mqtt.topic);
if (!topic.endsWith("/")) {
topic = topic + "/";
}
topic = topic + system.getUserConfig()->callsign;
logPrintD("Send MQTT with topic: \"");
logPrintD(topic);
logPrintD("\", data: ");
logPrintlnD(r);
_MQTT.publish(topic.c_str(), r.c_str());
}
_MQTT.loop();
return true;
}
bool MQTTTask::connect(const System &system) {
logPrintI("Connecting to MQTT broker: ");
logPrintI(system.getUserConfig()->mqtt.server);
logPrintI(" on port ");
logPrintlnI(String(system.getUserConfig()->mqtt.port));
if (_MQTT.connect(system.getUserConfig()->callsign.c_str(), system.getUserConfig()->mqtt.name.c_str(), system.getUserConfig()->mqtt.password.c_str())) {
logPrintI("Connected to MQTT broker as: ");
logPrintlnI(system.getUserConfig()->callsign);
return true;
}
logPrintlnI("Connecting to MQTT broker faild. Try again later.");
return false;
}

26
src/TaskMQTT.h 100644
Wyświetl plik

@ -0,0 +1,26 @@
#ifndef TASK_MQTT_H_
#define TASK_MQTT_H_
#include <APRSMessage.h>
#include <PubSubClient.h>
#include <TaskManager.h>
#include <WiFi.h>
class MQTTTask : public Task {
public:
MQTTTask(TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT);
virtual ~MQTTTask();
virtual bool setup(System &system) override;
virtual bool loop(System &system) override;
private:
TaskQueue<std::shared_ptr<APRSMessage>> &_toMQTT;
WiFiClient _client;
PubSubClient _MQTT;
bool connect(const System &system);
};
#endif

Wyświetl plik

@ -9,7 +9,7 @@
String create_lat_aprs(double lat);
String create_long_aprs(double lng);
RouterTask::RouterTask(TaskQueue<std::shared_ptr<APRSMessage>> &fromModem, TaskQueue<std::shared_ptr<APRSMessage>> &toModem, TaskQueue<std::shared_ptr<APRSMessage>> &toAprsIs) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs) {
RouterTask::RouterTask(TaskQueue<std::shared_ptr<APRSMessage>> &fromModem, TaskQueue<std::shared_ptr<APRSMessage>> &toModem, TaskQueue<std::shared_ptr<APRSMessage>> &toAprsIs, TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs), _toMQTT(toMQTT) {
}
RouterTask::~RouterTask() {
@ -34,6 +34,10 @@ bool RouterTask::loop(System &system) {
if (!_fromModem.empty()) {
std::shared_ptr<APRSMessage> modemMsg = _fromModem.getElement();
if (system.getUserConfig()->mqtt.active) {
_toMQTT.addElement(modemMsg);
}
if (system.getUserConfig()->aprs_is.active && modemMsg->getSource() != system.getUserConfig()->callsign) {
std::shared_ptr<APRSMessage> aprsIsMsg = std::make_shared<APRSMessage>(*modemMsg);
String path = aprsIsMsg->getPath();

Wyświetl plik

@ -2,11 +2,12 @@
#define TASK_ROUTER_H_
#include <APRSMessage.h>
#include <TaskMQTT.h>
#include <TaskManager.h>
class RouterTask : public Task {
public:
RouterTask(TaskQueue<std::shared_ptr<APRSMessage>> &fromModem, TaskQueue<std::shared_ptr<APRSMessage>> &toModem, TaskQueue<std::shared_ptr<APRSMessage>> &toAprsIs);
RouterTask(TaskQueue<std::shared_ptr<APRSMessage>> &fromModem, TaskQueue<std::shared_ptr<APRSMessage>> &toModem, TaskQueue<std::shared_ptr<APRSMessage>> &toAprsIs, TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT);
virtual ~RouterTask();
virtual bool setup(System &system) override;
@ -16,6 +17,7 @@ private:
TaskQueue<std::shared_ptr<APRSMessage>> &_fromModem;
TaskQueue<std::shared_ptr<APRSMessage>> &_toModem;
TaskQueue<std::shared_ptr<APRSMessage>> &_toAprsIs;
TaskQueue<std::shared_ptr<APRSMessage>> &_toMQTT;
std::shared_ptr<APRSMessage> _beaconMsg;
Timer _beacon_timer;

Wyświetl plik

@ -77,6 +77,14 @@ void ProjectConfigurationManagement::readProjectConfiguration(DynamicJsonDocumen
us.password = "ftp";
conf.ftp.users.push_back(us);
}
if (data.containsKey("mqtt")) {
conf.mqtt.active = data["mqtt"]["active"] | false;
conf.mqtt.server = data["mqtt"]["server"].as<String>();
conf.mqtt.port = data["mqtt"]["port"].as<uint16_t>();
conf.mqtt.name = data["mqtt"]["name"].as<String>();
conf.mqtt.password = data["mqtt"]["password"].as<String>();
conf.mqtt.topic = data["mqtt"]["topic"].as<String>();
}
if (data.containsKey("ntp_server"))
conf.ntpServer = data["ntp_server"].as<String>();
@ -133,7 +141,13 @@ void ProjectConfigurationManagement::writeProjectConfiguration(Configuration &co
v["name"] = u.name;
v["password"] = u.password;
}
data["ntp_server"] = conf.ntpServer;
data["mqtt"]["active"] = conf.mqtt.active;
data["mqtt"]["server"] = conf.mqtt.server;
data["mqtt"]["port"] = conf.mqtt.port;
data["mqtt"]["name"] = conf.mqtt.name;
data["mqtt"]["password"] = conf.mqtt.password;
data["mqtt"]["topic"] = conf.mqtt.topic;
data["ntp_server"] = conf.ntpServer;
data["board"] = conf.board;
}

Wyświetl plik

@ -117,6 +117,16 @@ public:
std::list<User> users;
};
class MQTT {
public:
bool active;
String server;
uint16_t port;
String name;
String password;
String topic;
};
Configuration() : callsign("NOCALL-10"), board(""), ntpServer("pool.ntp.org"){};
String callsign;
@ -128,6 +138,7 @@ public:
LoRa lora;
Display display;
Ftp ftp;
MQTT mqtt;
String board;
String ntpServer;
};