From 50a69d77e6d6ae7653e6c0af83799d2ee8b6ec28 Mon Sep 17 00:00:00 2001 From: Kevin Hester Date: Mon, 5 Apr 2021 08:42:52 +0800 Subject: [PATCH] mqtt: begin subscription support --- src/mqtt/MQTT.cpp | 56 +++++++++++++++++++++++++++++++++-------------- src/mqtt/MQTT.h | 12 ++++++++-- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index e6f38f8e..04a0e8a7 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -9,13 +9,29 @@ MQTT *mqtt; String statusTopic = "mesh/stat/"; +String cryptTopic = "mesh/crypt/"; // mesh/crypt/CHANNELID/NODEID -void mqttCallback(char *topic, byte *payload, unsigned int length) +void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { - DEBUG_MSG("MQTT topic %s\n", topic); + mqtt->onPublish(topic, payload, length); +} - // After parsing ServiceEnvelope - // FIXME - make sure to free both strings and the MeshPacket +void MQTT::onPublish(char *topic, byte *payload, unsigned int length) +{ + // parsing ServiceEnvelope + ServiceEnvelope e = ServiceEnvelope_init_default; + if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) { + DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length); + } else { + DEBUG_MSG("Received MQTT topic %s, len=%u\n", topic, length); + + // FIXME, ignore messages sent by us (requires decryption) or if we don't have the channel key + + // make sure to free both strings and the MeshPacket (passing in NULL is acceptable) + free(e.channel_id); + free(e.gateway_id); + free(e.packet); + } } void mqttInit() @@ -53,15 +69,30 @@ void MQTT::reconnect() static char subsStr[64]; /* We keep this static because the mqtt lib might not be copying it */ // snprintf(subsStr, sizeof(subsStr), "/ezd/todev/%s/#", clientId); - // mqtt.subscribe(subsStr, 1); // we use qos 1 because we don't want to miss messages + // pubSub.subscribe(subsStr, 1); // we use qos 1 because we don't want to miss messages /// FIXME, include more information in the status text bool ok = pubSub.publish(myStatus.c_str(), "online", true); DEBUG_MSG("published %d\n", ok); + + sendSubscriptions(); } else DEBUG_MSG("Failed to contact MQTT server...\n"); } +void MQTT::sendSubscriptions() +{ + size_t numChan = channels.getNumChannels(); + for (size_t i = 0; i < numChan; i++) { + auto &ch = channels.getByIndex(i); + if (ch.settings.uplink_enabled) { + String topic = cryptTopic + channels.getGlobalId(i) + "/#"; + DEBUG_MSG("Subscribing to %s\n", topic.c_str()); + pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right? + } + } +} + bool MQTT::wantsLink() const { bool hasChannel = false; @@ -124,18 +155,9 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) static uint8_t bytes[MeshPacket_size + 64]; size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env); - const char *topic = getCryptTopic(channelId); - DEBUG_MSG("publish %s, %u bytes\n", topic, numBytes); + String topic = cryptTopic + channelId + "/" + owner.id; + DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes); - pubSub.publish(topic, bytes, numBytes, false); + pubSub.publish(topic.c_str(), bytes, numBytes, false); } } - -const char *MQTT::getCryptTopic(const char *channelId) -{ - static char buf[128]; - - // "mesh/crypt/CHANNELID/NODEID/PORTID" - snprintf(buf, sizeof(buf), "mesh/crypt/%s/%s", channelId, owner.id); - return buf; -} \ No newline at end of file diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index 54fcf822..2f98c311 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -36,8 +36,6 @@ class MQTT : private concurrency::OSThread virtual int32_t runOnce(); private: - const char *getCryptTopic(const char *channelId); - /** return true if we have a channel that wants uplink/downlink */ bool wantsLink() const; @@ -45,6 +43,16 @@ class MQTT : private concurrency::OSThread /** Attempt to connect to server if necessary */ void reconnect(); + + /** Tell the server what subscriptions we want (based on channels.downlink_enabled) + */ + void sendSubscriptions(); + + /// Just C glue to call onPublish + static void mqttCallback(char *topic, byte *payload, unsigned int length); + + /// Called when a new publish arrives from the MQTT server + void onPublish(char *topic, byte *payload, unsigned int length); }; void mqttInit();