Initial refactor

custom-fields-additions
Mark Jessop 2025-09-07 11:08:09 +09:30
rodzic 18b0f662eb
commit 6eea97e7bf
3 zmienionych plików z 122 dodań i 124 usunięć

Wyświetl plik

@ -1 +1 @@
__version__ = "0.3.15"
__version__ = "0.4.0"

Wyświetl plik

@ -12,6 +12,10 @@ HORUS_PAYLOAD_LIST = {0:'4FSKTEST', 1:'HORUSBINARY', 256: '4FSKTEST-V2'}
# URL for payload list
PAYLOAD_ID_LIST_URL = "https://raw.githubusercontent.com/projecthorus/horusdemodlib/master/payload_id_list.txt"
# Custom Field JSON URL
HORUS_CUSTOM_FIELD_URL = "https://raw.githubusercontent.com/projecthorus/horusdemodlib/master/custom_field_list.json"
# Custom field data.
HORUS_CUSTOM_FIELD_LENGTH = 9
HORUS_CUSTOM_FIELDS = {
@ -46,73 +50,16 @@ HORUS_CUSTOM_FIELDS = {
}
# Custom Field JSON URL
HORUS_CUSTOM_FIELD_URL = "https://raw.githubusercontent.com/projecthorus/horusdemodlib/master/custom_field_list.json"
def read_payload_list(filename="payload_id_list.txt"):
""" Read a payload ID list from a file, and return the parsed data as a dictionary """
# Dummy payload list.
payload_list = HORUS_PAYLOAD_LIST.copy()
try:
with open(filename,'r') as file:
for line in file:
# Skip comment lines.
if line[0] == '#':
continue
else:
# Attempt to split the line with a comma.
_params = line.split(',')
if len(_params) != 2:
# Invalid line.
logging.error("Could not parse line: %s" % line)
else:
try:
_id = int(_params[0])
_callsign = _params[1].strip()
# Check to see if a payload ID is already in use and print a warning
if _id in payload_list:
if _id not in HORUS_PAYLOAD_LIST:
logging.warning(f"Payload ID {_id} already in use by {payload_list[_id]}")
payload_list[_id] = _callsign
except Exception as e:
logging.error(f"Error parsing line: {line}: {str(e)}")
except Exception as e:
logging.error("Error reading Payload ID list, does it exist? - %s" % str(e))
logging.debug("Known Payload IDs:")
for _payload in payload_list:
logging.debug("\t%s - %s" % (_payload, payload_list[_payload]))
return payload_list
def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None, timeout=5):
def parse_payload_list(text_data):
"""
Attempt to download the latest payload ID list from Github, and parse into a dictionary.
Optionally, save it to a file.
Parse payload list text data
"""
# Download the list.
try:
logging.info("Attempting to download latest payload ID list from GitHub...")
_r = requests.get(url, timeout=timeout)
except Exception as e:
logging.error("Unable to get latest payload ID list: %s" % str(e))
return None
# Check it is what we think it is..
if "HORUS BINARY PAYLOAD ID LIST" not in _r.text:
logging.error("Downloaded payload ID list is invalid.")
return None
_text = _r.text
_payload_list = {}
_payload_list = HORUS_PAYLOAD_LIST.copy()
try:
for line in _r.text.split('\n'):
for line in text_data.split('\n'):
if line == "":
continue
# Skip comment lines.
@ -123,7 +70,7 @@ def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None, time
_params = line.split(',')
if len(_params) != 2:
# Invalid line.
logging.error("Could not parse line: %s" % line)
logging.error(f"Could not parse line, incorrect number of fields: {line}")
else:
try:
_id = int(_params[0])
@ -136,11 +83,59 @@ def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None, time
_payload_list[_id] = _callsign
except:
logging.error("Error parsing line: %s" % line)
logging.error(f"Error parsing line, skipping: {line}")
except Exception as e:
logging.error("Error reading Payload ID list - %s" % str(e))
logging.error(f"Error reading Payload ID list, using defaults: {str(e)}")
return _payload_list
return _payload_list
def read_payload_list(filename="payload_id_list.txt"):
""" Read a payload ID list from a file, and return the parsed data as a dictionary """
# Dummy payload list.
payload_list = HORUS_PAYLOAD_LIST.copy()
try:
with open(filename,'r') as file:
_text = file.read()
payload_list = parse_payload_list(_text)
except Exception as e:
logging.error(f"Error reading Payload ID list, does it exist? Error: {str(e)}")
logging.debug("Known Payload IDs:")
for _payload in payload_list:
logging.debug("\t%s - %s" % (_payload, payload_list[_payload]))
return payload_list
def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None, timeout=10):
"""
Attempt to download the latest payload ID list from Github, and parse into a dictionary.
Optionally, save it to a file.
If the file fails to download, or is invalid, return None.
"""
# Download the list.
try:
logging.info("Attempting to download latest payload ID list from GitHub...")
_r = requests.get(url, timeout=timeout)
except Exception as e:
logging.error("Unable to download latest payload ID list: %s" % str(e))
return None
# Check it is what we think it is..
if "HORUS BINARY PAYLOAD ID LIST" not in _r.text:
logging.error("Downloaded payload ID list is invalid.")
return None
# Now parse what we have downloaded
_payload_list = parse_payload_list(_r.text)
# Write out to file, if we have been given a filename.
if filename != None:
try:
with open(filename, 'w') as f:
@ -155,7 +150,6 @@ def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None, time
return _payload_list
def init_payload_id_list(filename="payload_id_list.txt", nodownload=False):
""" Initialise and update the local payload ID list. """
@ -173,26 +167,21 @@ def init_payload_id_list(filename="payload_id_list.txt", nodownload=False):
return HORUS_PAYLOAD_LIST
def read_custom_field_list(filename="custom_field_list.json"):
"""
Read in a JSON file containing descriptions of custom payload fields,
def parse_custom_field_list(text_data):
"""
Parse JSON containing descriptions of custom payload fields,
for use with the Horus Binary v2 32-byte payload format.
"""
# Get default custom field list to pass back in case of error
_custom_field_list = HORUS_CUSTOM_FIELDS
try:
# Read in entirity of file contents.
_f = open(filename, 'r')
_raw_data = _f.read()
_f.close()
# Attempt to parse JSON
_field_data = json.loads(_raw_data)
_field_data = json.loads(text_data)
if type(_field_data) != dict:
logging.error("Error reading custom field list, using defaults.")
logging.error("Custom field list data does not contain a JSON object, using defaults.")
return _custom_field_list
# Iterate through fields in the file we just read in
@ -210,6 +199,21 @@ def read_custom_field_list(filename="custom_field_list.json"):
"fields": _data["fields"]
}
logging.debug(f"Loaded custom field data for {_payload}.")
# Look for other_payloads field, which means this custom field spec applies to
# other payload IDs too.
if "other_payloads" in _data:
for _other_payload in _data["other_payloads"]:
if _other_payload in _custom_field_list:
logging.warning(f"Custom field data for {_other_payload} is already loaded, overwriting.")
_custom_field_list[_other_payload] = {
"struct": _data["struct"],
"fields": _data["fields"]
}
logging.debug(f"Applied {_payload} custom field data to additional payloads: {_data['other_payloads']}.")
else:
logging.error(f"Struct field for {_payload} has incorrect length ({_structsize}).")
@ -219,11 +223,33 @@ def read_custom_field_list(filename="custom_field_list.json"):
return _custom_field_list
except Exception as e:
logging.error(f"Error parsing custom field list file ({filename}): {str(e)}")
logging.error(f"Error parsing custom field list data, using defaults: {str(e)}")
return _custom_field_list
def read_custom_field_list(filename="custom_field_list.json"):
"""
Read in a JSON file containing descriptions of custom payload fields,
then parse it.
"""
_custom_field_list = HORUS_CUSTOM_FIELDS
try:
# Read in entirity of file contents.
_f = open(filename, 'r')
_raw_data = _f.read()
_f.close()
except Exception as e:
logging.error(f"Error loading custom field list file ({filename}), using defaults: {str(e)}")
return _custom_field_list
return parse_custom_field_list(_raw_data)
# Function does not appear to be used anywhere, can probably remove this.
def grab_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, local_file="custom_field_list.json"):
""" Attempt to download the latest custom field list from Github """
@ -249,15 +275,21 @@ def grab_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, local_file="custom
return True
def download_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, filename=None, timeout=5):
""" Attempt to download the latest custom field list from Github """
def download_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, filename=None, timeout=10):
"""
Attempt to download the latest custom field list from Github,
then parse it.
If the file cannot be downloaded, or is invalid, this will return None rather than returning the default custom fields.
"""
_custom_field_list = HORUS_CUSTOM_FIELDS
# Download the list.
try:
logging.info("Attempting to download latest custom field list from GitHub...")
_r = requests.get(url, timeout=timeout)
except Exception as e:
logging.error("Unable to get latest custom field list: %s" % str(e))
logging.error("Unable to download latest custom field list: %s" % str(e))
return None
# Check it is what we think it is..
@ -266,55 +298,20 @@ def download_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, filename=None,
logging.error("Downloaded custom field list is invalid.")
return None
_text = _r.text
_custom_field_list = {}
try:
# Attempt to parse JSON
_field_data = json.loads(_r.text)
if type(_field_data) != dict:
logging.error("Error reading custom field list - Incorrect input format.")
return None
# Iterate through fields in the file we just read in
for _payload in _field_data:
_data = _field_data[_payload]
if ("struct" in _data) and ("fields" in _data):
# Check the struct value has the right length
try:
_structsize = struct.calcsize(_data["struct"])
if _structsize == HORUS_CUSTOM_FIELD_LENGTH:
_custom_field_list[_payload] = {
"struct": _data["struct"],
"fields": _data["fields"]
}
logging.debug(f"Loaded custom field data for {_payload}.")
else:
logging.error(f"Struct field for {_payload} has incorrect length ({_structsize}).")
except Exception as e:
logging.error(f"Could not parse custom field data for {_payload}: {str(e)}")
except Exception as e:
logging.error(f"Could not parse downloaded custom field list - {str(e)}")
return None
_custom_field_list = parse_custom_field_list(_r.text)
if filename != None:
try:
with open(filename, 'w') as f:
f.write(_r.text)
logging.info(f"Wrote latest custom field list data out to {filename}.")
except Exception as e:
logging.error(f"Error writing custom field list to file {filename} - {str(e)}")
return _custom_field_list
def init_custom_field_list(filename="custom_field_list.json", nodownload=False):
""" Initialise and update the local custom field list """
@ -340,6 +337,7 @@ def update_payload_lists(payload_list, custom_field_list):
if __name__ == "__main__":
import argparse
import pprint
from unittest.mock import Mock
# Read command-line arguments
@ -356,8 +354,8 @@ if __name__ == "__main__":
logging.error = Mock()
logging.warning = Mock()
init_payload_id_list(nodownload=args.nodownload)
init_custom_field_list(nodownload=args.nodownload)
_new_payload_list = init_payload_id_list(nodownload=args.nodownload)
_new_custom_field_list = init_custom_field_list(nodownload=args.nodownload)
if args.test:
try:
@ -371,5 +369,5 @@ if __name__ == "__main__":
raise AssertionError(f"Warnings when parsing payloads: {logging.warning.call_args_list}") from None
if args.print:
print(HORUS_PAYLOAD_LIST)
print(HORUS_CUSTOM_FIELDS)
pprint.pprint(_new_payload_list)
pprint.pprint(_new_custom_field_list)

Wyświetl plik

@ -1,6 +1,6 @@
[tool.poetry]
name = "horusdemodlib"
version = "0.3.15"
version = "0.4.0"
description = "Project Horus HAB Telemetry Demodulators"
authors = ["Mark Jessop"]
license = "LGPL-2.1-or-later"