Test beacon_type

pull/31/head
Konstantin Gründger 2017-09-28 08:43:50 +02:00
rodzic a57a17e644
commit 5faf78b239
5 zmienionych plików z 92 dodań i 69 usunięć

Wyświetl plik

@ -40,40 +40,27 @@ def parse_aprs(message, reference_date=None, reference_time=None):
raise AprsParseError(message)
def parse_ogn_aircraft_beacon(aprs_comment, dstcall="APRS"):
if dstcall == "APRS":
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
return None
elif dstcall == "OGLT24":
raise NotImplementedError("LT24 parser not implemented")
elif dstcall == "OGNAVI":
raise NotImplementedError("Naviter parser not implemented")
elif dstcall == "OGSKYL":
raise NotImplementedError("Skylines parser not implemented")
elif dstcall == "OGSPID":
raise NotImplementedError("Spider parser not implemented")
elif dstcall == "OGSPOT":
raise NotImplementedError("SPOT parser not implemented")
def parse_ogn_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
raise ValueError("dstcall {} unknown".format(dstcall))
return None
def parse_ogn_receiver_beacon(aprs_comment):
@ -103,18 +90,61 @@ def parse_ogn_receiver_beacon(aprs_comment):
return None
def parse_lt24_beacon(aprs_comment):
raise NotImplementedError("LT24 parser not implemented")
def parse_naviter_beacon(aprs_comment):
raise NotImplementedError("Naviter parser not implemented")
def parse_skylines_beacon(aprs_comment):
raise NotImplementedError("Skylines parser not implemented")
def parse_spider_beacon(aprs_comment):
raise NotImplementedError("Spider parser not implemented")
def parse_spot_beacon(aprs_comment):
raise NotImplementedError("SPOT parser not implemented")
def parse_ogn_beacon(aprs_comment, dstcall="APRS"):
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
if dstcall == "APRS":
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
ac_data = parse_ogn_aircraft_beacon(aprs_comment, dstcall)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
ac_data = parse_ogn_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = parse_ogn_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
raise OgnParseError(aprs_comment)
elif dstcall == "OGLT24":
ac_data = parse_lt24_beacon(aprs_comment)
ac_data.update({'beacon_type': 'lt24_beacon'})
return ac_data
rc_data = parse_ogn_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
raise OgnParseError(aprs_comment)
elif dstcall == "OGNAVI":
ac_data = parse_naviter_beacon(aprs_comment)
ac_data.update({'beacon_type': 'naviter_beacon'})
return ac_data
elif dstcall == "OGSKYL":
ac_data = parse_skylines_beacon(aprs_comment)
ac_data.update({'beacon_type': 'skylines_beacon'})
return ac_data
elif dstcall == "OGSPID":
ac_data = parse_spider_beacon(aprs_comment)
ac_data.update({'beacon_type': 'spider_beacon'})
return ac_data
elif dstcall == "OGSPOT":
ac_data = parse_spot_beacon(aprs_comment)
ac_data.update({'beacon_type': 'spot_beacon'})
return ac_data
else:
raise ValueError("dstcall {} unknown".format(dstcall))

Wyświetl plik

@ -1,13 +1,13 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse import parse_ogn_aircraft_beacon
from ogn.parser.parse import parse_naviter_beacon
class TestStringMethods(unittest.TestCase):
def test_OGNAVI_1(self):
message = "id0440042121 +000fpm +0.5rot"
naviter_message = parse_ogn_aircraft_beacon(message, dstcall='OGNAVI')
naviter_message = parse_naviter_beacon(message)
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
# bit 0: stealth mode

Wyświetl plik

@ -10,32 +10,36 @@ from ogn.parser.exceptions import AprsParseError
class TestStringMethods(unittest.TestCase):
def parse_valid_beacon_data_file(self, filename):
def parse_valid_beacon_data_file(self, filename, beacon_type):
with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f:
for line in f:
if not line[0] == '#':
aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0))
self.assertFalse(aprs is None)
if aprs['comment']:
parse_ogn_beacon(aprs['comment'], dstcall=aprs['dstcall'])
message = parse_ogn_beacon(aprs['comment'], dstcall=aprs['dstcall'])
self.assertEqual(message['beacon_type'], beacon_type)
def test_aprs_beacons(self):
self.parse_valid_beacon_data_file('aprs.txt')
def test_aprs_aircraft_beacons(self):
self.parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aircraft_beacon')
def test_aprs_receiver_beacons(self):
self.parse_valid_beacon_data_file(filename='aprs_receiver.txt', beacon_type='receiver_beacon')
def test_lt24_beacons(self):
self.parse_valid_beacon_data_file('lt24.txt')
self.parse_valid_beacon_data_file(filename='lt24.txt', beacon_type='lt24_beacon')
def test_naviter_beacons(self):
self.parse_valid_beacon_data_file('naviter.txt')
self.parse_valid_beacon_data_file(filename='naviter.txt', beacon_type='naviter_beacon')
def test_skylines_beacons(self):
self.parse_valid_beacon_data_file('skylines.txt')
self.parse_valid_beacon_data_file(filename='skylines.txt', beacon_type='skylines_beacon')
def test_spider_beacons(self):
self.parse_valid_beacon_data_file('spider.txt')
self.parse_valid_beacon_data_file(filename='spider.txt', beacon_type='spider_beacon')
def test_spot_beacons(self):
self.parse_valid_beacon_data_file('spot.txt')
self.parse_valid_beacon_data_file(filename='spot.txt', beacon_type='spot_beacon')
def test_fail_parse_aprs_none(self):
with self.assertRaises(TypeError):

Wyświetl plik

@ -3,18 +3,12 @@
#
Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB
LFGU>APRS,TCPIP*,qAC,GLIDERN2:/165556h4907.63NI00706.41E&/A=000833 v0.2.0 CPU:0.9 RAM:281.3/458.9MB NTP:0.5ms/-19.1ppm +53.0C RF:+0.70dB
FLRDDB091>APRS,qAS,Letzi:/165831h4740.04N/00806.01EX152/124/A=004881 id06DD8E80 +198fpm +0.0rot 6.5dB 13e +4.0kHz gps3x4
LSGS>APRS,TCPIP*,qAC,GLIDERN1:/165345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB
FLRDDDD33>APRS,qAS,LFNF:/165341h4344.27N/00547.41E'/A=000886 id06DDDD33 +020fpm +0.0rot 20.8dB 0e -14.3kHz gps3x4
FLRDDE026>APRS,qAS,LFNF:/165341h4358.58N/00553.89E'204/055/A=005048 id06DDE026 +257fpm +0.1rot 7.2dB 0e -0.8kHz gps4x7
ICA484A9C>APRS,qAS,LFMX:/165341h4403.50N/00559.67E'/A=001460 id05484A9C +000fpm +0.0rot 18.0dB 0e +3.5kHz gps4x7
WolvesSW>APRS,TCPIP*,qAC,GLIDERN2:/165343h5232.23NI00210.91W&/A=000377 CPU:1.5 RAM:159.9/458.7MB NTP:6.6ms/-36.7ppm +45.5C RF:+130-0.4ppm/-0.1dB
Oxford>APRS,TCPIP*,qAC,GLIDERN1:/165533h5142.96NI00109.68W&/A=000380 v0.1.3 CPU:0.9 RAM:268.8/458.6MB NTP:0.5ms/-45.9ppm +60.5C RF:+55+2.9ppm/+1.54dB
OGNE95A16>APRS,qAS,Sylwek:/165641h5001.94N/01956.91E'270/004/A=000000 id07E95A16 +000fpm +0.1rot 37.8dB 0e -0.4kHz
Salland>APRS,TCPIP*,qAC,GLIDERN2:/165426h5227.93NI00620.03E&/A=000049 v0.2.2 CPU:0.7 RAM:659.3/916.9MB NTP:2.5ms/-75.0ppm RF:+0.41dB
LSGS>APRS,TCPIP*,qAC,GLIDERN1:/165345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB
Drenstein>APRS,TCPIP*,qAC,GLIDERN1:/165011h5147.51NI00744.45E&/A=000213 v0.2.2 CPU:0.8 RAM:695.7/4025.5MB NTP:16000.0ms/+0.0ppm +63.0C
ZK-GSC>APRS,qAS,Omarama:/165202h4429.25S/16959.33E'/A=001407 id05C821EA +020fpm +0.0rot 16.8dB 0e -3.1kHz gps1x3 hear1084 hearB597 hearB598
#
# since 0.2.5 for receiver information not only the "aprs position" format is used but also the "aprs status" format (without lat/lon/alt informations)
Cordoba>APRS,TCPIP*,qAC,GLIDERN3:/194847h3112.85SI06409.56W&/A=001712 v0.2.5.ARM CPU:0.4 RAM:755.4/970.8MB NTP:6.7ms/-0.1ppm +45.5C RF:+48+18.3ppm/+3.45dB
@ -28,10 +22,5 @@ CNF3a>APRS,TCPIP*,qAC,GLIDERN3:>042143h v0.2.5.ARM CPU:0.6 RAM:514.6/970.8MB NTP
VITACURA2>APRS,TCPIP*,qAC,GLIDERN3:/042136h3322.81SI07034.95W&/A=002345 v0.2.5.ARM CPU:0.3 RAM:695.0/970.5MB NTP:0.6ms/-5.7ppm +51.5C RF:+0-0.0ppm/+1.32dB
VITACURA2>APRS,TCPIP*,qAC,GLIDERN3:>042136h v0.2.5.ARM CPU:0.3 RAM:695.0/970.5MB NTP:0.6ms/-5.7ppm +52.1C 0/0Acfts[1h] RF:+0-0.0ppm/+1.32dB/+2.1dB@10km[193897]/+9.0dB@10km[10/20]
#
# since 0.2.6
# ... the ogn comment of a receiver beacon is just optional
# since 0.2.6 the ogn comment of a receiver beacon is just optional
Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322
#
# ... and a aircraft beacon needs just an ID, climb rate and turn rate or just the ID
ICA3ECE59>APRS,qAS,GLDRTR:/171254h5144.78N/00616.67E'263/000/A=000075 id093D0930 +000fpm +0.0rot
ICA3ECE59>APRS,qAS,GLDRTR:/171254h5144.78N/00616.67E'263/000/A=000075 id053ECE59

Wyświetl plik

@ -4,4 +4,4 @@
NAV042121>OGNAVI,qAS,NAVITER:/140648h4550.36N/01314.85E'090/152/A=001086 !W47! id0440042121 +000fpm +0.5rot
NAV04220E>OGNAVI,qAS,NAVITER:/140748h4552.27N/01155.61E'090/012/A=006562 !W81! id044004220E +060fpm +1.2rot
NAV07220E>OGNAVI,qAS,NAVITER:/125447h4557.77N/01220.19E'258/056/A=006562 !W76! id1C4007220E +180fpm +0.0rot
FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot
FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot