parser: distinguish between different dstcalls

pull/31/head
Konstantin Gründger 2017-09-27 23:04:47 +02:00
rodzic a36d2a76ff
commit e63b746504
3 zmienionych plików z 80 dodań i 23 usunięć

Wyświetl plik

@ -40,27 +40,40 @@ def parse_aprs(message, reference_date=None, reference_time=None):
raise AprsParseError(message)
def parse_ogn_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
def parse_ogn_aircraft_beacon(aprs_comment, dstcall="APRS"):
if dstcall == "APRS":
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
return None
elif dstcall == "OGLT24":
raise NotImplementedError("LT24 parser not implemented")
elif dstcall == "OGNAVI":
raise NotImplementedError("Naviter parser not implemented")
elif dstcall == "OGSKYL":
raise NotImplementedError("Skylines parser not implemented")
elif dstcall == "OGSPID":
raise NotImplementedError("Spider parser not implemented")
elif dstcall == "OGSPOT":
raise NotImplementedError("SPOT parser not implemented")
else:
return None
raise ValueError("dstcall {} unknown".format(dstcall))
def parse_ogn_receiver_beacon(aprs_comment):
@ -90,11 +103,11 @@ def parse_ogn_receiver_beacon(aprs_comment):
return None
def parse_ogn_beacon(aprs_comment):
def parse_ogn_beacon(aprs_comment, dstcall="APRS"):
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
ac_data = parse_ogn_aircraft_beacon(aprs_comment)
ac_data = parse_ogn_aircraft_beacon(aprs_comment, dstcall)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data

Wyświetl plik

@ -0,0 +1,44 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse import parse_aprs, parse_ogn_aircraft_beacon
class TestStringMethods(unittest.TestCase):
def test_basic(self):
message = parse_aprs("NAV042121>OGNAVI,qAS,NAVITER:/140648h4550.36N/01314.85E'090/152/A=001086 !W47! id0440042121 +000fpm +0.5rot")
self.assertEqual(message['name'], "NAV042121")
self.assertEqual(message['dstcall'], "OGNAVI")
self.assertEqual(message['receiver_name'], "NAVITER")
# ... APRS format is tested enough
self.assertEqual(message['comment'], "id0440042121 +000fpm +0.5rot")
naviter_message = parse_ogn_aircraft_beacon(message['comment'], dstcall=message['dstcall'])
# NAVITER specific values
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
# bit 0: stealth mode
# bit 1: do not track mode
# bits 2-5: aircraft type
# bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers)
# bits 12-15: reserved for use at a later time
# bits 16-39: device id (24-bit device identifier, same as in APRS header)
self.assertEqual(naviter_message['stealth'], False)
self.assertEqual(naviter_message['do_not_track'], False)
self.assertEqual(naviter_message['aircraft_type'], 1)
self.assertEqual(naviter_message['address_type'], 4)
self.assertEqual(naviter_message['reserved'], 0)
self.assertEqual(naviter_message['address'], "042121")
self.assertAlmostEqual(naviter_message['climb_rate'] * ms2fpm, 0, 2)
self.assertEqual(naviter_message['turn_rate'], 0.5)
def test_relayed(self):
message = parse_aprs("FLRFFFFFF>OGNAVI,NAVABCDEF*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot")
self.assertEqual(message['name'], "FLRFFFFFF")
self.assertEqual(message['dstcall'], "OGNAVI")
self.assertEqual(message['relay_id'], "NAVABCDEF")
self.assertEqual(message['receiver_name'], "NAVITER")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -17,7 +17,7 @@ class TestStringMethods(unittest.TestCase):
aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0))
self.assertFalse(aprs is None)
if aprs['comment']:
parse_ogn_beacon(aprs['comment'])
parse_ogn_beacon(aprs['comment'], dstcall=aprs['dstcall'])
def test_aprs_beacons(self):
self.parse_valid_beacon_data_file('aprs.txt')