From 23d0e022cc2146d34fee3cc160b2753699fed457 Mon Sep 17 00:00:00 2001 From: David Johnson Date: Sat, 12 Feb 2022 15:27:06 +0000 Subject: [PATCH 1/4] Added MQTT and Datadump example config --- .gitignore | 2 ++ gateway-sample.txt | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/.gitignore b/.gitignore index 2cfc018..981dcf5 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ ssdv gateway-mode0.txt test.txt uplink.txt +LORA_dump +LORA_Dump diff --git a/gateway-sample.txt b/gateway-sample.txt index f5779aa..04c6dfa 100644 --- a/gateway-sample.txt +++ b/gateway-sample.txt @@ -59,3 +59,16 @@ AFC_1=Y #DIO5_1=5 #UplinkTime_1=5 #UplinkCycle_1=60 + +# MQTT Config +EnableMQTT=Y +MQTTHost=mqtt_host +MQTTPort=1883 +MQTTUser=mqtt_user +MQTTPass=mqtt_password +MQTTClient=mqtt_client_name +MQTTTopic=topic_name + +# Dump config +#DumpBuffer=Y +#DumpFile=./LORA_dump From 2765c91854277b2d674a0a094553934bb8c82a98 Mon Sep 17 00:00:00 2001 From: David Johnson Date: Sat, 12 Feb 2022 15:55:05 +0000 Subject: [PATCH 2/4] Added MQTT config to header and reading config into gateway --- gateway.c | 10 ++++++++++ global.h | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/gateway.c b/gateway.c index 96aef32..11b129e 100644 --- a/gateway.c +++ b/gateway.c @@ -2113,6 +2113,16 @@ void LoadConfigFile(void) RegisterConfigBoolean(MainSection, -1, "DumpBuffer", &Config.DumpBuffer, NULL); RegisterConfigString(MainSection, -1, "DumpFile", Config.DumpFile, sizeof(Config.DumpFile), NULL); + // MQTT + RegisterConfigBoolean(MainSection, -1, "EnableMQTT", &Config.EnableMQTT, NULL); + RegisterConfigString(MainSection, -1, "MQTTHost", Config.MQTTHost, sizeof(Config.MQTTHost), NULL); + RegisterConfigString(MainSection, -1, "MQTTPort", Config.MQTTPort, sizeof(Config.MQTTPort), NULL); + RegisterConfigString(MainSection, -1, "MQTTUser", Config.MQTTUser, sizeof(Config.MQTTUser), NULL); + RegisterConfigString(MainSection, -1, "MQTTPass", Config.MQTTPass, sizeof(Config.MQTTPass), NULL); + RegisterConfigString(MainSection, -1, "MQTTClient", Config.MQTTClient, sizeof(Config.MQTTClient), NULL); + RegisterConfigString(MainSection, -1, "MQTTTopic", Config.MQTTTopic, sizeof(Config.MQTTTopic), NULL); + + for (Channel = 0; Channel <= 1; Channel++) { RegisterConfigDouble(MainSection, Channel, "frequency", &Config.LoRaDevices[Channel].Frequency, LoRaCallback); diff --git a/global.h b/global.h index 63579a4..abddd64 100644 --- a/global.h +++ b/global.h @@ -145,6 +145,13 @@ struct TConfig char Version[16]; int DumpBuffer; char DumpFile[64]; + int EnableMQTT; + char MQTTHost; + char MQTTPort; + char MQTTUser; + char MQTTPass; + char MQTTClient; + char MQTTTopic; }; typedef struct { From 1849dafbfbd563c955bc5e995d1ad007ae7f0461 Mon Sep 17 00:00:00 2001 From: David Johnson Date: Sun, 13 Feb 2022 14:49:28 +0000 Subject: [PATCH 3/4] Added MQTT publish function --- Makefile | 2 +- gateway.c | 36 ++++++++++++++- global.h | 24 +++++++--- mqtt.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ mqtt.h | 1 + 5 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 mqtt.c create mode 100644 mqtt.h diff --git a/Makefile b/Makefile index 1425faa..41f0cc3 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ INDOPT= -bap -bl -blf -bli0 -brs -cbi0 -cdw -cs -ci4 -cli4 -i4 -ip0 -nbc -nce -l CC=gcc CFLAGS=-Wall -O3 #-std=c99 -LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread +LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread -lpaho-mqtt3c RM=rm %.o: %.c *.h # combined w/ next line will compile recently changed .c files diff --git a/gateway.c b/gateway.c index 11b129e..f385754 100644 --- a/gateway.c +++ b/gateway.c @@ -33,6 +33,7 @@ #include "ssdv.h" #include "ftp.h" #include "habitat.h" +#include "mqtt.h" #include "hablink.h" #include "network.h" #include "network.h" @@ -196,6 +197,7 @@ struct TBinaryPacket { #pragma pack(pop) lifo_buffer_t Habitat_Upload_Buffer; +lifo_buffer_t MQTT_Upload_Buffer; // Create pipes for inter proces communication // GLOBAL AS CALLED FROM INTERRRUPT @@ -1135,6 +1137,18 @@ int ProcessTelemetryMessage(int Channel, received_t *Received) lifo_buffer_push(&Habitat_Upload_Buffer, (void *)queueReceived); } } + + if (Config.EnableMQTT) + { + // Add to MQTT upload queue + received_t *queueReceived = malloc(sizeof(received_t)); + if(queueReceived != NULL) + { + memcpy(queueReceived, Received, sizeof(received_t)); + /* Push pointer onto upload queue */ + lifo_buffer_push(&MQTT_Upload_Buffer, (void *)queueReceived); + } + } if (Config.EnableHablink && Config.HablinkAddress[0]) { @@ -2621,7 +2635,7 @@ int main( int argc, char **argv ) int ch; int LoopPeriod, MSPerLoop; int Channel; - pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread; + pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread, MQTTThread; struct TServerInfo JSONInfo, TelnetInfo, DataportInfo, ChatportInfo; atexit(bye); @@ -2717,6 +2731,26 @@ int main( int argc, char **argv ) return 1; } } + + if (Config.EnableMQTT) + { + lifo_buffer_init(&MQTT_Upload_Buffer, 1024); + mqtt_connect_t *mqttConnection = malloc(sizeof *mqttConnection); + + strcpy(mqttConnection->host, Config.MQTTHost); + strcpy(mqttConnection->port, Config.MQTTPort); + strcpy(mqttConnection->user, Config.MQTTUser); + strcpy(mqttConnection->pass, Config.MQTTPass); + strcpy(mqttConnection->topic, Config.MQTTTopic); + strcpy(mqttConnection->clientId, Config.MQTTClient); + + if ( pthread_create (&MQTTThread, NULL, MQTTLoop, mqttConnection)) + { + fprintf( stderr, "Error creating MQTT thread\n" ); + free(mqttConnection); + return 1; + } + } if (Config.EnableHablink && Config.HablinkAddress[0]) { diff --git a/global.h b/global.h index abddd64..51b3823 100644 --- a/global.h +++ b/global.h @@ -146,14 +146,14 @@ struct TConfig int DumpBuffer; char DumpFile[64]; int EnableMQTT; - char MQTTHost; - char MQTTPort; - char MQTTUser; - char MQTTPass; - char MQTTClient; - char MQTTTopic; + char MQTTHost[128]; + char MQTTPort[8]; + char MQTTUser[16]; + char MQTTPass[32]; + char MQTTClient[16]; + char MQTTTopic[32]; }; - + typedef struct { int parent_status; unsigned long packet_count; @@ -230,6 +230,15 @@ typedef struct { int Packet_Number; } ssdv_t; +typedef struct { + char host[128]; + char port[8]; + char user[16]; + char pass[32]; + char clientId[16]; + char topic[32]; +} mqtt_connect_t; + struct TServerInfo { int Port; @@ -238,6 +247,7 @@ struct TServerInfo int sockfd; }; + extern struct TConfig Config; extern int SSDVSendArrayIndex; extern pthread_mutex_t ssdv_mutex; diff --git a/mqtt.c b/mqtt.c new file mode 100644 index 0000000..d5f38e1 --- /dev/null +++ b/mqtt.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include // Standard input/output definitions +#include // String function definitions +#include // String function definitions + +#include "MQTTClient.h" + +#include "global.h" +#include "lifo_buffer.h" + +extern lifo_buffer_t MQTT_Upload_Buffer; + +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTClient_deliveryToken deliveredtoken; + + +void delivered(void *context, MQTTClient_deliveryToken dt) +{ + LogMessage("Message with token value %d delivery confirmed\n", dt); + deliveredtoken = dt; +} + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{ + int i; + char* payloadptr; + LogMessage("Message arrived\n"); + LogMessage(" topic: %s\n", topicName); + LogMessage(" message: "); + payloadptr = message->payload; + for(i=0; ipayloadlen; i++) + { + putchar(*payloadptr++); + } + putchar('\n'); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +void connlost(void *context, char *cause) +{ + LogMessage("\nConnection lost\n"); + LogMessage(" cause: %s\n", cause); +} + +bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t ) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + MQTTClient_deliveryToken token; + int rc; + + char address[256]; + sprintf(address, "tcp://%s:%s", mqttConnection->host,mqttConnection->port); + + MQTTClient_create(&client, address, mqttConnection->clientId, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.username = mqttConnection->user; + conn_opts.password = mqttConnection->pass; + MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); + LogMessage("Attempting publication on host: %s\n", + address); + //"on topic %s for client with ClientID: %s\n", + //t->Message, address, mqttConnection->topic, mqttConnection->clientId); + if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + LogMessage("Failed to connect, return code %d\n", rc); + return false; + } + pubmsg.payload = t->Message; + pubmsg.payloadlen = strlen(t->Message); + pubmsg.qos = QOS; + pubmsg.retained = 0; + deliveredtoken = 0; + MQTTClient_publishMessage(client, mqttConnection->topic, &pubmsg, &token); + while(deliveredtoken != token); + MQTTClient_disconnect(client, 10000); + MQTTClient_destroy(&client); + return true; +} + +void *MQTTLoop( mqtt_connect_t *mqttConnection ) +{ + if ( Config.EnableMQTT ) + { + received_t *dequeued_telemetry_ptr; + + // Keep looping until the parent quits + while ( true ) + { + dequeued_telemetry_ptr = lifo_buffer_waitpop(&MQTT_Upload_Buffer); + + if(dequeued_telemetry_ptr != NULL) + { + if(UploadMQTTPacket(mqttConnection, dequeued_telemetry_ptr )) + { + free(dequeued_telemetry_ptr); + } + else + { + if(!lifo_buffer_requeue(&MQTT_Upload_Buffer, dequeued_telemetry_ptr)) + { + /* Requeue failed, drop packet */ + free(dequeued_telemetry_ptr); + } + } + } + else + { + /* NULL returned: We've been asked to quit */ + /* Don't bother free()ing stuff, as application is quitting */ + break; + } + } + } + + return NULL; +} + diff --git a/mqtt.h b/mqtt.h new file mode 100644 index 0000000..699ae43 --- /dev/null +++ b/mqtt.h @@ -0,0 +1 @@ +void *MQTTLoop( void *some_void_ptr ); From 72d5758bffa4a3be216ee179b9439ebb5df483c9 Mon Sep 17 00:00:00 2001 From: David Johnson Date: Sun, 13 Feb 2022 15:33:10 +0000 Subject: [PATCH 4/4] Commented out the MQTT config in the sample gateway.txt file --- gateway-sample.txt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/gateway-sample.txt b/gateway-sample.txt index 04c6dfa..ae931ab 100644 --- a/gateway-sample.txt +++ b/gateway-sample.txt @@ -61,13 +61,13 @@ AFC_1=Y #UplinkCycle_1=60 # MQTT Config -EnableMQTT=Y -MQTTHost=mqtt_host -MQTTPort=1883 -MQTTUser=mqtt_user -MQTTPass=mqtt_password -MQTTClient=mqtt_client_name -MQTTTopic=topic_name +#EnableMQTT=Y +#MQTTHost=mqtt_host +#MQTTPort=1883 +#MQTTUser=mqtt_user +#MQTTPass=mqtt_password +#MQTTClient=mqtt_client_name +#MQTTTopic=topic_name # Dump config #DumpBuffer=Y