kopia lustrzana https://github.com/glidernet/ogn-python
Merge pull request #43 from kerel-fs/refactor/parser
Move parsing from `ogn.model.*` to `ogn.parser`pull/46/head
commit
2a3aa5a595
|
@ -1,5 +1,9 @@
|
|||
# CHANGELOG
|
||||
|
||||
## Unreleased
|
||||
- Moved exceptions from `ogn.exceptions` to `ogn.parser.exceptions`
|
||||
- Moved parsing from `ogn.model.*` to `ogn.parser`
|
||||
|
||||
## 0.2.1 - 2016-02-17
|
||||
First release via PyPi.
|
||||
- Added CHANGELOG.
|
||||
|
|
21
README.md
21
README.md
|
@ -27,16 +27,23 @@ lets you process the incoming data.
|
|||
Example:
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon
|
||||
from ogn.gateway.client import ognGateway
|
||||
from ogn.parser.parse import parse_aprs, parse_ogn_beacon
|
||||
from ogn.parser.exceptions import ParseError
|
||||
|
||||
|
||||
def process_beacon(beacon):
|
||||
if type(beacon) is AircraftBeacon:
|
||||
print('Received aircraft beacon from {}'.format(beacon.name))
|
||||
elif type(beacon) is ReceiverBeacon:
|
||||
print('Received receiver beacon from {}'.format(beacon.name))
|
||||
def process_beacon(raw_message):
|
||||
if raw_message[0] == '#':
|
||||
print('Server Status: {}'.format(raw_message))
|
||||
return
|
||||
|
||||
try:
|
||||
message = parse_aprs(raw_message)
|
||||
message.update(parse_ogn_beacon(message['comment']))
|
||||
|
||||
print('Received {beacon_type} from {name}'.format(**message))
|
||||
except ParseError as e:
|
||||
print('Error, {}'.format(e.message))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
from datetime import datetime
|
||||
|
||||
from .model import Beacon, AircraftBeacon, ReceiverBeacon
|
||||
from ogn.exceptions import AprsParseError
|
||||
|
||||
|
||||
def parse_aprs(packet, reference_date=None):
|
||||
if reference_date is None:
|
||||
reference_date = datetime.utcnow()
|
||||
if not isinstance(packet, str):
|
||||
raise TypeError("Expected packet to be str, got %s" % type(packet))
|
||||
elif packet == "":
|
||||
raise AprsParseError("(empty string)")
|
||||
elif packet[0] == "#":
|
||||
return None
|
||||
|
||||
beacon = Beacon()
|
||||
beacon.parse(packet, reference_date)
|
||||
|
||||
# symboltable / symbolcodes used by OGN:
|
||||
# I&: used as receiver
|
||||
# /X: helicopter_rotorcraft
|
||||
# /': glider_or_motorglider
|
||||
# \^: powered_aircraft
|
||||
# /g: para_glider
|
||||
# /O: ?
|
||||
# /^: ?
|
||||
# \n: ?
|
||||
# /z: ?
|
||||
# /o: ?
|
||||
|
||||
if beacon.symboltable == "I" and beacon.symbolcode == "&":
|
||||
return ReceiverBeacon(beacon)
|
||||
else:
|
||||
return AircraftBeacon(beacon)
|
|
@ -3,9 +3,13 @@ import logging
|
|||
from time import time
|
||||
|
||||
from ogn.gateway import settings
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.aprs_utils import create_aprs_login
|
||||
from ogn.exceptions import AprsParseError, OgnParseError, AmbigousTimeError
|
||||
|
||||
|
||||
def create_aprs_login(user_name, pass_code, app_name, app_version, aprs_filter=None):
|
||||
if not aprs_filter:
|
||||
return "user {} pass {} vers {} {}\n".format(user_name, pass_code, app_name, app_version)
|
||||
else:
|
||||
return "user {} pass {} vers {} {} filter {}\n".format(user_name, pass_code, app_name, app_version, aprs_filter)
|
||||
|
||||
|
||||
class ognGateway:
|
||||
|
@ -42,8 +46,6 @@ class ognGateway:
|
|||
self.logger.error('Socket close error', exc_info=True)
|
||||
|
||||
def run(self, callback, autoreconnect=False):
|
||||
self.process_beacon = callback
|
||||
|
||||
while True:
|
||||
try:
|
||||
keepalive_time = time()
|
||||
|
@ -62,7 +64,7 @@ class ognGateway:
|
|||
self.logger.warning('Read returns zero length string. Failure. Orderly closeout')
|
||||
break
|
||||
|
||||
self.proceed_line(packet_str)
|
||||
callback(packet_str)
|
||||
except BrokenPipeError:
|
||||
self.logger.error('BrokenPipeError', exc_info=True)
|
||||
except socket.error:
|
||||
|
@ -72,20 +74,3 @@ class ognGateway:
|
|||
self.connect()
|
||||
else:
|
||||
return
|
||||
|
||||
def proceed_line(self, line):
|
||||
try:
|
||||
beacon = parse_aprs(line)
|
||||
self.logger.debug('Received beacon: {}'.format(beacon))
|
||||
except AprsParseError:
|
||||
self.logger.error('AprsParseError while parsing line: {}'.format(line), exc_info=True)
|
||||
return
|
||||
except OgnParseError:
|
||||
self.logger.error('OgnParseError while parsing line: {}'.format(line), exc_info=True)
|
||||
return
|
||||
except AmbigousTimeError as e:
|
||||
self.logger.error('Drop packet, {:.0f}s from past: {}'.format(e.timedelta.total_seconds(), line))
|
||||
return
|
||||
|
||||
if beacon is not None:
|
||||
self.process_beacon(beacon)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
|
||||
from ogn.gateway.client import ognGateway
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.gateway.process import process_beacon
|
||||
|
||||
from manager import Manager
|
||||
|
||||
|
@ -33,10 +33,6 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
|
|||
gateway = ognGateway(aprs_user)
|
||||
gateway.connect()
|
||||
|
||||
def process_beacon(beacon):
|
||||
session.add(beacon)
|
||||
session.commit()
|
||||
|
||||
try:
|
||||
gateway.run(callback=process_beacon, autoreconnect=True)
|
||||
except KeyboardInterrupt:
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
import logging
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon
|
||||
from ogn.parser.parse import parse_aprs, parse_ogn_receiver_beacon, parse_ogn_aircraft_beacon
|
||||
from ogn.parser.exceptions import AprsParseError, OgnParseError, AmbigousTimeError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def process_beacon(raw_message):
|
||||
if raw_message[0] == '#':
|
||||
return
|
||||
try:
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
# symboltable / symbolcodes used by OGN:
|
||||
# I&: used as receiver
|
||||
# /X: helicopter_rotorcraft
|
||||
# /': glider_or_motorglider
|
||||
# \^: powered_aircraft
|
||||
# /g: para_glider
|
||||
# /O: ?
|
||||
# /^: ?
|
||||
# \n: ?
|
||||
# /z: ?
|
||||
# /o: ?
|
||||
if message['symboltable'] == "I" and message['symbolcode'] == '&':
|
||||
message.update(parse_ogn_receiver_beacon(message['comment']))
|
||||
beacon = ReceiverBeacon(**message)
|
||||
else:
|
||||
message.update(parse_ogn_aircraft_beacon(message['comment']))
|
||||
beacon = AircraftBeacon(**message)
|
||||
session.add(beacon)
|
||||
session.commit()
|
||||
logger.debug('Received message: {}'.format(raw_message))
|
||||
except (AprsParseError, OgnParseError, AmbigousTimeError) as e:
|
||||
logger.error('Received message: {}'.format(raw_message))
|
||||
logger.error('Drop packet, {}'.format(e.message))
|
|
@ -1,10 +1,6 @@
|
|||
import re
|
||||
|
||||
from sqlalchemy import Column, String, Integer, Float, Boolean, SmallInteger
|
||||
|
||||
from ogn.aprs_utils import fpm2ms
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class AircraftBeacon(Beacon):
|
||||
|
@ -35,103 +31,6 @@ class AircraftBeacon(Beacon):
|
|||
|
||||
flight_state = Column(SmallInteger)
|
||||
|
||||
# Pattern
|
||||
address_pattern = re.compile(r"id(\S{2})(\S{6})")
|
||||
climb_rate_pattern = re.compile(r"([\+\-]\d+)fpm")
|
||||
turn_rate_pattern = re.compile(r"([\+\-]\d+\.\d+)rot")
|
||||
signal_strength_pattern = re.compile(r"(\d+\.\d+)dB")
|
||||
error_count_pattern = re.compile(r"(\d+)e")
|
||||
coordinates_extension_pattern = re.compile(r"\!W(.)(.)!")
|
||||
hear_address_pattern = re.compile(r"hear(\w{4})")
|
||||
frequency_offset_pattern = re.compile(r"([\+\-]\d+\.\d+)kHz")
|
||||
gps_status_pattern = re.compile(r"gps(\d+x\d+)")
|
||||
|
||||
software_version_pattern = re.compile(r"s(\d+\.\d+)")
|
||||
hardware_version_pattern = re.compile(r"h(\d+)")
|
||||
real_address_pattern = re.compile(r"r(\w{6})")
|
||||
|
||||
flightlevel_pattern = re.compile(r"FL(\d{3}\.\d{2})")
|
||||
|
||||
def __init__(self, beacon=None):
|
||||
self.heared_aircraft_addresses = list()
|
||||
|
||||
if beacon is not None:
|
||||
self.name = beacon.name
|
||||
self.receiver_name = beacon.receiver_name
|
||||
self.timestamp = beacon.timestamp
|
||||
self.latitude = beacon.latitude
|
||||
self.longitude = beacon.longitude
|
||||
self.ground_speed = beacon.ground_speed
|
||||
self.track = beacon.track
|
||||
self.altitude = beacon.altitude
|
||||
self.comment = beacon.comment
|
||||
|
||||
self.parse(beacon.comment)
|
||||
else:
|
||||
self.latitude = 0.0
|
||||
self.longitude = 0.0
|
||||
|
||||
def parse(self, text):
|
||||
for part in text.split(' '):
|
||||
address_match = self.address_pattern.match(part)
|
||||
climb_rate_match = self.climb_rate_pattern.match(part)
|
||||
turn_rate_match = self.turn_rate_pattern.match(part)
|
||||
signal_strength_match = self.signal_strength_pattern.match(part)
|
||||
error_count_match = self.error_count_pattern.match(part)
|
||||
coordinates_extension_match = self.coordinates_extension_pattern.match(part)
|
||||
hear_address_match = self.hear_address_pattern.match(part)
|
||||
frequency_offset_match = self.frequency_offset_pattern.match(part)
|
||||
gps_status_match = self.gps_status_pattern.match(part)
|
||||
|
||||
software_version_match = self.software_version_pattern.match(part)
|
||||
hardware_version_match = self.hardware_version_pattern.match(part)
|
||||
real_address_match = self.real_address_pattern.match(part)
|
||||
|
||||
flightlevel_match = self.flightlevel_pattern.match(part)
|
||||
|
||||
if address_match is not None:
|
||||
# Flarm ID type byte in APRS msg: PTTT TTII
|
||||
# P => stealth mode
|
||||
# TTTTT => aircraftType
|
||||
# II => IdType: 0=Random, 1=ICAO, 2=FLARM, 3=OGN
|
||||
# (see https://groups.google.com/forum/#!msg/openglidernetwork/lMzl5ZsaCVs/YirmlnkaJOYJ).
|
||||
self.address_type = int(address_match.group(1), 16) & 0b00000011
|
||||
self.aircraft_type = (int(address_match.group(1), 16) & 0b01111100) >> 2
|
||||
self.stealth = ((int(address_match.group(1), 16) & 0b10000000) >> 7 == 1)
|
||||
self.address = address_match.group(2)
|
||||
elif climb_rate_match is not None:
|
||||
self.climb_rate = int(climb_rate_match.group(1)) * fpm2ms
|
||||
elif turn_rate_match is not None:
|
||||
self.turn_rate = float(turn_rate_match.group(1))
|
||||
elif signal_strength_match is not None:
|
||||
self.signal_strength = float(signal_strength_match.group(1))
|
||||
elif error_count_match is not None:
|
||||
self.error_count = int(error_count_match.group(1))
|
||||
elif coordinates_extension_match is not None:
|
||||
dlat = int(coordinates_extension_match.group(1)) / 1000 / 60
|
||||
dlon = int(coordinates_extension_match.group(2)) / 1000 / 60
|
||||
|
||||
self.latitude = self.latitude + dlat
|
||||
self.longitude = self.longitude + dlon
|
||||
elif hear_address_match is not None:
|
||||
self.heared_aircraft_addresses.append(hear_address_match.group(1))
|
||||
elif frequency_offset_match is not None:
|
||||
self.frequency_offset = float(frequency_offset_match.group(1))
|
||||
elif gps_status_match is not None:
|
||||
self.gps_status = gps_status_match.group(1)
|
||||
|
||||
elif software_version_match is not None:
|
||||
self.software_version = float(software_version_match.group(1))
|
||||
elif hardware_version_match is not None:
|
||||
self.hardware_version = int(hardware_version_match.group(1))
|
||||
elif real_address_match is not None:
|
||||
self.real_address = real_address_match.group(1)
|
||||
|
||||
elif flightlevel_match is not None:
|
||||
self.flightlevel = float(flightlevel_match.group(1))
|
||||
else:
|
||||
raise OgnParseError(expected_type="AircraftBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
|
||||
self.name,
|
||||
|
|
|
@ -1,19 +1,9 @@
|
|||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import Column, String, Integer, Float, DateTime
|
||||
from sqlalchemy.ext.declarative import AbstractConcreteBase
|
||||
|
||||
from ogn.aprs_utils import createTimestamp, dmsToDeg, kts2kmh, feet2m
|
||||
from ogn.exceptions import AprsParseError
|
||||
from .base import Base
|
||||
|
||||
|
||||
# "original" pattern from OGN: "(.+?)>APRS,.+,(.+?):/(\\d{6})+h(\\d{4}\\.\\d{2})(N|S).(\\d{5}\\.\\d{2})(E|W).((\\d{3})/(\\d{3}))?/A=(\\d{6}).*?"
|
||||
PATTERN_APRS = r"^(.+?)>APRS,.+,(.+?):/(\d{6})+h(\d{4}\.\d{2})(N|S)(.)(\d{5}\.\d{2})(E|W)(.)((\d{3})/(\d{3}))?/A=(\d{6})\s(.*)$"
|
||||
re_pattern_aprs = re.compile(PATTERN_APRS)
|
||||
|
||||
|
||||
class Beacon(AbstractConcreteBase, Base):
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
|
@ -29,38 +19,3 @@ class Beacon(AbstractConcreteBase, Base):
|
|||
ground_speed = Column(Float)
|
||||
altitude = Column(Integer)
|
||||
comment = None
|
||||
|
||||
def parse(self, text, reference_date=None):
|
||||
if reference_date is None:
|
||||
reference_date = datetime.utcnow()
|
||||
result = re_pattern_aprs.match(text)
|
||||
if result is None:
|
||||
raise AprsParseError(text)
|
||||
|
||||
self.name = result.group(1)
|
||||
self.receiver_name = result.group(2)
|
||||
|
||||
self.timestamp = createTimestamp(result.group(3), reference_date)
|
||||
|
||||
self.latitude = dmsToDeg(float(result.group(4)) / 100)
|
||||
if result.group(5) == "S":
|
||||
self.latitude = -self.latitude
|
||||
|
||||
self.symboltable = result.group(6)
|
||||
|
||||
self.longitude = dmsToDeg(float(result.group(7)) / 100)
|
||||
if result.group(8) == "W":
|
||||
self.longitude = -self.longitude
|
||||
|
||||
self.symbolcode = result.group(9)
|
||||
|
||||
if result.group(10) is not None:
|
||||
self.track = int(result.group(11))
|
||||
self.ground_speed = int(result.group(12)) * kts2kmh
|
||||
else:
|
||||
self.track = 0
|
||||
self.ground_speed = 0
|
||||
|
||||
self.altitude = int(result.group(13)) * feet2m
|
||||
|
||||
self.comment = result.group(14)
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
import re
|
||||
|
||||
from sqlalchemy import Column, Float, String
|
||||
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class ReceiverBeacon(Beacon):
|
||||
|
@ -23,67 +20,5 @@ class ReceiverBeacon(Beacon):
|
|||
rec_crystal_correction_fine = 0 # obsolete since 0.2.0
|
||||
rec_input_noise = Column(Float)
|
||||
|
||||
# Pattern
|
||||
version_pattern = re.compile(r"v(\d+\.\d+\.\d+)\.?(.+)?")
|
||||
cpu_pattern = re.compile(r"CPU:(\d+\.\d+)")
|
||||
cpu_temp_pattern = re.compile(r"([\+\-]\d+\.\d+)C")
|
||||
ram_pattern = re.compile(r"RAM:(\d+\.\d+)/(\d+\.\d+)MB")
|
||||
ntp_pattern = re.compile(r"NTP:(\d+\.\d+)ms/([\+\-]\d+\.\d+)ppm")
|
||||
|
||||
rf_pattern_full = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm/([\+\-]\d+\.\d+)dB")
|
||||
rf_pattern_light1 = re.compile(r"RF:([\+\-]\d+\.\d+)dB")
|
||||
rf_pattern_light2 = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm")
|
||||
|
||||
def __init__(self, beacon=None):
|
||||
if beacon is not None:
|
||||
self.name = beacon.name
|
||||
self.receiver_name = beacon.receiver_name
|
||||
self.timestamp = beacon.timestamp
|
||||
self.latitude = beacon.latitude
|
||||
self.longitude = beacon.longitude
|
||||
self.ground_speed = beacon.ground_speed
|
||||
self.track = beacon.track
|
||||
self.altitude = beacon.altitude
|
||||
self.comment = beacon.comment
|
||||
|
||||
self.parse(beacon.comment)
|
||||
|
||||
def parse(self, text):
|
||||
for part in text.split(' '):
|
||||
version_match = self.version_pattern.match(part)
|
||||
cpu_match = self.cpu_pattern.match(part)
|
||||
cpu_temp_match = self.cpu_temp_pattern.match(part)
|
||||
ram_match = self.ram_pattern.match(part)
|
||||
ntp_match = self.ntp_pattern.match(part)
|
||||
|
||||
rf_full_match = self.rf_pattern_full.match(part)
|
||||
rf_light1_match = self.rf_pattern_light1.match(part)
|
||||
rf_light2_match = self.rf_pattern_light2.match(part)
|
||||
|
||||
if version_match is not None:
|
||||
self.version = version_match.group(1)
|
||||
self.platform = version_match.group(2)
|
||||
elif cpu_match is not None:
|
||||
self.cpu_load = float(cpu_match.group(1))
|
||||
elif cpu_temp_match is not None:
|
||||
self.cpu_temp = float(cpu_temp_match.group(1))
|
||||
elif ram_match is not None:
|
||||
self.free_ram = float(ram_match.group(1))
|
||||
self.total_ram = float(ram_match.group(2))
|
||||
elif ntp_match is not None:
|
||||
self.ntp_error = float(ntp_match.group(1))
|
||||
self.rt_crystal_correction = float(ntp_match.group(2))
|
||||
elif rf_full_match is not None:
|
||||
self.rec_crystal_correction = int(rf_full_match.group(1))
|
||||
self.rec_crystal_correction_fine = float(rf_full_match.group(2))
|
||||
self.rec_input_noise = float(rf_full_match.group(3))
|
||||
elif rf_light1_match is not None:
|
||||
self.rec_input_noise = float(rf_light1_match.group(1))
|
||||
elif rf_light2_match is not None:
|
||||
self.rec_crystal_correction = int(rf_light2_match.group(1))
|
||||
self.rec_crystal_correction_fine = float(rf_light2_match.group(2))
|
||||
else:
|
||||
raise OgnParseError(expected_type="ReceiverBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ReceiverBeacon %s: %s>" % (self.name, self.version)
|
||||
|
|
|
@ -4,26 +4,29 @@ exception definitions
|
|||
from datetime import datetime
|
||||
|
||||
|
||||
class AprsParseError(Exception):
|
||||
class ParseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AprsParseError(ParseError):
|
||||
"""Parse error while parsing an aprs packet."""
|
||||
def __init__(self, aprs_string):
|
||||
self.aprs_string = aprs_string
|
||||
|
||||
self.message = "This is not a valid APRS string: {}".format(aprs_string)
|
||||
self.message = "This is not a valid APRS packet: {}".format(aprs_string)
|
||||
super(AprsParseError, self).__init__(self.message)
|
||||
|
||||
|
||||
class OgnParseError(Exception):
|
||||
"""Parse error while parsing an aprs packet substring."""
|
||||
def __init__(self, substring, expected_type):
|
||||
self.substring = substring
|
||||
self.expected_type = expected_type
|
||||
class OgnParseError(ParseError):
|
||||
"""Parse error while parsing an ogn message from aprs comment."""
|
||||
def __init__(self, aprs_comment):
|
||||
self.aprs_comment = aprs_comment
|
||||
|
||||
self.message = "For type {} this is not a valid token: {}".format(expected_type, substring)
|
||||
self.message = "This is not a valid OGN message: {}".format(aprs_comment)
|
||||
super(OgnParseError, self).__init__(self.message)
|
||||
|
||||
|
||||
class AmbigousTimeError(Exception):
|
||||
class AmbigousTimeError(ParseError):
|
||||
"""Timstamp from the past/future, can't fully reconstruct datetime from timestamp."""
|
||||
def __init__(self, reference, packet_time):
|
||||
self.reference = reference
|
|
@ -0,0 +1,84 @@
|
|||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.parser.utils import createTimestamp, dmsToDeg, kts2kmh, feet2m, fpm2ms
|
||||
from ogn.parser.pattern import PATTERN_APRS, PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
|
||||
from ogn.parser.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
def parse_aprs(message, reference_date=None):
|
||||
if reference_date is None:
|
||||
reference_date = datetime.utcnow()
|
||||
|
||||
match = re.search(PATTERN_APRS, message)
|
||||
if match:
|
||||
return {'name': match.group('callsign'),
|
||||
'receiver_name': match.group('receiver'),
|
||||
'timestamp': createTimestamp(match.group('time'), reference_date),
|
||||
'latitude': dmsToDeg(float(match.group('latitude')) / 100) *
|
||||
(-1 if match.group('latitude_sign') == 'S' else 1) +
|
||||
(int(match.group('latitude_enhancement')) / 1000 / 60 if match.group('latitude_enhancement') else 0),
|
||||
'symboltable': match.group('symbol_table'),
|
||||
'longitude': dmsToDeg(float(match.group('longitude')) / 100) *
|
||||
(-1 if match.group('longitude_sign') == 'W' else 1) +
|
||||
(int(match.group('longitude_enhancement')) / 1000 / 60 if match.group('longitude_enhancement') else 0),
|
||||
'symbolcode': match.group('symbol'),
|
||||
'track': int(match.group('course')) if match.group('course_extension') else 0,
|
||||
'ground_speed': int(match.group('ground_speed')) * kts2kmh if match.group('ground_speed') else 0,
|
||||
'altitude': int(match.group('altitude')) * feet2m,
|
||||
'comment': match.group('comment')}
|
||||
|
||||
raise AprsParseError(message)
|
||||
|
||||
|
||||
def parse_ogn_aircraft_beacon(aprs_comment):
|
||||
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
|
||||
if ac_match:
|
||||
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
|
||||
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
|
||||
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
|
||||
'address': ac_match.group('id'),
|
||||
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms,
|
||||
'turn_rate': float(ac_match.group('turn_rate')),
|
||||
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
|
||||
'signal_strength': float(ac_match.group('signal')),
|
||||
'error_count': float(ac_match.group('errors')),
|
||||
'frequency_offset': float(ac_match.group('frequency_offset')),
|
||||
'gps_status': ac_match.group('gps_accuracy'),
|
||||
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
|
||||
'hardware_version': int(ac_match.group('flarm_hardware_version')) if ac_match.group('flarm_hardware_version') else None,
|
||||
'real_address': ac_match.group('flarm_id')}
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def parse_ogn_receiver_beacon(aprs_comment):
|
||||
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
|
||||
if rec_match:
|
||||
return {'version': rec_match.group('version'),
|
||||
'platform': rec_match.group('platform'),
|
||||
'cpu_load': float(rec_match.group('cpu_load')),
|
||||
'free_ram': float(rec_match.group('ram_free')),
|
||||
'total_ram': float(rec_match.group('ram_total')),
|
||||
'ntp_error': float(rec_match.group('ntp_offset')),
|
||||
'rt_crystal_correction': float(rec_match.group('ntp_correction')),
|
||||
'cpu_temp': float(rec_match.group('cpu_temperature')) if rec_match.group('cpu_temperature') else None,
|
||||
'rec_crystal_correction': int(rec_match.group('manual_correction')) if rec_match.group('manual_correction') else 0,
|
||||
'rec_crystal_correction_fine': float(rec_match.group('automatic_correction')) if rec_match.group('automatic_correction') else 0.0,
|
||||
'rec_input_noise': float(rec_match.group('input_noise')) if rec_match.group('input_noise') else None}
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def parse_ogn_beacon(aprs_comment):
|
||||
ac_data = parse_ogn_aircraft_beacon(aprs_comment)
|
||||
if ac_data:
|
||||
ac_data.update({'beacon_type': 'aircraft_beacon'})
|
||||
return ac_data
|
||||
|
||||
rc_data = parse_ogn_receiver_beacon(aprs_comment)
|
||||
if rc_data:
|
||||
rc_data.update({'beacon_type': 'receiver_beacon'})
|
||||
return rc_data
|
||||
|
||||
raise OgnParseError(aprs_comment)
|
|
@ -0,0 +1,63 @@
|
|||
import re
|
||||
|
||||
|
||||
PATTERN_APRS = re.compile(r"^(?P<callsign>.+?)>APRS,.+,(?P<receiver>.+?):/(?P<time>\d{6})+h(?P<latitude>\d{4}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>\d{5}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>\d{6})(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?\s(?P<comment>.*)$")
|
||||
|
||||
# The following regexp patterns are part of the ruby ogn-client.
|
||||
# source: https://github.com/svoop/ogn_client-ruby
|
||||
|
||||
# The MIT License (MIT)
|
||||
#
|
||||
# Copyright (c) 2015 Sven Schwyn
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
PATTERN_RECEIVER_BEACON = re.compile(r"""
|
||||
(?:
|
||||
v(?P<version>\d+\.\d+\.\d+)
|
||||
\.?(?P<platform>.+?)?
|
||||
\s)?
|
||||
CPU:(?P<cpu_load>[\d.]+)\s
|
||||
RAM:(?P<ram_free>[\d.]+)\/(?P<ram_total>[\d.]+)MB\s
|
||||
NTP:(?P<ntp_offset>[\d.]+)ms\/(?P<ntp_correction>[+-][\d.]+)ppm\s?
|
||||
(?:(?P<cpu_temperature>[+-][\d.]+)C\s*)?
|
||||
(?:RF:
|
||||
(?:
|
||||
(?P<manual_correction>[+-][\d]+)
|
||||
(?P<automatic_correction>[+-][\d.]+)ppm\/
|
||||
)?
|
||||
(?P<input_noise>[+-][\d.]+)dB
|
||||
)?
|
||||
""", re.VERBOSE | re.MULTILINE)
|
||||
|
||||
|
||||
PATTERN_AIRCRAFT_BEACON = re.compile(r"""
|
||||
id(?P<details>\w{2})(?P<id>\w+?)\s
|
||||
(?P<climb_rate>[+-]\d+?)fpm\s
|
||||
(?P<turn_rate>[+-][\d.]+?)rot\s
|
||||
(?:FL(?P<flight_level>[\d.]+)\s)?
|
||||
(?P<signal>[\d.]+?)dB\s
|
||||
(?P<errors>\d+)e\s
|
||||
(?P<frequency_offset>[+-][\d.]+?)kHz\s?
|
||||
(?:gps(?P<gps_accuracy>\d+x\d+)\s?)?
|
||||
(?:s(?P<flarm_software_version>[\d.]+)\s?)?
|
||||
(?:h(?P<flarm_hardware_version>[\dA-F]{2})\s?)?
|
||||
(?:r(?P<flarm_id>[\dA-F]+)\s?)?
|
||||
(?:hear(?P<proximity>.+))?
|
||||
""", re.VERBOSE | re.MULTILINE)
|
|
@ -1,7 +1,7 @@
|
|||
from datetime import datetime, timedelta
|
||||
import math
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from ogn.exceptions import AmbigousTimeError
|
||||
from ogn.parser.exceptions import AmbigousTimeError
|
||||
|
||||
|
||||
kmh2kts = 0.539957
|
||||
|
@ -33,10 +33,3 @@ def createTimestamp(hhmmss, reference):
|
|||
raise AmbigousTimeError(reference, packet_time)
|
||||
|
||||
return timestamp
|
||||
|
||||
|
||||
def create_aprs_login(user_name, pass_code, app_name, app_version, aprs_filter=None):
|
||||
if not aprs_filter:
|
||||
return "user %s pass %s vers %s %s\n" % (user_name, pass_code, app_name, app_version)
|
||||
else:
|
||||
return "user %s pass %s vers %s %s filter %s\n" % (user_name, pass_code, app_name, app_version, aprs_filter)
|
|
@ -1,11 +1,18 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from ogn.gateway.client import ognGateway
|
||||
from ogn.gateway.client import create_aprs_login, ognGateway
|
||||
from ogn.gateway.settings import APRS_APP_NAME, APRS_APP_VER
|
||||
|
||||
|
||||
class GatewayTest(unittest.TestCase):
|
||||
def test_create_aprs_login(self):
|
||||
basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1')
|
||||
self.assertEqual('user klaus pass -1 vers myApp 0.1\n', basic_login)
|
||||
|
||||
login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100')
|
||||
self.assertEqual('user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n', login_with_filter)
|
||||
|
||||
def test_initialisation(self):
|
||||
self.gw = ognGateway(aprs_user='testuser', aprs_filter='')
|
||||
self.assertEqual(self.gw.aprs_user, 'testuser')
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.aprs_utils import ms2fpm
|
||||
from ogn.model import Beacon, AircraftBeacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
with self.assertRaises(OgnParseError):
|
||||
aircraft_beacon.parse("notAValidToken")
|
||||
|
||||
def test_basic(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
|
||||
self.assertFalse(aircraft_beacon.stealth)
|
||||
self.assertEqual(aircraft_beacon.address, "DDA5BA")
|
||||
self.assertAlmostEqual(aircraft_beacon.climb_rate * ms2fpm, -454, 2)
|
||||
self.assertEqual(aircraft_beacon.turn_rate, -1.1)
|
||||
self.assertEqual(aircraft_beacon.signal_strength, 8.8)
|
||||
self.assertEqual(aircraft_beacon.error_count, 0)
|
||||
self.assertEqual(aircraft_beacon.frequency_offset, 51.2)
|
||||
self.assertEqual(aircraft_beacon.gps_status, '4x5')
|
||||
|
||||
self.assertEqual(len(aircraft_beacon.heared_aircraft_addresses), 3)
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[0], '1084')
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[1], 'B597')
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_addresses[2], 'B598')
|
||||
|
||||
def test_stealth(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("id0ADD1234")
|
||||
self.assertFalse(aircraft_beacon.stealth)
|
||||
|
||||
aircraft_beacon.parse("id8ADD1234")
|
||||
self.assertTrue(aircraft_beacon.stealth)
|
||||
|
||||
def test_v024(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("!W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
|
||||
|
||||
self.assertEqual(aircraft_beacon.latitude, 2 / 1000 / 60)
|
||||
self.assertEqual(aircraft_beacon.longitude, 6 / 1000 / 60)
|
||||
self.assertEqual(aircraft_beacon.software_version, 6.02)
|
||||
self.assertEqual(aircraft_beacon.hardware_version, 44)
|
||||
self.assertEqual(aircraft_beacon.real_address, "DF0C56")
|
||||
|
||||
def test_v024_ogn_tracker(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("!W34! id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
|
||||
self.assertEqual(aircraft_beacon.flightlevel, 4.43)
|
||||
|
||||
def test_copy_constructor(self):
|
||||
beacon = Beacon()
|
||||
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5",
|
||||
reference_date=datetime(2015, 1, 1, 16, 8, 29))
|
||||
aircraft_beacon = AircraftBeacon(beacon)
|
||||
|
||||
self.assertEqual(aircraft_beacon.name, 'FLRDDA5BA')
|
||||
self.assertEqual(aircraft_beacon.address, 'DDA5BA')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -1,35 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.aprs_utils import dmsToDeg, kts2kmh, m2feet
|
||||
from ogn.model import Beacon
|
||||
from ogn.exceptions import AprsParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
beacon = Beacon()
|
||||
with self.assertRaises(AprsParseError):
|
||||
beacon.parse("notAValidString")
|
||||
|
||||
def test_basic(self):
|
||||
beacon = Beacon()
|
||||
|
||||
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
|
||||
reference_date=datetime(2015, 1, 1, 16, 8, 29))
|
||||
self.assertEqual(beacon.name, "FLRDDA5BA")
|
||||
self.assertEqual(beacon.receiver_name, "LFMX")
|
||||
self.assertEqual(beacon.timestamp.strftime('%H:%M:%S'), "16:08:29")
|
||||
self.assertAlmostEqual(beacon.latitude, dmsToDeg(44.1541), 5)
|
||||
self.assertEqual(beacon.symboltable, '/')
|
||||
self.assertAlmostEqual(beacon.longitude, dmsToDeg(6.0003), 5)
|
||||
self.assertEqual(beacon.symbolcode, '\'')
|
||||
self.assertEqual(beacon.track, 342)
|
||||
self.assertEqual(beacon.ground_speed, 49 * kts2kmh)
|
||||
self.assertAlmostEqual(beacon.altitude * m2feet, 5524, 5)
|
||||
self.assertEqual(beacon.comment, "this is a comment")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -1,38 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from ogn.model import ReceiverBeacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
with self.assertRaises(OgnParseError):
|
||||
receiver_beacon.parse("notAValidToken")
|
||||
|
||||
def test_v022(self):
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
|
||||
receiver_beacon.parse("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
self.assertEqual(receiver_beacon.version, '0.2.2')
|
||||
self.assertEqual(receiver_beacon.platform, 'x86')
|
||||
self.assertEqual(receiver_beacon.cpu_load, 0.5)
|
||||
self.assertEqual(receiver_beacon.cpu_temp, 52.0)
|
||||
self.assertEqual(receiver_beacon.free_ram, 669.9)
|
||||
self.assertEqual(receiver_beacon.total_ram, 887.7)
|
||||
self.assertEqual(receiver_beacon.ntp_error, 1.0)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction, 0.0)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, 0.0)
|
||||
self.assertEqual(receiver_beacon.rec_input_noise, 0.06)
|
||||
|
||||
def test_v021(self):
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
|
||||
receiver_beacon.parse("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction, 26)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, -1.4)
|
||||
self.assertEqual(receiver_beacon.rec_input_noise, -0.25)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,49 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.utils import ms2fpm
|
||||
from ogn.parser.parse import parse_ogn_aircraft_beacon
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_invalid_token(self):
|
||||
self.assertEqual(parse_ogn_aircraft_beacon("notAValidToken"), None)
|
||||
|
||||
def test_basic(self):
|
||||
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
|
||||
self.assertFalse(aircraft_beacon['stealth'])
|
||||
self.assertEqual(aircraft_beacon['address'], "DDA5BA")
|
||||
self.assertAlmostEqual(aircraft_beacon['climb_rate'] * ms2fpm, -454, 2)
|
||||
self.assertEqual(aircraft_beacon['turn_rate'], -1.1)
|
||||
self.assertEqual(aircraft_beacon['signal_strength'], 8.8)
|
||||
self.assertEqual(aircraft_beacon['error_count'], 0)
|
||||
self.assertEqual(aircraft_beacon['frequency_offset'], 51.2)
|
||||
self.assertEqual(aircraft_beacon['gps_status'], '4x5')
|
||||
|
||||
# self.assertEqual(len(aircraft_beacon['heared_aircraft_addresses']), 3)
|
||||
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][0], '1084')
|
||||
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][1], 'B597')
|
||||
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][2], 'B598')
|
||||
|
||||
def test_stealth(self):
|
||||
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
self.assertFalse(aircraft_beacon['stealth'])
|
||||
|
||||
aircraft_beacon = parse_ogn_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
self.assertTrue(aircraft_beacon['stealth'])
|
||||
|
||||
def test_v024(self):
|
||||
aircraft_beacon = parse_ogn_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
|
||||
|
||||
self.assertEqual(aircraft_beacon['software_version'], 6.02)
|
||||
self.assertEqual(aircraft_beacon['hardware_version'], 44)
|
||||
self.assertEqual(aircraft_beacon['real_address'], "DF0C56")
|
||||
|
||||
def test_v024_ogn_tracker(self):
|
||||
aircraft_beacon = parse_ogn_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
|
||||
self.assertEqual(aircraft_beacon['flightlevel'], 4.43)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,39 @@
|
|||
import unittest
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.parser.utils import dmsToDeg, kts2kmh, m2feet
|
||||
from ogn.parser.parse import parse_aprs
|
||||
from ogn.parser.exceptions import AprsParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("notAValidString")
|
||||
|
||||
def test_basic(self):
|
||||
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
|
||||
reference_date=datetime(2015, 1, 1, 16, 8, 29))
|
||||
self.assertEqual(message['name'], "FLRDDA5BA")
|
||||
self.assertEqual(message['receiver_name'], "LFMX")
|
||||
self.assertEqual(message['timestamp'].strftime('%H:%M:%S'), "16:08:29")
|
||||
self.assertAlmostEqual(message['latitude'], dmsToDeg(44.1541), 5)
|
||||
self.assertEqual(message['symboltable'], '/')
|
||||
self.assertAlmostEqual(message['longitude'], dmsToDeg(6.0003), 5)
|
||||
self.assertEqual(message['symbolcode'], '\'')
|
||||
self.assertEqual(message['track'], 342)
|
||||
self.assertEqual(message['ground_speed'], 49 * kts2kmh)
|
||||
self.assertAlmostEqual(message['altitude'] * m2feet, 5524, 5)
|
||||
self.assertEqual(message['comment'], "this is a comment")
|
||||
|
||||
def test_v024(self):
|
||||
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
|
||||
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 16, 8, 29))
|
||||
|
||||
self.assertAlmostEqual(message['latitude'] - dmsToDeg(44.1541), 2 / 1000 / 60, 10)
|
||||
self.assertAlmostEqual(message['longitude'] - dmsToDeg(6.0003), 6 / 1000 / 60, 10)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,67 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
|
||||
from ogn.parser.parse import parse_aprs, parse_ogn_beacon
|
||||
from ogn.parser.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_valid_beacons(self):
|
||||
with open('tests/valid_beacons.txt') as f:
|
||||
for line in f:
|
||||
if not line[0] == '#':
|
||||
aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0))
|
||||
parse_ogn_beacon(aprs['comment'])
|
||||
|
||||
def test_fail_none(self):
|
||||
with self.assertRaises(TypeError):
|
||||
parse_aprs(None)
|
||||
|
||||
def test_fail_empty(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("")
|
||||
|
||||
def test_fail_bad_string(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
def test_incomplete_device_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
aprs = parse_aprs("ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -395",
|
||||
datetime(2015, 4, 10, 7, 24))
|
||||
parse_ogn_beacon(aprs['comment'])
|
||||
|
||||
def test_incomplete_receiver_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
aprs = parse_aprs("Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21",
|
||||
datetime(2015, 4, 10, 16, 54))
|
||||
parse_ogn_beacon(aprs['comment'])
|
||||
|
||||
@mock.patch('ogn.parser.parse.createTimestamp')
|
||||
def test_default_reference_date(self, createTimestamp_mock):
|
||||
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21"
|
||||
|
||||
parse_aprs(valid_aprs_string)
|
||||
call_args_before = createTimestamp_mock.call_args
|
||||
|
||||
sleep(1)
|
||||
|
||||
parse_aprs(valid_aprs_string)
|
||||
call_args_seconds_later = createTimestamp_mock.call_args
|
||||
|
||||
self.assertNotEqual(call_args_before, call_args_seconds_later)
|
||||
|
||||
def test_copy_constructor(self):
|
||||
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
|
||||
aprs = parse_aprs(valid_aprs_string, reference_date=datetime(2015, 1, 1, 16, 8, 29))
|
||||
aircraft_beacon = parse_ogn_beacon(aprs['comment'])
|
||||
|
||||
self.assertEqual(aprs['name'], 'FLRDDA5BA')
|
||||
self.assertEqual(aircraft_beacon['address'], 'DDA5BA')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,31 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.parse import parse_ogn_receiver_beacon
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
self.assertEqual(parse_ogn_receiver_beacon("notAValidToken"), None)
|
||||
|
||||
def test_v022(self):
|
||||
receiver_beacon = parse_ogn_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
self.assertEqual(receiver_beacon['version'], '0.2.2')
|
||||
self.assertEqual(receiver_beacon['platform'], 'x86')
|
||||
self.assertEqual(receiver_beacon['cpu_load'], 0.5)
|
||||
self.assertEqual(receiver_beacon['cpu_temp'], 52.0)
|
||||
self.assertEqual(receiver_beacon['free_ram'], 669.9)
|
||||
self.assertEqual(receiver_beacon['total_ram'], 887.7)
|
||||
self.assertEqual(receiver_beacon['ntp_error'], 1.0)
|
||||
self.assertEqual(receiver_beacon['rec_crystal_correction'], 0.0)
|
||||
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], 0.0)
|
||||
self.assertEqual(receiver_beacon['rec_input_noise'], 0.06)
|
||||
|
||||
def test_v021(self):
|
||||
receiver_beacon = parse_ogn_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
self.assertEqual(receiver_beacon['rec_crystal_correction'], 26)
|
||||
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], -1.4)
|
||||
self.assertEqual(receiver_beacon['rec_input_noise'], -0.25)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -1,8 +1,8 @@
|
|||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.aprs_utils import dmsToDeg, createTimestamp, create_aprs_login
|
||||
from ogn.exceptions import AmbigousTimeError
|
||||
from ogn.parser.utils import dmsToDeg, createTimestamp
|
||||
from ogn.parser.exceptions import AmbigousTimeError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
|
@ -22,13 +22,6 @@ class TestStringMethods(unittest.TestCase):
|
|||
with self.assertRaises(AmbigousTimeError):
|
||||
createTimestamp('123456', reference=datetime(2015, 10, 15, 23, 59, 59))
|
||||
|
||||
def test_create_aprs_login(self):
|
||||
basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1')
|
||||
self.assertEqual('user klaus pass -1 vers myApp 0.1\n', basic_login)
|
||||
|
||||
login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100')
|
||||
self.assertEqual('user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n', login_with_filter)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -1,54 +0,0 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_valid_beacons(self):
|
||||
with open('tests/valid_beacons.txt') as f:
|
||||
for line in f:
|
||||
parse_aprs(line, datetime(2015, 4, 10, 17, 0))
|
||||
|
||||
def test_fail_none(self):
|
||||
with self.assertRaises(TypeError):
|
||||
parse_aprs(None)
|
||||
|
||||
def test_fail_empty(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("")
|
||||
|
||||
def test_fail_bad_string(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
def test_incomplete_device_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -395",
|
||||
datetime(2015, 4, 10, 7, 24))
|
||||
|
||||
def test_incomplete_receiver_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21",
|
||||
datetime(2015, 4, 10, 16, 54))
|
||||
|
||||
@mock.patch('ogn.aprs_parser.Beacon')
|
||||
def test_default_reference_date(self, beacon_mock):
|
||||
instance = beacon_mock.return_value
|
||||
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21"
|
||||
|
||||
parse_aprs(valid_aprs_string)
|
||||
call_args = instance.parse.call_args
|
||||
sleep(1)
|
||||
parse_aprs(valid_aprs_string)
|
||||
call_args_one_second_later = instance.parse.call_args
|
||||
|
||||
self.assertNotEqual(call_args, call_args_one_second_later)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue