pull/43/head
Konstantin Gründger 2017-10-05 10:36:53 +02:00
rodzic 2ccdcd90be
commit 85b7ae5adb
25 zmienionych plików z 153 dodań i 145 usunięć

Wyświetl plik

@ -0,0 +1,19 @@
class BaseParser():
def __init__(self):
self.beacon_type = 'unknown'
def parse(self, aprs_comment, aprs_type):
if aprs_type == "position":
data = self.parse_position(aprs_comment)
elif aprs_type == "status":
data = self.parse_status(aprs_comment)
else:
raise ValueError("aprs_type {} unknown".format(aprs_type))
data.update({'beacon_type': self.beacon_type})
return data
def parse_position(self, aprs_comment):
raise NotImplementedError("Position parser for parser '{}' not yet implemented".format(self.beacon_type))
def parse_status(self, aprs_comment):
raise NotImplementedError("Status parser for parser '{}' not yet implemented".format(self.beacon_type))

Wyświetl plik

@ -0,0 +1,31 @@
import re
from ogn.parser.pattern import PATTERN_AIRCRAFT_BEACON
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class FlarmParser(BaseParser):
def __init__(self):
self.beacon_type = 'aircraft_beacon'
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class LT24Parser(BaseParser):
def __init__(self):
self.beacon_type = 'lt24_beacon'

Wyświetl plik

@ -1,12 +1,14 @@
import re
from ogn.parser.baseparser import BaseParser
from ogn.parser.pattern import PATTERN_NAVITER_BEACON
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class OGNAVI(BaseParser):
beacon_type = 'naviter_beacon'
class NaviterParser(BaseParser):
def __init__(self):
self.beacon_type = 'naviter_beacon'
@staticmethod
def parse_position(aprs_comment):

Wyświetl plik

@ -2,21 +2,24 @@ import re
from ogn.parser.utils import fpm2ms
from ogn.parser.pattern import PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
from ogn.parser.baseparser import BaseParser
from .base import BaseParser
class APRS(BaseParser):
@staticmethod
def parse(aprs_comment, aprs_type):
class OgnParser(BaseParser):
def __init__(self):
self.beacon_type = 'depends...'
def parse(self, aprs_comment, aprs_type):
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
ac_data = APRS.parse_aircraft_beacon(aprs_comment)
ac_data = self.parse_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = APRS.parse_receiver_beacon(aprs_comment)
rc_data = self.parse_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data

Wyświetl plik

@ -1,10 +1,14 @@
import re
from ogn.parser.baseparser import BaseParser
from ogn.parser.pattern import PATTERN_RECEIVER_POSITION, PATTERN_RECEIVER_STATUS
from .base import BaseParser
class ReceiverParser(BaseParser):
def __init__(self):
self.beacon_type = 'receiver_beacon'
class OGNSDR(BaseParser):
@staticmethod
def parse_position(aprs_comment):
if aprs_comment is None:

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SkylinesParser(BaseParser):
def __init__(self):
self.beacon_type = 'skylines_beacon'

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SpiderParser(BaseParser):
def __init__(self):
self.beacon_type = 'spider_beacon'

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SpotParser(BaseParser):
def __init__(self):
self.beacon_type = 'spot_beacon'

Wyświetl plik

@ -1,12 +1,14 @@
import re
from ogn.parser.baseparser import BaseParser
from ogn.parser.pattern import PATTERN_TRACKER_BEACON_POSITION, PATTERN_TRACKER_BEACON_STATUS
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class OGNTRK(BaseParser):
beacon_type = "aircraft_beacon"
class TrackerParser(BaseParser):
def __init__(self):
self.beacon_type = "aircraft_beacon"
@staticmethod
def parse_position(aprs_comment):

Wyświetl plik

@ -1,21 +0,0 @@
class BaseParser():
beacon_type = "undefined"
@staticmethod
def parse(aprs_comment, aprs_type):
if aprs_type == "position":
data = __class__.parse_position(aprs_comment)
elif aprs_type == "status":
data = __class__.parse_status(aprs_comment)
else:
raise ValueError("aprs_type {} unknown".format(aprs_type))
data.update({'beacon_type': __class__.beacon_type})
return data
@staticmethod
def parse_position(aprs_comment):
raise NotImplementedError("Position parser for parser '{}' not yet implemented".format(__class__.__name__))
@staticmethod
def parse_status(aprs_comment):
raise NotImplementedError("Status parser for parser '{}' not yet implemented".format(__class__.__name__))

Wyświetl plik

@ -5,15 +5,15 @@ from ogn.parser.utils import createTimestamp, parseAngle, kts2kmh, feet2m
from ogn.parser.pattern import PATTERN_APRS_POSITION, PATTERN_APRS_STATUS
from ogn.parser.exceptions import AprsParseError, OgnParseError
from ogn.parser.parse_ogn import APRS
from ogn.parser.parse_lt24 import OGLT24
from ogn.parser.parse_naviter import OGNAVI
from ogn.parser.parse_flarm import OGFLR
from ogn.parser.parse_tracker import OGNTRK
from ogn.parser.parse_receiver import OGNSDR
from ogn.parser.parse_skylines import OGSKYL
from ogn.parser.parse_spider import OGSPID
from ogn.parser.parse_spot import OGSPOT
from ogn.parser.aprs_comment.ogn_parser import OgnParser
from ogn.parser.aprs_comment.lt24_parser import LT24Parser
from ogn.parser.aprs_comment.naviter_parser import NaviterParser
from ogn.parser.aprs_comment.flarm_parser import FlarmParser
from ogn.parser.aprs_comment.tracker_parser import TrackerParser
from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
from ogn.parser.aprs_comment.skylines_parser import SkylinesParser
from ogn.parser.aprs_comment.spider_parser import SpiderParser
from ogn.parser.aprs_comment.spot_parser import SpotParser
def parse(aprs_message, reference_date=None, reference_time=None):
@ -59,24 +59,21 @@ def parse_aprs(message, reference_date, reference_time=None):
raise AprsParseError(message)
def parse_comment(aprs_comment, dstcall="APRS", aprs_type="position"):
if dstcall == "APRS": # this can be a receiver or an aircraft
return APRS.parse(aprs_comment, aprs_type)
elif dstcall == "OGFLR":
return OGFLR.parse(aprs_comment, aprs_type)
elif dstcall == "OGNTRK":
return OGNTRK.parse(aprs_comment, aprs_type)
elif dstcall == "OGNSDR":
return OGNSDR.parse(aprs_comment, aprs_type)
elif dstcall == "OGLT24":
return OGLT24.parse(aprs_comment, aprs_type)
elif dstcall == "OGNAVI":
return OGNAVI.parse(aprs_comment, aprs_type)
elif dstcall == "OGSKYL":
return OGSKYL.parse(aprs_comment, aprs_type)
elif dstcall == "OGSPID":
return OGSPID.parse(aprs_comment, aprs_type)
elif dstcall == "OGSPOT":
return OGSPOT.parse(aprs_comment, aprs_type)
dstcall_parser_mapping = {'APRS': OgnParser(),
'OGFLR': FlarmParser(),
'OGNTRK': TrackerParser(),
'OGNSDR': ReceiverParser(),
'OGLT24': LT24Parser(),
'OGNAVI': NaviterParser(),
'OGSKYL': SkylinesParser(),
'OGSPID': SpiderParser(),
'OGSPOT': SpotParser(),
}
def parse_comment(aprs_comment, dstcall='APRS', aprs_type="position"):
parser = dstcall_parser_mapping.get(dstcall)
if parser:
return parser.parse(aprs_comment, aprs_type)
else:
raise OgnParseError("No parser for dstcall {} found. APRS comment: {}".format(dstcall, aprs_comment))

Wyświetl plik

@ -1,33 +0,0 @@
import re
from ogn.parser.baseparser import BaseParser
from ogn.parser.pattern import PATTERN_AIRCRAFT_BEACON
from ogn.parser.utils import fpm2ms
class OGFLR(BaseParser):
def __init__(self):
self.beacon_type = 'aircraft_beacon'
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
return None

Wyświetl plik

@ -1,5 +0,0 @@
from ogn.parser.baseparser import BaseParser
class OGLT24(BaseParser):
beacon_type = 'lt24_beacon'

Wyświetl plik

@ -1,5 +0,0 @@
from ogn.parser.baseparser import BaseParser
class OGSKYL(BaseParser):
beacon_type = 'skylines_beacon'

Wyświetl plik

@ -1,5 +0,0 @@
from ogn.parser.baseparser import BaseParser
class OGSPID(BaseParser):
beacon_type = 'spider_beacon'

Wyświetl plik

@ -1,5 +0,0 @@
from ogn.parser.baseparser import BaseParser
class OGSPOT(BaseParser):
beacon_type = 'spot_beacon'

Wyświetl plik

@ -1,13 +1,13 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_lt24 import OGLT24
from ogn.parser.aprs_comment.lt24_parser import LT24Parser
class TestStringMethods(unittest.TestCase):
@unittest.skip("Not yet implemented")
def test(self):
message = OGLT24.parse_position("id25387 +000fpm GPS")
message = LT24Parser.parse_position("id25387 +000fpm GPS")
self.assertEqual(message['id'], 25387)
self.assertAlmostEqual(message['climb_rate'] * ms2fpm, 0, 2)

Wyświetl plik

@ -1,12 +1,12 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_naviter import OGNAVI
from ogn.parser.aprs_comment.naviter_parser import NaviterParser
class TestStringMethods(unittest.TestCase):
def test_OGNAVI_1(self):
message = OGNAVI.parse_position("id0440042121 +123fpm +0.5rot")
message = NaviterParser.parse_position("id0440042121 +123fpm +0.5rot")
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
# bit 0: stealth mode

Wyświetl plik

@ -1,15 +1,15 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_ogn import APRS
from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_invalid_token(self):
self.assertEqual(APRS.parse_aircraft_beacon("notAValidToken"), None)
self.assertEqual(OgnParser.parse_aircraft_beacon("notAValidToken"), None)
def test_basic(self):
message = APRS.parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertEqual(message['address_type'], 2)
self.assertEqual(message['aircraft_type'], 2)
@ -27,33 +27,33 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['proximity'][2], 'B598')
def test_stealth(self):
message = APRS.parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(message['stealth'])
message = APRS.parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertTrue(message['stealth'])
def test_v024(self):
message = APRS.parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
message = OgnParser.parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
self.assertEqual(message['software_version'], 6.02)
self.assertEqual(message['hardware_version'], 10)
self.assertEqual(message['real_address'], "DF0C56")
def test_v024_ogn_tracker(self):
message = APRS.parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
message = OgnParser.parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(message['flightlevel'], 4.43)
def test_v025(self):
message = APRS.parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
message = OgnParser.parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
self.assertEqual(message['signal_power'], 7.4)
def test_v026(self):
# from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID
message_triple = APRS.parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = APRS.parse_aircraft_beacon("id093D0930")
message_triple = OgnParser.parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = OgnParser.parse_aircraft_beacon("id093D0930")
self.assertIsNotNone(message_triple)
self.assertIsNotNone(message_single)

Wyświetl plik

@ -1,14 +1,14 @@
import unittest
from ogn.parser.parse_ogn import APRS
from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
self.assertEqual(APRS.parse_receiver_beacon("notAValidToken"), None)
self.assertEqual(OgnParser.parse_receiver_beacon("notAValidToken"), None)
def test_v021(self):
message = APRS.parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
message = OgnParser.parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(message['version'], "0.2.1")
self.assertEqual(message['cpu_load'], 0.8)
@ -23,17 +23,17 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['rec_input_noise'], -0.25)
def test_v022(self):
message = APRS.parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
message = OgnParser.parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(message['platform'], 'x86')
def test_v025(self):
message = APRS.parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
message = OgnParser.parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
self.assertEqual(message['voltage'], 5.016)
self.assertEqual(message['amperage'], 0.534)
self.assertEqual(message['senders_signal'], 10.8)
self.assertEqual(message['senders_messages'], 57282)
message = APRS.parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
message = OgnParser.parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
self.assertEqual(message['senders_visible'], 14)
self.assertEqual(message['senders_total'], 16)
self.assertEqual(message['senders_signal'], 24.0)

Wyświetl plik

@ -1,21 +1,21 @@
import unittest
from ogn.parser.parse_receiver import OGNSDR
from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
class TestStringMethods(unittest.TestCase):
def test_position(self):
message = OGNSDR.parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
message = ReceiverParser.parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
self.assertEqual(message['user_comment'], "Antenna: chinese, on a pylon, 20 meter above ground")
def test_position_empty(self):
message = OGNSDR.parse_position("")
message = ReceiverParser.parse_position("")
self.assertIsNotNone(message)
def test_status(self):
message = OGNSDR.parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
message = ReceiverParser.parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
self.assertEqual(message['version'], "0.2.7")
self.assertEqual(message['platform'], 'RPI-GPU')

Wyświetl plik

@ -1,12 +1,12 @@
import unittest
from ogn.parser.parse_spot import OGSPOT
from ogn.parser.aprs_comment.spot_parser import SpotParser
class TestStringMethods(unittest.TestCase):
@unittest.skip("Not yet implemented")
def test(self):
message = OGSPOT.parse_position("id0-2860357 SPOT3 GOOD")
message = SpotParser.parse_position("id0-2860357 SPOT3 GOOD")
self.assertEqual(message['id'], "0-2860357")
self.assertEqual(message['hw_version'], 3)

Wyświetl plik

@ -1,12 +1,12 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_tracker import OGNTRK
from ogn.parser.aprs_comment.tracker_parser import TrackerParser
class TestStringMethods(unittest.TestCase):
def test_position_beacon(self):
message = OGNTRK.parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5")
message = TrackerParser.parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5")
self.assertEqual(message['address_type'], 3)
self.assertEqual(message['aircraft_type'], 1)
@ -21,7 +21,7 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['gps_status'], '3x5')
def test_status(self):
message = OGNTRK.parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
message = TrackerParser.parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
self.assertEqual(message['hardware_version'], 0)
self.assertEqual(message['software_version'], 0)