Merge pull request #53 from philcrump/upload-lifo-queue

Use LIFO circular buffer for Telemetry Upload
pull/55/head
Pi In The Sky Project 2018-05-07 19:28:43 +01:00 zatwierdzone przez GitHub
commit a202e8788f
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
4 zmienionych plików z 272 dodań i 99 usunięć

Wyświetl plik

@ -38,6 +38,7 @@
#include "listener.h"
#include "habpack.h"
#include "udpclient.h"
#include "lifo_buffer.h"
#define VERSION "V1.8.19"
bool run = TRUE;
@ -187,15 +188,12 @@ struct TBinaryPacket {
#pragma pack(pop)
lifo_buffer_t Habitat_Upload_Buffer;
// Create pipes for inter proces communication
// GLOBAL AS CALLED FROM INTERRRUPT
int telem_pipe_fd[2];
int ssdv_pipe_fd[2];
// Create a structure to share some variables with the habitat child process
// GLOBAL AS CALLED FROM INTERRRUPT
thread_shared_vars_t htsv;
// Create a structure to share some variables with the ssdv child process
// GLOBAL AS CALLED FROM INTERRRUPT
thread_shared_vars_t stsv;
@ -1047,19 +1045,16 @@ void ProcessTelemetryMessage(int Channel, received_t *Received)
if ( Config.EnableHabitat )
{
// Add the telemetry packet to the pipe
int result = write( telem_pipe_fd[1], Received, sizeof( *Received ) );
if ( result == -1 )
// Add to Habitat upload queue
received_t *queueReceived = malloc(sizeof(received_t));
if(queueReceived != NULL)
{
exit_error("Error writing to the telemetry pipe\n");
}
if ( result == 0 )
{
LogMessage( "Nothing written to telemetry pipe \n" );
}
if ( result > 1 )
{
htsv.packet_count++;
memcpy(queueReceived, Received, sizeof(received_t));
/* We haven't copied the linked list, this'll be free()ed later, so remove pointer */
queueReceived->Telemetry.habpack_extra = NULL;
/* Push pointer onto upload queue */
lifo_buffer_push(&Habitat_Upload_Buffer, (void *)queueReceived);
}
}
@ -2348,12 +2343,6 @@ int main( int argc, char **argv )
int result;
result = pipe( telem_pipe_fd );
if ( result < 0 )
{
exit_error("Error creating telemetry pipe\n");
}
result = pipe( ssdv_pipe_fd );
if ( result < 0 )
{
@ -2403,15 +2392,11 @@ int main( int argc, char **argv )
return 1;
}
// Initialise the vars
htsv.parent_status = RUNNING;
htsv.packet_count = 0;
if (Config.EnableHabitat)
{
if ( pthread_create (&HabitatThread, NULL, HabitatLoop, ( void * ) &htsv))
lifo_buffer_init(&Habitat_Upload_Buffer, 1024);
if ( pthread_create (&HabitatThread, NULL, HabitatLoop, NULL))
{
fprintf( stderr, "Error creating Habitat thread\n" );
return 1;
@ -2595,14 +2580,11 @@ int main( int argc, char **argv )
LogMessage( "Closing SSDV pipe\n" );
close( ssdv_pipe_fd[1] );
LogMessage( "Closing Habitat pipe\n" );
close( telem_pipe_fd[1] );
LogMessage( "Stopping SSDV thread\n" );
stsv.parent_status = STOPPED;
LogMessage( "Stopping Habitat thread\n" );
htsv.parent_status = STOPPED;
lifo_buffer_quitwait(&Habitat_Upload_Buffer);
if (Config.EnableSSDV)
{

124
habitat.c
Wyświetl plik

@ -23,6 +23,9 @@
#include "sha256.h"
#include "wiringPi.h"
#include "gateway.h"
#include "lifo_buffer.h"
extern lifo_buffer_t Habitat_Upload_Buffer;
extern int telem_pipe_fd[2];
extern pthread_mutex_t var;
@ -46,7 +49,7 @@ void hash_to_hex( unsigned char *hash, char *line )
line[64] = '\0';
}
void UploadTelemetryPacket( received_t * t )
bool UploadTelemetryPacket( received_t * t )
{
CURL *curl;
CURLcode res;
@ -56,6 +59,7 @@ void UploadTelemetryPacket( received_t * t )
curl = curl_easy_init( );
if ( curl )
{
bool result;
char url[200];
char base64_data[1000];
size_t base64_length;
@ -79,19 +83,6 @@ void UploadTelemetryPacket( received_t * t )
doc_tm = gmtime( &t->Metadata.Timestamp );
strftime( doc_time, sizeof( doc_time ), "%Y-%0m-%0dT%H:%M:%SZ", doc_tm );
// So that the response to the curl PUT doesn't mess up my finely crafted display!
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data );
// Set the timeout
curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 );
// RJH capture http errors and report
// curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 );
curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error );
// Avoid curl library bug that happens if above timeout occurs (sigh)
curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 );
// Grab current telemetry string and append a linefeed
sprintf( Sentence, "%s\n", t->UKHASstring );
@ -111,12 +102,22 @@ void UploadTelemetryPacket( received_t * t )
"{\"data\": {\"_raw\": \"%s\"},\"receivers\": {\"%s\": {\"time_created\": \"%s\",\"time_uploaded\": \"%s\",\"rig_info\": {\"frequency\":%.0f}}}}",
base64_data, Config.Tracker, doc_time, now, (t->Metadata.Frequency + t->Metadata.FrequencyError) * 1000000 );
// LogTelemetryPacket(json);
// Set the URL that is about to receive our PUT
sprintf( url, "http://habitat.habhub.org/habitat/_design/payload_telemetry/_update/add_listener/%s", doc_id);
// So that the response to the curl PUT doesn't mess up my finely crafted display!
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data );
// Set the timeout
curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 );
// RJH capture http errors and report
// curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 );
curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error );
// Avoid curl library bug that happens if above timeout occurs (sigh)
curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 );
// Set the headers
headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
@ -142,7 +143,13 @@ void UploadTelemetryPacket( received_t * t )
if (http_resp != 201 && http_resp != 403 && http_resp != 409)
{
LogMessage("Unexpected HTTP response %ld for URL '%s'\n", http_resp, url);
result = false;
}
else
{
/* Everything performing nominally (even if we didn't successfully insert this time) */
result = true;
}
}
else
{
@ -150,6 +157,8 @@ void UploadTelemetryPacket( received_t * t )
LogMessage("Failed for URL '%s'\n", url);
LogMessage("curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
LogMessage("error: %s\n", curl_error);
/* Likely a network error, so return false to requeue */
result = false;
}
if (http_resp == 409)
@ -163,72 +172,57 @@ void UploadTelemetryPacket( received_t * t )
// always cleanup
curl_slist_free_all( headers );
curl_easy_cleanup( curl );
return result;
}
else
{
/* CURL error, return false so we requeue */
return false;
}
}
void *HabitatLoop( void *vars )
{
if ( Config.EnableHabitat )
{
thread_shared_vars_t *htsv;
htsv = vars;
received_t t;
int packets = 0;
unsigned long total_packets = 0;
received_t *dequeued_telemetry_ptr;
int i = 1;
// Keep looping until the parent quits and there are no more packets to
// send to habitat.
while ( ( htsv->parent_status == RUNNING ) || ( packets > 0 ) )
// Keep looping until the parent quits
while ( true )
{
dequeued_telemetry_ptr = lifo_buffer_waitpop(&Habitat_Upload_Buffer);
//THis is neded for some reason habitat thread has a pthread_mutex_lock set
// and this removes it
if ( i )
if(dequeued_telemetry_ptr != NULL)
{
// pthread_mutex_lock(&var);
pthread_mutex_unlock( &var );
i = 0;
}
if ( htsv->packet_count > total_packets )
{
packets = read( telem_pipe_fd[0], &t, sizeof( t ) );
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat (%d queued)", lifo_buffer_queued(&Habitat_Upload_Buffer) );
if(UploadTelemetryPacket( dequeued_telemetry_ptr ))
{
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " );
free(dequeued_telemetry_ptr);
}
else
{
/* Network / CURL Error, requeue packet */
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat Net Error! " );
if(!lifo_buffer_requeue(&Habitat_Upload_Buffer, dequeued_telemetry_ptr))
{
/* Requeue failed, drop packet */
free(dequeued_telemetry_ptr);
}
}
}
else
{
packets = 0;
// pthread_mutex_unlock(&var);
// If we have have a rollover after processing 4294967295 packets
if ( htsv->packet_count < total_packets )
total_packets = 0;
/* NULL returned: We've been asked to quit */
/* Don't bother free()ing stuff, as application is quitting */
break;
}
if ( packets )
{
// LogMessage ("%s\n", t.Telemetry);
ChannelPrintf( t.Metadata.Channel, 6, 1, "Habitat" );
UploadTelemetryPacket( &t );
ChannelPrintf( t.Metadata.Channel, 6, 1, " " );
total_packets++;
}
delay(100); // Don't eat too much CPU
}
}
close( telem_pipe_fd[0] );
close( telem_pipe_fd[1] );
LogMessage( "Habitat thread closing\n" );
return NULL;
}

165
lifo_buffer.c 100644
Wyświetl plik

@ -0,0 +1,165 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "lifo_buffer.h"
void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length)
{
pthread_mutex_init(&buf->Mutex, NULL);
pthread_cond_init(&buf->Signal, NULL);
pthread_mutex_lock(&buf->Mutex);
buf->Head = 0;
buf->Tail = 0;
buf->Length = length;
buf->Data = malloc(length * sizeof(void *));
buf->Quit = false;
pthread_mutex_unlock(&buf->Mutex);
}
uint32_t lifo_buffer_queued(lifo_buffer_t *buf)
{
uint32_t result;
pthread_mutex_lock(&buf->Mutex);
if(buf->Head >= buf->Tail)
{
result = buf->Head - buf->Tail;
}
else
{
result = buf->Head + (buf->Length - buf->Tail);
}
pthread_mutex_unlock(&buf->Mutex);
return result;
}
/* Lossy when buffer is full */
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr)
{
pthread_mutex_lock(&buf->Mutex);
/* If no space, remove oldest from bottom of the queue by advancing Tail */
if(buf->Head==(buf->Tail-1) || (buf->Head==(buf->Length-1) && buf->Tail==0))
{
if(buf->Tail==(buf->Length-1))
buf->Tail=0;
else
buf->Tail++;
}
if(buf->Head==(buf->Length-1))
buf->Head=0;
else
buf->Head++;
buf->Data[buf->Head] = data_ptr;
pthread_cond_signal(&buf->Signal);
pthread_mutex_unlock(&buf->Mutex);
}
/* Returns NULL when unsuccessful */
void *lifo_buffer_pop(lifo_buffer_t *buf)
{
void *result;
pthread_mutex_lock(&buf->Mutex);
if(buf->Head!=buf->Tail)
{
result = buf->Data[buf->Head];
if(buf->Head==0)
buf->Head=(buf->Length-1);
else
buf->Head--;
pthread_mutex_unlock(&buf->Mutex);
}
else
{
pthread_mutex_unlock(&buf->Mutex);
result = NULL;
}
return result;
}
void *lifo_buffer_waitpop(lifo_buffer_t *buf)
{
void *result;
pthread_mutex_lock(&buf->Mutex);
while(buf->Head==buf->Tail && !buf->Quit) /* If buffer is empty */
{
/* Mutex is atomically unlocked on beginning waiting for signal */
pthread_cond_wait(&buf->Signal, &buf->Mutex);
/* and locked again on resumption */
}
if(buf->Quit)
{
return NULL;
}
result = buf->Data[buf->Head];
if(buf->Head==0)
buf->Head=(buf->Length-1);
else
buf->Head--;
pthread_mutex_unlock(&buf->Mutex);
return result;
}
void lifo_buffer_quitwait(lifo_buffer_t *buf)
{
pthread_mutex_lock(&buf->Mutex);
buf->Quit = true;
pthread_cond_signal(&buf->Signal);
pthread_mutex_unlock(&buf->Mutex);
}
/* Puts it on the bottom to avoid clogging by packet data problems, lossy when buffer is full */
bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr)
{
bool result;
pthread_mutex_lock(&buf->Mutex);
/* If no space, ignore */
if(buf->Head!=(buf->Tail-1) && !(buf->Head==(buf->Length-1) && buf->Tail==0))
{
buf->Data[buf->Tail] = data_ptr;
if(buf->Tail==0)
buf->Tail=(buf->Length-1);
else
buf->Tail--;
pthread_cond_signal(&buf->Signal);
result = true;
}
else
{
result = false;
}
pthread_mutex_unlock(&buf->Mutex);
return result;
}

32
lifo_buffer.h 100644
Wyświetl plik

@ -0,0 +1,32 @@
#ifndef __LIFO_BUFFER_H__
#define __LIFO_BUFFER_H__
#include <stdbool.h>
#include <pthread.h>
typedef struct
{
/* Buffer Access Lock */
pthread_mutex_t Mutex;
/* New Data Signal */
pthread_cond_t Signal;
/* Whether the waiting thread should quit */
bool Quit;
/* Head and Tail Indexes */
uint32_t Head, Tail;
/* Data */
void **Data;
/* Data Length */
uint32_t Length;
} lifo_buffer_t;
/** Common functions **/
void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length);
uint32_t lifo_buffer_queued(lifo_buffer_t *buf);
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr);
void *lifo_buffer_pop(lifo_buffer_t *buf);
void *lifo_buffer_waitpop(lifo_buffer_t *buf);
void lifo_buffer_quitwait(lifo_buffer_t *buf);
bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr);
#endif /* __LIFO_BUFFER_H__*/