Use precompiled regex

pull/65/head
Konstantin Gründger 2019-06-03 21:09:57 +02:00
rodzic b0bf5c82ee
commit dab3ca3ddb
21 zmienionych plików z 66 dodań i 66 usunięć

Wyświetl plik

@ -9,10 +9,10 @@ from .base import BaseParser
class FanetParser(BaseParser):
def __init__(self):
self.beacon_type = 'fanet'
self.position_parser = re.compile(PATTERN_FANET_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_FANET_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_parser.match(aprs_comment)
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011 if ac_match.group('details') else None,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2 if ac_match.group('details') else None,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1 if ac_match.group('details') else None,

Wyświetl plik

@ -9,10 +9,10 @@ from .base import BaseParser
class FlarmParser(BaseParser):
def __init__(self):
self.beacon_type = 'flarm'
self.position_pattern = re.compile(PATTERN_FLARM_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_FLARM_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_pattern.match(aprs_comment)
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,

Wyświetl plik

@ -9,10 +9,10 @@ from .base import BaseParser
class LT24Parser(BaseParser):
def __init__(self):
self.beacon_type = 'lt24'
self.position_pattern = re.compile(PATTERN_LT24_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_LT24_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_pattern.match(aprs_comment)
return {'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * FPM_TO_MS if ac_match.group('climb_rate') else None,
'source': ac_match.group('source') if ac_match.group('source') else None}

Wyświetl plik

@ -9,10 +9,10 @@ from .base import BaseParser
class NaviterParser(BaseParser):
def __init__(self):
self.beacon_type = 'naviter'
self.position_pattern = re.compile(PATTERN_NAVITER_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
match = re.search(PATTERN_NAVITER_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
match = self.position_pattern.match(aprs_comment)
return {'stealth': (int(match.group('details'), 16) & 0b1000000000000000) >> 15 == 1,
'do_not_track': (int(match.group('details'), 16) & 0b0100000000000000) >> 14 == 1,
'aircraft_type': (int(match.group('details'), 16) & 0b0011110000000000) >> 10,

Wyświetl plik

@ -9,6 +9,8 @@ from .base import BaseParser
class OgnParser(BaseParser):
def __init__(self):
self.beacon_type = None
self.aircraft_pattern = re.compile(PATTERN_AIRCRAFT_BEACON)
self.receiver_pattern = re.compile(PATTERN_RECEIVER_BEACON)
def parse(self, aprs_comment, aprs_type):
if not aprs_comment:
@ -27,9 +29,8 @@ class OgnParser(BaseParser):
return {'user_comment': aprs_comment,
'beacon_type': 'aprs_receiver'}
@staticmethod
def parse_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
def parse_aircraft_beacon(self, aprs_comment):
ac_match = self.aircraft_pattern.match(aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
@ -51,9 +52,8 @@ class OgnParser(BaseParser):
else:
return None
@staticmethod
def parse_receiver_beacon(aprs_comment):
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
def parse_receiver_beacon(self, aprs_comment):
rec_match = self.receiver_pattern.match(aprs_comment)
if rec_match:
return {'version': rec_match.group('version'),
'platform': rec_match.group('platform'),

Wyświetl plik

@ -8,18 +8,18 @@ from .base import BaseParser
class ReceiverParser(BaseParser):
def __init__(self):
self.beacon_type = 'receiver'
self.position_pattern = re.compile(PATTERN_RECEIVER_POSITION_COMMENT)
self.status_pattern = re.compile(PATTERN_RECEIVER_STATUS_COMMENT)
@staticmethod
def parse_position(aprs_comment):
def parse_position(self, aprs_comment):
if aprs_comment is None:
return {}
else:
match = re.search(PATTERN_RECEIVER_POSITION_COMMENT, aprs_comment)
match = self.position_pattern.match(aprs_comment)
return {'user_comment': match.group('user_comment') if match.group('user_comment') else None}
@staticmethod
def parse_status(aprs_comment):
match = re.search(PATTERN_RECEIVER_STATUS_COMMENT, aprs_comment)
def parse_status(self, aprs_comment):
match = self.status_pattern.match(aprs_comment)
return {'version': match.group('version'),
'platform': match.group('platform'),
'cpu_load': float(match.group('cpu_load')),

Wyświetl plik

@ -9,9 +9,9 @@ from .base import BaseParser
class SkylinesParser(BaseParser):
def __init__(self):
self.beacon_type = 'skylines'
self.position_pattern = re.compile(PATTERN_SKYLINES_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_SKYLINES_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_pattern.match(aprs_comment)
return {'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * FPM_TO_MS if ac_match.group('climb_rate') else None}

Wyświetl plik

@ -8,10 +8,10 @@ from .base import BaseParser
class SpiderParser(BaseParser):
def __init__(self):
self.beacon_type = 'spider'
self.position_pattern = re.compile(PATTERN_SPIDER_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_SPIDER_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_pattern.match(aprs_comment)
return {'address': ac_match.group('id'),
'signal_power': int(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'spider_id': ac_match.group('spider_id') if ac_match.group('spider_id') else None,

Wyświetl plik

@ -8,10 +8,10 @@ from .base import BaseParser
class SpotParser(BaseParser):
def __init__(self):
self.beacon_type = 'spot'
self.position_pattern = re.compile(PATTERN_SPOT_POSITION_COMMENT)
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_SPOT_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
ac_match = self.position_pattern.match(aprs_comment)
return {'address': ac_match.group('id'),
'model': ac_match.group('model') if ac_match.group('model') else None,
'status': ac_match.group('status') if ac_match.group('status') else None}

Wyświetl plik

@ -10,10 +10,11 @@ from .base import BaseParser
class TrackerParser(BaseParser):
def __init__(self):
self.beacon_type = 'tracker'
self.position_pattern = re.compile(PATTERN_TRACKER_POSITION_COMMENT)
self.status_pattern = re.compile(PATTERN_TRACKER_STATUS_COMMENT)
@staticmethod
def parse_position(aprs_comment):
match = re.search(PATTERN_TRACKER_POSITION_COMMENT, aprs_comment)
def parse_position(self, aprs_comment):
match = self.position_pattern.match(aprs_comment)
return {'address_type': int(match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(match.group('details'), 16) & 0b10000000) >> 7 == 1,
@ -28,9 +29,8 @@ class TrackerParser(BaseParser):
'vertical': int(match.group('gps_quality_vertical'))} if match.group('gps_quality') else None,
'signal_power': float(match.group('signal_power')) if match.group('signal_power') else None}
@staticmethod
def parse_status(aprs_comment):
match = re.search(PATTERN_TRACKER_STATUS_COMMENT, aprs_comment)
def parse_status(self, aprs_comment):
match = self.status_pattern.match(aprs_comment)
if match:
return {'hardware_version': int(match.group('hardware_version')) if match.group('hardware_version') else None,
'software_version': int(match.group('software_version')) if match.group('software_version') else None,

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.fanet_parser import FanetParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = FanetParser.parse_position("id1E1103CE -02fpm")
message = FanetParser().parse_position("id1E1103CE -02fpm")
self.assertEqual(message['address_type'], 2)
self.assertEqual(message['aircraft_type'], 7)
@ -15,7 +15,7 @@ class TestStringMethods(unittest.TestCase):
self.assertAlmostEqual(message['climb_rate'], -2 * FPM_TO_MS, 0.1)
def test_pseudo_status_comment(self):
message = FanetParser.parse_position("")
message = FanetParser().parse_position("")
self.assertIsNone(message['address_type'])
self.assertIsNone(message['aircraft_type'])

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.flarm_parser import FlarmParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = FlarmParser.parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267")
message = FlarmParser().parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267")
self.assertEqual(message['address_type'], 1)
self.assertEqual(message['aircraft_type'], 8)

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.lt24_parser import LT24Parser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = LT24Parser.parse_position("id25387 +123fpm GPS")
message = LT24Parser().parse_position("id25387 +123fpm GPS")
self.assertEqual(message['address'], "25387")
self.assertAlmostEqual(message['climb_rate'], 123 * FPM_TO_MS, 2)

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.naviter_parser import NaviterParser
class TestStringMethods(unittest.TestCase):
def test_OGNAVI_1(self):
message = NaviterParser.parse_position("id0440042121 +123fpm +0.5rot")
message = NaviterParser().parse_position("id0440042121 +123fpm +0.5rot")
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
# bit 0: stealth mode

Wyświetl plik

@ -6,10 +6,10 @@ from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_invalid_token(self):
self.assertEqual(OgnParser.parse_aircraft_beacon("notAValidToken"), None)
self.assertEqual(OgnParser().parse_aircraft_beacon("notAValidToken"), None)
def test_basic(self):
message = OgnParser.parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser().parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertEqual(message['address_type'], 2)
self.assertEqual(message['aircraft_type'], 2)
@ -27,33 +27,33 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['proximity'][2], 'B598')
def test_stealth(self):
message = OgnParser.parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(message['stealth'])
message = OgnParser.parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser().parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertTrue(message['stealth'])
def test_v024(self):
message = OgnParser.parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
message = OgnParser().parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
self.assertEqual(message['software_version'], 6.02)
self.assertEqual(message['hardware_version'], 10)
self.assertEqual(message['real_address'], "DF0C56")
def test_v024_ogn_tracker(self):
message = OgnParser.parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
message = OgnParser().parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(message['flightlevel'], 4.43)
def test_v025(self):
message = OgnParser.parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
message = OgnParser().parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
self.assertEqual(message['signal_power'], 7.4)
def test_v026(self):
# from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID
message_triple = OgnParser.parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = OgnParser.parse_aircraft_beacon("id093D0930")
message_triple = OgnParser().parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = OgnParser().parse_aircraft_beacon("id093D0930")
self.assertIsNotNone(message_triple)
self.assertIsNotNone(message_single)

Wyświetl plik

@ -5,10 +5,10 @@ from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
self.assertEqual(OgnParser.parse_receiver_beacon("notAValidToken"), None)
self.assertEqual(OgnParser().parse_receiver_beacon("notAValidToken"), None)
def test_v021(self):
message = OgnParser.parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
message = OgnParser().parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(message['version'], "0.2.1")
self.assertEqual(message['cpu_load'], 0.8)
@ -23,17 +23,17 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['rec_input_noise'], -0.25)
def test_v022(self):
message = OgnParser.parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
message = OgnParser().parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(message['platform'], 'x86')
def test_v025(self):
message = OgnParser.parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
message = OgnParser().parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
self.assertEqual(message['voltage'], 5.016)
self.assertEqual(message['amperage'], 0.534)
self.assertEqual(message['senders_signal'], 10.8)
self.assertEqual(message['senders_messages'], 57282)
message = OgnParser.parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
self.assertEqual(message['senders_visible'], 14)
self.assertEqual(message['senders_total'], 16)
self.assertEqual(message['senders_signal'], 24.0)

Wyświetl plik

@ -5,17 +5,17 @@ from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = ReceiverParser.parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
message = ReceiverParser().parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
self.assertEqual(message['user_comment'], "Antenna: chinese, on a pylon, 20 meter above ground")
def test_position_comment_empty(self):
message = ReceiverParser.parse_position("")
message = ReceiverParser().parse_position("")
self.assertIsNotNone(message)
def test_status_comment(self):
message = ReceiverParser.parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
message = ReceiverParser().parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
self.assertEqual(message['version'], "0.2.7")
self.assertEqual(message['platform'], 'RPI-GPU')

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.skylines_parser import SkylinesParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = SkylinesParser.parse_position("id2816 -015fpm")
message = SkylinesParser().parse_position("id2816 -015fpm")
self.assertEqual(message['address'], "2816")
self.assertAlmostEqual(message['climb_rate'], -15 * FPM_TO_MS, 2)

Wyświetl plik

@ -5,7 +5,7 @@ from ogn.parser.aprs_comment.spider_parser import SpiderParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = SpiderParser.parse_position("id300234010617040 +19dB LWE 3D")
message = SpiderParser().parse_position("id300234010617040 +19dB LWE 3D")
self.assertEqual(message['address'], "300234010617040")
self.assertEqual(message['signal_power'], 19)

Wyświetl plik

@ -5,7 +5,7 @@ from ogn.parser.aprs_comment.spot_parser import SpotParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = SpotParser.parse_position("id0-2860357 SPOT3 GOOD")
message = SpotParser().parse_position("id0-2860357 SPOT3 GOOD")
self.assertEqual(message['address'], "0-2860357")
self.assertEqual(message['model'], 'SPOT3')

Wyświetl plik

@ -6,7 +6,7 @@ from ogn.parser.aprs_comment.tracker_parser import TrackerParser
class TestStringMethods(unittest.TestCase):
def test_position_comment(self):
message = TrackerParser.parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm")
message = TrackerParser().parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm")
self.assertEqual(message['address_type'], 3)
self.assertEqual(message['aircraft_type'], 1)
@ -22,7 +22,7 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['signal_power'], 12.7)
def test_status_comment(self):
message = TrackerParser.parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
message = TrackerParser().parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
self.assertEqual(message['hardware_version'], 0)
self.assertEqual(message['software_version'], 0)