Habitat queue: Requeue after network issue.

pull/53/head
Phil Crump 2018-05-06 16:16:00 +00:00
rodzic e142c9f7e2
commit 5be4c31726
3 zmienionych plików z 88 dodań i 10 usunięć

Wyświetl plik

@ -49,7 +49,7 @@ void hash_to_hex( unsigned char *hash, char *line )
line[64] = '\0'; line[64] = '\0';
} }
void UploadTelemetryPacket( received_t * t ) bool UploadTelemetryPacket( received_t * t )
{ {
CURL *curl; CURL *curl;
CURLcode res; CURLcode res;
@ -59,6 +59,7 @@ void UploadTelemetryPacket( received_t * t )
curl = curl_easy_init( ); curl = curl_easy_init( );
if ( curl ) if ( curl )
{ {
bool result;
char url[200]; char url[200];
char base64_data[1000]; char base64_data[1000];
size_t base64_length; size_t base64_length;
@ -142,7 +143,13 @@ void UploadTelemetryPacket( received_t * t )
if (http_resp != 201 && http_resp != 403 && http_resp != 409) if (http_resp != 201 && http_resp != 403 && http_resp != 409)
{ {
LogMessage("Unexpected HTTP response %ld for URL '%s'\n", http_resp, url); LogMessage("Unexpected HTTP response %ld for URL '%s'\n", http_resp, url);
result = false;
} }
else
{
/* Everything performing nominally (even if we didn't successfully insert this time) */
result = true;
}
} }
else else
{ {
@ -150,6 +157,8 @@ void UploadTelemetryPacket( received_t * t )
LogMessage("Failed for URL '%s'\n", url); LogMessage("Failed for URL '%s'\n", url);
LogMessage("curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); LogMessage("curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
LogMessage("error: %s\n", curl_error); LogMessage("error: %s\n", curl_error);
/* Likely a network error, so return false to requeue */
result = false;
} }
if (http_resp == 409) if (http_resp == 409)
@ -163,6 +172,13 @@ void UploadTelemetryPacket( received_t * t )
// always cleanup // always cleanup
curl_slist_free_all( headers ); curl_slist_free_all( headers );
curl_easy_cleanup( curl ); curl_easy_cleanup( curl );
return result;
}
else
{
/* CURL error, return false so we requeue */
return false;
} }
} }
@ -180,23 +196,33 @@ void *HabitatLoop( void *vars )
if(dequeued_telemetry_ptr != NULL) if(dequeued_telemetry_ptr != NULL)
{ {
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat" ); ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat (%d queued)", lifo_buffer_queued(&Habitat_Upload_Buffer) );
UploadTelemetryPacket( dequeued_telemetry_ptr ); if(UploadTelemetryPacket( dequeued_telemetry_ptr ))
{
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " );
free(dequeued_telemetry_ptr);
}
else
{
/* Network / CURL Error, requeue packet */
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat Net Error! " );
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " ); if(!lifo_buffer_requeue(&Habitat_Upload_Buffer, dequeued_telemetry_ptr))
{
free(dequeued_telemetry_ptr); /* Requeue failed, drop packet */
free(dequeued_telemetry_ptr);
}
}
} }
else else
{ {
/* We've been asked to quit */ /* NULL returned: We've been asked to quit */
/* Don't bother free()ing stuff, as application is quitting */
break; break;
} }
} }
} }
LogMessage( "Habitat thread closing\n" );
return NULL; return NULL;
} }

Wyświetl plik

@ -20,6 +20,26 @@ void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length)
pthread_mutex_unlock(&buf->Mutex); pthread_mutex_unlock(&buf->Mutex);
} }
uint32_t lifo_buffer_queued(lifo_buffer_t *buf)
{
uint32_t result;
pthread_mutex_lock(&buf->Mutex);
if(buf->Head >= buf->Tail)
{
result = buf->Head - buf->Tail;
}
else
{
result = buf->Head + (buf->Length - buf->Tail);
}
pthread_mutex_unlock(&buf->Mutex);
return result;
}
/* Lossy when buffer is full */ /* Lossy when buffer is full */
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr) void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr)
{ {
@ -112,4 +132,34 @@ void lifo_buffer_quitwait(lifo_buffer_t *buf)
pthread_cond_signal(&buf->Signal); pthread_cond_signal(&buf->Signal);
pthread_mutex_unlock(&buf->Mutex); pthread_mutex_unlock(&buf->Mutex);
} }
/* Puts it on the bottom to avoid clogging by packet data problems, lossy when buffer is full */
bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr)
{
bool result;
pthread_mutex_lock(&buf->Mutex);
/* If no space, ignore */
if(buf->Head!=(buf->Tail-1) && !(buf->Head==(buf->Length-1) && buf->Tail==0))
{
buf->Data[buf->Tail] = data_ptr;
if(buf->Tail==0)
buf->Tail=(buf->Length-1);
else
buf->Tail--;
pthread_cond_signal(&buf->Signal);
result = true;
}
else
{
result = false;
}
pthread_mutex_unlock(&buf->Mutex);
return result;
}

Wyświetl plik

@ -22,9 +22,11 @@ typedef struct
/** Common functions **/ /** Common functions **/
void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length); void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length);
uint32_t lifo_buffer_queued(lifo_buffer_t *buf);
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr); void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr);
void *lifo_buffer_pop(lifo_buffer_t *buf); void *lifo_buffer_pop(lifo_buffer_t *buf);
void *lifo_buffer_waitpop(lifo_buffer_t *buf); void *lifo_buffer_waitpop(lifo_buffer_t *buf);
void lifo_buffer_quitwait(lifo_buffer_t *buf); void lifo_buffer_quitwait(lifo_buffer_t *buf);
bool lifo_buffer_requeue(lifo_buffer_t *buf, void *data_ptr);
#endif /* __LIFO_BUFFER_H__*/ #endif /* __LIFO_BUFFER_H__*/