kopia lustrzana https://github.com/meshtastic/firmware
Use encoded ServiceEnvelope in mqttQueue (#5619)
rodzic
e1de439a7f
commit
658459aaf3
|
@ -24,6 +24,7 @@
|
|||
#include <Throttle.h>
|
||||
#include <assert.h>
|
||||
#include <pb_decode.h>
|
||||
#include <utility>
|
||||
|
||||
MQTT *mqtt;
|
||||
|
||||
|
@ -31,10 +32,6 @@ namespace
|
|||
{
|
||||
constexpr int reconnectMax = 5;
|
||||
|
||||
static MemoryDynamic<meshtastic_ServiceEnvelope> staticMqttPool;
|
||||
|
||||
Allocator<meshtastic_ServiceEnvelope> &mqttPool = staticMqttPool;
|
||||
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
static uint8_t bytes[meshtastic_MqttClientProxyMessage_size + 30]; // 12 for channel name and 16 for nodeid
|
||||
|
||||
|
@ -528,39 +525,37 @@ void MQTT::publishNodeInfo()
|
|||
}
|
||||
void MQTT::publishQueuedMessages()
|
||||
{
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
LOG_DEBUG("Publish enqueued MQTT message");
|
||||
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
std::string topic;
|
||||
if (env->packet->pki_encrypted) {
|
||||
topic = cryptTopic + "PKI/" + owner.id;
|
||||
} else {
|
||||
topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
}
|
||||
LOG_INFO("publish %s, %u bytes from queue", topic.c_str(), numBytes);
|
||||
if (mqttQueue.isEmpty())
|
||||
return;
|
||||
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
LOG_DEBUG("Publish enqueued MQTT message");
|
||||
const std::unique_ptr<QueueEntry> entry(mqttQueue.dequeuePtr(0));
|
||||
LOG_INFO("publish %s, %u bytes from queue", entry->topic.c_str(), entry->envBytes.size());
|
||||
publish(entry->topic.c_str(), entry->envBytes.data(), entry->envBytes.size(), false);
|
||||
|
||||
#if !defined(ARCH_NRF52) || \
|
||||
defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize(env->packet);
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson;
|
||||
if (env->packet->pki_encrypted) {
|
||||
topicJson = jsonTopic + "PKI/" + owner.id;
|
||||
} else {
|
||||
topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
}
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
#endif // ARCH_NRF52 NRF52_USE_JSON
|
||||
mqttPool.release(env);
|
||||
if (!moduleConfig.mqtt.json_enabled)
|
||||
return;
|
||||
|
||||
// handle json topic
|
||||
const DecodedServiceEnvelope env(entry->envBytes.data(), entry->envBytes.size());
|
||||
if (!env.validDecode || env.packet == NULL || env.channel_id == NULL)
|
||||
return;
|
||||
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize(env.packet);
|
||||
if (jsonString.length() == 0)
|
||||
return;
|
||||
|
||||
std::string topicJson;
|
||||
if (env.packet->pki_encrypted) {
|
||||
topicJson = jsonTopic + "PKI/" + owner.id;
|
||||
} else {
|
||||
topicJson = jsonTopic + env.channel_id + "/" + owner.id;
|
||||
}
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
#endif // ARCH_NRF52 NRF52_USE_JSON
|
||||
}
|
||||
|
||||
void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_MeshPacket &mp_decoded, ChannelIndex chIndex)
|
||||
|
@ -599,59 +594,56 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_Me
|
|||
// Either encrypted packet (we couldn't decrypt) is marked as pki_encrypted, or we could decode the PKI encrypted packet
|
||||
bool isPKIEncrypted = mp_encrypted.pki_encrypted || mp_decoded.pki_encrypted;
|
||||
// If it was to a channel, check uplink enabled, else must be pki_encrypted
|
||||
if ((ch.settings.uplink_enabled && !isPKIEncrypted) || isPKIEncrypted) {
|
||||
const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex);
|
||||
if (!(ch.settings.uplink_enabled || isPKIEncrypted))
|
||||
return;
|
||||
const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex);
|
||||
|
||||
meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed();
|
||||
env->channel_id = (char *)channelId;
|
||||
env->gateway_id = owner.id;
|
||||
LOG_DEBUG("MQTT onSend - Publish ");
|
||||
const meshtastic_MeshPacket *p;
|
||||
if (moduleConfig.mqtt.encryption_enabled) {
|
||||
p = &mp_encrypted;
|
||||
LOG_DEBUG("encrypted message");
|
||||
} else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
p = &mp_decoded;
|
||||
LOG_DEBUG("portnum %i message", mp_decoded.decoded.portnum);
|
||||
} else {
|
||||
LOG_DEBUG("nothing, pkt not decrypted");
|
||||
return; // Don't upload a still-encrypted PKI packet if not encryption_enabled
|
||||
}
|
||||
|
||||
LOG_DEBUG("MQTT onSend - Publish ");
|
||||
if (moduleConfig.mqtt.encryption_enabled) {
|
||||
env->packet = (meshtastic_MeshPacket *)&mp_encrypted;
|
||||
LOG_DEBUG("encrypted message");
|
||||
} else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
env->packet = (meshtastic_MeshPacket *)&mp_decoded;
|
||||
LOG_DEBUG("portnum %i message", env->packet->decoded.portnum);
|
||||
} else {
|
||||
LOG_DEBUG("nothing, pkt not decrypted");
|
||||
mqttPool.release(env);
|
||||
return; // Don't upload a still-encrypted PKI packet if not encryption_enabled
|
||||
}
|
||||
const meshtastic_ServiceEnvelope env = {
|
||||
.packet = const_cast<meshtastic_MeshPacket *>(p), .channel_id = const_cast<char *>(channelId), .gateway_id = owner.id};
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &env);
|
||||
std::string topic = cryptTopic + channelId + "/" + owner.id;
|
||||
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
std::string topic = cryptTopic + channelId + "/" + owner.id;
|
||||
LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes);
|
||||
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
|
||||
LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes);
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
#if !defined(ARCH_NRF52) || \
|
||||
defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize((meshtastic_MeshPacket *)&mp_decoded);
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(),
|
||||
jsonString.c_str());
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
if (!moduleConfig.mqtt.json_enabled)
|
||||
return;
|
||||
// handle json topic
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize(&mp_decoded);
|
||||
if (jsonString.length() == 0)
|
||||
return;
|
||||
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
#endif // ARCH_NRF52 NRF52_USE_JSON
|
||||
} else {
|
||||
LOG_INFO("MQTT not connected, queue packet");
|
||||
QueueEntry *entry;
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
LOG_WARN("MQTT queue is full, discard oldest");
|
||||
entry = mqttQueue.dequeuePtr(0);
|
||||
} else {
|
||||
LOG_INFO("MQTT not connected, queue packet");
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
LOG_WARN("MQTT queue is full, discard oldest");
|
||||
meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
if (d)
|
||||
mqttPool.release(d);
|
||||
}
|
||||
// make a copy of serviceEnvelope and queue it
|
||||
meshtastic_ServiceEnvelope *copied = mqttPool.allocCopy(*env);
|
||||
assert(mqttQueue.enqueue(copied, 0));
|
||||
entry = new QueueEntry;
|
||||
}
|
||||
mqttPool.release(env);
|
||||
entry->topic = std::move(topic);
|
||||
entry->envBytes.assign(bytes, numBytes);
|
||||
assert(mqttQueue.enqueue(entry, 0));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -660,73 +652,70 @@ void MQTT::perhapsReportToMap()
|
|||
if (!moduleConfig.mqtt.map_reporting_enabled || !(moduleConfig.mqtt.proxy_to_client_enabled || isConnectedDirectly()))
|
||||
return;
|
||||
|
||||
if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs)) {
|
||||
if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs))
|
||||
return;
|
||||
} else {
|
||||
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) {
|
||||
last_report_to_map = millis();
|
||||
if (map_position_precision == 0)
|
||||
LOG_WARN("MQTT Map report enabled, but precision is 0");
|
||||
if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)
|
||||
LOG_WARN("MQTT Map report enabled, but no position available");
|
||||
return;
|
||||
}
|
||||
|
||||
// Allocate ServiceEnvelope and fill it
|
||||
meshtastic_ServiceEnvelope *se = mqttPool.allocZeroed();
|
||||
se->channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()); // Use primary channel as the channel_id
|
||||
se->gateway_id = owner.id;
|
||||
|
||||
// Allocate MeshPacket and fill it
|
||||
meshtastic_MeshPacket *mp = packetPool.allocZeroed();
|
||||
mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag;
|
||||
mp->from = nodeDB->getNodeNum();
|
||||
mp->to = NODENUM_BROADCAST;
|
||||
mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP;
|
||||
|
||||
// Fill MapReport message
|
||||
meshtastic_MapReport mapReport = meshtastic_MapReport_init_default;
|
||||
memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name));
|
||||
memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name));
|
||||
mapReport.role = config.device.role;
|
||||
mapReport.hw_model = owner.hw_model;
|
||||
strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version));
|
||||
mapReport.region = config.lora.region;
|
||||
mapReport.modem_preset = config.lora.modem_preset;
|
||||
mapReport.has_default_channel = channels.hasDefaultChannel();
|
||||
|
||||
// Set position with precision (same as in PositionModule)
|
||||
if (map_position_precision < 32 && map_position_precision > 0) {
|
||||
mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.latitude_i += (1 << (31 - map_position_precision));
|
||||
mapReport.longitude_i += (1 << (31 - map_position_precision));
|
||||
} else {
|
||||
mapReport.latitude_i = localPosition.latitude_i;
|
||||
mapReport.longitude_i = localPosition.longitude_i;
|
||||
}
|
||||
mapReport.altitude = localPosition.altitude;
|
||||
mapReport.position_precision = map_position_precision;
|
||||
|
||||
mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true);
|
||||
|
||||
// Encode MapReport message and set it to MeshPacket in ServiceEnvelope
|
||||
mp->decoded.payload.size = pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes),
|
||||
&meshtastic_MapReport_msg, &mapReport);
|
||||
se->packet = mp;
|
||||
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, se);
|
||||
|
||||
LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str());
|
||||
publish(mapTopic.c_str(), bytes, numBytes, false);
|
||||
|
||||
// Release the allocated memory for ServiceEnvelope and MeshPacket
|
||||
mqttPool.release(se);
|
||||
packetPool.release(mp);
|
||||
|
||||
// Update the last report time
|
||||
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) {
|
||||
last_report_to_map = millis();
|
||||
if (map_position_precision == 0)
|
||||
LOG_WARN("MQTT Map report enabled, but precision is 0");
|
||||
if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)
|
||||
LOG_WARN("MQTT Map report enabled, but no position available");
|
||||
return;
|
||||
}
|
||||
|
||||
// Allocate MeshPacket and fill it
|
||||
meshtastic_MeshPacket *mp = packetPool.allocZeroed();
|
||||
mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag;
|
||||
mp->from = nodeDB->getNodeNum();
|
||||
mp->to = NODENUM_BROADCAST;
|
||||
mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP;
|
||||
|
||||
// Fill MapReport message
|
||||
meshtastic_MapReport mapReport = meshtastic_MapReport_init_default;
|
||||
memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name));
|
||||
memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name));
|
||||
mapReport.role = config.device.role;
|
||||
mapReport.hw_model = owner.hw_model;
|
||||
strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version));
|
||||
mapReport.region = config.lora.region;
|
||||
mapReport.modem_preset = config.lora.modem_preset;
|
||||
mapReport.has_default_channel = channels.hasDefaultChannel();
|
||||
|
||||
// Set position with precision (same as in PositionModule)
|
||||
if (map_position_precision < 32 && map_position_precision > 0) {
|
||||
mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.latitude_i += (1 << (31 - map_position_precision));
|
||||
mapReport.longitude_i += (1 << (31 - map_position_precision));
|
||||
} else {
|
||||
mapReport.latitude_i = localPosition.latitude_i;
|
||||
mapReport.longitude_i = localPosition.longitude_i;
|
||||
}
|
||||
mapReport.altitude = localPosition.altitude;
|
||||
mapReport.position_precision = map_position_precision;
|
||||
|
||||
mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true);
|
||||
|
||||
// Encode MapReport message into the MeshPacket
|
||||
mp->decoded.payload.size =
|
||||
pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes), &meshtastic_MapReport_msg, &mapReport);
|
||||
|
||||
// Encode the MeshPacket into a binary ServiceEnvelope and publish
|
||||
const meshtastic_ServiceEnvelope se = {
|
||||
.packet = mp,
|
||||
.channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()), // Use primary channel as the channel_id
|
||||
.gateway_id = owner.id};
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &se);
|
||||
|
||||
LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str());
|
||||
publish(mapTopic.c_str(), bytes, numBytes, false);
|
||||
|
||||
// Release the allocated memory for MeshPacket
|
||||
packetPool.release(mp);
|
||||
|
||||
// Update the last report time
|
||||
last_report_to_map = millis();
|
||||
}
|
||||
|
||||
bool MQTT::isPrivateIpAddress(const char address[])
|
||||
|
|
|
@ -78,7 +78,11 @@ class MQTT : private concurrency::OSThread
|
|||
void start() { setIntervalFromNow(0); };
|
||||
|
||||
protected:
|
||||
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;
|
||||
struct QueueEntry {
|
||||
std::string topic;
|
||||
std::basic_string<uint8_t> envBytes; // binary/pb_encode_to_bytes ServiceEnvelope
|
||||
};
|
||||
PointerQueue<QueueEntry> mqttQueue;
|
||||
|
||||
int reconnectCount = 0;
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue