kopia lustrzana https://github.com/glidernet/ogn-python
commit
4d163838a9
|
@ -1,11 +1,12 @@
|
|||
from .model import Beacon, AircraftBeacon, ReceiverBeacon
|
||||
from ogn.exceptions import AprsParseError
|
||||
|
||||
|
||||
def parse_aprs(packet):
|
||||
if not isinstance(packet, str):
|
||||
raise TypeError("Expected packet to be str, got %s" % type(packet))
|
||||
elif packet == "":
|
||||
raise AprsParseError(substring=packet, expected_type="non-empty aprs packet")
|
||||
raise AprsParseError("(empty string)")
|
||||
elif packet[0] == "#":
|
||||
return None
|
||||
|
||||
|
|
|
@ -4,9 +4,17 @@ exception definitions
|
|||
|
||||
|
||||
class AprsParseError(Exception):
|
||||
"""Parse error while parsing an aprs packet substring."""
|
||||
def __init__(self, substring, expected_type):
|
||||
self.message = "Aprs Substring can't be parsed as %s." % expected_type
|
||||
"""Parse error while parsing an aprs packet."""
|
||||
def __init__(self, aprs_string):
|
||||
self.message = "This is not a valid APRS string: %s" % aprs_string
|
||||
super(AprsParseError, self).__init__(self.message)
|
||||
self.aprs_string = aprs_string
|
||||
|
||||
|
||||
class OgnParseError(Exception):
|
||||
"""Parse error while parsing an aprs packet substring"""
|
||||
def __init__(self, substring, expected_type):
|
||||
self.message = "For type %s this is not a valid token: %s" % (expected_type, substring)
|
||||
super(OgnParseError, self).__init__(self.message)
|
||||
self.substring = substring
|
||||
self.expected_type = expected_type
|
||||
|
|
|
@ -4,7 +4,7 @@ from time import time
|
|||
from ogn.gateway import settings
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.exceptions import AprsParseError
|
||||
from ogn.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
@ -32,6 +32,12 @@ class ognGateway:
|
|||
self.sock.send(login.encode())
|
||||
self.sock_file = self.sock.makefile('rw')
|
||||
|
||||
def disconnect(self):
|
||||
# close everything
|
||||
print('Close socket')
|
||||
self.sock.shutdown(0)
|
||||
self.sock.close()
|
||||
|
||||
def run(self):
|
||||
keepalive_time = time()
|
||||
while True:
|
||||
|
@ -40,11 +46,7 @@ class ognGateway:
|
|||
keepalive_time = time()
|
||||
|
||||
# Read packet string from socket
|
||||
try:
|
||||
packet_str = self.sock_file.readline().strip()
|
||||
except socket.error:
|
||||
print('Socket error on readline')
|
||||
continue
|
||||
packet_str = self.sock_file.readline().strip()
|
||||
|
||||
# A zero length line should not be return if keepalives are being sent
|
||||
# A zero length line will only be returned after ~30m if keepalives are not sent
|
||||
|
@ -53,17 +55,16 @@ class ognGateway:
|
|||
break
|
||||
|
||||
self.proceed_line(packet_str)
|
||||
# close everything
|
||||
print('Close socket')
|
||||
self.sock.shutdown(0)
|
||||
self.sock.close()
|
||||
|
||||
def proceed_line(self, line):
|
||||
try:
|
||||
beacon = parse_aprs(line)
|
||||
except AprsParseError as e:
|
||||
print(e.message)
|
||||
print("Substring: " % e.substring)
|
||||
return
|
||||
except OgnParseError as e:
|
||||
print(e.message)
|
||||
print("APRS line: %s" % line)
|
||||
return
|
||||
|
||||
if beacon is not None:
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import socket
|
||||
|
||||
from ogn.gateway import ognGateway
|
||||
|
||||
DB_URI = 'sqlite:///beacons.db'
|
||||
|
@ -9,12 +11,27 @@ manager = Manager()
|
|||
@manager.command
|
||||
def run(aprs_user="anon-dev"):
|
||||
"""Run the aprs client."""
|
||||
user_interrupted = False
|
||||
gateway = ognGateway()
|
||||
print("Start OGN gateway")
|
||||
|
||||
print("Connect to DB")
|
||||
gateway.connect_db()
|
||||
gateway.connect(aprs_user)
|
||||
try:
|
||||
gateway.run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
print("\nOGN gateway Exit")
|
||||
|
||||
while user_interrupted is False:
|
||||
print("Connect OGN gateway")
|
||||
gateway.connect(aprs_user)
|
||||
|
||||
try:
|
||||
gateway.run()
|
||||
except KeyboardInterrupt:
|
||||
print("User interrupted")
|
||||
user_interrupted = True
|
||||
except BrokenPipeError:
|
||||
print("BrokenPipeError")
|
||||
except socket.err:
|
||||
print("socket error")
|
||||
|
||||
print("Disconnect OGN gateway")
|
||||
gateway.disconnect()
|
||||
|
||||
print("\nExit OGN gateway")
|
||||
|
|
|
@ -4,6 +4,7 @@ from sqlalchemy import Column, String, Integer, Float, Boolean, SmallInteger
|
|||
|
||||
from ogn.aprs_utils import fpm2ms
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class AircraftBeacon(Beacon):
|
||||
|
@ -127,7 +128,7 @@ class AircraftBeacon(Beacon):
|
|||
elif flightlevel_match is not None:
|
||||
self.flightlevel = float(flightlevel_match.group(1))
|
||||
else:
|
||||
raise Exception("No valid position description: %s" % part)
|
||||
raise OgnParseError(expected_type="AircraftBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
|
||||
|
|
|
@ -32,7 +32,7 @@ class Beacon(AbstractConcreteBase, Base):
|
|||
def parse(self, text, reference_time=None):
|
||||
result = re_pattern_aprs.match(text)
|
||||
if result is None:
|
||||
raise AprsParseError(substring=text, expected_type="Beacon")
|
||||
raise AprsParseError(text)
|
||||
|
||||
self.name = result.group(1)
|
||||
self.receiver_name = result.group(2)
|
||||
|
|
|
@ -3,6 +3,7 @@ import re
|
|||
from sqlalchemy import Column, String
|
||||
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class ReceiverBeacon(Beacon):
|
||||
|
@ -82,7 +83,7 @@ class ReceiverBeacon(Beacon):
|
|||
self.rec_crystal_correction = int(rf_light2_match.group(1))
|
||||
self.rec_crystal_correction_fine = float(rf_light2_match.group(2))
|
||||
else:
|
||||
raise Exception("No valid receiver description: %s" % part)
|
||||
raise OgnParseError(expected_type="ReceiverBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ReceiverBeacon %s: %s>" % (self.name, self.version)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#DEVICE_TYPE,DEVICE_ID,AIRCRAFT_MODEL,REGISTRATION,CN,TRACKED,IDENTIFIED
|
||||
'F','DD4711','HK36 TTC','D-EULE','','Y','Y'
|
||||
'F','DD4711','HK36 TTC','D-EULE','CU','Y','Y'
|
||||
'F','DD0815','Ventus 2cxM','D-1234','','Y','Y'
|
||||
'F','DD3141','Arcus T','OE-4321','','Y','Y'
|
|
@ -2,58 +2,64 @@ import unittest
|
|||
|
||||
from ogn.aprs_utils import ms2fpm
|
||||
from ogn.model import Beacon, AircraftBeacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
with self.assertRaises(OgnParseError):
|
||||
aircraft_beacon.parse("notAValidToken")
|
||||
|
||||
def test_basic(self):
|
||||
position = AircraftBeacon()
|
||||
position.parse("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
|
||||
self.assertFalse(position.stealth)
|
||||
self.assertEqual(position.address, "DDA5BA")
|
||||
self.assertAlmostEqual(position.climb_rate*ms2fpm, -454, 2)
|
||||
self.assertEqual(position.turn_rate, -1.1)
|
||||
self.assertEqual(position.signal_strength, 8.8)
|
||||
self.assertEqual(position.error_count, 0)
|
||||
self.assertEqual(position.frequency_offset, 51.2)
|
||||
self.assertEqual(position.gps_status, '4x5')
|
||||
self.assertFalse(aircraft_beacon.stealth)
|
||||
self.assertEqual(aircraft_beacon.address, "DDA5BA")
|
||||
self.assertAlmostEqual(aircraft_beacon.climb_rate*ms2fpm, -454, 2)
|
||||
self.assertEqual(aircraft_beacon.turn_rate, -1.1)
|
||||
self.assertEqual(aircraft_beacon.signal_strength, 8.8)
|
||||
self.assertEqual(aircraft_beacon.error_count, 0)
|
||||
self.assertEqual(aircraft_beacon.frequency_offset, 51.2)
|
||||
self.assertEqual(aircraft_beacon.gps_status, '4x5')
|
||||
|
||||
self.assertEqual(len(position.heared_aircraft_IDs), 3)
|
||||
self.assertEqual(position.heared_aircraft_IDs[0], '1084')
|
||||
self.assertEqual(position.heared_aircraft_IDs[1], 'B597')
|
||||
self.assertEqual(position.heared_aircraft_IDs[2], 'B598')
|
||||
self.assertEqual(len(aircraft_beacon.heared_aircraft_IDs), 3)
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_IDs[0], '1084')
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_IDs[1], 'B597')
|
||||
self.assertEqual(aircraft_beacon.heared_aircraft_IDs[2], 'B598')
|
||||
|
||||
def test_stealth(self):
|
||||
position = AircraftBeacon()
|
||||
position.parse("id0ADD1234")
|
||||
self.assertFalse(position.stealth)
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("id0ADD1234")
|
||||
self.assertFalse(aircraft_beacon.stealth)
|
||||
|
||||
position.parse("id8ADD1234")
|
||||
self.assertTrue(position.stealth)
|
||||
aircraft_beacon.parse("id8ADD1234")
|
||||
self.assertTrue(aircraft_beacon.stealth)
|
||||
|
||||
def test_v024(self):
|
||||
position = AircraftBeacon()
|
||||
position.parse("!W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("!W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
|
||||
|
||||
self.assertEqual(position.latitude, 2 / 1000 / 60)
|
||||
self.assertEqual(position.longitude, 6 / 1000 / 60)
|
||||
self.assertEqual(position.software_version, 6.02)
|
||||
self.assertEqual(position.hardware_version, 44)
|
||||
self.assertEqual(position.real_id, "DF0C56")
|
||||
self.assertEqual(aircraft_beacon.latitude, 2 / 1000 / 60)
|
||||
self.assertEqual(aircraft_beacon.longitude, 6 / 1000 / 60)
|
||||
self.assertEqual(aircraft_beacon.software_version, 6.02)
|
||||
self.assertEqual(aircraft_beacon.hardware_version, 44)
|
||||
self.assertEqual(aircraft_beacon.real_id, "DF0C56")
|
||||
|
||||
def test_v024_ogn_tracker(self):
|
||||
position = AircraftBeacon()
|
||||
position.parse("!W34! id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
aircraft_beacon = AircraftBeacon()
|
||||
aircraft_beacon.parse("!W34! id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
|
||||
self.assertEqual(position.flightlevel, 4.43)
|
||||
self.assertEqual(aircraft_beacon.flightlevel, 4.43)
|
||||
|
||||
def test_copy_constructor(self):
|
||||
beacon = Beacon()
|
||||
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5")
|
||||
position = AircraftBeacon(beacon)
|
||||
aircraft_beacon = AircraftBeacon(beacon)
|
||||
|
||||
self.assertEqual(position.name, 'FLRDDA5BA')
|
||||
self.assertEqual(position.address, 'DDA5BA')
|
||||
self.assertEqual(aircraft_beacon.name, 'FLRDDA5BA')
|
||||
self.assertEqual(aircraft_beacon.address, 'DDA5BA')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -2,12 +2,13 @@ import unittest
|
|||
|
||||
from ogn.aprs_utils import dmsToDeg, kts2kmh, m2feet
|
||||
from ogn.model import Beacon
|
||||
from ogn.exceptions import AprsParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
beacon = Beacon()
|
||||
with self.assertRaises(Exception):
|
||||
with self.assertRaises(AprsParseError):
|
||||
beacon.parse("notAValidString")
|
||||
|
||||
def test_basic(self):
|
||||
|
|
|
@ -1,31 +1,37 @@
|
|||
import unittest
|
||||
|
||||
from ogn.model import ReceiverBeacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_v022(self):
|
||||
receiver = ReceiverBeacon()
|
||||
def test_fail_validation(self):
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
with self.assertRaises(OgnParseError):
|
||||
receiver_beacon.parse("notAValidToken")
|
||||
|
||||
receiver.parse("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
self.assertEqual(receiver.version, '0.2.2')
|
||||
self.assertEqual(receiver.platform, 'x86')
|
||||
self.assertEqual(receiver.cpu_load, 0.5)
|
||||
self.assertEqual(receiver.cpu_temp, 52.0)
|
||||
self.assertEqual(receiver.free_ram, 669.9)
|
||||
self.assertEqual(receiver.total_ram, 887.7)
|
||||
self.assertEqual(receiver.ntp_error, 1.0)
|
||||
self.assertEqual(receiver.rec_crystal_correction, 0.0)
|
||||
self.assertEqual(receiver.rec_crystal_correction_fine, 0.0)
|
||||
self.assertEqual(receiver.rec_input_noise, 0.06)
|
||||
def test_v022(self):
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
|
||||
receiver_beacon.parse("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
self.assertEqual(receiver_beacon.version, '0.2.2')
|
||||
self.assertEqual(receiver_beacon.platform, 'x86')
|
||||
self.assertEqual(receiver_beacon.cpu_load, 0.5)
|
||||
self.assertEqual(receiver_beacon.cpu_temp, 52.0)
|
||||
self.assertEqual(receiver_beacon.free_ram, 669.9)
|
||||
self.assertEqual(receiver_beacon.total_ram, 887.7)
|
||||
self.assertEqual(receiver_beacon.ntp_error, 1.0)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction, 0.0)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, 0.0)
|
||||
self.assertEqual(receiver_beacon.rec_input_noise, 0.06)
|
||||
|
||||
def test_v021(self):
|
||||
receiver = ReceiverBeacon()
|
||||
receiver_beacon = ReceiverBeacon()
|
||||
|
||||
receiver.parse("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
self.assertEqual(receiver.rec_crystal_correction, 26)
|
||||
self.assertEqual(receiver.rec_crystal_correction_fine, -1.4)
|
||||
self.assertEqual(receiver.rec_input_noise, -0.25)
|
||||
receiver_beacon.parse("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.0ms/+0.0ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction, 26)
|
||||
self.assertEqual(receiver_beacon.rec_crystal_correction_fine, -1.4)
|
||||
self.assertEqual(receiver_beacon.rec_input_noise, -0.25)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import unittest
|
||||
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
|
@ -30,12 +31,25 @@ class TestStringMethods(unittest.TestCase):
|
|||
parse_aprs(line)
|
||||
|
||||
def test_fail_none(self):
|
||||
with self.assertRaises(Exception):
|
||||
with self.assertRaises(TypeError):
|
||||
parse_aprs(None)
|
||||
|
||||
def test_fail_empty(self):
|
||||
with self.assertRaises(Exception):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("")
|
||||
|
||||
def test_fail_bad_string(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
def test_concated_device_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -395")
|
||||
|
||||
def test_concated_receiver_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from datetime import datetime
|
||||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.aprs_utils import dmsToDeg, createTimestamp
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import unittest
|
||||
|
||||
from ogn.utils import get_ddb, get_country_code, wgs84_to_sphere
|
||||
from ogn.model.address_origin import AddressOrigin
|
||||
from ogn.model import AddressOrigin
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
|
@ -16,7 +17,7 @@ class TestStringMethods(unittest.TestCase):
|
|||
self.assertEqual(device.address, 'DD4711')
|
||||
self.assertEqual(device.aircraft, 'HK36 TTC')
|
||||
self.assertEqual(device.registration, 'D-EULE')
|
||||
self.assertEqual(device.competition, '')
|
||||
self.assertEqual(device.competition, 'CU')
|
||||
self.assertTrue(device.tracked)
|
||||
self.assertTrue(device.identified)
|
||||
|
Ładowanie…
Reference in New Issue