From e63b74650406cc49fd168766d46387ba1f6aa454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Wed, 27 Sep 2017 23:04:47 +0200 Subject: [PATCH] parser: distinguish between different dstcalls --- ogn/parser/parse.py | 57 ++++++++++++++++++----------- tests/parser/test_beacon_naviter.py | 44 ++++++++++++++++++++++ tests/parser/test_parse.py | 2 +- 3 files changed, 80 insertions(+), 23 deletions(-) create mode 100644 tests/parser/test_beacon_naviter.py diff --git a/ogn/parser/parse.py b/ogn/parser/parse.py index c5d9e61..9df6746 100644 --- a/ogn/parser/parse.py +++ b/ogn/parser/parse.py @@ -40,27 +40,40 @@ def parse_aprs(message, reference_date=None, reference_time=None): raise AprsParseError(message) -def parse_ogn_aircraft_beacon(aprs_comment): - ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment) - if ac_match: - return {'address_type': int(ac_match.group('details'), 16) & 0b00000011, - 'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2, - 'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1, - 'address': ac_match.group('id'), - 'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None, - 'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None, - 'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None, - 'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None, - 'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None, - 'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None, - 'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None, - 'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None, - 'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None, - 'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None, - 'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None, - 'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None} +def parse_ogn_aircraft_beacon(aprs_comment, dstcall="APRS"): + if dstcall == "APRS": + ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment) + if ac_match: + return {'address_type': int(ac_match.group('details'), 16) & 0b00000011, + 'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2, + 'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1, + 'address': ac_match.group('id'), + 'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None, + 'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None, + 'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None, + 'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None, + 'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None, + 'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None, + 'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None, + 'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None, + 'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None, + 'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None, + 'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None, + 'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None} + else: + return None + elif dstcall == "OGLT24": + raise NotImplementedError("LT24 parser not implemented") + elif dstcall == "OGNAVI": + raise NotImplementedError("Naviter parser not implemented") + elif dstcall == "OGSKYL": + raise NotImplementedError("Skylines parser not implemented") + elif dstcall == "OGSPID": + raise NotImplementedError("Spider parser not implemented") + elif dstcall == "OGSPOT": + raise NotImplementedError("SPOT parser not implemented") else: - return None + raise ValueError("dstcall {} unknown".format(dstcall)) def parse_ogn_receiver_beacon(aprs_comment): @@ -90,11 +103,11 @@ def parse_ogn_receiver_beacon(aprs_comment): return None -def parse_ogn_beacon(aprs_comment): +def parse_ogn_beacon(aprs_comment, dstcall="APRS"): if not aprs_comment: return {'beacon_type': 'receiver_beacon'} - ac_data = parse_ogn_aircraft_beacon(aprs_comment) + ac_data = parse_ogn_aircraft_beacon(aprs_comment, dstcall) if ac_data: ac_data.update({'beacon_type': 'aircraft_beacon'}) return ac_data diff --git a/tests/parser/test_beacon_naviter.py b/tests/parser/test_beacon_naviter.py new file mode 100644 index 0000000..f66527b --- /dev/null +++ b/tests/parser/test_beacon_naviter.py @@ -0,0 +1,44 @@ +import unittest + +from ogn.parser.utils import ms2fpm +from ogn.parser.parse import parse_aprs, parse_ogn_aircraft_beacon + + +class TestStringMethods(unittest.TestCase): + def test_basic(self): + message = parse_aprs("NAV042121>OGNAVI,qAS,NAVITER:/140648h4550.36N/01314.85E'090/152/A=001086 !W47! id0440042121 +000fpm +0.5rot") + self.assertEqual(message['name'], "NAV042121") + self.assertEqual(message['dstcall'], "OGNAVI") + self.assertEqual(message['receiver_name'], "NAVITER") + # ... APRS format is tested enough + self.assertEqual(message['comment'], "id0440042121 +000fpm +0.5rot") + + naviter_message = parse_ogn_aircraft_beacon(message['comment'], dstcall=message['dstcall']) + # NAVITER specific values + # id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001 + # bit 0: stealth mode + # bit 1: do not track mode + # bits 2-5: aircraft type + # bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers) + # bits 12-15: reserved for use at a later time + # bits 16-39: device id (24-bit device identifier, same as in APRS header) + self.assertEqual(naviter_message['stealth'], False) + self.assertEqual(naviter_message['do_not_track'], False) + self.assertEqual(naviter_message['aircraft_type'], 1) + self.assertEqual(naviter_message['address_type'], 4) + self.assertEqual(naviter_message['reserved'], 0) + self.assertEqual(naviter_message['address'], "042121") + + self.assertAlmostEqual(naviter_message['climb_rate'] * ms2fpm, 0, 2) + self.assertEqual(naviter_message['turn_rate'], 0.5) + + def test_relayed(self): + message = parse_aprs("FLRFFFFFF>OGNAVI,NAVABCDEF*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot") + + self.assertEqual(message['name'], "FLRFFFFFF") + self.assertEqual(message['dstcall'], "OGNAVI") + self.assertEqual(message['relay_id'], "NAVABCDEF") + self.assertEqual(message['receiver_name'], "NAVITER") + +if __name__ == '__main__': + unittest.main() diff --git a/tests/parser/test_parse.py b/tests/parser/test_parse.py index a66b1b8..76cb75b 100644 --- a/tests/parser/test_parse.py +++ b/tests/parser/test_parse.py @@ -17,7 +17,7 @@ class TestStringMethods(unittest.TestCase): aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0)) self.assertFalse(aprs is None) if aprs['comment']: - parse_ogn_beacon(aprs['comment']) + parse_ogn_beacon(aprs['comment'], dstcall=aprs['dstcall']) def test_aprs_beacons(self): self.parse_valid_beacon_data_file('aprs.txt')