kopia lustrzana https://github.com/glidernet/ogn-python
Changed exception handling
rodzic
be4b4fdc22
commit
e66291e5ea
|
@ -1,11 +1,12 @@
|
|||
from .model import Beacon, AircraftBeacon, ReceiverBeacon
|
||||
from ogn.exceptions import AprsParseError
|
||||
|
||||
|
||||
def parse_aprs(packet):
|
||||
if not isinstance(packet, str):
|
||||
raise TypeError("Expected packet to be str, got %s" % type(packet))
|
||||
elif packet == "":
|
||||
raise AprsParseError(substring=packet, expected_type="non-empty aprs packet")
|
||||
raise AprsParseError("(empty string)")
|
||||
elif packet[0] == "#":
|
||||
return None
|
||||
|
||||
|
|
|
@ -4,9 +4,17 @@ exception definitions
|
|||
|
||||
|
||||
class AprsParseError(Exception):
|
||||
"""Parse error while parsing an aprs packet substring."""
|
||||
def __init__(self, substring, expected_type):
|
||||
self.message = "Aprs Substring can't be parsed as %s." % expected_type
|
||||
"""Parse error while parsing an aprs packet."""
|
||||
def __init__(self, aprs_string):
|
||||
self.message = "This is not a valid APRS string: %s" % aprs_string
|
||||
super(AprsParseError, self).__init__(self.message)
|
||||
self.aprs_string = aprs_string
|
||||
|
||||
|
||||
class OgnParseError(Exception):
|
||||
"""Parse error while parsing an aprs packet substring"""
|
||||
def __init__(self, substring, expected_type):
|
||||
self.message = "For type %s this is not a valid token: %s" % (expected_type, substring)
|
||||
super(OgnParseError, self).__init__(self.message)
|
||||
self.substring = substring
|
||||
self.expected_type = expected_type
|
||||
|
|
|
@ -4,7 +4,7 @@ from time import time
|
|||
from ogn.gateway import settings
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.exceptions import AprsParseError
|
||||
from ogn.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
@ -32,6 +32,12 @@ class ognGateway:
|
|||
self.sock.send(login.encode())
|
||||
self.sock_file = self.sock.makefile('rw')
|
||||
|
||||
def disconnect(self):
|
||||
# close everything
|
||||
print('Close socket')
|
||||
self.sock.shutdown(0)
|
||||
self.sock.close()
|
||||
|
||||
def run(self):
|
||||
keepalive_time = time()
|
||||
while True:
|
||||
|
@ -40,11 +46,7 @@ class ognGateway:
|
|||
keepalive_time = time()
|
||||
|
||||
# Read packet string from socket
|
||||
try:
|
||||
packet_str = self.sock_file.readline().strip()
|
||||
except socket.error:
|
||||
print('Socket error on readline')
|
||||
continue
|
||||
packet_str = self.sock_file.readline().strip()
|
||||
|
||||
# A zero length line should not be return if keepalives are being sent
|
||||
# A zero length line will only be returned after ~30m if keepalives are not sent
|
||||
|
@ -53,17 +55,16 @@ class ognGateway:
|
|||
break
|
||||
|
||||
self.proceed_line(packet_str)
|
||||
# close everything
|
||||
print('Close socket')
|
||||
self.sock.shutdown(0)
|
||||
self.sock.close()
|
||||
|
||||
def proceed_line(self, line):
|
||||
try:
|
||||
beacon = parse_aprs(line)
|
||||
except AprsParseError as e:
|
||||
print(e.message)
|
||||
print("Substring: " % e.substring)
|
||||
return
|
||||
except OgnParseError as e:
|
||||
print(e.message)
|
||||
print("APRS line: %s" % line)
|
||||
return
|
||||
|
||||
if beacon is not None:
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import socket
|
||||
|
||||
from ogn.gateway import ognGateway
|
||||
|
||||
DB_URI = 'sqlite:///beacons.db'
|
||||
|
@ -9,12 +11,27 @@ manager = Manager()
|
|||
@manager.command
|
||||
def run(aprs_user="anon-dev"):
|
||||
"""Run the aprs client."""
|
||||
user_interrupted = False
|
||||
gateway = ognGateway()
|
||||
print("Start OGN gateway")
|
||||
|
||||
print("Connect to DB")
|
||||
gateway.connect_db()
|
||||
gateway.connect(aprs_user)
|
||||
try:
|
||||
gateway.run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
print("\nOGN gateway Exit")
|
||||
|
||||
while user_interrupted is False:
|
||||
print("Connect OGN gateway")
|
||||
gateway.connect(aprs_user)
|
||||
|
||||
try:
|
||||
gateway.run()
|
||||
except KeyboardInterrupt:
|
||||
print("User interrupted")
|
||||
user_interrupted = True
|
||||
except BrokenPipeError:
|
||||
print("BrokenPipeError")
|
||||
except socket.err:
|
||||
print("socket error")
|
||||
|
||||
print("Disconnect OGN gateway")
|
||||
gateway.disconnect()
|
||||
|
||||
print("\nExit OGN gateway")
|
||||
|
|
|
@ -4,6 +4,7 @@ from sqlalchemy import Column, String, Integer, Float, Boolean, SmallInteger
|
|||
|
||||
from ogn.aprs_utils import fpm2ms
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class AircraftBeacon(Beacon):
|
||||
|
@ -127,7 +128,7 @@ class AircraftBeacon(Beacon):
|
|||
elif flightlevel_match is not None:
|
||||
self.flightlevel = float(flightlevel_match.group(1))
|
||||
else:
|
||||
raise Exception("No valid position description: %s" % part)
|
||||
raise OgnParseError(expected_type="AircraftBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
|
||||
|
|
|
@ -32,7 +32,7 @@ class Beacon(AbstractConcreteBase, Base):
|
|||
def parse(self, text, reference_time=None):
|
||||
result = re_pattern_aprs.match(text)
|
||||
if result is None:
|
||||
raise AprsParseError(substring=text, expected_type="Beacon")
|
||||
raise AprsParseError(text)
|
||||
|
||||
self.name = result.group(1)
|
||||
self.receiver_name = result.group(2)
|
||||
|
|
|
@ -3,6 +3,7 @@ import re
|
|||
from sqlalchemy import Column, String
|
||||
|
||||
from .beacon import Beacon
|
||||
from ogn.exceptions import OgnParseError
|
||||
|
||||
|
||||
class ReceiverBeacon(Beacon):
|
||||
|
@ -82,7 +83,7 @@ class ReceiverBeacon(Beacon):
|
|||
self.rec_crystal_correction = int(rf_light2_match.group(1))
|
||||
self.rec_crystal_correction_fine = float(rf_light2_match.group(2))
|
||||
else:
|
||||
raise Exception("No valid receiver description: %s" % part)
|
||||
raise OgnParseError(expected_type="ReceiverBeacon", substring=part)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ReceiverBeacon %s: %s>" % (self.name, self.version)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import unittest
|
||||
|
||||
from ogn.aprs_parser import parse_aprs
|
||||
from ogn.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
|
@ -30,12 +31,25 @@ class TestStringMethods(unittest.TestCase):
|
|||
parse_aprs(line)
|
||||
|
||||
def test_fail_none(self):
|
||||
with self.assertRaises(Exception):
|
||||
with self.assertRaises(TypeError):
|
||||
parse_aprs(None)
|
||||
|
||||
def test_fail_empty(self):
|
||||
with self.assertRaises(Exception):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("")
|
||||
|
||||
def test_fail_bad_string(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
def test_concated_device_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -395")
|
||||
|
||||
def test_concated_receiver_string(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse_aprs("Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/21")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Ładowanie…
Reference in New Issue