Initial attempt at persistent APRS connections. Still need to figure out a filter that works with radiosondy.info

pull/330/head
Mark Jessop 2020-12-12 18:03:03 +10:30
rodzic 876cb19c12
commit a2a761e875
2 zmienionych plików z 147 dodań i 31 usunięć

Wyświetl plik

@ -17,7 +17,7 @@ except ImportError:
# MINOR - New sonde type support, other fairly big changes that may result in telemetry or config file incompatability issus.
# PATCH - Small changes, or minor feature additions.
__version__ = "1.3.3-beta5"
__version__ = "1.3.3-beta6"
# Global Variables

Wyświetl plik

@ -11,7 +11,7 @@ import random
import time
import traceback
import socket
from threading import Thread
from threading import Thread, Lock
from . import __version__ as auto_rx_version
try:
# Python 2
@ -281,6 +281,7 @@ class APRSUploader(object):
position_report = False,
aprsis_host = 'rotate.aprs2.net',
aprsis_port = 14580,
aprsis_reconnect = 180,
station_beacon = False,
station_beacon_rate = 30,
station_beacon_position = (0.0,0.0,0.0),
@ -289,7 +290,7 @@ class APRSUploader(object):
synchronous_upload_time = 30,
callsign_validity_threshold = 5,
upload_queue_size = 16,
upload_timeout = 10,
upload_timeout = 5,
inhibit = False
):
""" Initialise an APRS Uploader object.
@ -336,6 +337,7 @@ class APRSUploader(object):
self.position_report = position_report
self.aprsis_host = aprsis_host
self.aprsis_port = aprsis_port
self.aprsis_reconnect = aprsis_reconnect
self.upload_timeout = upload_timeout
self.upload_queue_size = upload_queue_size
self.synchronous_upload_time = synchronous_upload_time
@ -372,6 +374,14 @@ class APRSUploader(object):
# Record of when we last uploaded a user station position to Habitat.
self.last_user_position_upload = 0
# APRS-IS Socket Object
self.aprsis_socket = None
self.aprsis_lastconnect = 0
self.aprsis_upload_lock = Lock()
# Attempt to connect to the APRS-IS server.
# If this fails, we will attempt to re-connect when a packet needs to be uploaded.
self.connect()
# Start the uploader thread.
self.upload_thread_running = True
self.upload_thread = Thread(target=self.aprs_upload_thread)
@ -388,8 +398,67 @@ class APRSUploader(object):
self.log_info("APRS Uploader Started.")
def connect(self):
""" Connect to an APRS-IS Server """
# create socket & connect to server
self.aprsis_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.aprsis_socket.settimeout(self.upload_timeout)
try:
self.aprsis_socket.connect((self.aprsis_host, self.aprsis_port))
# Send logon string
#_logon = 'user %s pass %s vers VK5QI-AutoRX filter b/%s \r\n' % (self.aprs_callsign, self.aprs_passcode, self.aprs_callsign)
_logon = 'user %s pass %s vers VK5QI-AutoRX\r\n' % (self.aprs_callsign, self.aprs_passcode)
self.log_debug("Logging in: %s" % _logon)
self.aprsis_socket.sendall(_logon.encode('ascii'))
def aprsis_upload(self, source, packet, igate=False):
# Set packet filters to limit inbound bandwidth.
_filter = '#filter p/ZZ\r\n'
self.log_debug("Setting Filter: %s" % _filter)
self.aprsis_socket.sendall(_filter.encode('ascii'))
_filter = '#filter -t/po\r\n'
self.log_debug("Setting Filter: %s" % _filter)
self.aprsis_socket.sendall(_filter.encode('ascii'))
# Wait for login to complete.
time.sleep(1)
# Check response
_resp = self.aprsis_socket.recv(1024)
try:
_resp = _resp.decode('ascii').strip()
except:
print(_resp)
if _resp[0] != '#':
raise IOError("Invalid response from APRS-IS Server: %s" % _resp)
else:
self.log_debug("Server Logon Response: %s" % str(_resp))
self.log_info("Connected to APRS-IS server %s:%d" % (self.aprsis_host, self.aprsis_port))
self.aprsis_lastconnect = time.time()
return True
except Exception as e:
self.log_error("Connection to APRS-IS Failed - %s" % str(e))
self.aprsis_socket = None
return False
def flush_rx(self):
""" Flush the APRS-IS RX buffer """
try:
_start = time.time()
_data = self.aprsis_socket.recv(32768)
_dur = time.time() - _start
self.log_debug("%.2f s, Buffer Contents: %s" % (_dur, _data.decode()))
except:
# Don't handle any exceptions from attemptingto read the buffer.
pass
def aprsis_upload(self, source, packet, igate=False, retries=5):
""" Upload a packet to APRS-IS
Args:
@ -397,43 +466,67 @@ class APRSUploader(object):
packet (str): APRS packet to upload.
igate (boolean): If True, iGate the packet into APRS-IS
(i.e. use the original source call, but add SONDEGATE and our callsign to the path.)
retries (int): Number of times to retry uploading.
"""
# If we are inhibited, just return immediately.
if self.inhibit:
self.log_info("Upload Inhibited: %s" % packet)
return True
self.aprsis_upload_lock.acquire()
# Generate APRS packet
if igate:
# If we are emulating an IGATE, then we need to add in a path, a q-construct, and our own callsign.
# We have the TOCALL field 'APRARX' allocated by Bob WB4APR, so we can now use this to indicate
# that these packets have arrived via radiosonde_auto_rx!
_packet = '%s>APRARX,SONDEGATE,TCPIP,qAR,%s:%s\n' % (source, self.aprs_callsign, packet)
_packet = '%s>APRARX,SONDEGATE,TCPIP,qAR,%s:%s\r\n' % (source, self.aprs_callsign, packet)
else:
# Otherwise, we are probably just placing an object, usually sourced by our own callsign
_packet = '%s>APRS:%s\n' % (source, packet)
_packet = '%s>APRS:%s\r\n' % (source, packet)
# create socket & connect to server
_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_s.settimeout(self.upload_timeout)
_attempts = 1
while _attempts < retries:
try:
# Immediately throw exception if we're not connected.
# This will trigger a reconnect.
if self.aprsis_socket is None:
raise IOError("Socket not connected.")
# Attempt to send the packet.
# This will timeout if the socket is locked up.
self.aprsis_socket.sendall(_packet.encode('ascii'))
# If OK, return.
self.log_info("Uploaded to APRS-IS: %s" % str(_packet).strip())
self.aprsis_upload_lock.release()
return True
except Exception as e:
# If something broke, forcibly shutdown the socket, then reconnect.
self.log_error("Upload Error: %s" % str(e))
self.log_info("Attempting to reconnect...")
self.disconnect()
time.sleep(1)
self.connect()
_attempts += 1
# If we end up here, something has really broken.
self.aprsis_upload_lock.release()
return False
def disconnect(self):
""" Close APRS-IS connection """
try:
_s.connect((self.aprsis_host, self.aprsis_port))
# Send logon string
_logon = 'user %s pass %s vers VK5QI-AutoRX \n' % (self.aprs_callsign, self.aprs_passcode)
_s.send(_logon.encode('ascii'))
# send packet
_s.send(_packet.encode('ascii'))
# close socket
_s.shutdown(0)
_s.close()
self.log_info("Uploaded to APRS-IS: %s" % _packet)
return True
self.aprsis_socket.shutdown(0)
self.aprsis_socket.close()
except Exception as e:
self.log_error("Upload to APRS-IS Failed - %s" % str(e))
return False
self.log_error("Disconnection from APRS-IS Failed - %s" % str(e))
def beacon_station_position(self):
@ -466,7 +559,7 @@ class APRSUploader(object):
def aprs_upload_thread(self):
''' Handle uploading of packets to Habitat '''
''' Handle uploading of packets to APRS '''
self.log_debug("Started APRS Uploader Thread.")
@ -495,6 +588,12 @@ class APRSUploader(object):
# Attempt to upload it.
if _packet is not None:
# If we have not connected in a long time, reset the APRS-IS connection.
if (time.time() - self.aprsis_lastconnect) > (self.aprsis_reconnect * 60):
self.disconnect()
time.sleep(1)
self.connect()
# If we are uploading position reports, the source call is the generated callsign
# usually based on the sonde serial number, and we iGate the position report.
# Otherwise, we upload APRS Objects, sourced by our own callsign, but still iGated via us.
@ -512,7 +611,7 @@ class APRSUploader(object):
def upload_timer(self):
""" Add packets to the habitat upload queue if it is time for us to upload. """
""" Add packets to the aprs upload queue if it is time for us to upload. """
while self.timer_thread_running:
if int(time.time()) % self.synchronous_upload_time == 0:
@ -534,6 +633,9 @@ class APRSUploader(object):
# Sleep a second so we don't hit the synchronous upload time again.
time.sleep(1)
# Flush APRS-IS RX buffer
self.flush_rx()
else:
# Not yet time to upload, wait for a bit.
time.sleep(0.1)
@ -574,7 +676,8 @@ class APRSUploader(object):
self.log_debug("Payload ID %s not observed enough to allow upload." % _id)
if (time.time() - self.last_user_position_upload) > self.station_beacon['rate']*60:
self.beacon_station_position()
if self.aprsis_socket != None:
self.beacon_station_position()
time.sleep(0.1)
@ -614,6 +717,8 @@ class APRSUploader(object):
self.timer_thread_running = False
self.upload_thread_running = False
self.disconnect()
# Wait for all threads to close.
if self.upload_thread is not None:
self.upload_thread.join()
@ -667,10 +772,10 @@ if __name__ == "__main__":
#{'id':'DFM09-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
#{'id':'DFM15-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
#{'id':'DFM17-12345678', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'DFM-19123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM17', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'DFM-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'DFM06', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'N1234567', 'frame':10, 'lat':-10.00001, 'lon':9.99999999, 'alt':10000, 'temp':1.0, 'type':'RS41', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'M1234567', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'type':'RS92', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'DFM-19123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'humidity':1.0, 'pressure':1000.0, 'batt':3.0, 'type':'DFM17', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'DFM-123456', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'humidity':1.0, 'pressure':1000.0, 'batt':3.0, 'type':'DFM06', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'N1234567', 'frame':10, 'lat':-10.00001, 'lon':9.99999999, 'alt':10000, 'temp':1.0, 'humidity':1.0, 'pressure':1000.0, 'batt':3.0, 'type':'RS41', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
{'id':'M1234567', 'frame':10, 'lat':-10.0, 'lon':10.0, 'alt':10000, 'temp':1.0, 'humidity':1.0, 'pressure':1000.0, 'batt':3.0, 'type':'RS92', 'freq':'401.520 MHz', 'freq_float':401.52, 'heading':0.0, 'vel_h':5.1, 'vel_v':-5.0, 'datetime_dt':datetime.datetime.utcnow()},
]
@ -679,3 +784,14 @@ if __name__ == "__main__":
for _telem in test_telem:
out_str = telemetry_to_aprs_position(_telem, object_name="<id>", aprs_comment=comment_field, position_report=False)
print(out_str)
# APRS Testing
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
test = APRSUploader(aprs_callsign="VK5QI", aprs_passcode="23032", aprsis_host="radiosondy.info")
test.connect()
time.sleep(5)
test.disconnect()
test.close()