From e142c9f7e2b1c15083d2d99df4470c4142983bbb Mon Sep 17 00:00:00 2001 From: Phil Crump Date: Sun, 6 May 2018 15:33:33 +0000 Subject: [PATCH] Use lifo circular buffer for habitat telemetry upload. --- gateway.c | 49 +++++++-------------- habitat.c | 92 +++++++++++++--------------------------- lifo_buffer.c | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++ lifo_buffer.h | 30 +++++++++++++ 4 files changed, 190 insertions(+), 96 deletions(-) create mode 100644 lifo_buffer.c create mode 100644 lifo_buffer.h diff --git a/gateway.c b/gateway.c index d62ed9e..db81228 100644 --- a/gateway.c +++ b/gateway.c @@ -38,6 +38,7 @@ #include "listener.h" #include "habpack.h" #include "udpclient.h" +#include "lifo_buffer.h" #define VERSION "V1.8.19" bool run = TRUE; @@ -187,15 +188,12 @@ struct TBinaryPacket { #pragma pack(pop) +lifo_buffer_t Habitat_Upload_Buffer; + // Create pipes for inter proces communication // GLOBAL AS CALLED FROM INTERRRUPT -int telem_pipe_fd[2]; int ssdv_pipe_fd[2]; -// Create a structure to share some variables with the habitat child process -// GLOBAL AS CALLED FROM INTERRRUPT -thread_shared_vars_t htsv; - // Create a structure to share some variables with the ssdv child process // GLOBAL AS CALLED FROM INTERRRUPT thread_shared_vars_t stsv; @@ -1019,19 +1017,15 @@ void ProcessTelemetryMessage(int Channel, received_t *Received) if ( Config.EnableHabitat ) { - // Add the telemetry packet to the pipe - int result = write( telem_pipe_fd[1], Received, sizeof( *Received ) ); - if ( result == -1 ) + // Add to Habitat upload queue + received_t *queueReceived = malloc(sizeof(received_t)); + if(queueReceived != NULL) { - exit_error("Error writing to the telemetry pipe\n"); - } - if ( result == 0 ) - { - LogMessage( "Nothing written to telemetry pipe \n" ); - } - if ( result > 1 ) - { - htsv.packet_count++; + /* WARNING: Doesn't copy linked-list :/ */ + memcpy(queueReceived, Received, sizeof(received_t)); + + /* Push pointer onto upload queue */ + lifo_buffer_push(&Habitat_Upload_Buffer, (void *)queueReceived); } } @@ -2318,12 +2312,6 @@ int main( int argc, char **argv ) int result; - result = pipe( telem_pipe_fd ); - if ( result < 0 ) - { - exit_error("Error creating telemetry pipe\n"); - } - result = pipe( ssdv_pipe_fd ); if ( result < 0 ) { @@ -2373,15 +2361,11 @@ int main( int argc, char **argv ) return 1; } - - // Initialise the vars - htsv.parent_status = RUNNING; - htsv.packet_count = 0; - - if (Config.EnableHabitat) { - if ( pthread_create (&HabitatThread, NULL, HabitatLoop, ( void * ) &htsv)) + lifo_buffer_init(&Habitat_Upload_Buffer, 1024); + + if ( pthread_create (&HabitatThread, NULL, HabitatLoop, NULL)) { fprintf( stderr, "Error creating Habitat thread\n" ); return 1; @@ -2565,14 +2549,11 @@ int main( int argc, char **argv ) LogMessage( "Closing SSDV pipe\n" ); close( ssdv_pipe_fd[1] ); - LogMessage( "Closing Habitat pipe\n" ); - close( telem_pipe_fd[1] ); - LogMessage( "Stopping SSDV thread\n" ); stsv.parent_status = STOPPED; LogMessage( "Stopping Habitat thread\n" ); - htsv.parent_status = STOPPED; + lifo_buffer_quitwait(&Habitat_Upload_Buffer); if (Config.EnableSSDV) { diff --git a/habitat.c b/habitat.c index c519499..10951df 100644 --- a/habitat.c +++ b/habitat.c @@ -23,6 +23,9 @@ #include "sha256.h" #include "wiringPi.h" #include "gateway.h" +#include "lifo_buffer.h" + +extern lifo_buffer_t Habitat_Upload_Buffer; extern int telem_pipe_fd[2]; extern pthread_mutex_t var; @@ -79,19 +82,6 @@ void UploadTelemetryPacket( received_t * t ) doc_tm = gmtime( &t->Metadata.Timestamp ); strftime( doc_time, sizeof( doc_time ), "%Y-%0m-%0dT%H:%M:%SZ", doc_tm ); - // So that the response to the curl PUT doesn't mess up my finely crafted display! - curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data ); - - // Set the timeout - curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 ); - - // RJH capture http errors and report - // curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 ); - curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error ); - - // Avoid curl library bug that happens if above timeout occurs (sigh) - curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 ); - // Grab current telemetry string and append a linefeed sprintf( Sentence, "%s\n", t->UKHASstring ); @@ -111,12 +101,22 @@ void UploadTelemetryPacket( received_t * t ) "{\"data\": {\"_raw\": \"%s\"},\"receivers\": {\"%s\": {\"time_created\": \"%s\",\"time_uploaded\": \"%s\",\"rig_info\": {\"frequency\":%.0f}}}}", base64_data, Config.Tracker, doc_time, now, (t->Metadata.Frequency + t->Metadata.FrequencyError) * 1000000 ); - // LogTelemetryPacket(json); - - // Set the URL that is about to receive our PUT sprintf( url, "http://habitat.habhub.org/habitat/_design/payload_telemetry/_update/add_listener/%s", doc_id); + // So that the response to the curl PUT doesn't mess up my finely crafted display! + curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data ); + + // Set the timeout + curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 ); + + // RJH capture http errors and report + // curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 ); + curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error ); + + // Avoid curl library bug that happens if above timeout occurs (sigh) + curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 ); + // Set the headers headers = NULL; headers = curl_slist_append(headers, "Accept: application/json"); @@ -169,65 +169,33 @@ void UploadTelemetryPacket( received_t * t ) void *HabitatLoop( void *vars ) { - if ( Config.EnableHabitat ) { - thread_shared_vars_t *htsv; - htsv = vars; - received_t t; - int packets = 0; - unsigned long total_packets = 0; + received_t *dequeued_telemetry_ptr; - int i = 1; - - // Keep looping until the parent quits and there are no more packets to - // send to habitat. - while ( ( htsv->parent_status == RUNNING ) || ( packets > 0 ) ) + // Keep looping until the parent quits + while ( true ) { + dequeued_telemetry_ptr = lifo_buffer_waitpop(&Habitat_Upload_Buffer); - //THis is neded for some reason habitat thread has a pthread_mutex_lock set - // and this removes it - if ( i ) + if(dequeued_telemetry_ptr != NULL) { - // pthread_mutex_lock(&var); - pthread_mutex_unlock( &var ); - i = 0; - } - if ( htsv->packet_count > total_packets ) - { - packets = read( telem_pipe_fd[0], &t, sizeof( t ) ); + ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat" ); + + UploadTelemetryPacket( dequeued_telemetry_ptr ); + + ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " ); + + free(dequeued_telemetry_ptr); } else { - packets = 0; - // pthread_mutex_unlock(&var); - - // If we have have a rollover after processing 4294967295 packets - if ( htsv->packet_count < total_packets ) - total_packets = 0; - + /* We've been asked to quit */ + break; } - - if ( packets ) - { - // LogMessage ("%s\n", t.Telemetry); - - ChannelPrintf( t.Metadata.Channel, 6, 1, "Habitat" ); - - UploadTelemetryPacket( &t ); - - ChannelPrintf( t.Metadata.Channel, 6, 1, " " ); - - total_packets++; - - } - delay(100); // Don't eat too much CPU } } - close( telem_pipe_fd[0] ); - close( telem_pipe_fd[1] ); - LogMessage( "Habitat thread closing\n" ); return NULL; diff --git a/lifo_buffer.c b/lifo_buffer.c new file mode 100644 index 0000000..0fc3315 --- /dev/null +++ b/lifo_buffer.c @@ -0,0 +1,115 @@ +#include +#include +#include +#include +#include + +#include "lifo_buffer.h" + +void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length) +{ + pthread_mutex_init(&buf->Mutex, NULL); + pthread_cond_init(&buf->Signal, NULL); + + pthread_mutex_lock(&buf->Mutex); + buf->Head = 0; + buf->Tail = 0; + buf->Length = length; + buf->Data = malloc(length * sizeof(void *)); + buf->Quit = false; + pthread_mutex_unlock(&buf->Mutex); +} + +/* Lossy when buffer is full */ +void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr) +{ + pthread_mutex_lock(&buf->Mutex); + + /* If no space, remove oldest from bottom of the queue by advancing Tail */ + if(buf->Head==(buf->Tail-1) || (buf->Head==(buf->Length-1) && buf->Tail==0)) + { + if(buf->Tail==(buf->Length-1)) + buf->Tail=0; + else + buf->Tail++; + } + + if(buf->Head==(buf->Length-1)) + buf->Head=0; + else + buf->Head++; + + buf->Data[buf->Head] = data_ptr; + + pthread_cond_signal(&buf->Signal); + + pthread_mutex_unlock(&buf->Mutex); +} + +/* Returns NULL when unsuccessful */ +void *lifo_buffer_pop(lifo_buffer_t *buf) +{ + void *result; + + pthread_mutex_lock(&buf->Mutex); + if(buf->Head!=buf->Tail) + { + result = buf->Data[buf->Head]; + + if(buf->Head==0) + buf->Head=(buf->Length-1); + else + buf->Head--; + + pthread_mutex_unlock(&buf->Mutex); + } + else + { + pthread_mutex_unlock(&buf->Mutex); + + result = NULL; + } + + return result; +} + +void *lifo_buffer_waitpop(lifo_buffer_t *buf) +{ + void *result; + + pthread_mutex_lock(&buf->Mutex); + + while(buf->Head==buf->Tail && !buf->Quit) /* If buffer is empty */ + { + /* Mutex is atomically unlocked on beginning waiting for signal */ + pthread_cond_wait(&buf->Signal, &buf->Mutex); + /* and locked again on resumption */ + } + + if(buf->Quit) + { + return NULL; + } + + result = buf->Data[buf->Head]; + + if(buf->Head==0) + buf->Head=(buf->Length-1); + else + buf->Head--; + + pthread_mutex_unlock(&buf->Mutex); + + return result; +} + +void lifo_buffer_quitwait(lifo_buffer_t *buf) +{ + pthread_mutex_lock(&buf->Mutex); + + buf->Quit = true; + + pthread_cond_signal(&buf->Signal); + + pthread_mutex_unlock(&buf->Mutex); +} \ No newline at end of file diff --git a/lifo_buffer.h b/lifo_buffer.h new file mode 100644 index 0000000..c4c58a7 --- /dev/null +++ b/lifo_buffer.h @@ -0,0 +1,30 @@ +#ifndef __LIFO_BUFFER_H__ +#define __LIFO_BUFFER_H__ + +#include +#include + +typedef struct +{ + /* Buffer Access Lock */ + pthread_mutex_t Mutex; + /* New Data Signal */ + pthread_cond_t Signal; + /* Whether the waiting thread should quit */ + bool Quit; + /* Head and Tail Indexes */ + uint32_t Head, Tail; + /* Data */ + void **Data; + /* Data Length */ + uint32_t Length; +} lifo_buffer_t; + +/** Common functions **/ +void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length); +void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr); +void *lifo_buffer_pop(lifo_buffer_t *buf); +void *lifo_buffer_waitpop(lifo_buffer_t *buf); +void lifo_buffer_quitwait(lifo_buffer_t *buf); + +#endif /* __LIFO_BUFFER_H__*/ \ No newline at end of file