kopia lustrzana https://github.com/meshtastic/firmware
				
				
				
			Mark packets received via MQTT and add option to ignore them (#3117)
* Mark packets received via MQTT and add option to ignore them * Don't send packets received via MQTT back into MQTT Generate implicit ACK for packets we as an MQTT gateway sent --------- Co-authored-by: Ben Meadors <benmmeadors@gmail.com>pull/3119/head v2.2.19.8f6a283
							rodzic
							
								
									4f76239d48
								
							
						
					
					
						commit
						8f6a2836b8
					
				|  | @ -173,6 +173,7 @@ void NodeDB::installDefaultConfig() | |||
|     config.lora.region = meshtastic_Config_LoRaConfig_RegionCode_UNSET; | ||||
|     config.lora.modem_preset = meshtastic_Config_LoRaConfig_ModemPreset_LONG_FAST; | ||||
|     config.lora.hop_limit = HOP_RELIABLE; | ||||
|     config.lora.ignore_mqtt = false; | ||||
| #ifdef PIN_GPS_EN | ||||
|     config.position.gps_en_gpio = PIN_GPS_EN; | ||||
| #endif | ||||
|  |  | |||
|  | @ -287,15 +287,14 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p) | |||
|         out += " encrypted"; | ||||
|     } | ||||
| 
 | ||||
|     if (p->rx_time != 0) { | ||||
|     if (p->rx_time != 0) | ||||
|         out += DEBUG_PORT.mt_sprintf(" rxtime=%u", p->rx_time); | ||||
|     } | ||||
|     if (p->rx_snr != 0.0) { | ||||
|     if (p->rx_snr != 0.0) | ||||
|         out += DEBUG_PORT.mt_sprintf(" rxSNR=%g", p->rx_snr); | ||||
|     } | ||||
|     if (p->rx_rssi != 0) { | ||||
|     if (p->rx_rssi != 0) | ||||
|         out += DEBUG_PORT.mt_sprintf(" rxRSSI=%i", p->rx_rssi); | ||||
|     } | ||||
|     if (p->via_mqtt != 0) | ||||
|         out += DEBUG_PORT.mt_sprintf(" via MQTT"); | ||||
|     if (p->priority != 0) | ||||
|         out += DEBUG_PORT.mt_sprintf(" priority=%d", p->priority); | ||||
| 
 | ||||
|  | @ -554,7 +553,7 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p) | |||
|         LOG_WARN("hop limit %d is too high, setting to %d\n", p->hop_limit, HOP_RELIABLE); | ||||
|         p->hop_limit = HOP_RELIABLE; | ||||
|     } | ||||
|     h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0); | ||||
|     h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0) | (p->via_mqtt ? PACKET_FLAGS_VIA_MQTT_MASK : 0); | ||||
| 
 | ||||
|     // if the sender nodenum is zero, that means uninitialized
 | ||||
|     assert(h->from); | ||||
|  | @ -563,4 +562,4 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p) | |||
| 
 | ||||
|     sendingPacket = p; | ||||
|     return p->encrypted.size + sizeof(PacketHeader); | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -12,6 +12,7 @@ | |||
| 
 | ||||
| #define PACKET_FLAGS_HOP_MASK 0x07 | ||||
| #define PACKET_FLAGS_WANT_ACK_MASK 0x08 | ||||
| #define PACKET_FLAGS_VIA_MQTT_MASK 0x10 | ||||
| 
 | ||||
| /**
 | ||||
|  * This structure has to exactly match the wire layout when sent over the radio link.  Used to keep compatibility | ||||
|  | @ -223,4 +224,4 @@ class RadioInterface | |||
| }; | ||||
| 
 | ||||
| /// Debug printing for packets
 | ||||
| void printPacket(const char *prefix, const meshtastic_MeshPacket *p); | ||||
| void printPacket(const char *prefix, const meshtastic_MeshPacket *p); | ||||
|  |  | |||
|  | @ -362,6 +362,7 @@ void RadioLibInterface::handleReceiveInterrupt() | |||
|             assert(HOP_MAX <= PACKET_FLAGS_HOP_MASK); // If hopmax changes, carefully check this code
 | ||||
|             mp->hop_limit = h->flags & PACKET_FLAGS_HOP_MASK; | ||||
|             mp->want_ack = !!(h->flags & PACKET_FLAGS_WANT_ACK_MASK); | ||||
|             mp->via_mqtt = !!(h->flags & PACKET_FLAGS_VIA_MQTT_MASK); | ||||
| 
 | ||||
|             addReceiveMetadata(mp); | ||||
| 
 | ||||
|  | @ -406,4 +407,4 @@ void RadioLibInterface::startSend(meshtastic_MeshPacket *txp) | |||
|         // bits
 | ||||
|         enableInterrupt(isrTxLevel0); | ||||
|     } | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -467,10 +467,10 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src) | |||
| void Router::perhapsHandleReceived(meshtastic_MeshPacket *p) | ||||
| { | ||||
|     // assert(radioConfig.has_preferences);
 | ||||
|     bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from); | ||||
|     bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from) || (config.lora.ignore_mqtt && p->via_mqtt); | ||||
| 
 | ||||
|     if (ignore) { | ||||
|         LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list\n", p->from); | ||||
|         LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list or came via MQTT\n", p->from); | ||||
|     } else if (ignore |= shouldFilterReceived(p)) { | ||||
|         LOG_DEBUG("Incoming message was filtered 0x%x\n", p->from); | ||||
|     } | ||||
|  | @ -481,4 +481,4 @@ void Router::perhapsHandleReceived(meshtastic_MeshPacket *p) | |||
|         handleReceived(p); | ||||
| 
 | ||||
|     packetPool.release(p); | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -85,4 +85,4 @@ TraceRouteModule::TraceRouteModule() | |||
|     : ProtobufModule("traceroute", meshtastic_PortNum_TRACEROUTE_APP, &meshtastic_RouteDiscovery_msg) | ||||
| { | ||||
|     ourPortNum = meshtastic_PortNum_TRACEROUTE_APP; | ||||
| } | ||||
| } | ||||
|  | @ -7,6 +7,7 @@ | |||
| #include "mesh/Router.h" | ||||
| #include "mesh/generated/meshtastic/mqtt.pb.h" | ||||
| #include "mesh/generated/meshtastic/telemetry.pb.h" | ||||
| #include "modules/RoutingModule.h" | ||||
| #if defined(ARCH_ESP32) | ||||
| #include "../mesh/generated/meshtastic/paxcount.pb.h" | ||||
| #endif | ||||
|  | @ -142,6 +143,7 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length) | |||
|                 if (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled) { | ||||
|                     LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length); | ||||
|                     meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet); | ||||
|                     p->via_mqtt = true; // Mark that the packet was received via MQTT
 | ||||
| 
 | ||||
|                     if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { | ||||
|                         p->channel = ch.index; | ||||
|  | @ -463,6 +465,9 @@ void MQTT::publishQueuedMessages() | |||
| 
 | ||||
| void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) | ||||
| { | ||||
|     if (mp.via_mqtt) | ||||
|         return; // Don't send messages that came from MQTT back into MQTT
 | ||||
| 
 | ||||
|     auto &ch = channels.getByIndex(chIndex); | ||||
| 
 | ||||
|     if (&mp.decoded && strcmp(moduleConfig.mqtt.address, default_mqtt_address) == 0 && | ||||
|  | @ -501,6 +506,12 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) | |||
|                     publish(topicJson.c_str(), jsonString.c_str(), false); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message.
 | ||||
|             // We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node
 | ||||
|             // receives it when we're connected to the broker. Then we'll stop our retransmissions.
 | ||||
|             if (getFrom(&mp) == nodeDB.getNodeNum()) | ||||
|                 routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(&mp), mp.id, chIndex); | ||||
|         } else { | ||||
|             LOG_INFO("MQTT not connected, queueing packet\n"); | ||||
|             if (mqttQueue.numFree() == 0) { | ||||
|  |  | |||
		Ładowanie…
	
		Reference in New Issue
	
	 GUVWAF
						GUVWAF