Merge pull request #33 from Meisterschueler/fix/+parser

Fix/+parser
pull/34/head
Meisterschueler 2017-09-30 19:29:04 +02:00 zatwierdzone przez GitHub
commit cd6b5a62c9
11 zmienionych plików z 126 dodań i 58 usunięć

Wyświetl plik

@ -2,7 +2,7 @@
## Unreleased
- parser: Added support for heared aircrafts
- parser: Added support for naviter beacons
- parser: Added support for OGNSDR (receiver), OGNTRK (ogn tracker), OGNFLR (flarm) and OGNAV (Naviter) beacons
- client: Allow client to do sequential connect-disconnect
## 0.7.1 - 2017-06-05

Wyświetl plik

@ -15,27 +15,25 @@ A full featured gateway with build-in database is provided by [ogn-python](https
Parse APRS/OGN packet.
```
from ogn.parser import parse_aprs, parse_ogn_beacon
from ogn.parser import parse
from datetime import date, time
beacon = parse_aprs("FLRDDDEAD>APRS,qAS,EDER:/114500h5029.86N/00956.98E'342/049/A=005524 id0ADDDEAD -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5",
reference_date=date(2016,1,1), reference_time=time(11,46))
beacon.update(parse_ogn_beacon(beacon['comment']))
beacon = parse("FLRDDDEAD>APRS,qAS,EDER:/114500h5029.86N/00956.98E'342/049/A=005524 id0ADDDEAD -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5",
reference_date=date(2016,1,1), reference_time=time(11,46))
```
Connect to OGN and display all incoming beacons.
```
from ogn.client import AprsClient
from ogn.parser import parse_aprs, parse_ogn_beacon, ParseError
from ogn.parser import parse, ParseError
def process_beacon(raw_message):
if raw_message[0] == '#':
print('Server Status: {}'.format(raw_message))
return
try:
beacon = parse_aprs(raw_message)
beacon.update(parse_ogn_beacon(beacon['comment']))
beacon = parse(raw_message)
print('Received {beacon_type} from {name}'.format(**beacon))
except ParseError as e:
print('Error, {}'.format(e.message))

Wyświetl plik

@ -1,2 +1,3 @@
from ogn.parser.parse import parse_aprs, parse_ogn_beacon, parse_receiver_beacon, parse_aircraft_beacon # flake8: noqa
from ogn.parser import parse as parse_module # only for test functions. Without this a mock of parse would mock the function instead of the module
from ogn.parser.parse import parse, parse_aprs, parse_comment # flake8: noqa
from ogn.parser.exceptions import ParseError, AprsParseError, OgnParseError, AmbigousTimeError # flake8: noqa

Wyświetl plik

@ -17,26 +17,25 @@ from ogn.parser.parse_receiver import parse_position as parse_receiver_position
from ogn.parser.parse_receiver import parse_status as parse_receiver_status
def parse_aprs(message, reference_date=None, reference_time=None):
def parse(aprs_message, reference_date=None, reference_time=None):
if reference_date is None:
now = datetime.utcnow()
reference_date = now.date()
reference_time = now.time()
message = parse_aprs(aprs_message, reference_date, reference_time)
message.update(parse_comment(message['comment'], dstcall=message['dstcall'], aprs_type=message['aprs_type']))
return message
def parse_aprs(message, reference_date=None, reference_time=None):
match_position = re.search(PATTERN_APRS_POSITION, message)
if match_position:
if match_position.group('time_hhmmss'):
timestamp = createTimestamp(match_position.group('time_hhmmss'), reference_date, reference_time)
else:
timestamp_ddmmhh = match_position.group('time_ddmmhh')
reference_date = reference_date.replace(day=int(timestamp_ddmmhh[:2]))
timestamp = createTimestamp(timestamp_ddmmhh[2:] + '00', reference_date)
return {'name': match_position.group('callsign'),
'dstcall': match_position.group('dstcall'),
'relay': match_position.group('relay') if match_position.group('relay') else None,
'receiver_name': match_position.group('receiver'),
'timestamp': timestamp,
'timestamp': createTimestamp(match_position.group('time'), reference_date, reference_time),
'latitude': parseAngle('0' + match_position.group('latitude') + (match_position.group('latitude_enhancement') or '0')) *
(-1 if match_position.group('latitude_sign') == 'S' else 1),
'symboltable': match_position.group('symbol_table'),
@ -46,7 +45,7 @@ def parse_aprs(message, reference_date=None, reference_time=None):
'track': int(match_position.group('course')) if match_position.group('course_extension') else None,
'ground_speed': int(match_position.group('ground_speed')) * kts2kmh if match_position.group('ground_speed') else None,
'altitude': int(match_position.group('altitude')) * feet2m,
'comment': match_position.group('comment'),
'comment': match_position.group('comment') if match_position.group('comment') else "",
'aprs_type': 'position'}
match_status = re.search(PATTERN_APRS_STATUS, message)
@ -55,13 +54,13 @@ def parse_aprs(message, reference_date=None, reference_time=None):
'dstcall': match_status.group('dstcall'),
'receiver_name': match_status.group('receiver'),
'timestamp': createTimestamp(match_status.group('time'), reference_date, reference_time),
'comment': match_status.group('comment'),
'comment': match_status.group('comment') if match_status.group('comment') else "",
'aprs_type': 'status'}
raise AprsParseError(message)
def parse_ogn_beacon(aprs_comment, dstcall="APRS", aprs_type="position"):
def parse_comment(aprs_comment, dstcall="APRS", aprs_type="position"):
if dstcall == "APRS": # this can be a receiver or an aircraft
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}

Wyświetl plik

@ -4,8 +4,11 @@ from ogn.parser.pattern import PATTERN_RECEIVER_POSITION, PATTERN_RECEIVER_STATU
def parse_position(aprs_comment):
match = re.search(PATTERN_RECEIVER_POSITION, aprs_comment)
return {'user_comment': match.group('user_comment') if match.group('user_comment') else None}
if aprs_comment is None:
return {}
else:
match = re.search(PATTERN_RECEIVER_POSITION, aprs_comment)
return {'user_comment': match.group('user_comment') if match.group('user_comment') else None}
def parse_status(aprs_comment):

Wyświetl plik

@ -1,8 +1,8 @@
import re
PATTERN_APRS_POSITION = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),((?P<relay>[A-Za-z0-9]+)\*)?.*,(?P<receiver>.+?):/((?P<time_hhmmss>\d{6})h|(?P<time_ddmmhh>\d{6})z)(?P<latitude>\d{4}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>\d{5}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>\d{6})(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?(?:\s(?P<comment>.*))?$")
PATTERN_APRS_STATUS = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),.+,(?P<receiver>.+?):>(?P<time>\d{6})+h\s(?P<comment>.*)$")
PATTERN_APRS_POSITION = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),((?P<relay>[A-Za-z0-9]+)\*)?.*,(?P<receiver>.+?):/(?P<time>\d{6}(h|z))(?P<latitude>\d{4}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>\d{5}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>\d{6})(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?(?:\s(?P<comment>.*))?$")
PATTERN_APRS_STATUS = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),.+,(?P<receiver>.+?):>(?P<time>\d{6}(h|z))\s(?P<comment>.*)$")
PATTERN_NAVITER_BEACON = re.compile("""
id(?P<details>[\dA-F]{4})(?P<id>[\dA-F]{6})\s

Wyświetl plik

@ -16,8 +16,25 @@ def parseAngle(dddmmhht):
return float(dddmmhht[:3]) + float(dddmmhht[3:]) / 60
def createTimestamp(hhmmss, reference_date, reference_time=None):
packet_time = datetime.strptime(hhmmss, '%H%M%S').time()
def createTimestamp(timestamp, reference_date, reference_time=None):
if timestamp[-1] == "z":
day = int(timestamp[0:2])
hhmm = timestamp[2:6]
if reference_date.day < day:
if reference_date.month == 1:
reference_date = reference_date.replace(year=reference_date.year - 1, month=12, day=day)
else:
reference_date = reference_date.replace(month=reference_date.month - 1, day=day)
else:
reference_date = reference_date.replace(day=day)
packet_time = datetime.strptime(hhmm, '%H%M').time()
return datetime.combine(reference_date, packet_time)
elif timestamp[-1] == "h":
hhmmss = timestamp[:-1]
packet_time = datetime.strptime(hhmmss, '%H%M%S').time()
else:
raise ValueError()
if reference_time is None:
return datetime.combine(reference_date, packet_time)
else:

Wyświetl plik

@ -5,7 +5,7 @@ import os
from datetime import datetime
from time import sleep
from ogn.parser.parse import parse_aprs, parse_ogn_beacon
from ogn.parser.parse import parse
from ogn.parser.exceptions import AprsParseError
@ -14,11 +14,12 @@ class TestStringMethods(unittest.TestCase):
with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f:
for line in f:
if not line[0] == '#':
aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0))
self.assertFalse(aprs is None)
if aprs['comment']:
message = parse_ogn_beacon(aprs['comment'], dstcall=aprs['dstcall'], aprs_type=aprs['aprs_type'])
try:
message = parse(line, datetime(2015, 4, 10, 17, 0))
self.assertFalse(message is None)
self.assertEqual(message['beacon_type'], beacon_type)
except NotImplementedError as e:
print(e)
def test_aprs_aircraft_beacons(self):
self.parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aircraft_beacon')
@ -52,40 +53,36 @@ class TestStringMethods(unittest.TestCase):
def test_fail_parse_aprs_none(self):
with self.assertRaises(TypeError):
parse_aprs(None)
def test_parse_ogn_none(self):
parse_ogn_beacon(None)
parse(None)
def test_fail_empty(self):
with self.assertRaises(AprsParseError):
parse_aprs("")
parse("")
def test_fail_bad_string(self):
with self.assertRaises(AprsParseError):
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
parse("Lachens>APRS,TCPIwontbeavalidstring")
@mock.patch('ogn.parser.parse.createTimestamp')
@mock.patch('ogn.parser.parse_module.createTimestamp')
def test_default_reference_date(self, createTimestamp_mock):
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21"
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB"
parse_aprs(valid_aprs_string)
parse(valid_aprs_string)
call_args_before = createTimestamp_mock.call_args
sleep(1)
parse_aprs(valid_aprs_string)
parse(valid_aprs_string)
call_args_seconds_later = createTimestamp_mock.call_args
self.assertNotEqual(call_args_before, call_args_seconds_later)
def test_copy_constructor(self):
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
aprs = parse_aprs(valid_aprs_string, reference_date=datetime(2015, 1, 1, 16, 8, 29))
aircraft_beacon = parse_ogn_beacon(aprs['comment'])
message = parse(valid_aprs_string, reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertEqual(aprs['name'], 'FLRDDA5BA')
self.assertEqual(aircraft_beacon['address'], 'DDA5BA')
self.assertEqual(message['name'], 'FLRDDA5BA')
self.assertEqual(message['address'], 'DDA5BA')
if __name__ == '__main__':

Wyświetl plik

@ -55,7 +55,7 @@ class TestStringMethods(unittest.TestCase):
raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 8, 56, 0))
self.assertEqual(message['comment'], None)
self.assertEqual(message['comment'], '')
def test_v026_relay(self):
# beacons can be relayed

Wyświetl plik

@ -0,0 +1,41 @@
import unittest
from ogn.parser.parse_receiver import parse_position, parse_status
class TestStringMethods(unittest.TestCase):
def test_position(self):
message = parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
self.assertEqual(message['user_comment'], "Antenna: chinese, on a pylon, 20 meter above ground")
def test_position_empty(self):
message = parse_position("")
self.assertIsNotNone(message)
def test_status(self):
message = parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
self.assertEqual(message['version'], "0.2.7")
self.assertEqual(message['platform'], 'RPI-GPU')
self.assertEqual(message['cpu_load'], 0.7)
self.assertEqual(message['free_ram'], 770.2)
self.assertEqual(message['total_ram'], 968.2)
self.assertEqual(message['ntp_error'], 1.8)
self.assertEqual(message['rt_crystal_correction'], -3.3)
self.assertEqual(message['cpu_temp'], 55.7)
self.assertEqual(message['senders_visible'], 7)
self.assertEqual(message['senders_total'], 8)
self.assertEqual(message['rec_crystal_correction'], 54)
self.assertEqual(message['rec_crystal_correction_fine'], -1.1)
self.assertEqual(message['rec_input_noise'], -0.16)
self.assertEqual(message['senders_signal'], 7.1)
self.assertEqual(message['senders_messages'], 19481)
self.assertEqual(message['good_senders_signal'], 16.8)
self.assertEqual(message['good_senders'], 7)
self.assertEqual(message['good_and_bad_senders'], 13)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -9,17 +9,7 @@ class TestStringMethods(unittest.TestCase):
def test_parseAngle(self):
self.assertAlmostEqual(parseAngle('05048.30'), 50.805, 5)
def test_createTimestamp(self):
test_data = [
('000001', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000', date(2015, 1, 10), time(0, 0, 1), None), # packet 11 hours from future or 13 hours old
('123500', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
('000001', date(2015, 1, 10), None, datetime(2015, 1, 10, 0, 0, 1)), # first packet of a specific day
('235959', date(2015, 1, 10), None, datetime(2015, 1, 10, 23, 59, 59)), # last packet of a specific day
]
def proceed_test_data(self, test_data):
for test in test_data:
if test[3]:
timestamp = createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
@ -28,6 +18,28 @@ class TestStringMethods(unittest.TestCase):
with self.assertRaises(AmbigousTimeError):
createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
def test_createTimestamp_hhmmss(self):
test_data = [
('000001h', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959h', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000h', date(2015, 1, 10), time(0, 0, 1), None), # packet 11 hours from future or 13 hours old
('123500h', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001h', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
('000001h', date(2015, 1, 10), None, datetime(2015, 1, 10, 0, 0, 1)), # first packet of a specific day
('235959h', date(2015, 1, 10), None, datetime(2015, 1, 10, 23, 59, 59)), # last packet of a specific day
]
self.proceed_test_data(test_data)
def test_createTimestamp_ddhhmm(self):
test_data = [
('011212z', date(2017, 9, 28), time(0, 0, 1), datetime(2017, 9, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th,
('281313z', date(2017, 10, 1), time(0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st,
('281414z', date(2017, 1, 1), time(0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st,
]
self.proceed_test_data(test_data)
if __name__ == '__main__':
unittest.main()