radiosonde_auto_rx/auto_rx/autorx/aprs.py

914 wiersze
34 KiB
Python

#!/usr/bin/env python
#
# radiosonde_auto_rx - APRS Exporter
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import datetime
import logging
import random
import time
import traceback
import socket
from queue import Queue
from threading import Thread, Lock
from . import __version__ as auto_rx_version
from .utils import strip_sonde_serial
def telemetry_to_aprs_position(
sonde_data, object_name="<id>", aprs_comment="BOM Balloon", position_report=False
):
""" Convert a dictionary containing Sonde telemetry into an APRS packet.
Args:
sonde_data (dict): Sonde telemetry dictionary. (refer autorx.decoder)
object_name (str): APRS Object Name. If <id>, the sonde's serial number will be used.
aprs_comment (str): Comment to use in the packet.
position_report (bool): if True, generate a position report instead of an APRS object packet.
"""
# Generate the APRS 'callsign' for the sonde.
if object_name == "<id>":
_object_name = None
# We should have been provided a APRS ID in the telemetry.
# This is added in in decode.py, and generated in utils.py
if 'aprsid' in sonde_data:
if sonde_data['aprsid'] is not None:
_object_name = sonde_data['aprsid']
# However, it may be 'None', which indicates we don't know how to handle this sonde ID yet.
if _object_name is None:
# Unknown sonde type, don't know how to handle this yet.
logging.error(
"No APRS ID conversion available for sonde type: %s"
% sonde_data["type"]
)
return (None, None)
else:
_object_name = object_name
# Use the actual sonde frequency, if we have it.
if "f_centre" in sonde_data:
# We have an estimate of the sonde's centre frequency from the modem, use this in place of
# the RX frequency.
# Round to 1 kHz
_freq = round(sonde_data["f_centre"] / 1000.0)
# Convert to MHz.
_freq = "%.3f MHz" % (_freq / 1e3)
else:
# Otherwise, use the normal frequency.
_freq = sonde_data["freq"]
# Generate the comment field.
_aprs_comment = aprs_comment
_aprs_comment = _aprs_comment.replace("<freq>", _freq)
_aprs_comment = _aprs_comment.replace("<id>", strip_sonde_serial(sonde_data["id"]))
_aprs_comment = _aprs_comment.replace("<temp>", "%.1fC" % sonde_data["temp"])
_aprs_comment = _aprs_comment.replace(
"<pressure>", "%.1fhPa" % sonde_data["pressure"]
)
_aprs_comment = _aprs_comment.replace(
"<humidity>", "%.1f" % sonde_data["humidity"] + "%"
)
_aprs_comment = _aprs_comment.replace("<batt>", "%.1fV" % sonde_data["batt"])
_aprs_comment = _aprs_comment.replace("<vel_v>", "%.1fm/s" % sonde_data["vel_v"])
_aprs_comment = _aprs_comment.replace("<type>", sonde_data["type"])
# TODO: RS41 Burst Timer
# Convert float latitude to APRS format (DDMM.MM)
lat = float(sonde_data["lat"])
lat_degree = abs(int(lat))
lat_minute = abs(lat - int(lat)) * 60.0
lat_min_str = ("%02.4f" % lat_minute).zfill(7)[:5]
lat_dir = "S"
if lat > 0.0:
lat_dir = "N"
lat_str = "%02d%s" % (lat_degree, lat_min_str) + lat_dir
# Convert float longitude to APRS format (DDDMM.MM)
lon = float(sonde_data["lon"])
lon_degree = abs(int(lon))
lon_minute = abs(lon - int(lon)) * 60.0
lon_min_str = ("%02.4f" % lon_minute).zfill(7)[:5]
lon_dir = "E"
if lon < 0.0:
lon_dir = "W"
lon_str = "%03d%s" % (lon_degree, lon_min_str) + lon_dir
# Generate the added digits of precision, as per http://www.aprs.org/datum.txt
# Base-91 can only encode decimal integers between 0 and 93 (otherwise we end up with non-printable characters)
# So, we have to scale the range 00-99 down to 0-90, being careful to avoid errors due to floating point math.
_lat_prec = int(round(float(("%02.4f" % lat_minute)[-2:]) / 1.10))
_lon_prec = int(round(float(("%02.4f" % lon_minute)[-2:]) / 1.10))
# Now we can add 33 to the 0-90 value to produce the Base-91 character.
_lat_prec = chr(_lat_prec + 33)
_lon_prec = chr(_lon_prec + 33)
# Produce Datum + Added precision string
# We currently assume all position data is using the WGS84 datum,
# which I believe is true for most (if not all?) radiosondes.
_datum = "!w%s%s!" % (_lat_prec, _lon_prec)
# Convert Alt (in metres) to feet
alt = int(float(sonde_data["alt"]) / 0.3048)
# Produce the timestamp
_aprs_timestamp = sonde_data["datetime_dt"].strftime("%H%M%S")
# Generate course/speed data, if provided in the telemetry dictionary
if ("heading" in sonde_data.keys()) and ("vel_h" in sonde_data.keys()):
course_speed = "%03d/%03d" % (
int(sonde_data["heading"]),
int(sonde_data["vel_h"] * 1.944),
)
else:
course_speed = "000/000"
if position_report:
# Produce an APRS position report string
# Note, we are using the 'position with timestamp' data type, as per http://www.aprs.org/doc/APRS101.PDF
out_str = "/%sh%s/%sO%s/A=%06d %s %s" % (
_aprs_timestamp,
lat_str,
lon_str,
course_speed,
alt,
_aprs_comment,
_datum,
)
else:
# Produce an APRS Object
out_str = ";%s*%sh%s/%sO%s/A=%06d %s %s" % (
_object_name,
_aprs_timestamp,
lat_str,
lon_str,
course_speed,
alt,
_aprs_comment,
_datum,
)
# Return both the packet, and the 'callsign'.
return (out_str, _object_name.strip())
def generate_station_object(
callsign,
lat,
lon,
comment="radiosonde_auto_rx SondeGate v<version>",
icon="/r",
position_report=False,
):
""" Generate a station object """
# Pad or limit the station callsign to 9 characters, if it is to long or short.
if len(callsign) > 9:
callsign = callsign[:9]
elif len(callsign) < 9:
callsign = callsign + " " * (9 - len(callsign))
# Convert float latitude to APRS format (DDMM.MM)
lat = float(lat)
lat_degree = abs(int(lat))
lat_minute = abs(lat - int(lat)) * 60.0
lat_min_str = ("%02.4f" % lat_minute).zfill(7)[:5]
lat_dir = "S"
if lat > 0.0:
lat_dir = "N"
lat_str = "%02d%s" % (lat_degree, lat_min_str) + lat_dir
# Convert float longitude to APRS format (DDDMM.MM)
lon = float(lon)
lon_degree = abs(int(lon))
lon_minute = abs(lon - int(lon)) * 60.0
lon_min_str = ("%02.4f" % lon_minute).zfill(7)[:5]
lon_dir = "E"
if lon < 0.0:
lon_dir = "W"
lon_str = "%03d%s" % (lon_degree, lon_min_str) + lon_dir
# Generate the added digits of precision, as per http://www.aprs.org/datum.txt
# Base-91 can only encode decimal integers between 0 and 93 (otherwise we end up with non-printable characters)
# So, we have to scale the range 00-99 down to 0-90, being careful to avoid errors due to floating point math.
_lat_prec = int(round(float(("%02.4f" % lat_minute)[-2:]) / 1.10))
_lon_prec = int(round(float(("%02.4f" % lon_minute)[-2:]) / 1.10))
# Now we can add 33 to the 0-90 value to produce the Base-91 character.
_lat_prec = chr(_lat_prec + 33)
_lon_prec = chr(_lon_prec + 33)
# Produce Datum + Added precision string
# We currently assume all position data is using the WGS84 datum.
_datum = "!w%s%s!" % (_lat_prec, _lon_prec)
# Generate timestamp using current UTC time
_aprs_timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%H%M%S")
# Add version string to position comment, if requested.
_aprs_comment = comment
_aprs_comment = _aprs_comment.replace("<version>", auto_rx_version)
# Generate output string
if position_report:
# Produce a position report with no timestamp, as per page 32 of http://www.aprs.org/doc/APRS101.PDF
out_str = "!%s%s%s%s%s %s" % (
lat_str,
icon[0],
lon_str,
icon[1],
_aprs_comment,
_datum,
)
else:
# Produce an object string
out_str = ";%s*%sh%s%s%s%s%s %s" % (
callsign,
_aprs_timestamp,
lat_str,
icon[0],
lon_str,
icon[1],
_aprs_comment,
_datum,
)
return out_str
#
# APRS Uploader Class
#
class APRSUploader(object):
"""
Queued APRS Telemetry Uploader class
This performs uploads to an APRS-IS server.
Incoming telemetry packets are fed into queue, which is checked regularly.
At a regular interval, the most recent telemetry packet is extracted, and converted to an
APRS object format, and then uploaded into APRS-IS.
If an upload attempt times out, the packet is discarded.
If the queue fills up (probably indicating no network connection, and a fast packet downlink rate),
it is immediately emptied, to avoid upload of out-of-date packets.
Note that this uploader object is intended to handle telemetry from multiple sondes
"""
# We require the following fields to be present in the incoming telemetry dictionary data
REQUIRED_FIELDS = [
"frame",
"id",
"datetime",
"lat",
"lon",
"alt",
"temp",
"type",
"freq",
"freq_float",
"datetime_dt",
]
def __init__(
self,
aprs_callsign="N0CALL",
aprs_passcode="00000",
object_name_override=None,
object_comment="RadioSonde",
position_report=False,
aprsis_host="rotate.aprs2.net",
aprsis_port=14580,
aprsis_reconnect=300,
station_beacon=False,
station_beacon_rate=30,
station_beacon_position=(0.0, 0.0, 0.0),
station_beacon_comment="radiosonde_auto_rx SondeGate v<version>",
station_beacon_icon="/r",
upload_time=60,
callsign_validity_threshold=5,
upload_queue_size=16,
upload_timeout=5,
inhibit=False,
):
""" Initialise an APRS Uploader object.
Args:
aprs_callsign (str): Callsign of the uploader, used when logging into APRS-IS.
aprs_passcode (tuple): Optional - a tuple consisting of (lat, lon, alt), which if populated,
is used to plot the listener's position on the Habitat map, both when this class is initialised, and
when a new sonde ID is observed.
object_name_override (str): Override the object name in the uploaded sentence with this value.
WARNING: This will horribly break the aprs.fi map if multiple sondes are uploaded simultaneously under the same callsign.
USE WITH CAUTION!!!
object_comment (str): A comment to go with the object. Various fields will be replaced with telmetry data.
position_report (bool): If True, upload positions as APRS position reports, otherwise, upload as an Object.
aprsis_host (str): APRS-IS Server to upload packets to.
aprsis_port (int): APRS-IS TCP port number.
aprsis_reconnect (int): Reconnect to the APRS-IS server at least every X minutes. Reconnections will occur when telemetry needs to be sent.
station_beacon (bool): Enable beaconing of station position.
station_beacon_rate (int): Time delay between beacon uploads (minutes)
station_beacon_position (tuple): (lat, lon, alt), in decimal degrees, of the station position.
station_beacon_comment (str): Comment field for the station beacon. <version> will be replaced with the current auto_rx version.
station_beacon_icon (str): The APRS icon to be used, as the two characters (symbol table, symbol index), as per http://www.aprs.org/symbols.html
upload_time (int): Upload the most recent telemetry after this time is up.
callsign_validity_threshold (int): Only upload telemetry data if the callsign has been observed more than N times. Default = 5
upload_queue_size (int): Maximum number of sentences to keep in the upload queue. If the queue is filled,
it will be emptied (discarding the queue contents).
upload_timeout (int): Timeout (Seconds) when performing uploads to APRS-IS. Default: 10 seconds.
inhibit (bool): Inhibit all uploads. Mainly intended for debugging.
"""
self.aprs_callsign = aprs_callsign
self.aprs_passcode = aprs_passcode
self.object_comment = object_comment
self.position_report = position_report
self.aprsis_host = aprsis_host
self.aprsis_port = aprsis_port
self.aprsis_reconnect = aprsis_reconnect
self.upload_timeout = upload_timeout
self.upload_queue_size = upload_queue_size
self.upload_time = upload_time
self.next_upload = time.monotonic() + upload_time
self.callsign_validity_threshold = callsign_validity_threshold
self.inhibit = inhibit
self.station_beacon = {
"enabled": station_beacon,
"position": station_beacon_position,
"rate": station_beacon_rate,
"comment": station_beacon_comment,
"icon": station_beacon_icon,
}
if object_name_override is None:
self.object_name_override = "<id>"
else:
self.object_name_override = object_name_override
self.log_info(
"Using APRS Object Name Override: %s" % self.object_name_override
)
# Our two Queues - one to hold sentences to be upload, the other to temporarily hold
# input telemetry dictionaries before they are converted and processed.
self.aprs_upload_queue = Queue(upload_queue_size)
self.input_queue = Queue()
# Dictionary where we store sorted telemetry data for upload when required.
# Elements will be named after payload IDs, and will contain:
# 'count' (int): Number of times this callsign has been observed. Uploads will only occur when
# this number rises above callsign_validity_threshold.
# 'data' (Queue): A queue of telemetry sentences to be uploaded. When the upload timer fires,
# this queue will be dumped, and the most recent telemetry uploaded.
self.observed_payloads = {}
# Record of when we last uploaded a user station position to Habitat.
self.last_user_position_upload = 0
# APRS-IS Socket Object
self.aprsis_socket = None
self.aprsis_lastconnect = 0
self.aprsis_upload_lock = Lock()
# Attempt to connect to the APRS-IS server.
# If this fails, we will attempt to re-connect when a packet needs to be uploaded.
self.connect()
# Start the uploader thread.
self.upload_thread_running = True
self.upload_thread = Thread(target=self.aprs_upload_thread)
self.upload_thread.start()
# Start the input queue processing thread.
self.input_processing_running = True
self.input_thread = Thread(target=self.process_queue)
self.input_thread.start()
self.timer_thread_running = True
self.timer_thread = Thread(target=self.upload_timer)
self.timer_thread.start()
self.log_info("APRS Uploader Started.")
def connect(self):
""" Connect to an APRS-IS Server """
# create socket & connect to server
self.aprsis_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.aprsis_socket.settimeout(self.upload_timeout)
try:
self.aprsis_socket.connect((self.aprsis_host, self.aprsis_port))
# Send logon string
# _logon = 'user %s pass %s vers VK5QI-AutoRX filter b/%s \r\n' % (self.aprs_callsign, self.aprs_passcode, self.aprs_callsign)
_logon = "user %s pass %s vers VK5QI-AutoRX\r\n" % (
self.aprs_callsign,
self.aprs_passcode,
)
self.log_debug("Logging in: %s" % _logon)
self.aprsis_socket.sendall(_logon.encode("ascii"))
# Set packet filters to limit inbound bandwidth.
_filter = "#filter p/ZZ\r\n"
self.log_debug("Setting Filter: %s" % _filter)
self.aprsis_socket.sendall(_filter.encode("ascii"))
_filter = "#filter -t/po\r\n"
self.log_debug("Setting Filter: %s" % _filter)
self.aprsis_socket.sendall(_filter.encode("ascii"))
# Wait for login to complete.
time.sleep(1)
# Check response
_resp = self.aprsis_socket.recv(1024)
try:
_resp = _resp.decode("ascii").strip()
except:
print(_resp)
if _resp[0] != "#":
raise IOError("Invalid response from APRS-IS Server: %s" % _resp)
else:
self.log_debug("Server Logon Response: %s" % str(_resp))
self.log_info(
"Connected to APRS-IS server %s:%d"
% (self.aprsis_host, self.aprsis_port)
)
self.aprsis_lastconnect = time.time()
return True
except Exception as e:
self.log_error("Connection to APRS-IS Failed - %s" % str(e))
self.aprsis_socket = None
return False
def flush_rx(self):
""" Flush the APRS-IS RX buffer """
try:
_start = time.time()
_data = self.aprsis_socket.recv(32768)
_dur = time.time() - _start
self.log_debug("Incoming data from APRS-IS: %s" % (_data.decode()))
except:
# Ignore any exceptions from attempting to read the buffer.
pass
def aprsis_upload(self, source, packet, igate=False, retries=5):
""" Upload a packet to APRS-IS
Args:
source (str): Callsign of the packet source.
packet (str): APRS packet to upload.
igate (boolean): If True, iGate the packet into APRS-IS
(i.e. use the original source call, but add SONDEGATE and our callsign to the path.)
retries (int): Number of times to retry uploading.
"""
# If we are inhibited, just return immediately.
if self.inhibit:
self.log_info("Upload Inhibited: %s" % packet)
return True
self.aprsis_upload_lock.acquire()
# If we have not connected in a long time, reset the APRS-IS connection.
if (time.time() - self.aprsis_lastconnect) > (self.aprsis_reconnect * 60):
self.disconnect()
time.sleep(1)
self.connect()
# Generate APRS packet
if igate:
# If we are emulating an IGATE, then we need to add in a path, a q-construct, and our own callsign.
# We have the TOCALL field 'APRARX' allocated by Bob WB4APR, so we can now use this to indicate
# that these packets have arrived via radiosonde_auto_rx!
_packet = "%s>APRARX,SONDEGATE,TCPIP,qAR,%s:%s\r\n" % (
source,
self.aprs_callsign,
packet,
)
else:
# Otherwise, we are probably just placing an object, usually sourced by our own callsign
_packet = "%s>APRS:%s\r\n" % (source, packet)
_attempts = 1
while _attempts < retries:
try:
# Immediately throw exception if we're not connected.
# This will trigger a reconnect.
if self.aprsis_socket is None:
raise IOError("Socket not connected.")
# Attempt to send the packet.
# This will timeout if the socket is locked up.
self.aprsis_socket.sendall(_packet.encode("ascii"))
# If OK, return.
self.log_info("Uploaded to APRS-IS: %s" % str(_packet).strip())
self.aprsis_upload_lock.release()
return True
except Exception as e:
# If something broke, forcibly shutdown the socket, then reconnect.
self.log_error("Upload Error: %s" % str(e))
self.log_info("Attempting to reconnect...")
self.disconnect()
time.sleep(1)
self.connect()
_attempts += 1
# If we end up here, something has really broken.
self.aprsis_upload_lock.release()
return False
def disconnect(self):
""" Close APRS-IS connection """
try:
self.aprsis_socket.shutdown(0)
except Exception as e:
self.log_debug("Socket shutdown failed - %s" % str(e))
try:
self.aprsis_socket.close()
except Exception as e:
self.log_debug("Socket close failed - %s" % str(e))
def beacon_station_position(self):
""" Send a station position beacon into APRS-IS """
if self.station_beacon["enabled"]:
if (self.station_beacon["position"][0] == 0.0) and (
self.station_beacon["position"][1] == 0.0
):
self.log_error(
"Station position is 0,0, not uploading position beacon."
)
self.last_user_position_upload = time.time()
return
# Generate the station position packet
# Note - this is now generated as an APRS position report, for radiosondy.info compatability.
_packet = generate_station_object(
self.aprs_callsign,
self.station_beacon["position"][0],
self.station_beacon["position"][1],
self.station_beacon["comment"],
self.station_beacon["icon"],
position_report=True,
)
# Send the packet as an iGated packet.
self.aprsis_upload(self.aprs_callsign, _packet, igate=True)
self.last_user_position_upload = time.time()
def update_station_position(self, lat, lon, alt):
""" Update the internal station position record. Used when determining the station position by GPSD """
self.station_beacon["position"] = (lat, lon, alt)
def aprs_upload_thread(self):
""" Handle uploading of packets to APRS """
self.log_debug("Started APRS Uploader Thread.")
while self.upload_thread_running:
if self.aprs_upload_queue.qsize() > 0:
# If the queue is completely full, jump to the most recent telemetry sentence.
if self.aprs_upload_queue.qsize() == self.upload_queue_size:
while not self.aprs_upload_queue.empty():
_telem = self.aprs_upload_queue.get()
self.log_warning(
"Uploader queue was full - possible connectivity issue."
)
else:
# Otherwise, get the first item in the queue.
_telem = self.aprs_upload_queue.get()
# Convert to a packet.
try:
(_packet, _call) = telemetry_to_aprs_position(
_telem,
object_name=self.object_name_override,
aprs_comment=self.object_comment,
position_report=self.position_report,
)
except Exception as e:
self.log_error(
"Error converting telemetry to APRS packet - %s" % str(e)
)
_packet = None
# Attempt to upload it.
if _packet is not None:
# If we are uploading position reports, the source call is the generated callsign
# usually based on the sonde serial number, and we iGate the position report.
# Otherwise, we upload APRS Objects, sourced by our own callsign, but still iGated via us.
if self.position_report:
self.aprsis_upload(_call, _packet, igate=True)
else:
self.aprsis_upload(self.aprs_callsign, _packet, igate=True)
else:
# Wait for a short time before checking the queue again.
time.sleep(0.1)
self.log_debug("Stopped APRS Uploader Thread.")
def upload_timer(self):
""" Add packets to the aprs upload queue if it is time for us to upload. """
while self.timer_thread_running:
if time.monotonic() > self.next_upload:
# Time to upload!
for _id in self.observed_payloads.keys():
# If no data, continue...
if self.observed_payloads[_id]["data"].empty():
continue
else:
# Otherwise, dump the queue and keep the latest telemetry.
while not self.observed_payloads[_id]["data"].empty():
_telem = self.observed_payloads[_id]["data"].get()
# Attept to add it to the habitat uploader queue.
try:
self.aprs_upload_queue.put_nowait(_telem)
except Exception as e:
self.log_error(
"Error adding sentence to queue: %s" % str(e)
)
# Sleep a second so we don't hit the synchronous upload time again.
time.sleep(1)
# Flush APRS-IS RX buffer
self.flush_rx()
# Reset upload timer
self.next_upload = time.monotonic() + self.upload_time
else:
# Not yet time to upload, wait for a bit.
time.sleep(0.1)
def process_queue(self):
""" Process packets from the input queue.
This thread handles packets from the input queue (provided by the decoders)
Packets are sorted by ID, and a dictionary entry is created.
"""
while self.input_processing_running:
# Process everything in the queue.
while self.input_queue.qsize() > 0:
# Grab latest telem dictionary.
_telem = self.input_queue.get_nowait()
_id = _telem["id"]
if _id not in self.observed_payloads:
# We haven't seen this ID before, so create a new dictionary entry for it.
self.observed_payloads[_id] = {"count": 1, "data": Queue()}
self.log_debug(
"New Payload %s. Not observed enough to allow upload." % _id
)
# However, we don't yet add anything to the queue for this payload...
else:
# We have seen this payload before!
# Increment the 'seen' counter.
self.observed_payloads[_id]["count"] += 1
# If we have seen this particular ID enough times, add the data to the ID's queue.
if (
self.observed_payloads[_id]["count"]
>= self.callsign_validity_threshold
):
# Add the telemetry to the queue
self.observed_payloads[_id]["data"].put(_telem)
else:
self.log_debug(
"Payload ID %s not observed enough to allow upload." % _id
)
if (time.time() - self.last_user_position_upload) > self.station_beacon[
"rate"
] * 60:
if self.aprsis_socket != None:
self.beacon_station_position()
time.sleep(0.1)
def add(self, telemetry):
""" Add a dictionary of telemetry to the input queue.
Args:
telemetry (dict): Telemetry dictionary to add to the input queue.
"""
# Discard any telemetry which is indicated to be encrypted.
if "encrypted" in telemetry:
if telemetry["encrypted"] == True:
return
# Check the telemetry dictionary contains the required fields.
for _field in self.REQUIRED_FIELDS:
if _field not in telemetry:
self.log_error("JSON object missing required field %s" % _field)
return
# Add it to the queue if we are running.
if self.input_processing_running:
self.input_queue.put(telemetry)
else:
self.log_error("Processing not running, discarding.")
def close(self):
""" Shutdown uploader and processing threads. """
self.log_debug("Waiting for threads to close...")
self.input_processing_running = False
self.timer_thread_running = False
self.upload_thread_running = False
self.disconnect()
# Wait for all threads to close.
if self.upload_thread is not None:
self.upload_thread.join(60)
if self.upload_thread.is_alive():
self.log_error("aprs upload thread failed to join")
if self.timer_thread is not None:
self.timer_thread.join(60)
if self.timer_thread.is_alive():
self.log_error("aprs timer thread failed to join")
if self.input_thread is not None:
self.input_thread.join(60)
if self.input_thread.is_alive():
self.log_error("aprs input thread failed to join")
def log_debug(self, line):
""" Helper function to log a debug message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.debug("APRS-IS - %s" % line)
def log_info(self, line):
""" Helper function to log an informational message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.info("APRS-IS - %s" % line)
def log_error(self, line):
""" Helper function to log an error message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.error("APRS-IS - %s" % line)
def log_warning(self, line):
""" Helper function to log a warning message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.warning("APRS-IS - %s" % line)
if __name__ == "__main__":
# Some unit tests for the APRS packet generation code.
# ['frame', 'id', 'datetime', 'lat', 'lon', 'alt', 'temp', 'type', 'freq', 'freq_float', 'datetime_dt']
test_telem = [
# These types of DFM serial IDs are deprecated
# {'id':'DFM06-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.now(datetime.timezone.utc)},
# {'id':'DFM09-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.now(datetime.timezone.utc)},
# {'id':'DFM15-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.now(datetime.timezone.utc)},
# {'id':'DFM17-12345678', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.now(datetime.timezone.utc)},
{
"id": "DFM-19123456",
"frame": 10,
"lat": -10.0,
"lon": 10.0,
"alt": 10000,
"temp": 1.0,
"humidity": 1.0,
"pressure": 1000.0,
"batt": 3.0,
"type": "DFM17",
"freq": "401.520 MHz",
"freq_float": 401.52,
"heading": 0.0,
"vel_h": 5.1,
"vel_v": -5.0,
"datetime_dt": datetime.datetime.now(datetime.timezone.utc),
},
{
"id": "DFM-123456",
"frame": 10,
"lat": -10.0,
"lon": 10.0,
"alt": 10000,
"temp": 1.0,
"humidity": 1.0,
"pressure": 1000.0,
"batt": 3.0,
"type": "DFM06",
"freq": "401.520 MHz",
"freq_float": 401.52,
"heading": 0.0,
"vel_h": 5.1,
"vel_v": -5.0,
"datetime_dt": datetime.datetime.now(datetime.timezone.utc),
},
{
"id": "N1234567",
"frame": 10,
"lat": -10.00001,
"lon": 9.99999999,
"alt": 10000,
"temp": 1.0,
"humidity": 1.0,
"pressure": 1000.0,
"batt": 3.0,
"type": "RS41",
"freq": "401.520 MHz",
"freq_float": 401.52,
"heading": 0.0,
"vel_h": 5.1,
"vel_v": -5.0,
"datetime_dt": datetime.datetime.now(datetime.timezone.utc),
},
{
"id": "M1234567",
"frame": 10,
"lat": -10.0,
"lon": 10.0,
"alt": 10000,
"temp": 1.0,
"humidity": 1.0,
"pressure": 1000.0,
"batt": 3.0,
"type": "RS92",
"freq": "401.520 MHz",
"freq_float": 401.52,
"heading": 0.0,
"vel_h": 5.1,
"vel_v": -5.0,
"datetime_dt": datetime.datetime.now(datetime.timezone.utc),
},
]
comment_field = (
"Clb=<vel_v> t=<temp> <freq> Type=<type> Radiosonde http://bit.ly/2Bj4Sfk"
)
for _telem in test_telem:
out_str = telemetry_to_aprs_position(
_telem,
object_name="<id>",
aprs_comment=comment_field,
position_report=False,
)
print(out_str)
# APRS Testing
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG
)
test = APRSUploader(
aprs_callsign="VK5QI", aprs_passcode="23032", aprsis_host="radiosondy.info"
)
test.connect()
time.sleep(5)
test.disconnect()
test.close()