Move parsing from 'ogn.model.*' to 'ogn.parser'

Use named capture groups for parsing.
Regular expressions for receiver and aircraft beacons are from ruby ogn-client.
The separated regular expressions for parsing of aircraft beacons were
concatenated to harmonize with other ogn clients (at zero costs,
currently all packets stick to the common token sequence).

regexp for receiver and aircraft beacons:
Copyright (c) 2015 Sven Schwyn; licensed under The MIT License
pull/43/head
Fabian P. Schmidt 2016-02-18 22:11:33 +01:00
rodzic 1b8f27331e
commit 6a31a648cb
13 zmienionych plików z 267 dodań i 353 usunięć

Wyświetl plik

@ -2,6 +2,7 @@
## Unreleased
- Moved exceptions from `ogn.exceptions` to `ogn.parser.exceptions`
- Moved parsing from `ogn.model.*` to `ogn.parser`
## 0.2.1 - 2016-02-17
First release via PyPi.

Wyświetl plik

@ -1,10 +1,6 @@
import re
from sqlalchemy import Column, String, Integer, Float, Boolean, SmallInteger
from ogn.parser.utils import fpm2ms
from .beacon import Beacon
from ogn.parser.exceptions import OgnParseError
class AircraftBeacon(Beacon):
@ -35,103 +31,6 @@ class AircraftBeacon(Beacon):
flight_state = Column(SmallInteger)
# Pattern
address_pattern = re.compile(r"id(\S{2})(\S{6})")
climb_rate_pattern = re.compile(r"([\+\-]\d+)fpm")
turn_rate_pattern = re.compile(r"([\+\-]\d+\.\d+)rot")
signal_strength_pattern = re.compile(r"(\d+\.\d+)dB")
error_count_pattern = re.compile(r"(\d+)e")
coordinates_extension_pattern = re.compile(r"\!W(.)(.)!")
hear_address_pattern = re.compile(r"hear(\w{4})")
frequency_offset_pattern = re.compile(r"([\+\-]\d+\.\d+)kHz")
gps_status_pattern = re.compile(r"gps(\d+x\d+)")
software_version_pattern = re.compile(r"s(\d+\.\d+)")
hardware_version_pattern = re.compile(r"h(\d+)")
real_address_pattern = re.compile(r"r(\w{6})")
flightlevel_pattern = re.compile(r"FL(\d{3}\.\d{2})")
def __init__(self, beacon=None):
self.heared_aircraft_addresses = list()
if beacon is not None:
self.name = beacon.name
self.receiver_name = beacon.receiver_name
self.timestamp = beacon.timestamp
self.latitude = beacon.latitude
self.longitude = beacon.longitude
self.ground_speed = beacon.ground_speed
self.track = beacon.track
self.altitude = beacon.altitude
self.comment = beacon.comment
self.parse(beacon.comment)
else:
self.latitude = 0.0
self.longitude = 0.0
def parse(self, text):
for part in text.split(' '):
address_match = self.address_pattern.match(part)
climb_rate_match = self.climb_rate_pattern.match(part)
turn_rate_match = self.turn_rate_pattern.match(part)
signal_strength_match = self.signal_strength_pattern.match(part)
error_count_match = self.error_count_pattern.match(part)
coordinates_extension_match = self.coordinates_extension_pattern.match(part)
hear_address_match = self.hear_address_pattern.match(part)
frequency_offset_match = self.frequency_offset_pattern.match(part)
gps_status_match = self.gps_status_pattern.match(part)
software_version_match = self.software_version_pattern.match(part)
hardware_version_match = self.hardware_version_pattern.match(part)
real_address_match = self.real_address_pattern.match(part)
flightlevel_match = self.flightlevel_pattern.match(part)
if address_match is not None:
# Flarm ID type byte in APRS msg: PTTT TTII
# P => stealth mode
# TTTTT => aircraftType
# II => IdType: 0=Random, 1=ICAO, 2=FLARM, 3=OGN
# (see https://groups.google.com/forum/#!msg/openglidernetwork/lMzl5ZsaCVs/YirmlnkaJOYJ).
self.address_type = int(address_match.group(1), 16) & 0b00000011
self.aircraft_type = (int(address_match.group(1), 16) & 0b01111100) >> 2
self.stealth = ((int(address_match.group(1), 16) & 0b10000000) >> 7 == 1)
self.address = address_match.group(2)
elif climb_rate_match is not None:
self.climb_rate = int(climb_rate_match.group(1)) * fpm2ms
elif turn_rate_match is not None:
self.turn_rate = float(turn_rate_match.group(1))
elif signal_strength_match is not None:
self.signal_strength = float(signal_strength_match.group(1))
elif error_count_match is not None:
self.error_count = int(error_count_match.group(1))
elif coordinates_extension_match is not None:
dlat = int(coordinates_extension_match.group(1)) / 1000 / 60
dlon = int(coordinates_extension_match.group(2)) / 1000 / 60
self.latitude = self.latitude + dlat
self.longitude = self.longitude + dlon
elif hear_address_match is not None:
self.heared_aircraft_addresses.append(hear_address_match.group(1))
elif frequency_offset_match is not None:
self.frequency_offset = float(frequency_offset_match.group(1))
elif gps_status_match is not None:
self.gps_status = gps_status_match.group(1)
elif software_version_match is not None:
self.software_version = float(software_version_match.group(1))
elif hardware_version_match is not None:
self.hardware_version = int(hardware_version_match.group(1))
elif real_address_match is not None:
self.real_address = real_address_match.group(1)
elif flightlevel_match is not None:
self.flightlevel = float(flightlevel_match.group(1))
else:
raise OgnParseError(expected_type="AircraftBeacon", substring=part)
def __repr__(self):
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
self.name,

Wyświetl plik

@ -1,19 +1,9 @@
import re
from datetime import datetime
from sqlalchemy import Column, String, Integer, Float, DateTime
from sqlalchemy.ext.declarative import AbstractConcreteBase
from ogn.parser.utils import createTimestamp, dmsToDeg, kts2kmh, feet2m
from ogn.parser.exceptions import AprsParseError
from .base import Base
# "original" pattern from OGN: "(.+?)>APRS,.+,(.+?):/(\\d{6})+h(\\d{4}\\.\\d{2})(N|S).(\\d{5}\\.\\d{2})(E|W).((\\d{3})/(\\d{3}))?/A=(\\d{6}).*?"
PATTERN_APRS = r"^(.+?)>APRS,.+,(.+?):/(\d{6})+h(\d{4}\.\d{2})(N|S)(.)(\d{5}\.\d{2})(E|W)(.)((\d{3})/(\d{3}))?/A=(\d{6})\s(.*)$"
re_pattern_aprs = re.compile(PATTERN_APRS)
class Beacon(AbstractConcreteBase, Base):
id = Column(Integer, primary_key=True)
@ -29,38 +19,3 @@ class Beacon(AbstractConcreteBase, Base):
ground_speed = Column(Float)
altitude = Column(Integer)
comment = None
def parse(self, text, reference_date=None):
if reference_date is None:
reference_date = datetime.utcnow()
result = re_pattern_aprs.match(text)
if result is None:
raise AprsParseError(text)
self.name = result.group(1)
self.receiver_name = result.group(2)
self.timestamp = createTimestamp(result.group(3), reference_date)
self.latitude = dmsToDeg(float(result.group(4)) / 100)
if result.group(5) == "S":
self.latitude = -self.latitude
self.symboltable = result.group(6)
self.longitude = dmsToDeg(float(result.group(7)) / 100)
if result.group(8) == "W":
self.longitude = -self.longitude
self.symbolcode = result.group(9)
if result.group(10) is not None:
self.track = int(result.group(11))
self.ground_speed = int(result.group(12)) * kts2kmh
else:
self.track = 0
self.ground_speed = 0
self.altitude = int(result.group(13)) * feet2m
self.comment = result.group(14)

Wyświetl plik

@ -1,9 +1,6 @@
import re
from sqlalchemy import Column, Float, String
from .beacon import Beacon
from ogn.parser.exceptions import OgnParseError
class ReceiverBeacon(Beacon):
@ -23,67 +20,5 @@ class ReceiverBeacon(Beacon):
rec_crystal_correction_fine = 0 # obsolete since 0.2.0
rec_input_noise = Column(Float)
# Pattern
version_pattern = re.compile(r"v(\d+\.\d+\.\d+)\.?(.+)?")
cpu_pattern = re.compile(r"CPU:(\d+\.\d+)")
cpu_temp_pattern = re.compile(r"([\+\-]\d+\.\d+)C")
ram_pattern = re.compile(r"RAM:(\d+\.\d+)/(\d+\.\d+)MB")
ntp_pattern = re.compile(r"NTP:(\d+\.\d+)ms/([\+\-]\d+\.\d+)ppm")
rf_pattern_full = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm/([\+\-]\d+\.\d+)dB")
rf_pattern_light1 = re.compile(r"RF:([\+\-]\d+\.\d+)dB")
rf_pattern_light2 = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm")
def __init__(self, beacon=None):
if beacon is not None:
self.name = beacon.name
self.receiver_name = beacon.receiver_name
self.timestamp = beacon.timestamp
self.latitude = beacon.latitude
self.longitude = beacon.longitude
self.ground_speed = beacon.ground_speed
self.track = beacon.track
self.altitude = beacon.altitude
self.comment = beacon.comment
self.parse(beacon.comment)
def parse(self, text):
for part in text.split(' '):
version_match = self.version_pattern.match(part)
cpu_match = self.cpu_pattern.match(part)
cpu_temp_match = self.cpu_temp_pattern.match(part)
ram_match = self.ram_pattern.match(part)
ntp_match = self.ntp_pattern.match(part)
rf_full_match = self.rf_pattern_full.match(part)
rf_light1_match = self.rf_pattern_light1.match(part)
rf_light2_match = self.rf_pattern_light2.match(part)
if version_match is not None:
self.version = version_match.group(1)
self.platform = version_match.group(2)
elif cpu_match is not None:
self.cpu_load = float(cpu_match.group(1))
elif cpu_temp_match is not None:
self.cpu_temp = float(cpu_temp_match.group(1))
elif ram_match is not None:
self.free_ram = float(ram_match.group(1))
self.total_ram = float(ram_match.group(2))
elif ntp_match is not None:
self.ntp_error = float(ntp_match.group(1))
self.rt_crystal_correction = float(ntp_match.group(2))
elif rf_full_match is not None:
self.rec_crystal_correction = int(rf_full_match.group(1))
self.rec_crystal_correction_fine = float(rf_full_match.group(2))
self.rec_input_noise = float(rf_full_match.group(3))
elif rf_light1_match is not None:
self.rec_input_noise = float(rf_light1_match.group(1))
elif rf_light2_match is not None:
self.rec_crystal_correction = int(rf_light2_match.group(1))
self.rec_crystal_correction_fine = float(rf_light2_match.group(2))
else:
raise OgnParseError(expected_type="ReceiverBeacon", substring=part)
def __repr__(self):
return "<ReceiverBeacon %s: %s>" % (self.name, self.version)

Wyświetl plik

@ -0,0 +1,84 @@
import re
from datetime import datetime
from ogn.parser.utils import createTimestamp, dmsToDeg, kts2kmh, feet2m, fpm2ms
from ogn.parser.pattern import PATTERN_APRS, PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
from ogn.parser.exceptions import AprsParseError, OgnParseError
def parse_aprs(message, reference_date=None):
if reference_date is None:
reference_date = datetime.utcnow()
match = re.search(PATTERN_APRS, message)
if match:
return {'name': match.group('callsign'),
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match.group('time'), reference_date),
'latitude': dmsToDeg(float(match.group('latitude')) / 100) *
(-1 if match.group('latitude_sign') == 'S' else 1) +
(int(match.group('latitude_enhancement')) / 1000 / 60 if match.group('latitude_enhancement') else 0),
'symboltable': match.group('symbol_table'),
'longitude': dmsToDeg(float(match.group('longitude')) / 100) *
(-1 if match.group('longitude_sign') == 'W' else 1) +
(int(match.group('longitude_enhancement')) / 1000 / 60 if match.group('longitude_enhancement') else 0),
'symbolcode': match.group('symbol'),
'track': int(match.group('course')) if match.group('course_extension') else 0,
'ground_speed': int(match.group('ground_speed')) * kts2kmh if match.group('ground_speed') else 0,
'altitude': int(match.group('altitude')) * feet2m,
'comment': match.group('comment')}
raise AprsParseError(message)
def parse_ogn_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms,
'turn_rate': float(ac_match.group('turn_rate')),
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_strength': float(ac_match.group('signal')),
'error_count': float(ac_match.group('errors')),
'frequency_offset': float(ac_match.group('frequency_offset')),
'gps_status': ac_match.group('gps_accuracy'),
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version')) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id')}
else:
return None
def parse_ogn_receiver_beacon(aprs_comment):
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
if rec_match:
return {'version': rec_match.group('version'),
'platform': rec_match.group('platform'),
'cpu_load': float(rec_match.group('cpu_load')),
'free_ram': float(rec_match.group('ram_free')),
'total_ram': float(rec_match.group('ram_total')),
'ntp_error': float(rec_match.group('ntp_offset')),
'rt_crystal_correction': float(rec_match.group('ntp_correction')),
'cpu_temp': float(rec_match.group('cpu_temperature')) if rec_match.group('cpu_temperature') else None,
'rec_crystal_correction': int(rec_match.group('manual_correction')) if rec_match.group('manual_correction') else 0,
'rec_crystal_correction_fine': float(rec_match.group('automatic_correction')) if rec_match.group('automatic_correction') else 0.0,
'rec_input_noise': float(rec_match.group('input_noise')) if rec_match.group('input_noise') else None}
else:
return None
def parse_ogn_beacon(aprs_comment):
ac_data = parse_ogn_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = parse_ogn_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
raise OgnParseError(aprs_comment)

Wyświetl plik

@ -0,0 +1,63 @@
import re
PATTERN_APRS = re.compile(r"^(?P<callsign>.+?)>APRS,.+,(?P<receiver>.+?):/(?P<time>\d{6})+h(?P<latitude>\d{4}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>\d{5}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>\d{6})(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?\s(?P<comment>.*)$")
# The following regexp patterns are part of the ruby ogn-client.
# source: https://github.com/svoop/ogn_client-ruby
# The MIT License (MIT)
#
# Copyright (c) 2015 Sven Schwyn
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
PATTERN_RECEIVER_BEACON = re.compile(r"""
(?:
v(?P<version>\d+\.\d+\.\d+)
\.?(?P<platform>.+?)?
\s)?
CPU:(?P<cpu_load>[\d.]+)\s
RAM:(?P<ram_free>[\d.]+)\/(?P<ram_total>[\d.]+)MB\s
NTP:(?P<ntp_offset>[\d.]+)ms\/(?P<ntp_correction>[+-][\d.]+)ppm\s?
(?:(?P<cpu_temperature>[+-][\d.]+)C\s*)?
(?:RF:
(?:
(?P<manual_correction>[+-][\d]+)
(?P<automatic_correction>[+-][\d.]+)ppm\/
)?
(?P<input_noise>[+-][\d.]+)dB
)?
""", re.VERBOSE | re.MULTILINE)
PATTERN_AIRCRAFT_BEACON = re.compile(r"""
id(?P<details>\w{2})(?P<id>\w+?)\s
(?P<climb_rate>[+-]\d+?)fpm\s
(?P<turn_rate>[+-][\d.]+?)rot\s
(?:FL(?P<flight_level>[\d.]+)\s)?
(?P<signal>[\d.]+?)dB\s
(?P<errors>\d+)e\s
(?P<frequency_offset>[+-][\d.]+?)kHz\s?
(?:gps(?P<gps_accuracy>\d+x\d+)\s?)?
(?:s(?P<flarm_software_version>[\d.]+)\s?)?
(?:h(?P<flarm_hardware_version>[\dA-F]{2})\s?)?
(?:r(?P<flarm_id>[\dA-F]+)\s?)?
(?:hear(?P<proximity>.+))?
""", re.VERBOSE | re.MULTILINE)

Wyświetl plik

@ -1,69 +0,0 @@
import unittest
from datetime import datetime
from ogn.parser.utils import ms2fpm
from ogn.model import Beacon, AircraftBeacon
from ogn.parser.exceptions import OgnParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
aircraft_beacon = AircraftBeacon()
with self.assertRaises(OgnParseError):
aircraft_beacon.parse("notAValidToken")
def test_basic(self):
aircraft_beacon = AircraftBeacon()
aircraft_beacon.parse("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(aircraft_beacon.stealth)
self.assertEqual(aircraft_beacon.address, "DDA5BA")
self.assertAlmostEqual(aircraft_beacon.climb_rate * ms2fpm, -454, 2)
self.assertEqual(aircraft_beacon.turn_rate, -1.1)
self.assertEqual(aircraft_beacon.signal_strength, 8.8)
self.assertEqual(aircraft_beacon.error_count, 0)
self.assertEqual(aircraft_beacon.frequency_offset, 51.2)
self.assertEqual(aircraft_beacon.gps_status, '4x5')
self.assertEqual(len(aircraft_beacon.heared_aircraft_addresses), 3)
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[0], '1084')
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[1], 'B597')
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[2], 'B598')
def test_stealth(self):
aircraft_beacon = AircraftBeacon()
aircraft_beacon.parse("id0ADD1234")
self.assertFalse(aircraft_beacon.stealth)
aircraft_beacon.parse("id8ADD1234")
self.assertTrue(aircraft_beacon.stealth)
def test_v024(self):
aircraft_beacon = AircraftBeacon()
aircraft_beacon.parse("!W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
self.assertEqual(aircraft_beacon.latitude, 2 / 1000 / 60)
self.assertEqual(aircraft_beacon.longitude, 6 / 1000 / 60)
self.assertEqual(aircraft_beacon.software_version, 6.02)
self.assertEqual(aircraft_beacon.hardware_version, 44)
self.assertEqual(aircraft_beacon.real_address, "DF0C56")
def test_v024_ogn_tracker(self):
aircraft_beacon = AircraftBeacon()
aircraft_beacon.parse("!W34! id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(aircraft_beacon.flightlevel, 4.43)
def test_copy_constructor(self):
beacon = Beacon()
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5",
reference_date=datetime(2015, 1, 1, 16, 8, 29))
aircraft_beacon = AircraftBeacon(beacon)
self.assertEqual(aircraft_beacon.name, 'FLRDDA5BA')
self.assertEqual(aircraft_beacon.address, 'DDA5BA')
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,35 +0,0 @@
import unittest
from datetime import datetime
from ogn.parser.utils import dmsToDeg, kts2kmh, m2feet
from ogn.model import Beacon
from ogn.parser.exceptions import AprsParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
beacon = Beacon()
with self.assertRaises(AprsParseError):
beacon.parse("notAValidString")
def test_basic(self):
beacon = Beacon()
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertEqual(beacon.name, "FLRDDA5BA")
self.assertEqual(beacon.receiver_name, "LFMX")
self.assertEqual(beacon.timestamp.strftime('%H:%M:%S'), "16:08:29")
self.assertAlmostEqual(beacon.latitude, dmsToDeg(44.1541), 5)
self.assertEqual(beacon.symboltable, '/')
self.assertAlmostEqual(beacon.longitude, dmsToDeg(6.0003), 5)
self.assertEqual(beacon.symbolcode, '\'')
self.assertEqual(beacon.track, 342)
self.assertEqual(beacon.ground_speed, 49 * kts2kmh)
self.assertAlmostEqual(beacon.altitude * m2feet, 5524, 5)
self.assertEqual(beacon.comment, "this is a comment")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,38 +0,0 @@
import unittest
from ogn.model import ReceiverBeacon
from ogn.parser.exceptions import OgnParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
receiver_beacon = ReceiverBeacon()
with self.assertRaises(OgnParseError):
receiver_beacon.parse("notAValidToken")
def test_v022(self):
receiver_beacon = ReceiverBeacon()
receiver_beacon.parse("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(receiver_beacon.version, '0.2.2')
self.assertEqual(receiver_beacon.platform, 'x86')
self.assertEqual(receiver_beacon.cpu_load, 0.5)
self.assertEqual(receiver_beacon.cpu_temp, 52.0)
self.assertEqual(receiver_beacon.free_ram, 669.9)
self.assertEqual(receiver_beacon.total_ram, 887.7)
self.assertEqual(receiver_beacon.ntp_error, 1.0)
self.assertEqual(receiver_beacon.rec_crystal_correction, 0.0)
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, 0.0)
self.assertEqual(receiver_beacon.rec_input_noise, 0.06)
def test_v021(self):
receiver_beacon = ReceiverBeacon()
receiver_beacon.parse("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(receiver_beacon.rec_crystal_correction, 26)
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, -1.4)
self.assertEqual(receiver_beacon.rec_input_noise, -0.25)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,49 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse import parse_ogn_aircraft_beacon
class TestStringMethods(unittest.TestCase):
def test_invalid_token(self):
self.assertEqual(parse_ogn_aircraft_beacon("notAValidToken"), None)
def test_basic(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(aircraft_beacon['stealth'])
self.assertEqual(aircraft_beacon['address'], "DDA5BA")
self.assertAlmostEqual(aircraft_beacon['climb_rate'] * ms2fpm, -454, 2)
self.assertEqual(aircraft_beacon['turn_rate'], -1.1)
self.assertEqual(aircraft_beacon['signal_strength'], 8.8)
self.assertEqual(aircraft_beacon['error_count'], 0)
self.assertEqual(aircraft_beacon['frequency_offset'], 51.2)
self.assertEqual(aircraft_beacon['gps_status'], '4x5')
# self.assertEqual(len(aircraft_beacon['heared_aircraft_addresses']), 3)
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][0], '1084')
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][1], 'B597')
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][2], 'B598')
def test_stealth(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(aircraft_beacon['stealth'])
aircraft_beacon = parse_ogn_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertTrue(aircraft_beacon['stealth'])
def test_v024(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
self.assertEqual(aircraft_beacon['software_version'], 6.02)
self.assertEqual(aircraft_beacon['hardware_version'], 44)
self.assertEqual(aircraft_beacon['real_address'], "DF0C56")
def test_v024_ogn_tracker(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(aircraft_beacon['flightlevel'], 4.43)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,39 @@
import unittest
from datetime import datetime
from ogn.parser.utils import dmsToDeg, kts2kmh, m2feet
from ogn.parser.parse import parse_aprs
from ogn.parser.exceptions import AprsParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
with self.assertRaises(AprsParseError):
parse_aprs("notAValidString")
def test_basic(self):
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertEqual(message['name'], "FLRDDA5BA")
self.assertEqual(message['receiver_name'], "LFMX")
self.assertEqual(message['timestamp'].strftime('%H:%M:%S'), "16:08:29")
self.assertAlmostEqual(message['latitude'], dmsToDeg(44.1541), 5)
self.assertEqual(message['symboltable'], '/')
self.assertAlmostEqual(message['longitude'], dmsToDeg(6.0003), 5)
self.assertEqual(message['symbolcode'], '\'')
self.assertEqual(message['track'], 342)
self.assertEqual(message['ground_speed'], 49 * kts2kmh)
self.assertAlmostEqual(message['altitude'] * m2feet, 5524, 5)
self.assertEqual(message['comment'], "this is a comment")
def test_v024(self):
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertAlmostEqual(message['latitude'] - dmsToDeg(44.1541), 2 / 1000 / 60, 10)
self.assertAlmostEqual(message['longitude'] - dmsToDeg(6.0003), 6 / 1000 / 60, 10)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,31 @@
import unittest
from ogn.parser.parse import parse_ogn_receiver_beacon
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
self.assertEqual(parse_ogn_receiver_beacon("notAValidToken"), None)
def test_v022(self):
receiver_beacon = parse_ogn_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(receiver_beacon['version'], '0.2.2')
self.assertEqual(receiver_beacon['platform'], 'x86')
self.assertEqual(receiver_beacon['cpu_load'], 0.5)
self.assertEqual(receiver_beacon['cpu_temp'], 52.0)
self.assertEqual(receiver_beacon['free_ram'], 669.9)
self.assertEqual(receiver_beacon['total_ram'], 887.7)
self.assertEqual(receiver_beacon['ntp_error'], 1.0)
self.assertEqual(receiver_beacon['rec_crystal_correction'], 0.0)
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], 0.0)
self.assertEqual(receiver_beacon['rec_input_noise'], 0.06)
def test_v021(self):
receiver_beacon = parse_ogn_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(receiver_beacon['rec_crystal_correction'], 26)
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], -1.4)
self.assertEqual(receiver_beacon['rec_input_noise'], -0.25)
if __name__ == '__main__':
unittest.main()