refactor MQTT task

pull/160/head
Peter Buchegger 2022-02-24 16:02:02 +01:00
rodzic e9158a44fd
commit c66d7d7d8f
2 zmienionych plików z 38 dodań i 46 usunięć

Wyświetl plik

@ -9,31 +9,22 @@
#include <ArduinoJson.h>
WiFiClient wiFiClient;
PubSubClient _MQTT(wiFiClient);
MQTTTask::MQTTTask(TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _beginCalled(false), _toMQTT(toMQTT) {
MQTTTask::MQTTTask(TaskQueue<std::shared_ptr<APRSMessage>> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _toMQTT(toMQTT), _MQTT(_client) {
}
MQTTTask::~MQTTTask() {
}
bool MQTTTask::setup(System &system) {
bool MQTTTask::setup(System &system) {
_MQTT.setServer(system.getUserConfig()->mqtt.server.c_str(), system.getUserConfig()->mqtt.port);
return true;
}
bool MQTTTask::loop(System &system) {
if (!_beginCalled) {
_beginCalled = true;
}
if (!system.isWifiEthConnected()) {
return false;
}
if (!_MQTT.connected()) {
connect(system);
}
@ -41,48 +32,46 @@ bool MQTTTask::loop(System &system) {
if (!_toMQTT.empty()) {
std::shared_ptr<APRSMessage> msg = _toMQTT.getElement();
DynamicJsonDocument _Data(1024);
String _r;
DynamicJsonDocument data(1024);
data["Source"] = msg->getSource();
data["Destination"] = msg->getDestination();
data["Path"] = msg->getPath();
data["Type"] = msg->getType().toString();
String body = msg->getBody()->encode();
body.replace("\n", "");
data["Data"] = body;
_Data["Source"] = msg->getSource();
_Data["Destination"] = msg->getDestination();
_Data["Path"] = msg->getPath();
_Data["Type"] = msg->getType().toString();
String _body = msg->getBody()->encode();
_body.replace("\n", "");
_Data["Data"] = _body;
String r;
serializeJson(data, r);
serializeJson(_Data, _r);
logPrintD("Send MQTT: ");
logPrintlnD(_r);
String _topic = String(system.getUserConfig()->mqtt.topic);
if (!_topic.endsWith("/")) {
_topic = _topic + "/";
String topic = String(system.getUserConfig()->mqtt.topic);
if (!topic.endsWith("/")) {
topic = topic + "/";
}
_topic = _topic + system.getUserConfig()->callsign;
topic = topic + system.getUserConfig()->callsign;
_MQTT.publish(_topic.c_str(), _r.c_str());
logPrintD("Send MQTT with topic: \"");
logPrintD(topic);
logPrintD("\", data: ");
logPrintlnD(r);
_MQTT.publish(topic.c_str(), r.c_str());
}
_MQTT.loop();
return true;
}
bool MQTTTask::connect(const System &system) {
logPrintI("Connecting to MQTT broker: ");
logPrintI("Connecting to MQTT broker: ");
logPrintI(system.getUserConfig()->mqtt.server);
logPrintI(" on port ");
logPrintI(" on port ");
logPrintlnI(String(system.getUserConfig()->mqtt.port));
if (_MQTT.connect(system.getUserConfig()->callsign.c_str(),system.getUserConfig()->mqtt.name.c_str() ,system.getUserConfig()->mqtt.password.c_str() )) {
logPrintI("Connected to MQTT broker as: ");
logPrintlnI(system.getUserConfig()->callsign);
if (_MQTT.connect(system.getUserConfig()->callsign.c_str(), system.getUserConfig()->mqtt.name.c_str(), system.getUserConfig()->mqtt.password.c_str())) {
logPrintI("Connected to MQTT broker as: ");
logPrintlnI(system.getUserConfig()->callsign);
return true;
} else {
logPrintlnI("Connecting to MQTT broker faild. Try again later.");
}
}
logPrintlnI("Connecting to MQTT broker faild. Try again later.");
return false;
}

Wyświetl plik

@ -2,8 +2,8 @@
#define TASK_MQTT_H_
#include <APRSMessage.h>
#include <TaskManager.h>
#include <PubSubClient.h>
#include <TaskManager.h>
class MQTTTask : public Task {
public:
@ -12,11 +12,14 @@ public:
virtual bool setup(System &system) override;
virtual bool loop(System &system) override;
private:
bool _beginCalled;
TaskQueue<std::shared_ptr<APRSMessage>> &_toMQTT;
bool connect(const System &system);
TaskQueue<std::shared_ptr<APRSMessage>> &_toMQTT;
WiFiClient _client;
PubSubClient _MQTT;
bool connect(const System &system);
};
#endif