Introduce generic APRS pattern

pull/51/head^2
Konstantin Gründger 2018-04-10 08:33:48 +02:00
rodzic aa682a9e10
commit a8a83e960b
3 zmienionych plików z 39 dodań i 31 usunięć

Wyświetl plik

@ -2,7 +2,7 @@ import re
from datetime import datetime
from ogn.parser.utils import createTimestamp, parseAngle, kts2kmh, feet2m
from ogn.parser.pattern import PATTERN_APRS_POSITION, PATTERN_APRS_STATUS, PATTERN_APRS_SERVER
from ogn.parser.pattern import PATTERN_APRS, PATTERN_APRS_POSITION, PATTERN_APRS_STATUS, PATTERN_APRS_SERVER
from ogn.parser.exceptions import AprsParseError, OgnParseError
from ogn.parser.aprs_comment.ogn_parser import OgnParser
@ -44,33 +44,38 @@ def parse_aprs(message, reference_date, reference_time=None):
return {'comment': message,
'aprs_type': 'comment'}
match_position = re.search(PATTERN_APRS_POSITION, message)
if match_position:
return {'name': match_position.group('callsign'),
'dstcall': match_position.group('dstcall'),
'relay': match_position.group('relay') if match_position.group('relay') else None,
'receiver_name': match_position.group('receiver'),
'timestamp': createTimestamp(match_position.group('time'), reference_date, reference_time),
'latitude': parseAngle('0' + match_position.group('latitude') + (match_position.group('latitude_enhancement') or '0')) *
(-1 if match_position.group('latitude_sign') == 'S' else 1),
'symboltable': match_position.group('symbol_table'),
'longitude': parseAngle(match_position.group('longitude') + (match_position.group('longitude_enhancement') or '0')) *
(-1 if match_position.group('longitude_sign') == 'W' else 1),
'symbolcode': match_position.group('symbol'),
'track': int(match_position.group('course')) if match_position.group('course_extension') else None,
'ground_speed': int(match_position.group('ground_speed')) * kts2kmh if match_position.group('ground_speed') else None,
'altitude': int(match_position.group('altitude')) * feet2m,
'comment': match_position.group('comment') if match_position.group('comment') else "",
'aprs_type': 'position'}
match_status = re.search(PATTERN_APRS_STATUS, message)
if match_status:
return {'name': match_status.group('callsign'),
'dstcall': match_status.group('dstcall'),
'receiver_name': match_status.group('receiver'),
'timestamp': createTimestamp(match_status.group('time'), reference_date, reference_time),
'comment': match_status.group('comment') if match_status.group('comment') else "",
'aprs_type': 'status'}
match = re.search(PATTERN_APRS, message)
if match:
aprs_type = 'position' if match.group('aprs_type') == '/' else 'status'
aprs_body = match.group('aprs_body')
if aprs_type == 'position':
match_position = re.search(PATTERN_APRS_POSITION, aprs_body)
if match_position:
return {'name': match.group('callsign'),
'dstcall': match.group('dstcall'),
'relay': match.group('relay') if match.group('relay') else None,
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match_position.group('time'), reference_date, reference_time),
'latitude': parseAngle('0' + match_position.group('latitude') + (match_position.group('latitude_enhancement') or '0')) *
(-1 if match_position.group('latitude_sign') == 'S' else 1),
'symboltable': match_position.group('symbol_table'),
'longitude': parseAngle(match_position.group('longitude') + (match_position.group('longitude_enhancement') or '0')) *
(-1 if match_position.group('longitude_sign') == 'W' else 1),
'symbolcode': match_position.group('symbol'),
'track': int(match_position.group('course')) if match_position.group('course_extension') else None,
'ground_speed': int(match_position.group('ground_speed')) * kts2kmh if match_position.group('ground_speed') else None,
'altitude': int(match_position.group('altitude')) * feet2m,
'comment': match_position.group('comment') if match_position.group('comment') else "",
'aprs_type': aprs_type}
elif aprs_type == 'status':
match_status = re.search(PATTERN_APRS_STATUS, aprs_body)
if match_status:
return {'name': match.group('callsign'),
'dstcall': match.group('dstcall'),
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match_status.group('time'), reference_date, reference_time),
'comment': match_status.group('comment') if match_status.group('comment') else "",
'aprs_type': aprs_type}
raise AprsParseError(message)

Wyświetl plik

@ -1,8 +1,8 @@
import re
PATTERN_APRS_POSITION = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),((?P<relay>[A-Za-z0-9]+)\*)?.*,(?P<receiver>.+?):/(?P<time>(([0-1]\d|2[0-3])[0-5]\d[0-5]\dh|([0-2]\d|3[0-1])([0-1]\d|2[0-3])[0-5]\dz))(?P<latitude>9000\.00|[0-8]\d{3}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>18000\.00|1[0-7]\d{3}\.\d{2}|0\d{4}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>(-\d{5}|\d{6}))(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?(?:\s(?P<comment>.*))?$")
PATTERN_APRS_STATUS = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),.+,(?P<receiver>.+?):>(?P<time>\d{6}(h|z))\s(?P<comment>.*)$")
PATTERN_APRS = re.compile(r"^(?P<callsign>.+?)>(?P<dstcall>[A-Z0-9]+),((?P<relay>[A-Za-z0-9]+)\*)?.*,(?P<receiver>.+?):(?P<aprs_type>(/|>))(?P<aprs_body>.*)$")
PATTERN_APRS_POSITION = re.compile(r"^(?P<time>(([0-1]\d|2[0-3])[0-5]\d[0-5]\dh|([0-2]\d|3[0-1])([0-1]\d|2[0-3])[0-5]\dz))(?P<latitude>9000\.00|[0-8]\d{3}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>18000\.00|1[0-7]\d{3}\.\d{2}|0\d{4}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>(-\d{5}|\d{6}))(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?(?:\s(?P<comment>.*))?$")
PATTERN_APRS_STATUS = re.compile(r"^(?P<time>(([0-1]\d|2[0-3])[0-5]\d[0-5]\dh|([0-2]\d|3[0-1])([0-1]\d|2[0-3])[0-5]\dz))\s(?P<comment>.*)$")
PATTERN_APRS_SERVER = re.compile(r"^# aprsc (?P<version>[a-z0-9\.\-]+) (?P<timestamp>\d+ [A-Za-z]+ \d+ \d{2}:\d{2}:\d{2} GMT) (?P<server>[A-Z0-9]+) (?P<ip_address>\d+\.\d+\.\d+\.\d+):(?P<port>\d+)$")
PATTERN_LT24_POSITION_COMMENT = re.compile("""

Wyświetl plik

@ -90,6 +90,9 @@ class TestStringMethods(unittest.TestCase):
with self.assertRaises(AprsParseError):
parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm", reference_date=datetime(2015, 1, 1))
with self.assertRaises(AprsParseError):
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322", reference_date=datetime(2015, 1, 1))
def test_invalid_altitude(self):
with self.assertRaises(AprsParseError):
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345", reference_date=datetime(2015, 1, 1))