From 68f6cfde0c75ee35ba930d67a8369467f9cdcfcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20G=C3=B6ttgens?= Date: Fri, 16 Dec 2022 20:28:28 +0100 Subject: [PATCH] Improve Wifi Reconnect handling and add outgoing queue for MQTT packets to bridge short connection issues. --- src/mesh/http/WiFiAPClient.cpp | 134 +++++++++++++++++++++------------ src/mesh/http/WiFiAPClient.h | 4 + src/mqtt/MQTT.cpp | 104 +++++++++++++++++++------ src/mqtt/MQTT.h | 7 ++ 4 files changed, 175 insertions(+), 74 deletions(-) diff --git a/src/mesh/http/WiFiAPClient.cpp b/src/mesh/http/WiFiAPClient.cpp index bb454246..731197e6 100644 --- a/src/mesh/http/WiFiAPClient.cpp +++ b/src/mesh/http/WiFiAPClient.cpp @@ -1,7 +1,7 @@ -#include "mesh/http/WiFiAPClient.h" #include "NodeDB.h" #include "RTC.h" #include "concurrency/Periodic.h" +#include "mesh/http/WiFiAPClient.h" #include "configuration.h" #include "main.h" #include "mesh/http/WebServer.h" @@ -37,9 +37,9 @@ bool APStartupComplete = 0; unsigned long lastrun_ntp = 0; -static bool needReconnect = true; // If we create our reconnector, run it once at the beginning +bool needReconnect = true; // If we create our reconnector, run it once at the beginning -static Periodic *wifiReconnect; +Periodic *wifiReconnect; static int32_t reconnectWiFi() { @@ -56,29 +56,15 @@ static int32_t reconnectWiFi() // Make sure we clear old connection credentials WiFi.disconnect(false, true); - DEBUG_MSG("... Reconnecting to WiFi access point %s\n",wifiName); - - int n = WiFi.scanNetworks(); - - if (n > 0) { - for (int i = 0; i < n; ++i) { - DEBUG_MSG("Found WiFi network %s, signal strength %d\n", WiFi.SSID(i).c_str(), WiFi.RSSI(i)); - yield(); - } - WiFi.mode(WIFI_MODE_STA); - WiFi.begin(wifiName, wifiPsw); - } else { - DEBUG_MSG("No networks found during site survey. Rebooting MCU...\n"); - screen->startRebootScreen(); - rebootAtMsec = millis() + 5000; - } - + DEBUG_MSG("Reconnecting to WiFi access point %s\n",wifiName); + WiFi.mode(WIFI_MODE_STA); + WiFi.begin(wifiName, wifiPsw); } #ifndef DISABLE_NTP if (WiFi.isConnected() && (((millis() - lastrun_ntp) > 43200000) || (lastrun_ntp == 0))) { // every 12 hours - DEBUG_MSG("Updating NTP time\n"); + DEBUG_MSG("Updating NTP time from %s\n",config.network.ntp_server); if (timeClient.update()) { DEBUG_MSG("NTP Request Success - Setting RTCQualityNTP if needed\n"); @@ -129,7 +115,7 @@ static void onNetworkConnected() { if (!APStartupComplete) { // Start web server - DEBUG_MSG("... Starting network services\n"); + DEBUG_MSG("Starting network services\n"); // start mdns if (!MDNS.begin("Meshtastic")) { @@ -168,6 +154,8 @@ bool initWifi() createSSLCert(); + esp_wifi_set_storage(WIFI_STORAGE_RAM); // Disable flash storage for WiFi credentials + if (!*wifiPsw) // Treat empty password as no password wifiPsw = NULL; @@ -194,7 +182,7 @@ bool initWifi() WiFi.onEvent( [](WiFiEvent_t event, WiFiEventInfo_t info) { - Serial.print("\nWiFi lost connection. Reason: "); + Serial.print("WiFi lost connection. Reason: "); Serial.println(info.wifi_sta_disconnected.reason); /* @@ -221,91 +209,137 @@ bool initWifi() // Called by the Espressif SDK to static void WiFiEvent(WiFiEvent_t event) { - DEBUG_MSG("************ [WiFi-event] event: %d ************\n", event); + DEBUG_MSG("WiFi-Event %d: ", event); switch (event) { - case SYSTEM_EVENT_WIFI_READY: + case ARDUINO_EVENT_WIFI_READY: DEBUG_MSG("WiFi interface ready\n"); break; - case SYSTEM_EVENT_SCAN_DONE: + case ARDUINO_EVENT_WIFI_SCAN_DONE: DEBUG_MSG("Completed scan for access points\n"); break; - case SYSTEM_EVENT_STA_START: + case ARDUINO_EVENT_WIFI_STA_START: DEBUG_MSG("WiFi station started\n"); break; - case SYSTEM_EVENT_STA_STOP: + case ARDUINO_EVENT_WIFI_STA_STOP: DEBUG_MSG("WiFi station stopped\n"); break; - case SYSTEM_EVENT_STA_CONNECTED: + case ARDUINO_EVENT_WIFI_STA_CONNECTED: DEBUG_MSG("Connected to access point\n"); break; - case SYSTEM_EVENT_STA_DISCONNECTED: + case ARDUINO_EVENT_WIFI_STA_DISCONNECTED: DEBUG_MSG("Disconnected from WiFi access point\n"); WiFi.disconnect(false, true); needReconnect = true; wifiReconnect->setIntervalFromNow(1000); break; - case SYSTEM_EVENT_STA_AUTHMODE_CHANGE: + case ARDUINO_EVENT_WIFI_STA_AUTHMODE_CHANGE: DEBUG_MSG("Authentication mode of access point has changed\n"); break; - case SYSTEM_EVENT_STA_GOT_IP: + case ARDUINO_EVENT_WIFI_STA_GOT_IP: DEBUG_MSG("Obtained IP address: "); Serial.println(WiFi.localIP()); onNetworkConnected(); break; - case SYSTEM_EVENT_STA_LOST_IP: + case ARDUINO_EVENT_WIFI_STA_GOT_IP6: + DEBUG_MSG("Obtained IP6 address: "); + Serial.println(WiFi.localIPv6()); + break; + case ARDUINO_EVENT_WIFI_STA_LOST_IP: DEBUG_MSG("Lost IP address and IP address is reset to 0\n"); WiFi.disconnect(false, true); needReconnect = true; wifiReconnect->setIntervalFromNow(1000); break; - case SYSTEM_EVENT_STA_WPS_ER_SUCCESS: + case ARDUINO_EVENT_WPS_ER_SUCCESS: DEBUG_MSG("WiFi Protected Setup (WPS): succeeded in enrollee mode\n"); break; - case SYSTEM_EVENT_STA_WPS_ER_FAILED: + case ARDUINO_EVENT_WPS_ER_FAILED: DEBUG_MSG("WiFi Protected Setup (WPS): failed in enrollee mode\n"); break; - case SYSTEM_EVENT_STA_WPS_ER_TIMEOUT: + case ARDUINO_EVENT_WPS_ER_TIMEOUT: DEBUG_MSG("WiFi Protected Setup (WPS): timeout in enrollee mode\n"); break; - case SYSTEM_EVENT_STA_WPS_ER_PIN: + case ARDUINO_EVENT_WPS_ER_PIN: DEBUG_MSG("WiFi Protected Setup (WPS): pin code in enrollee mode\n"); break; - case SYSTEM_EVENT_AP_START: + case ARDUINO_EVENT_WPS_ER_PBC_OVERLAP: + DEBUG_MSG("WiFi Protected Setup (WPS): push button overlap in enrollee mode\n"); + break; + case ARDUINO_EVENT_WIFI_AP_START: DEBUG_MSG("WiFi access point started\n"); break; - case SYSTEM_EVENT_AP_STOP: + case ARDUINO_EVENT_WIFI_AP_STOP: DEBUG_MSG("WiFi access point stopped\n"); break; - case SYSTEM_EVENT_AP_STACONNECTED: + case ARDUINO_EVENT_WIFI_AP_STACONNECTED: DEBUG_MSG("Client connected\n"); break; - case SYSTEM_EVENT_AP_STADISCONNECTED: + case ARDUINO_EVENT_WIFI_AP_STADISCONNECTED: DEBUG_MSG("Client disconnected\n"); break; - case SYSTEM_EVENT_AP_STAIPASSIGNED: + case ARDUINO_EVENT_WIFI_AP_STAIPASSIGNED: DEBUG_MSG("Assigned IP address to client\n"); break; - case SYSTEM_EVENT_AP_PROBEREQRECVED: + case ARDUINO_EVENT_WIFI_AP_PROBEREQRECVED: DEBUG_MSG("Received probe request\n"); break; - case SYSTEM_EVENT_GOT_IP6: + case ARDUINO_EVENT_WIFI_AP_GOT_IP6: DEBUG_MSG("IPv6 is preferred\n"); break; - case SYSTEM_EVENT_ETH_START: + case ARDUINO_EVENT_WIFI_FTM_REPORT: + DEBUG_MSG("Fast Transition Management report\n"); + break; + case ARDUINO_EVENT_ETH_START: DEBUG_MSG("Ethernet started\n"); break; - case SYSTEM_EVENT_ETH_STOP: + case ARDUINO_EVENT_ETH_STOP: DEBUG_MSG("Ethernet stopped\n"); break; - case SYSTEM_EVENT_ETH_CONNECTED: + case ARDUINO_EVENT_ETH_CONNECTED: DEBUG_MSG("Ethernet connected\n"); break; - case SYSTEM_EVENT_ETH_DISCONNECTED: + case ARDUINO_EVENT_ETH_DISCONNECTED: DEBUG_MSG("Ethernet disconnected\n"); break; - case SYSTEM_EVENT_ETH_GOT_IP: - DEBUG_MSG("Obtained IP address (SYSTEM_EVENT_ETH_GOT_IP)\n"); + case ARDUINO_EVENT_ETH_GOT_IP: + DEBUG_MSG("Obtained IP address (ARDUINO_EVENT_ETH_GOT_IP)\n"); + break; + case ARDUINO_EVENT_ETH_GOT_IP6: + DEBUG_MSG("Obtained IP6 address (ARDUINO_EVENT_ETH_GOT_IP6)\n"); + break; + case ARDUINO_EVENT_SC_SCAN_DONE: + DEBUG_MSG("SmartConfig: Scan done\n"); + break; + case ARDUINO_EVENT_SC_FOUND_CHANNEL: + DEBUG_MSG("SmartConfig: Found channel\n"); + break; + case ARDUINO_EVENT_SC_GOT_SSID_PSWD: + DEBUG_MSG("SmartConfig: Got SSID and password\n"); + break; + case ARDUINO_EVENT_SC_SEND_ACK_DONE: + DEBUG_MSG("SmartConfig: Send ACK done\n"); + break; + case ARDUINO_EVENT_PROV_INIT: + DEBUG_MSG("Provisioning: Init\n"); + break; + case ARDUINO_EVENT_PROV_DEINIT: + DEBUG_MSG("Provisioning: Stopped\n"); + break; + case ARDUINO_EVENT_PROV_START: + DEBUG_MSG("Provisioning: Started\n"); + break; + case ARDUINO_EVENT_PROV_END: + DEBUG_MSG("Provisioning: End\n"); + break; + case ARDUINO_EVENT_PROV_CRED_RECV: + DEBUG_MSG("Provisioning: Credentials received\n"); + break; + case ARDUINO_EVENT_PROV_CRED_FAIL: + DEBUG_MSG("Provisioning: Credentials failed\n"); + break; + case ARDUINO_EVENT_PROV_CRED_SUCCESS: + DEBUG_MSG("Provisioning: Credentials success\n"); break; default: break; diff --git a/src/mesh/http/WiFiAPClient.h b/src/mesh/http/WiFiAPClient.h index 9729c24b..a11330ad 100644 --- a/src/mesh/http/WiFiAPClient.h +++ b/src/mesh/http/WiFiAPClient.h @@ -1,6 +1,7 @@ #pragma once #include "configuration.h" +#include "concurrency/Periodic.h" #include #include @@ -8,6 +9,9 @@ #include #endif +extern bool needReconnect; +extern concurrency::Periodic *wifiReconnect; + /// @return true if wifi is now in use bool initWifi(); diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 96135bc3..428f8ed8 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -7,6 +7,7 @@ #include "mesh/Router.h" #include "mesh/generated/mqtt.pb.h" #include "mesh/generated/telemetry.pb.h" +#include "mesh/http/WiFiAPClient.h" #include "sleep.h" #if HAS_WIFI #include @@ -20,6 +21,10 @@ String statusTopic = "msh/2/stat/"; String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID +static MemoryDynamic staticMqttPool; + +Allocator &mqttPool = staticMqttPool; + void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onPublish(topic, payload, length); @@ -121,7 +126,7 @@ void mqttInit() new MQTT(); } -MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) +MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) { assert(!mqtt); mqtt = this; @@ -168,14 +173,23 @@ void MQTT::reconnect() DEBUG_MSG("MQTT connected\n"); enabled = true; // Start running background process again runASAP = true; + reconnectCount = 0; /// FIXME, include more information in the status text bool ok = pubSub.publish(myStatus.c_str(), "online", true); DEBUG_MSG("published %d\n", ok); sendSubscriptions(); - } else - DEBUG_MSG("Failed to contact MQTT server...\n"); + } else { + DEBUG_MSG("Failed to contact MQTT server (%d/10)...\n",reconnectCount); +#if HAS_WIFI + if (reconnectCount > 9) { + needReconnect = true; + wifiReconnect->setIntervalFromNow(1000); + } +#endif + reconnectCount++; + } } } @@ -231,8 +245,35 @@ int32_t MQTT::runOnce() if (wantConnection) { reconnect(); - // If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) - return pubSub.connected() ? 20 : 30000; + // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) + if (pubSub.connected()) { + if (!mqttQueue.isEmpty()) { + // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets + ServiceEnvelope *env = mqttQueue.dequeuePtr(0); + static uint8_t bytes[MeshPacket_size + 64]; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, env); + + String topic = cryptTopic + env->channel_id + "/" + owner.id; + DEBUG_MSG("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); + + + pubSub.publish(topic.c_str(), bytes, numBytes, false); + + if (moduleConfig.mqtt.json_enabled) { + // handle json topic + auto jsonString = this->downstreamPacketToJson(env->packet); + if (jsonString.length() != 0) { + String topicJson = jsonTopic + env->channel_id + "/" + owner.id; + DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); + } + } + mqttPool.release(env); + } + return 20; + } else { + return 30000; + } } else return 5000; // If we don't want connection now, check again in 5 secs } else { @@ -251,33 +292,48 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) { auto &ch = channels.getByIndex(chIndex); - // don't bother sending if not connected... - if (pubSub.connected() && ch.settings.uplink_enabled) { + if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel - ServiceEnvelope env = ServiceEnvelope_init_default; - env.channel_id = (char *)channelId; - env.gateway_id = owner.id; - env.packet = (MeshPacket *)∓ + ServiceEnvelope *env = mqttPool.allocZeroed(); + env->channel_id = (char *)channelId; + env->gateway_id = owner.id; + env->packet = (MeshPacket *)∓ - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets - static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env); + // don't bother sending if not connected... + if (pubSub.connected()) { - String topic = cryptTopic + channelId + "/" + owner.id; - DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes); + // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets + static uint8_t bytes[MeshPacket_size + 64]; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, env); - pubSub.publish(topic.c_str(), bytes, numBytes, false); + String topic = cryptTopic + channelId + "/" + owner.id; + DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes); - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp); - if (jsonString.length() != 0) { - String topicJson = jsonTopic + channelId + "/" + owner.id; - DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); + pubSub.publish(topic.c_str(), bytes, numBytes, false); + + if (moduleConfig.mqtt.json_enabled) { + // handle json topic + auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp); + if (jsonString.length() != 0) { + String topicJson = jsonTopic + channelId + "/" + owner.id; + DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); + } } + } else { + DEBUG_MSG("MQTT not connected, queueing packet\n"); + if (mqttQueue.numFree() == 0) { + DEBUG_MSG("NOTE: MQTT queue is full, discarding oldest\n"); + ServiceEnvelope *d = mqttQueue.dequeuePtr(0); + if (d) + mqttPool.release(d); + } + // make a copy of serviceEnvelope and queue it + ServiceEnvelope *copied = mqttPool.allocCopy(*env); + assert(mqttQueue.enqueue(copied, 0)); } + mqttPool.release(env); } } diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index c8381574..ddbacbcc 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -4,6 +4,7 @@ #include "concurrency/OSThread.h" #include "mesh/Channels.h" +#include "mesh/generated/mqtt.pb.h" #include #if HAS_WIFI #include @@ -12,6 +13,8 @@ #include #endif +#define MAX_MQTT_QUEUE 32 + /** * Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from * the two components that use it: MQTTPlugin and MQTTSimInterface. @@ -52,6 +55,10 @@ class MQTT : private concurrency::OSThread bool connected(); protected: + PointerQueue mqttQueue; + + int reconnectCount = 0; + virtual int32_t runOnce() override; private: