V1.10.2: Changes to Sondehub upload to make it more robust

master
Dave Akerman 2023-08-24 11:54:02 +01:00
rodzic 9fc3719bbb
commit 648b73fb94
4 zmienionych plików z 107 dodań i 64 usunięć

Wyświetl plik

@ -296,6 +296,12 @@ Many thanks to David Brooke for coding this feature and the AFC.
Change History
==============
## 24/8/2023 - V1.10.2
UKHAS Telemetry is CRC16 checked before being used/uploaded, in addition to the existing LoRa CRC check
Separation of new incoming telemetry and that already being uploaded to Sondehub
Added parsing check on UKHAS telemetry before uploading to Sondehub
## 13/8/2023 - V1.10.1
Sondehub upload: For errors, log return code, return text and uploaded JSON to errors.txt

Wyświetl plik

@ -45,7 +45,7 @@
#include "udpclient.h"
#include "lifo_buffer.h"
#define VERSION "V1.10.1"
#define VERSION "V1.10.2"
bool run = TRUE;
// RFM98
@ -1126,39 +1126,47 @@ int ProcessTelemetryMessage(int Channel, received_t *Received)
{
struct tm *tm;
tm = localtime( &Received->Metadata.Timestamp );
*endmessage = '\0';
LogTelemetryPacket(Channel, startmessage);
ProcessLineUKHAS(Channel, startmessage);
if ((Repeated = (*startmessage == '%')))
if (ValidCRC16(startmessage))
{
*startmessage = '$';
}
LogTelemetryPacket(Channel, startmessage);
strcpy(Config.LoRaDevices[Channel].Telemetry, startmessage);
Config.LoRaDevices[Channel].TelemetryCount++;
ProcessLineUKHAS(Channel, startmessage);
if ((Repeated = (*startmessage == '%')))
{
*startmessage = '$';
}
if (Config.EnableMQTT)
{
// Add to MQTT upload queue
received_t *queueReceived = malloc(sizeof(received_t));
if(queueReceived != NULL)
{
memcpy(queueReceived, Received, sizeof(received_t));
/* Push pointer onto upload queue */
lifo_buffer_push(&MQTT_Upload_Buffer, (void *)queueReceived);
}
}
if (Config.EnableSondehub)
{
SetSondehubSentence(Channel, startmessage);
strcpy(Config.LoRaDevices[Channel].Telemetry, startmessage);
Config.LoRaDevices[Channel].TelemetryCount++;
if (Config.EnableMQTT)
{
// Add to MQTT upload queue
received_t *queueReceived = malloc(sizeof(received_t));
if(queueReceived != NULL)
{
memcpy(queueReceived, Received, sizeof(received_t));
/* Push pointer onto upload queue */
lifo_buffer_push(&MQTT_Upload_Buffer, (void *)queueReceived);
}
}
if (Config.EnableSondehub)
{
SetSondehubSentence(Channel, startmessage);
}
LogMessage("%02d:%02d:%02d Ch%d: %s%s\n", tm->tm_hour, tm->tm_min, tm->tm_sec, Channel, startmessage, Repeated ? " (repeated)" : "");
}
else
{
LogMessage("%02d:%02d:%02d Ch%d: %s\n", tm->tm_hour, tm->tm_min, tm->tm_sec, Channel, "Bad UKHAS CRC");
}
tm = localtime( &Received->Metadata.Timestamp );
LogMessage("%02d:%02d:%02d Ch%d: %s%s\n", tm->tm_hour, tm->tm_min, tm->tm_sec, Channel, startmessage, Repeated ? " (repeated)" : "");
startmessage = endmessage + 1;
endmessage = strchr( startmessage, '\n' );
@ -2325,15 +2333,20 @@ WINDOW *InitDisplay(void)
}
uint16_t
CRC16( unsigned char *ptr )
int ValidCRC16(char *ptr)
{
uint16_t CRC;
int j;
int Valid;
char CRCString[5];
Valid = 0;
CRC = 0xffff; // Seed
// Skip $'s
while (*++ptr == '$');
for ( ; *ptr; ptr++ )
for ( ; *ptr && (*ptr != '*'); ptr++)
{ // For speed, repeat calculation instead of looping for each bit
CRC ^= ( ( ( unsigned int ) *ptr ) << 8 );
for ( j = 0; j < 8; j++ )
@ -2345,7 +2358,15 @@ CRC16( unsigned char *ptr )
}
}
return CRC;
if (*ptr == '*')
{
// CRC follows
sprintf(CRCString, "%04X", CRC);
Valid = memcmp(ptr+1, CRCString, 4) == 0;
}
return Valid;
}
void

Wyświetl plik

@ -12,6 +12,7 @@ void LogError(int ErrorCode, char *Message1, char *Message2);
void LogMessage( const char *format, ... );
void ChannelPrintf( int Channel, int row, int column, const char *format,
... );
int ValidCRC16(char *ptr);
void displayChannel (int Channel);
#endif

Wyświetl plik

@ -24,25 +24,33 @@
#include "gateway.h"
#include "sondehub.h"
struct TPayload SondehubPayloads[2];
pthread_mutex_t crit = PTHREAD_MUTEX_INITIALIZER; // To protect IncomingSondehubPayloads
struct TPayload IncomingSondehubPayloads[2];
struct TPayload ActiveSondehubPayloads[2];
void SetSondehubSentence(int Channel, char *tmp)
{
sscanf(tmp + 2, "%31[^,],%u,%8[^,],%lf,%lf,%d",
SondehubPayloads[Channel].Payload,
&SondehubPayloads[Channel].Counter,
SondehubPayloads[Channel].Time,
&SondehubPayloads[Channel].Latitude,
&SondehubPayloads[Channel].Longitude,
&SondehubPayloads[Channel].Altitude);
pthread_mutex_lock(&crit); // lock the critical section
SondehubPayloads[Channel].PacketSNR = Config.LoRaDevices[Channel].PacketSNR;
SondehubPayloads[Channel].PacketRSSI = Config.LoRaDevices[Channel].PacketRSSI;
SondehubPayloads[Channel].Frequency = Config.LoRaDevices[Channel].Frequency + Config.LoRaDevices[Channel].FrequencyOffset;
if (sscanf(tmp + 2, "%31[^,],%u,%8[^,],%lf,%lf,%d",
IncomingSondehubPayloads[Channel].Payload,
&IncomingSondehubPayloads[Channel].Counter,
IncomingSondehubPayloads[Channel].Time,
&IncomingSondehubPayloads[Channel].Latitude,
&IncomingSondehubPayloads[Channel].Longitude,
&IncomingSondehubPayloads[Channel].Altitude) == 6)
{
IncomingSondehubPayloads[Channel].PacketSNR = Config.LoRaDevices[Channel].PacketSNR;
IncomingSondehubPayloads[Channel].PacketRSSI = Config.LoRaDevices[Channel].PacketRSSI;
IncomingSondehubPayloads[Channel].Frequency = Config.LoRaDevices[Channel].Frequency + Config.LoRaDevices[Channel].FrequencyOffset;
strcpy(SondehubPayloads[Channel].Telemetry, tmp);
strcpy(IncomingSondehubPayloads[Channel].Telemetry, tmp);
SondehubPayloads[Channel].InUse = (SondehubPayloads[Channel].Latitude != 0) && (SondehubPayloads[Channel].Longitude != 0);
IncomingSondehubPayloads[Channel].InUse = (IncomingSondehubPayloads[Channel].Latitude != 0) && (IncomingSondehubPayloads[Channel].Longitude != 0);
}
pthread_mutex_unlock(&crit); // unlock once you are done
}
size_t sondehub_write_data( void *buffer, size_t size, size_t nmemb, void *userp )
@ -108,7 +116,9 @@ int UploadJSONToServer(char *url, char *json)
else if (http_resp == 400)
{
LogMessage("400 response to %s\n", json);
result = true;
LogError(http_resp, "JSON: ", json);
LogError(http_resp, "RESP: ", curl_error);
result = true; // Don't retry - this failed due to the uploaded JSON containing something that the server rejected
}
else
{
@ -260,7 +270,6 @@ void BuildPayloadTime(char *Result, char *TimeInSentence, struct tm *tm)
int UploadSondehubPosition(int Channel)
{
char url[200];
char json[1000], now[32], payload_time[32], ExtractedFields[256], uploader_position[256];
time_t rawtime;
struct tm *tm;
@ -270,10 +279,10 @@ int UploadSondehubPosition(int Channel)
tm = gmtime(&rawtime);
strftime(now, sizeof(now), "%Y-%0m-%0dT%H:%M:%SZ", tm);
BuildPayloadTime(payload_time, SondehubPayloads[Channel].Time, tm);
BuildPayloadTime(payload_time, ActiveSondehubPayloads[Channel].Time, tm);
// Find field list and extract fields
ExtractFields(SondehubPayloads[Channel].Telemetry, ExtractedFields);
ExtractFields(ActiveSondehubPayloads[Channel].Telemetry, ExtractedFields);
if ((Config.latitude >= -90) && (Config.latitude <= 90) && (Config.longitude >= -180) && (Config.longitude <= 180))
{
@ -310,20 +319,17 @@ int UploadSondehubPosition(int Channel)
"\"uploader_antenna\": \"%s\""
"}]",
Config.Version, Config.Tracker, now,
SondehubPayloads[Channel].Payload, payload_time,
SondehubPayloads[Channel].Latitude, SondehubPayloads[Channel].Longitude, SondehubPayloads[Channel].Altitude,
SondehubPayloads[Channel].Frequency,
ActiveSondehubPayloads[Channel].Payload, payload_time,
ActiveSondehubPayloads[Channel].Latitude, ActiveSondehubPayloads[Channel].Longitude, ActiveSondehubPayloads[Channel].Altitude,
ActiveSondehubPayloads[Channel].Frequency,
Config.LoRaDevices[Channel].SpeedMode,
SondehubPayloads[Channel].PacketSNR, SondehubPayloads[Channel].PacketRSSI,
SondehubPayloads[Channel].Telemetry,
ActiveSondehubPayloads[Channel].PacketSNR, ActiveSondehubPayloads[Channel].PacketRSSI,
ActiveSondehubPayloads[Channel].Telemetry,
ExtractedFields,
uploader_position,
Config.antenna);
// Set the URL that is about to receive our PUT
strcpy(url, "https://api.v2.sondehub.org/amateur/telemetry");
return UploadJSONToServer(url, json);
return UploadJSONToServer("https://api.v2.sondehub.org/amateur/telemetry", json);
}
char *SanitiseCallsignForMQTT(char *Callsign)
@ -346,7 +352,6 @@ char *SanitiseCallsignForMQTT(char *Callsign)
int UploadListenerToSondehub(void)
{
char url[200];
char json[1000], now[32], doc_time[32];
time_t rawtime;
struct tm *tm, *doc_tm;
@ -376,9 +381,7 @@ int UploadListenerToSondehub(void)
Config.latitude, Config.longitude, Config.altitude,
Config.radio, Config.antenna);
strcpy(url, "https://api.v2.sondehub.org/amateur/listeners");
return UploadJSONToServer(url, json);
return UploadJSONToServer("https://api.v2.sondehub.org/amateur/listeners", json);
}
@ -391,11 +394,23 @@ void *SondehubLoop( void *vars )
for (Channel=0; Channel<=1; Channel++)
{
if (SondehubPayloads[Channel].InUse)
// Copy new incoming payload, if there is one
if (IncomingSondehubPayloads[Channel].InUse)
{
pthread_mutex_lock(&crit); // lock the critical section
// copy from incoming to active
memcpy(&ActiveSondehubPayloads[Channel], &IncomingSondehubPayloads[Channel], sizeof(struct TPayload));
pthread_mutex_unlock(&crit); // unlock once you are done
}
// Try to upload active payload, if there is one
if (ActiveSondehubPayloads[Channel].InUse)
{
if (UploadSondehubPosition(Channel))
{
SondehubPayloads[Channel].InUse = 0;
ActiveSondehubPayloads[Channel].InUse = 0;
}
}
}