V1.8.46: Flexible MQTT topic path

pull/72/head
Dave Akerman 2022-02-16 16:54:14 +00:00
rodzic 3ce242c7dd
commit cc8dacc030
5 zmienionych plików z 63 dodań i 8 usunięć

Wyświetl plik

@ -277,7 +277,11 @@ Many thanks to David Brooke for coding this feature and the AFC.
Change History
==============
## 14/02/2022 - V1.8.45
16/02/2022 - V1.8.46
Added flexible MQTT topic - $GATEWAY$ gets replaced by gateway callsign; $PAYLOAD$ gets replaced by payload callsign
14/02/2022 - V1.8.45
Added MQTT support (coded by David Johnson G4DPZ)

Wyświetl plik

@ -68,6 +68,7 @@ AFC_1=Y
#MQTTPass=mqtt_password
#MQTTClient=mqtt_client_name
#MQTTTopic=topic_name
# example MQTTTopic=incoming/payloads/$PAYLOAD$/$GATEWAY$/sentence
# Dump config
#DumpBuffer=Y

Wyświetl plik

@ -47,7 +47,7 @@
#include "udpclient.h"
#include "lifo_buffer.h"
#define VERSION "V1.8.45"
#define VERSION "V1.8.46"
bool run = TRUE;
// RFM98

Wyświetl plik

@ -151,7 +151,7 @@ struct TConfig
char MQTTUser[16];
char MQTTPass[32];
char MQTTClient[16];
char MQTTTopic[32];
char MQTTTopic[128];
};
typedef struct {
@ -236,7 +236,7 @@ typedef struct {
char user[16];
char pass[32];
char clientId[16];
char topic[32];
char topic[128];
} mqtt_connect_t;
struct TServerInfo

58
mqtt.c
Wyświetl plik

@ -49,6 +49,54 @@ void connlost(void *context, char *cause)
LogMessage(" cause: %s\n", cause);
}
void InsertSubstring(char *topic, char *field, char *value)
{
char *position;
char temp[256];
strcpy(temp, topic);
position = strstr(temp, field);
if (position != NULL)
{
strcpy(position, value);
position = strstr(topic, field);
strcat(temp, position + strlen(field));
// LogMessage("topic is now <%s>\n", temp);
strcpy(topic, temp);
}
}
void BuildMQTTPath(char *topic, mqtt_connect_t * mqttConnection, received_t * t)
{
int i;
char GatewayID[32], Payload[32];
// Get gateway callsign and remove any slashes as they would mess up the MQTT topic path
strcpy(GatewayID, Config.Tracker);
for (i=0; i<strlen(GatewayID); i++)
{
if (GatewayID[i] == '/')
{
GatewayID[i] = '_';
}
}
// Get the payload ID
sscanf(t->Message + 2, "%31[^,]", Payload);
// Example: MQTTTopic=incoming/payloads/$PAYLOAD$/$GATEWAY$/sentence
strcpy(topic, mqttConnection->topic);
// Insert the payload ID if required by the config
InsertSubstring(topic, "$PAYLOAD$", Payload);
// Insert the callsign if required by the config
InsertSubstring(topic, "$GATEWAY$", GatewayID);
// sprintf(topic, "%s/%s/%s/sentence", mqttConnection->topic, Payload, GatewayID);
}
bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t )
{
MQTTClient client;
@ -56,9 +104,11 @@ bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t )
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
char address[256];
char address[256], topic[256];
sprintf(address, "tcp://%s:%s", mqttConnection->host,mqttConnection->port);
BuildMQTTPath(topic, mqttConnection, t);
MQTTClient_create(&client, address, mqttConnection->clientId,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
@ -70,7 +120,7 @@ bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t )
// LogMessage("Attempting publication on host: %s\n",
// address);
//"on topic %s for client with ClientID: %s\n",
//t->Message, address, mqttConnection->topic, mqttConnection->clientId);
//t->Message, address, topic, mqttConnection->clientId);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
LogMessage("MQTT: Failed to connect, return code %d\n", rc);
@ -81,7 +131,7 @@ bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t )
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
MQTTClient_publishMessage(client, mqttConnection->topic, &pubmsg, &token);
MQTTClient_publishMessage(client, topic, &pubmsg, &token);
while(deliveredtoken != token);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);