Changed telnet parser from fixed size to regex

pull/66/head
Konstantin Gründger 2019-07-20 11:11:19 +02:00
rodzic 1892cce7b3
commit 7260245e15
3 zmienionych plików z 66 dodań i 34 usunięć

Wyświetl plik

@ -114,6 +114,30 @@ PATTERN_RECEIVER_STATUS_COMMENT = re.compile(r"""
)? )?
""", re.VERBOSE | re.MULTILINE) """, re.VERBOSE | re.MULTILINE)
PATTERN_TELNET_50001 = re.compile(r"""
(?P<pps_offset>\d\.\d+)sec:(?P<frequency>\d+\.\d+)MHz:\s+
(?P<aircraft_type>\d):(?P<address_type>\d):(?P<address>[A-F0-9]{6})\s
(?P<timestamp>\d{6}):\s
\[\s*(?P<latitude>[+-]\d+\.\d+),\s*(?P<longitude>[+-]\d+\.\d+)\]deg\s*
(?P<altitude>\d+)m\s*
(?P<climb_rate>[+-]\d+\.\d+)m/s\s*
(?P<ground_speed>\d+\.\d+)m/s\s*
(?P<track>\d+\.\d+)deg\s*
(?P<turn_rate>[+-]\d+\.\d+)deg/sec\s*
(?P<magic_number>\d+)\s*
(?P<gps_status>[0-9x]+)m\s*
(?P<channel>\d+)(?P<flarm_timeslot>[f_])(?P<ogn_timeslot>[o_])\s*
(?P<frequency_offset>[+-]\d+\.\d+)kHz\s*
(?P<decode_quality>\d+\.\d+)/(?P<signal_quality>\d+\.\d+)dB/(?P<demodulator_type>\d+)\s+
(?P<error_count>\d+)e\s*
(?P<distance>\d+\.\d+)km\s*
(?P<bearing>\d+\.\d+)deg\s*
(?P<phi>[+-]\d+\.\d+)deg\s*
(?P<multichannel>\+)?\s*
\?\s*
R?\s*
(B(?P<baro_altitude>\d+))?
""", re.VERBOSE | re.MULTILINE)
# The following regexp patterns are part of the ruby ogn-client. # The following regexp patterns are part of the ruby ogn-client.
# source: https://github.com/svoop/ogn_client-ruby # source: https://github.com/svoop/ogn_client-ruby

Wyświetl plik

@ -1,40 +1,43 @@
import re
from datetime import datetime from datetime import datetime
from ogn.parser import ParseError
from ogn.parser.utils import createTimestamp from ogn.parser.utils import createTimestamp
from ogn.parser.pattern import PATTERN_TELNET_50001
telnet_50001_pattern = re.compile(PATTERN_TELNET_50001)
def parse(telnet_data): def parse(telnet_data):
reference_timestamp = datetime.utcnow() reference_timestamp = datetime.utcnow()
try: match = telnet_50001_pattern.match(telnet_data)
return {'pps_offset': float(telnet_data[0:5]), if match:
'frequency': float(telnet_data[9:16]), return {'pps_offset': float(match.group('pps_offset')),
'aircraft_type': int(telnet_data[20:24]), 'frequency': float(match.group('frequency')),
'address_type': int(telnet_data[25]), 'aircraft_type': int(match.group('aircraft_type')),
'address': telnet_data[27:33], 'address_type': int(match.group('address_type')),
'timestamp': createTimestamp(telnet_data[34:40] + 'h', reference_timestamp), 'address': match.group('address'),
'latitude': float(telnet_data[43:53]), 'timestamp': createTimestamp(match.group('timestamp') + 'h', reference_timestamp),
'longitude': float(telnet_data[54:64]), 'latitude': float(match.group('latitude')),
'altitude': int(telnet_data[68:73]), 'longitude': float(match.group('longitude')),
'climb_rate': float(telnet_data[74:80]), 'altitude': int(match.group('altitude')),
'ground_speed': float(telnet_data[83:89]), 'climb_rate': float(match.group('climb_rate')),
'track': float(telnet_data[92:98]), 'ground_speed': float(match.group('ground_speed')),
'turn_rate': float(telnet_data[101:107]), 'track': float(match.group('track')),
'magic_number': int(telnet_data[114:116]), 'turn_rate': float(match.group('turn_rate')),
'gps_status': telnet_data[117:122], 'magic_number': int(match.group('magic_number')),
'channel': int(telnet_data[124:126]), 'gps_status': match.group('gps_status'),
'flarm_timeslot': telnet_data[126] == 'f', 'channel': int(match.group('channel')),
'ogn_timeslot': telnet_data[127] == 'o', 'flarm_timeslot': match.group('flarm_timeslot') == 'f',
'frequency_offset': float(telnet_data[128:134]), 'ogn_timeslot': match.group('ogn_timeslot') == 'o',
'decode_quality': float(telnet_data[137:142]), 'frequency_offset': float(match.group('frequency_offset')),
'signal_quality': float(telnet_data[143:147]), 'decode_quality': float(match.group('decode_quality')),
'demodulator_type': int(telnet_data[150:151]), 'signal_quality': float(match.group('signal_quality')),
'error_count': float(telnet_data[151:154]), 'demodulator_type': int(match.group('demodulator_type')),
'distance': float(telnet_data[155:162]), 'error_count': float(match.group('error_count')),
'bearing': float(telnet_data[164:170]), 'distance': float(match.group('distance')),
'phi': float(telnet_data[173:179]), 'bearing': float(match.group('bearing')),
'multichannel': telnet_data[183] == '+'} 'phi': float(match.group('phi')),
'multichannel': match.group('multichannel') == '+'}
except Exception: else:
raise ParseError return None

Wyświetl plik

@ -14,11 +14,11 @@ class TestStringMethods(unittest.TestCase):
parse('This is rubbish') parse('This is rubbish')
@mock.patch('ogn.parser.telnet_parser.datetime') @mock.patch('ogn.parser.telnet_parser.datetime')
def test_telnet_parse(self, datetime_mock): def test_telnet_parse_complete(self, datetime_mock):
# set the utcnow-mock near to the time in the test string # set the utcnow-mock near to the time in the test string
datetime_mock.utcnow.return_value = datetime(2015, 1, 1, 10, 0, 55) datetime_mock.utcnow.return_value = datetime(2015, 1, 1, 10, 0, 55)
message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ?') message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ? R B8949')
self.assertEqual(message['pps_offset'], 0.181) self.assertEqual(message['pps_offset'], 0.181)
self.assertEqual(message['frequency'], 868.394) self.assertEqual(message['frequency'], 868.394)
@ -48,6 +48,11 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['phi'], 1.1) self.assertEqual(message['phi'], 1.1)
self.assertEqual(message['multichannel'], True) self.assertEqual(message['multichannel'], True)
def test_telnet_parse_corrupt(self):
message = parse('0.397sec:868.407MHz: sA:1:784024 205656: [ +5.71003, +20.48951]deg 34012m +14.5m/s 109.7m/s 118.5deg +21.0deg/sec 0 27x40m 01_o +7.03kHz 17.2/27.0dB/2 12e 4719.5km 271.1deg -8.5deg ? R B34067')
self.assertIsNone(message)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()