aprs_type of position beacon with weather data is 'position_weather'

pull/93/head
Konstantin Gründger 2020-10-13 20:42:53 +02:00
rodzic e1c4623f64
commit f500b36fb6
4 zmienionych plików z 17 dodań i 10 usunięć

Wyświetl plik

@ -1,8 +1,8 @@
# CHANGELOG
## not released
- client: changed socket mode from blocking to timeout (fixes #89)
- parser: Added optional distance/bearing calculation (fixes #86)
- parser: Added support for weather data from FANET ground stations
- parser: Added optional distance/bearing/normalized_quality calculation (fixes #86)
- parser: Added support for weather data from FANET ground stations (aprs_type: position_weather)
- parser: Added support for latency in receiver messages (OGNSDR) (fixes #87)
- parser: Added support for reference_timestamp with tzinfo (fixes #84)
- parser: Fixed textual altitude part (fixes #81)

Wyświetl plik

@ -3,9 +3,9 @@ class BaseParser():
self.beacon_type = 'unknown'
def parse(self, aprs_comment, aprs_type):
if aprs_type == "position":
if aprs_type.startswith('position'):
data = self.parse_position(aprs_comment)
elif aprs_type == "status":
elif aprs_type.startswith('status'):
data = self.parse_status(aprs_comment)
else:
raise ValueError("aprs_type {} unknown".format(aprs_type))

Wyświetl plik

@ -97,6 +97,8 @@ def parse_aprs(message, reference_timestamp=None):
match_position_weather = re.search(PATTERN_APRS_POSITION_WEATHER, aprs_body)
if match_position_weather:
result.update({
'aprs_type': 'position_weather',
'name': match.group('callsign'),
'dstcall': match.group('dstcall'),
'relay': match.group('relay') if match.group('relay') else None,

Wyświetl plik

@ -14,6 +14,8 @@ class TestStringMethods(unittest.TestCase):
def test_basic(self):
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment")
self.assertEqual(message['aprs_type'], 'position')
self.assertEqual(message['name'], "FLRDDA5BA")
self.assertEqual(message['dstcall'], "APRS")
self.assertEqual(message['receiver_name'], "LFMX")
@ -27,13 +29,12 @@ class TestStringMethods(unittest.TestCase):
self.assertAlmostEqual(message['altitude'], 5524 * FEETS_TO_METER, 5)
self.assertEqual(message['comment'], "this is a comment")
self.assertEqual(message['aprs_type'], 'position')
def test_v024(self):
# higher precision datum format introduced
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position')
self.assertAlmostEqual(message['latitude'] - 44.2568 - 1 / 30000, 2 / 1000 / 60, 10)
self.assertAlmostEqual(message['longitude'] - 6.0005, 6 / 1000 / 60, 10)
@ -42,18 +43,18 @@ class TestStringMethods(unittest.TestCase):
raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment"
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'status')
self.assertEqual(message['name'], "EPZR")
self.assertEqual(message['receiver_name'], "GLIDERN1")
self.assertEqual(message['timestamp'].strftime('%H:%M:%S'), "09:34:56")
self.assertEqual(message['comment'], "this is a comment")
self.assertEqual(message['aprs_type'], 'status')
def test_v026(self):
# from 0.2.6 the ogn comment of a receiver beacon is just optional
raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322"
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position')
self.assertEqual(message['comment'], '')
def test_v026_relay(self):
@ -61,6 +62,7 @@ class TestStringMethods(unittest.TestCase):
raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot"
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position')
self.assertEqual(message['relay'], "NAV07220E")
def test_v027_ddhhmm(self):
@ -68,13 +70,15 @@ class TestStringMethods(unittest.TestCase):
raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1"
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position')
self.assertEqual(message['timestamp'].strftime('%d %H:%M'), "30 10:46")
def test_v028_fanet_position_weather(self):
# with v0.2.8 fanet devices can report weather data
raw_message = 'FNTFC9002>OGNFNT,qAS,LSXI2:/163051h4640.33N/00752.21E_187/004g007t075h78b63620 29.0dB -8.0kHz'
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position_weather')
self.assertEqual(message['wind_direction'], 187)
self.assertEqual(message['wind_speed'], 4 * KNOTS_TO_MS / KPH_TO_MS)
self.assertEqual(message['wind_speed_peak'], 7 * KNOTS_TO_MS / KPH_TO_MS)
@ -86,8 +90,9 @@ class TestStringMethods(unittest.TestCase):
def test_v028_fanet_position_weather_empty(self):
raw_message = 'FNT010115>OGNFNT,qAS,DB7MJ:/065738h4727.72N/01012.83E_.../...g...t... 27.8dB -13.8kHz'
message = parse_aprs(raw_message)
self.assertEqual(message['aprs_type'], 'position_weather')
self.assertIsNone(message['wind_direction'])
self.assertIsNone(message['wind_speed'])
self.assertIsNone(message['wind_speed_peak'])