From 5be4c31726a9973cbd102d7d9643ffdd247b9bd3 Mon Sep 17 00:00:00 2001 From: Phil Crump Date: Sun, 6 May 2018 16:16:00 +0000 Subject: [PATCH] Habitat queue: Requeue after network issue. --- habitat.c | 44 ++++++++++++++++++++++++++++++++++--------- lifo_buffer.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++- lifo_buffer.h | 2 ++ 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/habitat.c b/habitat.c index 10951df..830f390 100644 --- a/habitat.c +++ b/habitat.c @@ -49,7 +49,7 @@ void hash_to_hex( unsigned char *hash, char *line ) line[64] = '\0'; } -void UploadTelemetryPacket( received_t * t ) +bool UploadTelemetryPacket( received_t * t ) { CURL *curl; CURLcode res; @@ -59,6 +59,7 @@ void UploadTelemetryPacket( received_t * t ) curl = curl_easy_init( ); if ( curl ) { + bool result; char url[200]; char base64_data[1000]; size_t base64_length; @@ -142,7 +143,13 @@ void UploadTelemetryPacket( received_t * t ) if (http_resp != 201 && http_resp != 403 && http_resp != 409) { LogMessage("Unexpected HTTP response %ld for URL '%s'\n", http_resp, url); + result = false; } + else + { + /* Everything performing nominally (even if we didn't successfully insert this time) */ + result = true; + } } else { @@ -150,6 +157,8 @@ void UploadTelemetryPacket( received_t * t ) LogMessage("Failed for URL '%s'\n", url); LogMessage("curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); LogMessage("error: %s\n", curl_error); + /* Likely a network error, so return false to requeue */ + result = false; } if (http_resp == 409) @@ -163,6 +172,13 @@ void UploadTelemetryPacket( received_t * t ) // always cleanup curl_slist_free_all( headers ); curl_easy_cleanup( curl ); + + return result; + } + else + { + /* CURL error, return false so we requeue */ + return false; } } @@ -180,23 +196,33 @@ void *HabitatLoop( void *vars ) if(dequeued_telemetry_ptr != NULL) { - ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat" ); + ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat (%d queued)", lifo_buffer_queued(&Habitat_Upload_Buffer) ); - UploadTelemetryPacket( dequeued_telemetry_ptr ); + if(UploadTelemetryPacket( dequeued_telemetry_ptr )) + { + ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " ); + free(dequeued_telemetry_ptr); + } + else + { + /* Network / CURL Error, requeue packet */ + ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat Net Error! " ); - ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " ); - - free(dequeued_telemetry_ptr); + if(!lifo_buffer_requeue(&Habitat_Upload_Buffer, dequeued_telemetry_ptr)) + { + /* Requeue failed, drop packet */ + free(dequeued_telemetry_ptr); + } + } } else { - /* We've been asked to quit */ + /* NULL returned: We've been asked to quit */ + /* Don't bother free()ing stuff, as application is quitting */ break; } } } - LogMessage( "Habitat thread closing\n" ); - return NULL; } diff --git a/lifo_buffer.c b/lifo_buffer.c index 0fc3315..fe6236d 100644 --- a/lifo_buffer.c +++ b/lifo_buffer.c @@ -20,6 +20,26 @@ void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length) pthread_mutex_unlock(&buf->Mutex); } +uint32_t lifo_buffer_queued(lifo_buffer_t *buf) +{ + uint32_t result; + + pthread_mutex_lock(&buf->Mutex); + + if(buf->Head >= buf->Tail) + { + result = buf->Head - buf->Tail; + } + else + { + result = buf->Head + (buf->Length - buf->Tail); + } + + pthread_mutex_unlock(&buf->Mutex); + + return result; +} + /* Lossy when buffer is full */ void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr) { @@ -112,4 +132,34 @@ void lifo_buffer_quitwait(lifo_buffer_t *buf) pthread_cond_signal(&buf->Signal); pthread_mutex_unlock(&buf->Mutex); -} \ No newline at end of file +} +/* Puts it on the bottom to avoid clogging by packet data problems, lossy when buffer is full */ +bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr) +{ + bool result; + + pthread_mutex_lock(&buf->Mutex); + + /* If no space, ignore */ + if(buf->Head!=(buf->Tail-1) && !(buf->Head==(buf->Length-1) && buf->Tail==0)) + { + buf->Data[buf->Tail] = data_ptr; + + if(buf->Tail==0) + buf->Tail=(buf->Length-1); + else + buf->Tail--; + + pthread_cond_signal(&buf->Signal); + + result = true; + } + else + { + result = false; + } + + pthread_mutex_unlock(&buf->Mutex); + + return result; +} diff --git a/lifo_buffer.h b/lifo_buffer.h index c4c58a7..82ff646 100644 --- a/lifo_buffer.h +++ b/lifo_buffer.h @@ -22,9 +22,11 @@ typedef struct /** Common functions **/ void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length); +uint32_t lifo_buffer_queued(lifo_buffer_t *buf); void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr); void *lifo_buffer_pop(lifo_buffer_t *buf); void *lifo_buffer_waitpop(lifo_buffer_t *buf); void lifo_buffer_quitwait(lifo_buffer_t *buf); +bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr); #endif /* __LIFO_BUFFER_H__*/ \ No newline at end of file