From 1849dafbfbd563c955bc5e995d1ad007ae7f0461 Mon Sep 17 00:00:00 2001 From: David Johnson Date: Sun, 13 Feb 2022 14:49:28 +0000 Subject: [PATCH] Added MQTT publish function --- Makefile | 2 +- gateway.c | 36 ++++++++++++++- global.h | 24 +++++++--- mqtt.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ mqtt.h | 1 + 5 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 mqtt.c create mode 100644 mqtt.h diff --git a/Makefile b/Makefile index 1425faa..41f0cc3 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ INDOPT= -bap -bl -blf -bli0 -brs -cbi0 -cdw -cs -ci4 -cli4 -i4 -ip0 -nbc -nce -l CC=gcc CFLAGS=-Wall -O3 #-std=c99 -LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread +LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread -lpaho-mqtt3c RM=rm %.o: %.c *.h # combined w/ next line will compile recently changed .c files diff --git a/gateway.c b/gateway.c index 11b129e..f385754 100644 --- a/gateway.c +++ b/gateway.c @@ -33,6 +33,7 @@ #include "ssdv.h" #include "ftp.h" #include "habitat.h" +#include "mqtt.h" #include "hablink.h" #include "network.h" #include "network.h" @@ -196,6 +197,7 @@ struct TBinaryPacket { #pragma pack(pop) lifo_buffer_t Habitat_Upload_Buffer; +lifo_buffer_t MQTT_Upload_Buffer; // Create pipes for inter proces communication // GLOBAL AS CALLED FROM INTERRRUPT @@ -1135,6 +1137,18 @@ int ProcessTelemetryMessage(int Channel, received_t *Received) lifo_buffer_push(&Habitat_Upload_Buffer, (void *)queueReceived); } } + + if (Config.EnableMQTT) + { + // Add to MQTT upload queue + received_t *queueReceived = malloc(sizeof(received_t)); + if(queueReceived != NULL) + { + memcpy(queueReceived, Received, sizeof(received_t)); + /* Push pointer onto upload queue */ + lifo_buffer_push(&MQTT_Upload_Buffer, (void *)queueReceived); + } + } if (Config.EnableHablink && Config.HablinkAddress[0]) { @@ -2621,7 +2635,7 @@ int main( int argc, char **argv ) int ch; int LoopPeriod, MSPerLoop; int Channel; - pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread; + pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread, MQTTThread; struct TServerInfo JSONInfo, TelnetInfo, DataportInfo, ChatportInfo; atexit(bye); @@ -2717,6 +2731,26 @@ int main( int argc, char **argv ) return 1; } } + + if (Config.EnableMQTT) + { + lifo_buffer_init(&MQTT_Upload_Buffer, 1024); + mqtt_connect_t *mqttConnection = malloc(sizeof *mqttConnection); + + strcpy(mqttConnection->host, Config.MQTTHost); + strcpy(mqttConnection->port, Config.MQTTPort); + strcpy(mqttConnection->user, Config.MQTTUser); + strcpy(mqttConnection->pass, Config.MQTTPass); + strcpy(mqttConnection->topic, Config.MQTTTopic); + strcpy(mqttConnection->clientId, Config.MQTTClient); + + if ( pthread_create (&MQTTThread, NULL, MQTTLoop, mqttConnection)) + { + fprintf( stderr, "Error creating MQTT thread\n" ); + free(mqttConnection); + return 1; + } + } if (Config.EnableHablink && Config.HablinkAddress[0]) { diff --git a/global.h b/global.h index abddd64..51b3823 100644 --- a/global.h +++ b/global.h @@ -146,14 +146,14 @@ struct TConfig int DumpBuffer; char DumpFile[64]; int EnableMQTT; - char MQTTHost; - char MQTTPort; - char MQTTUser; - char MQTTPass; - char MQTTClient; - char MQTTTopic; + char MQTTHost[128]; + char MQTTPort[8]; + char MQTTUser[16]; + char MQTTPass[32]; + char MQTTClient[16]; + char MQTTTopic[32]; }; - + typedef struct { int parent_status; unsigned long packet_count; @@ -230,6 +230,15 @@ typedef struct { int Packet_Number; } ssdv_t; +typedef struct { + char host[128]; + char port[8]; + char user[16]; + char pass[32]; + char clientId[16]; + char topic[32]; +} mqtt_connect_t; + struct TServerInfo { int Port; @@ -238,6 +247,7 @@ struct TServerInfo int sockfd; }; + extern struct TConfig Config; extern int SSDVSendArrayIndex; extern pthread_mutex_t ssdv_mutex; diff --git a/mqtt.c b/mqtt.c new file mode 100644 index 0000000..d5f38e1 --- /dev/null +++ b/mqtt.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include // Standard input/output definitions +#include // String function definitions +#include // String function definitions + +#include "MQTTClient.h" + +#include "global.h" +#include "lifo_buffer.h" + +extern lifo_buffer_t MQTT_Upload_Buffer; + +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTClient_deliveryToken deliveredtoken; + + +void delivered(void *context, MQTTClient_deliveryToken dt) +{ + LogMessage("Message with token value %d delivery confirmed\n", dt); + deliveredtoken = dt; +} + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{ + int i; + char* payloadptr; + LogMessage("Message arrived\n"); + LogMessage(" topic: %s\n", topicName); + LogMessage(" message: "); + payloadptr = message->payload; + for(i=0; ipayloadlen; i++) + { + putchar(*payloadptr++); + } + putchar('\n'); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +void connlost(void *context, char *cause) +{ + LogMessage("\nConnection lost\n"); + LogMessage(" cause: %s\n", cause); +} + +bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t ) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + MQTTClient_deliveryToken token; + int rc; + + char address[256]; + sprintf(address, "tcp://%s:%s", mqttConnection->host,mqttConnection->port); + + MQTTClient_create(&client, address, mqttConnection->clientId, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.username = mqttConnection->user; + conn_opts.password = mqttConnection->pass; + MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); + LogMessage("Attempting publication on host: %s\n", + address); + //"on topic %s for client with ClientID: %s\n", + //t->Message, address, mqttConnection->topic, mqttConnection->clientId); + if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + LogMessage("Failed to connect, return code %d\n", rc); + return false; + } + pubmsg.payload = t->Message; + pubmsg.payloadlen = strlen(t->Message); + pubmsg.qos = QOS; + pubmsg.retained = 0; + deliveredtoken = 0; + MQTTClient_publishMessage(client, mqttConnection->topic, &pubmsg, &token); + while(deliveredtoken != token); + MQTTClient_disconnect(client, 10000); + MQTTClient_destroy(&client); + return true; +} + +void *MQTTLoop( mqtt_connect_t *mqttConnection ) +{ + if ( Config.EnableMQTT ) + { + received_t *dequeued_telemetry_ptr; + + // Keep looping until the parent quits + while ( true ) + { + dequeued_telemetry_ptr = lifo_buffer_waitpop(&MQTT_Upload_Buffer); + + if(dequeued_telemetry_ptr != NULL) + { + if(UploadMQTTPacket(mqttConnection, dequeued_telemetry_ptr )) + { + free(dequeued_telemetry_ptr); + } + else + { + if(!lifo_buffer_requeue(&MQTT_Upload_Buffer, dequeued_telemetry_ptr)) + { + /* Requeue failed, drop packet */ + free(dequeued_telemetry_ptr); + } + } + } + else + { + /* NULL returned: We've been asked to quit */ + /* Don't bother free()ing stuff, as application is quitting */ + break; + } + } + } + + return NULL; +} + diff --git a/mqtt.h b/mqtt.h new file mode 100644 index 0000000..699ae43 --- /dev/null +++ b/mqtt.h @@ -0,0 +1 @@ +void *MQTTLoop( void *some_void_ptr );