Merge pull request #49 from kerel-fs/use-ogn-client

Use python-ogn-client
pull/55/head
Meisterschueler 2016-03-19 09:38:44 +01:00
commit 28555e43e7
21 zmienionych plików z 21 dodań i 635 usunięć

Wyświetl plik

@ -7,60 +7,16 @@
[![PyPi Version](https://img.shields.io/pypi/v/ogn-python.svg)]
(https://pypi.python.org/pypi/ogn-python)
A python module for the [Open Glider Network](http://wiki.glidernet.org/).
The submodule 'ogn.gateway' is an aprs client which could be invoked via a CLI
or used by other python projects.
The CLI allows to save all received beacons into a database with [SQLAlchemy](http://www.sqlalchemy.org/).
The [sqlite](https://www.sqlite.org/)-backend is sufficient for simple testing,
but some tasks (e.g. logbook generation) require a proper backend like [postgresql](http://www.postgresql.org/).
An external python project would instantiate ogn.gateway and register a custom callback,
called each time a beacon is received.
A database backend for the [Open Glider Network](http://wiki.glidernet.org/).
The ogn-python module saves all received beacons into a database with [SQLAlchemy](http://www.sqlalchemy.org/).
It connects to the OGN aprs servers with [python-ogn-client](https://github.com/glidernet/python-ogn-client).
For simple tests a [sqlite](https://www.sqlite.org/)-backend is sufficient,
but some tasks (e.g. logbook generation) require a proper database backend like [postgresql](http://www.postgresql.org/).
[Examples](https://github.com/glidernet/ogn-python/wiki/Examples)
## Usage - python module
Implement your own gateway by using ogn.gateway with a custom callback function.
Each time a beacon is received, this function gets called and
lets you process the incoming data.
Example:
```python
#!/usr/bin/env python3
from ogn.gateway.client import ognGateway
from ogn.parser.parse import parse_aprs, parse_ogn_beacon
from ogn.parser.exceptions import ParseError
def process_beacon(raw_message):
if raw_message[0] == '#':
print('Server Status: {}'.format(raw_message))
return
try:
message = parse_aprs(raw_message)
message.update(parse_ogn_beacon(message['comment']))
print('Received {beacon_type} from {name}'.format(**message))
except ParseError as e:
print('Error, {}'.format(e.message))
if __name__ == '__main__':
gateway = ognGateway(aprs_user='N0CALL')
gateway.connect()
try:
gateway.run(callback=process_beacon, autoreconnect=True)
except KeyboardInterrupt:
print('\nStop ogn gateway')
gateway.disconnect()
```
## Usage - CLI
### Installation and Setup
## Installation and Setup
1. Checkout the repository
```
@ -85,6 +41,7 @@ if __name__ == '__main__':
./manage.py db.init
```
## Usage
### Running the aprs client and task server
To schedule tasks like takeoff/landing-detection (`logbook.compute`),
[Celery](http://www.celeryproject.org/) with [Redis](http://www.redis.io/) is used.

Wyświetl plik

Wyświetl plik

@ -1,76 +0,0 @@
import socket
import logging
from time import time
from ogn.gateway import settings
def create_aprs_login(user_name, pass_code, app_name, app_version, aprs_filter=None):
if not aprs_filter:
return "user {} pass {} vers {} {}\n".format(user_name, pass_code, app_name, app_version)
else:
return "user {} pass {} vers {} {} filter {}\n".format(user_name, pass_code, app_name, app_version, aprs_filter)
class ognGateway:
def __init__(self, aprs_user, aprs_filter=''):
self.logger = logging.getLogger(__name__)
self.logger.info("Connect to OGN as {} with filter '{}'".format(aprs_user, (aprs_filter if aprs_filter else 'full-feed')))
self.aprs_user = aprs_user
self.aprs_filter = aprs_filter
def connect(self):
# create socket, connect to server, login and make a file object associated with the socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if self.aprs_filter:
port = settings.APRS_SERVER_PORT_CLIENT_DEFINED_FILTERS
else:
port = settings.APRS_SERVER_PORT_FULL_FEED
self.sock.connect((settings.APRS_SERVER_HOST, port))
self.logger.debug('Server port {}'.format(port))
login = create_aprs_login(self.aprs_user, -1, settings.APRS_APP_NAME, settings.APRS_APP_VER, self.aprs_filter)
self.sock.send(login.encode())
self.sock_file = self.sock.makefile('rw')
def disconnect(self):
self.logger.info('Disconnect')
try:
# close everything
self.sock.shutdown(0)
self.sock.close()
except OSError:
self.logger.error('Socket close error', exc_info=True)
def run(self, callback, autoreconnect=False):
while True:
try:
keepalive_time = time()
while True:
if time() - keepalive_time > settings.APRS_KEEPALIVE_TIME:
self.logger.info('Send keepalive')
self.sock.send('#keepalive'.encode())
keepalive_time = time()
# Read packet string from socket
packet_str = self.sock_file.readline().strip()
# A zero length line should not be return if keepalives are being sent
# A zero length line will only be returned after ~30m if keepalives are not sent
if len(packet_str) == 0:
self.logger.warning('Read returns zero length string. Failure. Orderly closeout')
break
callback(packet_str)
except BrokenPipeError:
self.logger.error('BrokenPipeError', exc_info=True)
except socket.error:
self.logger.error('socket.error', exc_info=True)
if autoreconnect:
self.connect()
else:
return

Wyświetl plik

@ -1,6 +1,6 @@
import logging
from ogn.gateway.client import ognGateway
from ogn.client import AprsClient
from ogn.gateway.process import process_beacon
from manager import Manager
@ -30,13 +30,13 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers)
print('Start ogn gateway')
gateway = ognGateway(aprs_user)
gateway.connect()
client = AprsClient(aprs_user)
client.connect()
try:
gateway.run(callback=process_beacon, autoreconnect=True)
client.run(callback=process_beacon, autoreconnect=True)
except KeyboardInterrupt:
print('\nStop ogn gateway')
gateway.disconnect()
client.disconnect()
logging.shutdown()

Wyświetl plik

@ -1,8 +1,7 @@
import logging
from ogn.commands.dbutils import session
from ogn.model import AircraftBeacon, ReceiverBeacon
from ogn.parser.parse import parse_aprs, parse_ogn_receiver_beacon, parse_ogn_aircraft_beacon
from ogn.parser.exceptions import AprsParseError, OgnParseError, AmbigousTimeError
from ogn.parser import parse_aprs, parse_ogn_receiver_beacon, parse_ogn_aircraft_beacon, ParseError
logger = logging.getLogger(__name__)
@ -33,6 +32,6 @@ def process_beacon(raw_message):
session.add(beacon)
session.commit()
logger.debug('Received message: {}'.format(raw_message))
except (AprsParseError, OgnParseError, AmbigousTimeError) as e:
except ParseError as e:
logger.error('Received message: {}'.format(raw_message))
logger.error('Drop packet, {}'.format(e.message))

Wyświetl plik

@ -1,10 +0,0 @@
APRS_SERVER_HOST = 'aprs.glidernet.org'
APRS_SERVER_PORT_FULL_FEED = 10152
APRS_SERVER_PORT_CLIENT_DEFINED_FILTERS = 14580
APRS_APP_NAME = 'ogn-gateway-python'
PACKAGE_VERSION = '0.2.1'
APRS_APP_VER = PACKAGE_VERSION[:3]
APRS_KEEPALIVE_TIME = 240

Wyświetl plik

@ -1,37 +0,0 @@
"""
exception definitions
"""
from datetime import datetime
class ParseError(Exception):
pass
class AprsParseError(ParseError):
"""Parse error while parsing an aprs packet."""
def __init__(self, aprs_string):
self.aprs_string = aprs_string
self.message = "This is not a valid APRS packet: {}".format(aprs_string)
super(AprsParseError, self).__init__(self.message)
class OgnParseError(ParseError):
"""Parse error while parsing an ogn message from aprs comment."""
def __init__(self, aprs_comment):
self.aprs_comment = aprs_comment
self.message = "This is not a valid OGN message: {}".format(aprs_comment)
super(OgnParseError, self).__init__(self.message)
class AmbigousTimeError(ParseError):
"""Timstamp from the past/future, can't fully reconstruct datetime from timestamp."""
def __init__(self, reference, packet_time):
self.reference = reference
self.packet_time = packet_time
self.timedelta = reference - datetime.combine(reference, packet_time)
self.message = "Can't reconstruct timstamp, {:.0f}s from past.".format(self.timedelta.total_seconds())
super(AmbigousTimeError, self).__init__(self.message)

Wyświetl plik

@ -1,84 +0,0 @@
import re
from datetime import datetime
from ogn.parser.utils import createTimestamp, dmsToDeg, kts2kmh, feet2m, fpm2ms
from ogn.parser.pattern import PATTERN_APRS, PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
from ogn.parser.exceptions import AprsParseError, OgnParseError
def parse_aprs(message, reference_date=None):
if reference_date is None:
reference_date = datetime.utcnow()
match = re.search(PATTERN_APRS, message)
if match:
return {'name': match.group('callsign'),
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match.group('time'), reference_date),
'latitude': dmsToDeg(float(match.group('latitude')) / 100) *
(-1 if match.group('latitude_sign') == 'S' else 1) +
(int(match.group('latitude_enhancement')) / 1000 / 60 if match.group('latitude_enhancement') else 0),
'symboltable': match.group('symbol_table'),
'longitude': dmsToDeg(float(match.group('longitude')) / 100) *
(-1 if match.group('longitude_sign') == 'W' else 1) +
(int(match.group('longitude_enhancement')) / 1000 / 60 if match.group('longitude_enhancement') else 0),
'symbolcode': match.group('symbol'),
'track': int(match.group('course')) if match.group('course_extension') else 0,
'ground_speed': int(match.group('ground_speed')) * kts2kmh if match.group('ground_speed') else 0,
'altitude': int(match.group('altitude')) * feet2m,
'comment': match.group('comment')}
raise AprsParseError(message)
def parse_ogn_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms,
'turn_rate': float(ac_match.group('turn_rate')),
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_strength': float(ac_match.group('signal')),
'error_count': float(ac_match.group('errors')),
'frequency_offset': float(ac_match.group('frequency_offset')),
'gps_status': ac_match.group('gps_accuracy'),
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id')}
else:
return None
def parse_ogn_receiver_beacon(aprs_comment):
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
if rec_match:
return {'version': rec_match.group('version'),
'platform': rec_match.group('platform'),
'cpu_load': float(rec_match.group('cpu_load')),
'free_ram': float(rec_match.group('ram_free')),
'total_ram': float(rec_match.group('ram_total')),
'ntp_error': float(rec_match.group('ntp_offset')),
'rt_crystal_correction': float(rec_match.group('ntp_correction')),
'cpu_temp': float(rec_match.group('cpu_temperature')) if rec_match.group('cpu_temperature') else None,
'rec_crystal_correction': int(rec_match.group('manual_correction')) if rec_match.group('manual_correction') else 0,
'rec_crystal_correction_fine': float(rec_match.group('automatic_correction')) if rec_match.group('automatic_correction') else 0.0,
'rec_input_noise': float(rec_match.group('input_noise')) if rec_match.group('input_noise') else None}
else:
return None
def parse_ogn_beacon(aprs_comment):
ac_data = parse_ogn_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = parse_ogn_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
raise OgnParseError(aprs_comment)

Wyświetl plik

@ -1,63 +0,0 @@
import re
PATTERN_APRS = re.compile(r"^(?P<callsign>.+?)>APRS,.+,(?P<receiver>.+?):/(?P<time>\d{6})+h(?P<latitude>\d{4}\.\d{2})(?P<latitude_sign>N|S)(?P<symbol_table>.)(?P<longitude>\d{5}\.\d{2})(?P<longitude_sign>E|W)(?P<symbol>.)(?P<course_extension>(?P<course>\d{3})/(?P<ground_speed>\d{3}))?/A=(?P<altitude>\d{6})(?P<pos_extension>\s!W((?P<latitude_enhancement>\d)(?P<longitude_enhancement>\d))!)?\s(?P<comment>.*)$")
# The following regexp patterns are part of the ruby ogn-client.
# source: https://github.com/svoop/ogn_client-ruby
# The MIT License (MIT)
#
# Copyright (c) 2015 Sven Schwyn
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
PATTERN_RECEIVER_BEACON = re.compile(r"""
(?:
v(?P<version>\d+\.\d+\.\d+)
\.?(?P<platform>.+?)?
\s)?
CPU:(?P<cpu_load>[\d.]+)\s
RAM:(?P<ram_free>[\d.]+)\/(?P<ram_total>[\d.]+)MB\s
NTP:(?P<ntp_offset>[\d.]+)ms\/(?P<ntp_correction>[+-][\d.]+)ppm\s?
(?:(?P<cpu_temperature>[+-][\d.]+)C\s*)?
(?:RF:
(?:
(?P<manual_correction>[+-][\d]+)
(?P<automatic_correction>[+-][\d.]+)ppm\/
)?
(?P<input_noise>[+-][\d.]+)dB
)?
""", re.VERBOSE | re.MULTILINE)
PATTERN_AIRCRAFT_BEACON = re.compile(r"""
id(?P<details>\w{2})(?P<id>\w+?)\s
(?P<climb_rate>[+-]\d+?)fpm\s
(?P<turn_rate>[+-][\d.]+?)rot\s
(?:FL(?P<flight_level>[\d.]+)\s)?
(?P<signal>[\d.]+?)dB\s
(?P<errors>\d+)e\s
(?P<frequency_offset>[+-][\d.]+?)kHz\s?
(?:gps(?P<gps_accuracy>\d+x\d+)\s?)?
(?:s(?P<flarm_software_version>[\d.]+)\s?)?
(?:h(?P<flarm_hardware_version>[\dA-F]{2})\s?)?
(?:r(?P<flarm_id>[\dA-F]+)\s?)?
(?:hear(?P<proximity>.+))?
""", re.VERBOSE | re.MULTILINE)

Wyświetl plik

@ -1,35 +0,0 @@
import math
from datetime import datetime, timedelta
from ogn.parser.exceptions import AmbigousTimeError
kmh2kts = 0.539957
feet2m = 0.3048
ms2fpm = 196.85
kts2kmh = 1 / kmh2kts
m2feet = 1 / feet2m
fpm2ms = 1 / ms2fpm
def dmsToDeg(dms):
absDms = abs(dms)
d = math.floor(absDms)
m = (absDms - d) * 100 / 60
return d + m
def createTimestamp(hhmmss, reference):
packet_time = datetime.strptime(hhmmss, '%H%M%S').time()
timestamp = datetime.combine(reference, packet_time)
if reference.hour == 23 and timestamp.hour == 0:
timestamp = timestamp + timedelta(days=1)
elif reference.hour == 0 and timestamp.hour == 23:
timestamp = timestamp - timedelta(days=1)
if reference - timestamp > timedelta(hours=1):
raise AmbigousTimeError(reference, packet_time)
return timestamp

Wyświetl plik

@ -3,8 +3,6 @@
from os import path
from setuptools import setup, find_packages
from ogn.gateway.settings import PACKAGE_VERSION
here = path.abspath(path.dirname(__file__))
@ -14,8 +12,8 @@ with open(path.join(here, 'README.md'), encoding='utf-8') as f:
setup(
name='ogn-python',
version=PACKAGE_VERSION,
description='A python module for the Open Glider Network',
version='0.2.1',
description='A database backend for the Open Glider Network',
long_description=long_description,
url='https://github.com/glidernet/ogn-python',
author='Konstantin Gründger aka Meisterschueler, Fabian P. Schmidt aka kerel',
@ -38,7 +36,8 @@ setup(
'geopy==1.11.0',
'manage.py==0.2.10',
'celery[redis]>=3.1,<3.2',
'alembic==0.8.3'
'alembic==0.8.3',
'ogn-client==0.3.0'
],
extras_require={
'dev': [

Wyświetl plik

@ -6,9 +6,9 @@ from ogn.gateway.manage import run
class GatewayManagerTest(unittest.TestCase):
# try simple user interrupt
@mock.patch('ogn.gateway.manage.ognGateway')
def test_run_user_interruption(self, mock_gateway):
instance = mock_gateway.return_value
@mock.patch('ogn.gateway.manage.AprsClient')
def test_run_user_interruption(self, mock_aprs_client):
instance = mock_aprs_client.return_value
instance.run.side_effect = KeyboardInterrupt()
run(aprs_user="testuser")

Wyświetl plik

@ -1,34 +0,0 @@
import unittest
import unittest.mock as mock
from ogn.gateway.client import create_aprs_login, ognGateway
from ogn.gateway.settings import APRS_APP_NAME, APRS_APP_VER
class GatewayTest(unittest.TestCase):
def test_create_aprs_login(self):
basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1')
self.assertEqual('user klaus pass -1 vers myApp 0.1\n', basic_login)
login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100')
self.assertEqual('user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n', login_with_filter)
def test_initialisation(self):
self.gw = ognGateway(aprs_user='testuser', aprs_filter='')
self.assertEqual(self.gw.aprs_user, 'testuser')
self.assertEqual(self.gw.aprs_filter, '')
@mock.patch('ogn.gateway.client.socket')
def test_connect(self, mock_socket):
self.gw = ognGateway(aprs_user='testuser', aprs_filter='')
self.gw.connect()
self.gw.sock.send.assert_called_once_with('user testuser pass -1 vers {} {}\n'.format(APRS_APP_NAME, APRS_APP_VER).encode('ascii'))
self.gw.sock.makefile.asser_called_once_with('rw')
@mock.patch('ogn.gateway.client.socket')
def test_disconnect(self, mock_socket):
self.gw = ognGateway(aprs_user='testuser', aprs_filter='')
self.gw.connect()
self.gw.disconnect()
self.gw.sock.shutdown.assert_called_once_with(0)
self.gw.sock.close.assert_called_once_with()

Wyświetl plik

@ -1,49 +0,0 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse import parse_ogn_aircraft_beacon
class TestStringMethods(unittest.TestCase):
def test_invalid_token(self):
self.assertEqual(parse_ogn_aircraft_beacon("notAValidToken"), None)
def test_basic(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(aircraft_beacon['stealth'])
self.assertEqual(aircraft_beacon['address'], "DDA5BA")
self.assertAlmostEqual(aircraft_beacon['climb_rate'] * ms2fpm, -454, 2)
self.assertEqual(aircraft_beacon['turn_rate'], -1.1)
self.assertEqual(aircraft_beacon['signal_strength'], 8.8)
self.assertEqual(aircraft_beacon['error_count'], 0)
self.assertEqual(aircraft_beacon['frequency_offset'], 51.2)
self.assertEqual(aircraft_beacon['gps_status'], '4x5')
# self.assertEqual(len(aircraft_beacon['heared_aircraft_addresses']), 3)
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][0], '1084')
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][1], 'B597')
# self.assertEqual(aircraft_beacon['heared_aircraft_addresses'][2], 'B598')
def test_stealth(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(aircraft_beacon['stealth'])
aircraft_beacon = parse_ogn_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertTrue(aircraft_beacon['stealth'])
def test_v024(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
self.assertEqual(aircraft_beacon['software_version'], 6.02)
self.assertEqual(aircraft_beacon['hardware_version'], 10)
self.assertEqual(aircraft_beacon['real_address'], "DF0C56")
def test_v024_ogn_tracker(self):
aircraft_beacon = parse_ogn_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(aircraft_beacon['flightlevel'], 4.43)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,39 +0,0 @@
import unittest
from datetime import datetime
from ogn.parser.utils import dmsToDeg, kts2kmh, m2feet
from ogn.parser.parse import parse_aprs
from ogn.parser.exceptions import AprsParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
with self.assertRaises(AprsParseError):
parse_aprs("notAValidString")
def test_basic(self):
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertEqual(message['name'], "FLRDDA5BA")
self.assertEqual(message['receiver_name'], "LFMX")
self.assertEqual(message['timestamp'].strftime('%H:%M:%S'), "16:08:29")
self.assertAlmostEqual(message['latitude'], dmsToDeg(44.1541), 5)
self.assertEqual(message['symboltable'], '/')
self.assertAlmostEqual(message['longitude'], dmsToDeg(6.0003), 5)
self.assertEqual(message['symbolcode'], '\'')
self.assertEqual(message['track'], 342)
self.assertEqual(message['ground_speed'], 49 * kts2kmh)
self.assertAlmostEqual(message['altitude'] * m2feet, 5524, 5)
self.assertEqual(message['comment'], "this is a comment")
def test_v024(self):
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 16, 8, 29))
self.assertAlmostEqual(message['latitude'] - dmsToDeg(44.1541), 2 / 1000 / 60, 10)
self.assertAlmostEqual(message['longitude'] - dmsToDeg(6.0003), 6 / 1000 / 60, 10)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,67 +0,0 @@
import unittest
import unittest.mock as mock
from datetime import datetime
from time import sleep
from ogn.parser.parse import parse_aprs, parse_ogn_beacon
from ogn.parser.exceptions import AprsParseError, OgnParseError
class TestStringMethods(unittest.TestCase):
def test_valid_beacons(self):
with open('tests/valid_beacons.txt') as f:
for line in f:
if not line[0] == '#':
aprs = parse_aprs(line, datetime(2015, 4, 10, 17, 0))
parse_ogn_beacon(aprs['comment'])
def test_fail_none(self):
with self.assertRaises(TypeError):
parse_aprs(None)
def test_fail_empty(self):
with self.assertRaises(AprsParseError):
parse_aprs("")
def test_fail_bad_string(self):
with self.assertRaises(AprsParseError):
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
def test_incomplete_device_string(self):
with self.assertRaises(OgnParseError):
aprs = parse_aprs("ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -395",
datetime(2015, 4, 10, 7, 24))
parse_ogn_beacon(aprs['comment'])
def test_incomplete_receiver_string(self):
with self.assertRaises(OgnParseError):
aprs = parse_aprs("Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21",
datetime(2015, 4, 10, 16, 54))
parse_ogn_beacon(aprs['comment'])
@mock.patch('ogn.parser.parse.createTimestamp')
def test_default_reference_date(self, createTimestamp_mock):
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21"
parse_aprs(valid_aprs_string)
call_args_before = createTimestamp_mock.call_args
sleep(1)
parse_aprs(valid_aprs_string)
call_args_seconds_later = createTimestamp_mock.call_args
self.assertNotEqual(call_args_before, call_args_seconds_later)
def test_copy_constructor(self):
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
aprs = parse_aprs(valid_aprs_string, reference_date=datetime(2015, 1, 1, 16, 8, 29))
aircraft_beacon = parse_ogn_beacon(aprs['comment'])
self.assertEqual(aprs['name'], 'FLRDDA5BA')
self.assertEqual(aircraft_beacon['address'], 'DDA5BA')
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,31 +0,0 @@
import unittest
from ogn.parser.parse import parse_ogn_receiver_beacon
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
self.assertEqual(parse_ogn_receiver_beacon("notAValidToken"), None)
def test_v022(self):
receiver_beacon = parse_ogn_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(receiver_beacon['version'], '0.2.2')
self.assertEqual(receiver_beacon['platform'], 'x86')
self.assertEqual(receiver_beacon['cpu_load'], 0.5)
self.assertEqual(receiver_beacon['cpu_temp'], 52.0)
self.assertEqual(receiver_beacon['free_ram'], 669.9)
self.assertEqual(receiver_beacon['total_ram'], 887.7)
self.assertEqual(receiver_beacon['ntp_error'], 1.0)
self.assertEqual(receiver_beacon['rec_crystal_correction'], 0.0)
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], 0.0)
self.assertEqual(receiver_beacon['rec_input_noise'], 0.06)
def test_v021(self):
receiver_beacon = parse_ogn_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(receiver_beacon['rec_crystal_correction'], 26)
self.assertEqual(receiver_beacon['rec_crystal_correction_fine'], -1.4)
self.assertEqual(receiver_beacon['rec_input_noise'], -0.25)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,27 +0,0 @@
import unittest
from datetime import datetime
from ogn.parser.utils import dmsToDeg, createTimestamp
from ogn.parser.exceptions import AmbigousTimeError
class TestStringMethods(unittest.TestCase):
def test_dmsToDeg(self):
dms = 50.4830
self.assertAlmostEqual(dmsToDeg(dms), 50.805, 5)
def test_createTimestamp_seconds_behind(self):
timestamp = createTimestamp('235959', reference=datetime(2015, 10, 16, 0, 0, 1))
self.assertEqual(timestamp, datetime(2015, 10, 15, 23, 59, 59))
def test_createTimestamp_seconds_before(self):
timestamp = createTimestamp('000001', reference=datetime(2015, 10, 15, 23, 59, 59))
self.assertEqual(timestamp, datetime(2015, 10, 16, 0, 0, 1))
def test_createTimestamp_big_difference(self):
with self.assertRaises(AmbigousTimeError):
createTimestamp('123456', reference=datetime(2015, 10, 15, 23, 59, 59))
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,17 +0,0 @@
# aprsc 2.0.14-g28c5a6a 10 Apr 2015 18:58:47 GMT GLIDERN1 37.187.40.234:14580
FLRDDA5BA>APRS,qAS,LFMX:/165829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5
ICA4B0E3A>APRS,qAS,Letzi:/165319h4711.75N\00802.59E^327/149/A=006498 id154B0E3A -3959fpm +0.5rot 9.0dB 0e -6.3kHz gps1x3
Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB
LFGU>APRS,TCPIP*,qAC,GLIDERN2:/165556h4907.63NI00706.41E&/A=000833 v0.2.0 CPU:0.9 RAM:281.3/458.9MB NTP:0.5ms/-19.1ppm +53.0C RF:+0.70dB
FLRDDB091>APRS,qAS,Letzi:/165831h4740.04N/00806.01EX152/124/A=004881 id06DD8E80 +198fpm +0.0rot 6.5dB 13e +4.0kHz gps3x4
LSGS>APRS,TCPIP*,qAC,GLIDERN1:/165345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB
FLRDDDD33>APRS,qAS,LFNF:/165341h4344.27N/00547.41E'/A=000886 id06DDDD33 +020fpm +0.0rot 20.8dB 0e -14.3kHz gps3x4
FLRDDE026>APRS,qAS,LFNF:/165341h4358.58N/00553.89E'204/055/A=005048 id06DDE026 +257fpm +0.1rot 7.2dB 0e -0.8kHz gps4x7
ICA484A9C>APRS,qAS,LFMX:/165341h4403.50N/00559.67E'/A=001460 id05484A9C +000fpm +0.0rot 18.0dB 0e +3.5kHz gps4x7
WolvesSW>APRS,TCPIP*,qAC,GLIDERN2:/165343h5232.23NI00210.91W&/A=000377 CPU:1.5 RAM:159.9/458.7MB NTP:6.6ms/-36.7ppm +45.5C RF:+130-0.4ppm/-0.1dB
Oxford>APRS,TCPIP*,qAC,GLIDERN1:/165533h5142.96NI00109.68W&/A=000380 v0.1.3 CPU:0.9 RAM:268.8/458.6MB NTP:0.5ms/-45.9ppm +60.5C RF:+55+2.9ppm/+1.54dB
OGNE95A16>APRS,qAS,Sylwek:/165641h5001.94N/01956.91E'270/004/A=000000 id07E95A16 +000fpm +0.1rot 37.8dB 0e -0.4kHz
Salland>APRS,TCPIP*,qAC,GLIDERN2:/165426h5227.93NI00620.03E&/A=000049 v0.2.2 CPU:0.7 RAM:659.3/916.9MB NTP:2.5ms/-75.0ppm RF:+0.41dB
LSGS>APRS,TCPIP*,qAC,GLIDERN1:/165345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB
Drenstein>APRS,TCPIP*,qAC,GLIDERN1:/165011h5147.51NI00744.45E&/A=000213 v0.2.2 CPU:0.8 RAM:695.7/4025.5MB NTP:16000.0ms/+0.0ppm +63.0C
ZK-GSC>APRS,qAS,Omarama:/165202h4429.25S/16959.33E'/A=001407 id05C821EA +020fpm +0.0rot 16.8dB 0e -3.1kHz gps1x3 hear1084 hearB597 hearB598