kopia lustrzana https://github.com/PiInTheSky/lora-gateway
commit
5fa0c1ee87
|
@ -16,3 +16,5 @@ ssdv
|
|||
gateway-mode0.txt
|
||||
test.txt
|
||||
uplink.txt
|
||||
LORA_dump
|
||||
LORA_Dump
|
||||
|
|
2
Makefile
2
Makefile
|
@ -9,7 +9,7 @@ INDOPT= -bap -bl -blf -bli0 -brs -cbi0 -cdw -cs -ci4 -cli4 -i4 -ip0 -nbc -nce -l
|
|||
|
||||
CC=gcc
|
||||
CFLAGS=-Wall -O3 #-std=c99
|
||||
LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread
|
||||
LDFLAGS= -lm -lwiringPi -lwiringPiDev -lcurl -lncurses -lpthread -lpaho-mqtt3c
|
||||
RM=rm
|
||||
|
||||
%.o: %.c *.h # combined w/ next line will compile recently changed .c files
|
||||
|
|
|
@ -59,3 +59,16 @@ AFC_1=Y
|
|||
#DIO5_1=5
|
||||
#UplinkTime_1=5
|
||||
#UplinkCycle_1=60
|
||||
|
||||
# MQTT Config
|
||||
#EnableMQTT=Y
|
||||
#MQTTHost=mqtt_host
|
||||
#MQTTPort=1883
|
||||
#MQTTUser=mqtt_user
|
||||
#MQTTPass=mqtt_password
|
||||
#MQTTClient=mqtt_client_name
|
||||
#MQTTTopic=topic_name
|
||||
|
||||
# Dump config
|
||||
#DumpBuffer=Y
|
||||
#DumpFile=./LORA_dump
|
||||
|
|
46
gateway.c
46
gateway.c
|
@ -33,6 +33,7 @@
|
|||
#include "ssdv.h"
|
||||
#include "ftp.h"
|
||||
#include "habitat.h"
|
||||
#include "mqtt.h"
|
||||
#include "hablink.h"
|
||||
#include "network.h"
|
||||
#include "network.h"
|
||||
|
@ -196,6 +197,7 @@ struct TBinaryPacket {
|
|||
#pragma pack(pop)
|
||||
|
||||
lifo_buffer_t Habitat_Upload_Buffer;
|
||||
lifo_buffer_t MQTT_Upload_Buffer;
|
||||
|
||||
// Create pipes for inter proces communication
|
||||
// GLOBAL AS CALLED FROM INTERRRUPT
|
||||
|
@ -1136,6 +1138,18 @@ int ProcessTelemetryMessage(int Channel, received_t *Received)
|
|||
}
|
||||
}
|
||||
|
||||
if (Config.EnableMQTT)
|
||||
{
|
||||
// Add to MQTT upload queue
|
||||
received_t *queueReceived = malloc(sizeof(received_t));
|
||||
if(queueReceived != NULL)
|
||||
{
|
||||
memcpy(queueReceived, Received, sizeof(received_t));
|
||||
/* Push pointer onto upload queue */
|
||||
lifo_buffer_push(&MQTT_Upload_Buffer, (void *)queueReceived);
|
||||
}
|
||||
}
|
||||
|
||||
if (Config.EnableHablink && Config.HablinkAddress[0])
|
||||
{
|
||||
SetHablinkSentence(startmessage);
|
||||
|
@ -2113,6 +2127,16 @@ void LoadConfigFile(void)
|
|||
RegisterConfigBoolean(MainSection, -1, "DumpBuffer", &Config.DumpBuffer, NULL);
|
||||
RegisterConfigString(MainSection, -1, "DumpFile", Config.DumpFile, sizeof(Config.DumpFile), NULL);
|
||||
|
||||
// MQTT
|
||||
RegisterConfigBoolean(MainSection, -1, "EnableMQTT", &Config.EnableMQTT, NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTHost", Config.MQTTHost, sizeof(Config.MQTTHost), NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTPort", Config.MQTTPort, sizeof(Config.MQTTPort), NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTUser", Config.MQTTUser, sizeof(Config.MQTTUser), NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTPass", Config.MQTTPass, sizeof(Config.MQTTPass), NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTClient", Config.MQTTClient, sizeof(Config.MQTTClient), NULL);
|
||||
RegisterConfigString(MainSection, -1, "MQTTTopic", Config.MQTTTopic, sizeof(Config.MQTTTopic), NULL);
|
||||
|
||||
|
||||
for (Channel = 0; Channel <= 1; Channel++)
|
||||
{
|
||||
RegisterConfigDouble(MainSection, Channel, "frequency", &Config.LoRaDevices[Channel].Frequency, LoRaCallback);
|
||||
|
@ -2611,7 +2635,7 @@ int main( int argc, char **argv )
|
|||
int ch;
|
||||
int LoopPeriod, MSPerLoop;
|
||||
int Channel;
|
||||
pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread;
|
||||
pthread_t SSDVThread, FTPThread, NetworkThread, HabitatThread, HablinkThread, ServerThread, TelnetThread, ListenerThread, DataportThread, ChatportThread, MQTTThread;
|
||||
struct TServerInfo JSONInfo, TelnetInfo, DataportInfo, ChatportInfo;
|
||||
|
||||
atexit(bye);
|
||||
|
@ -2708,6 +2732,26 @@ int main( int argc, char **argv )
|
|||
}
|
||||
}
|
||||
|
||||
if (Config.EnableMQTT)
|
||||
{
|
||||
lifo_buffer_init(&MQTT_Upload_Buffer, 1024);
|
||||
mqtt_connect_t *mqttConnection = malloc(sizeof *mqttConnection);
|
||||
|
||||
strcpy(mqttConnection->host, Config.MQTTHost);
|
||||
strcpy(mqttConnection->port, Config.MQTTPort);
|
||||
strcpy(mqttConnection->user, Config.MQTTUser);
|
||||
strcpy(mqttConnection->pass, Config.MQTTPass);
|
||||
strcpy(mqttConnection->topic, Config.MQTTTopic);
|
||||
strcpy(mqttConnection->clientId, Config.MQTTClient);
|
||||
|
||||
if ( pthread_create (&MQTTThread, NULL, MQTTLoop, mqttConnection))
|
||||
{
|
||||
fprintf( stderr, "Error creating MQTT thread\n" );
|
||||
free(mqttConnection);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (Config.EnableHablink && Config.HablinkAddress[0])
|
||||
{
|
||||
if (pthread_create (&HablinkThread, NULL, HablinkLoop, NULL))
|
||||
|
|
17
global.h
17
global.h
|
@ -145,6 +145,13 @@ struct TConfig
|
|||
char Version[16];
|
||||
int DumpBuffer;
|
||||
char DumpFile[64];
|
||||
int EnableMQTT;
|
||||
char MQTTHost[128];
|
||||
char MQTTPort[8];
|
||||
char MQTTUser[16];
|
||||
char MQTTPass[32];
|
||||
char MQTTClient[16];
|
||||
char MQTTTopic[32];
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
@ -223,6 +230,15 @@ typedef struct {
|
|||
int Packet_Number;
|
||||
} ssdv_t;
|
||||
|
||||
typedef struct {
|
||||
char host[128];
|
||||
char port[8];
|
||||
char user[16];
|
||||
char pass[32];
|
||||
char clientId[16];
|
||||
char topic[32];
|
||||
} mqtt_connect_t;
|
||||
|
||||
struct TServerInfo
|
||||
{
|
||||
int Port;
|
||||
|
@ -231,6 +247,7 @@ struct TServerInfo
|
|||
int sockfd;
|
||||
};
|
||||
|
||||
|
||||
extern struct TConfig Config;
|
||||
extern int SSDVSendArrayIndex;
|
||||
extern pthread_mutex_t ssdv_mutex;
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <ctype.h>
|
||||
#include <stdio.h> // Standard input/output definitions
|
||||
#include <string.h> // String function definitions
|
||||
#include <stdlib.h> // String function definitions
|
||||
|
||||
#include "MQTTClient.h"
|
||||
|
||||
#include "global.h"
|
||||
#include "lifo_buffer.h"
|
||||
|
||||
extern lifo_buffer_t MQTT_Upload_Buffer;
|
||||
|
||||
#define QOS 1
|
||||
#define TIMEOUT 10000L
|
||||
|
||||
volatile MQTTClient_deliveryToken deliveredtoken;
|
||||
|
||||
|
||||
void delivered(void *context, MQTTClient_deliveryToken dt)
|
||||
{
|
||||
LogMessage("Message with token value %d delivery confirmed\n", dt);
|
||||
deliveredtoken = dt;
|
||||
}
|
||||
|
||||
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
|
||||
{
|
||||
int i;
|
||||
char* payloadptr;
|
||||
LogMessage("Message arrived\n");
|
||||
LogMessage(" topic: %s\n", topicName);
|
||||
LogMessage(" message: ");
|
||||
payloadptr = message->payload;
|
||||
for(i=0; i<message->payloadlen; i++)
|
||||
{
|
||||
putchar(*payloadptr++);
|
||||
}
|
||||
putchar('\n');
|
||||
MQTTClient_freeMessage(&message);
|
||||
MQTTClient_free(topicName);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void connlost(void *context, char *cause)
|
||||
{
|
||||
LogMessage("\nConnection lost\n");
|
||||
LogMessage(" cause: %s\n", cause);
|
||||
}
|
||||
|
||||
bool UploadMQTTPacket(mqtt_connect_t * mqttConnection, received_t * t )
|
||||
{
|
||||
MQTTClient client;
|
||||
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
|
||||
MQTTClient_message pubmsg = MQTTClient_message_initializer;
|
||||
MQTTClient_deliveryToken token;
|
||||
int rc;
|
||||
|
||||
char address[256];
|
||||
sprintf(address, "tcp://%s:%s", mqttConnection->host,mqttConnection->port);
|
||||
|
||||
MQTTClient_create(&client, address, mqttConnection->clientId,
|
||||
MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||||
conn_opts.keepAliveInterval = 20;
|
||||
conn_opts.cleansession = 1;
|
||||
conn_opts.username = mqttConnection->user;
|
||||
conn_opts.password = mqttConnection->pass;
|
||||
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
|
||||
LogMessage("Attempting publication on host: %s\n",
|
||||
address);
|
||||
//"on topic %s for client with ClientID: %s\n",
|
||||
//t->Message, address, mqttConnection->topic, mqttConnection->clientId);
|
||||
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
|
||||
{
|
||||
LogMessage("Failed to connect, return code %d\n", rc);
|
||||
return false;
|
||||
}
|
||||
pubmsg.payload = t->Message;
|
||||
pubmsg.payloadlen = strlen(t->Message);
|
||||
pubmsg.qos = QOS;
|
||||
pubmsg.retained = 0;
|
||||
deliveredtoken = 0;
|
||||
MQTTClient_publishMessage(client, mqttConnection->topic, &pubmsg, &token);
|
||||
while(deliveredtoken != token);
|
||||
MQTTClient_disconnect(client, 10000);
|
||||
MQTTClient_destroy(&client);
|
||||
return true;
|
||||
}
|
||||
|
||||
void *MQTTLoop( mqtt_connect_t *mqttConnection )
|
||||
{
|
||||
if ( Config.EnableMQTT )
|
||||
{
|
||||
received_t *dequeued_telemetry_ptr;
|
||||
|
||||
// Keep looping until the parent quits
|
||||
while ( true )
|
||||
{
|
||||
dequeued_telemetry_ptr = lifo_buffer_waitpop(&MQTT_Upload_Buffer);
|
||||
|
||||
if(dequeued_telemetry_ptr != NULL)
|
||||
{
|
||||
if(UploadMQTTPacket(mqttConnection, dequeued_telemetry_ptr ))
|
||||
{
|
||||
free(dequeued_telemetry_ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
if(!lifo_buffer_requeue(&MQTT_Upload_Buffer, dequeued_telemetry_ptr))
|
||||
{
|
||||
/* Requeue failed, drop packet */
|
||||
free(dequeued_telemetry_ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* NULL returned: We've been asked to quit */
|
||||
/* Don't bother free()ing stuff, as application is quitting */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
void *MQTTLoop( void *some_void_ptr );
|
Ładowanie…
Reference in New Issue