kopia lustrzana https://github.com/glidernet/python-ogn-client
commit
a3beaf7804
|
@ -2,6 +2,7 @@
|
|||
## Unreleased
|
||||
- parser: Added support for OGNLT24 (LT24), OGSKYL (Skylines), OGSPID (Spider), OGSPOT (Spot) and OGNFNT (Fanet)
|
||||
- parser: Added support for (server) comments
|
||||
- parser: Added parser for local receiver output (port 50001)
|
||||
|
||||
## 0.8.2: - 2018-01-20
|
||||
- parser: Better validation of timestamp, lat/lon and altitude
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import socket
|
||||
import logging
|
||||
from time import time
|
||||
from time import time, sleep
|
||||
|
||||
from ogn.client import settings
|
||||
|
||||
|
@ -83,3 +83,35 @@ class AprsClient:
|
|||
self.connect()
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
class TelnetClient:
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logger.info("Connect to local telnet server")
|
||||
|
||||
def connect(self):
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.connect((settings.TELNET_SERVER_HOST, settings.TELNET_SERVER_PORT))
|
||||
|
||||
def run(self, callback, autoreconnect=False):
|
||||
while True:
|
||||
try:
|
||||
self.sock_file = self.sock.makefile(mode='rw', encoding='iso-8859-1')
|
||||
while True:
|
||||
packet_str = self.sock_file.readline().strip()
|
||||
callback(packet_str)
|
||||
|
||||
except ConnectionRefusedError:
|
||||
self.logger.error('Telnet server not running', exc_info=True)
|
||||
|
||||
if autoreconnect:
|
||||
sleep(1)
|
||||
self.connect()
|
||||
else:
|
||||
return
|
||||
|
||||
def disconnect(self):
|
||||
self.logger.info('Disconnect')
|
||||
self.sock.shutdown(0)
|
||||
self.sock.close()
|
||||
|
|
|
@ -8,3 +8,6 @@ PACKAGE_VERSION = '0.8.2'
|
|||
APRS_APP_VER = PACKAGE_VERSION[:3]
|
||||
|
||||
APRS_KEEPALIVE_TIME = 240
|
||||
|
||||
TELNET_SERVER_HOST = 'localhost'
|
||||
TELNET_SERVER_PORT = 50001
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
from datetime import datetime
|
||||
|
||||
from ogn.parser import ParseError
|
||||
from ogn.parser.utils import createTimestamp
|
||||
|
||||
|
||||
def parse(telnet_data):
|
||||
now = datetime.utcnow()
|
||||
reference_date = now.date()
|
||||
reference_time = now.time()
|
||||
|
||||
try:
|
||||
return {'pps_offset': float(telnet_data[0:5]),
|
||||
'frequency': float(telnet_data[9:16]),
|
||||
'aircraft_type': int(telnet_data[20:24]),
|
||||
'address_type': int(telnet_data[25]),
|
||||
'address': telnet_data[27:33],
|
||||
'timestamp': createTimestamp(telnet_data[34:40] + 'h', reference_date, reference_time),
|
||||
'latitude': float(telnet_data[43:53]),
|
||||
'longitude': float(telnet_data[54:64]),
|
||||
'altitude': int(telnet_data[68:73]),
|
||||
'climb_rate': float(telnet_data[74:80]),
|
||||
'ground_speed': float(telnet_data[83:89]),
|
||||
'track': float(telnet_data[92:98]),
|
||||
'turn_rate': float(telnet_data[101:107]),
|
||||
'magic_number': int(telnet_data[114:116]),
|
||||
'gps_status': telnet_data[117:122],
|
||||
'frequency_offset': float(telnet_data[123:129]),
|
||||
'signal_quality': float(telnet_data[132:137]),
|
||||
'error_count': float(telnet_data[139:142]),
|
||||
'distance': float(telnet_data[143:150]),
|
||||
'bearing': float(telnet_data[152:158]),
|
||||
'phi': float(telnet_data[161:167])}
|
||||
except:
|
||||
raise ParseError
|
|
@ -0,0 +1,26 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from ogn.client.client import TelnetClient
|
||||
|
||||
|
||||
class TelnetClientTest(unittest.TestCase):
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_disconnect(self, socket_mock):
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
client.sock.connect.assert_called_once_with(('localhost', 50001))
|
||||
|
||||
client.disconnect()
|
||||
client.sock.shutdown.assert_called_once_with(0)
|
||||
client.sock.close.assert_called_once_with()
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run(self, socket_mock):
|
||||
def callback(raw_message):
|
||||
raise ConnectionRefusedError
|
||||
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
|
||||
client.run(callback=callback)
|
|
@ -0,0 +1,47 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from ogn.parser.telnet_parser import parse
|
||||
from ogn.parser.exceptions import ParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
@unittest.skip("Not yet implemented")
|
||||
def test_telnet_fail_corrupt(self):
|
||||
with self.assertRaises(ParseError):
|
||||
parse('This is rubbish')
|
||||
|
||||
@mock.patch('ogn.parser.telnet_parser.datetime')
|
||||
def test_telnet_parse(self, datetime_mock):
|
||||
# set the utcnow-mock near to the time in the test string
|
||||
datetime_mock.utcnow.return_value = datetime(2015, 1, 1, 10, 0, 55)
|
||||
|
||||
message = parse('0.867sec:868.398MHz: 1:2:DD9AB2 100050: [ +47.44356, +11.77856]deg 2749m +3.2m/s 34.5m/s 082.7deg -0.7deg/sec 5 03x03m -1.6kHz 8.5dB 1e 50.6km 151.4deg +1.9deg')
|
||||
|
||||
self.assertEqual(message['pps_offset'], 0.867)
|
||||
self.assertEqual(message['frequency'], 868.398)
|
||||
self.assertEqual(message['aircraft_type'], 1)
|
||||
self.assertEqual(message['address_type'], 2)
|
||||
self.assertEqual(message['address'], 'DD9AB2')
|
||||
self.assertEqual(message['timestamp'], datetime(2015, 1, 1, 10, 0, 50))
|
||||
self.assertEqual(message['latitude'], 47.44356)
|
||||
self.assertEqual(message['longitude'], 11.77856)
|
||||
self.assertEqual(message['altitude'], 2749)
|
||||
self.assertEqual(message['climb_rate'], 3.2)
|
||||
self.assertEqual(message['ground_speed'], 34.5)
|
||||
self.assertEqual(message['track'], 82.7)
|
||||
self.assertEqual(message['turn_rate'], -0.7)
|
||||
self.assertEqual(message['magic_number'], 5) # the '5' is a magic number... 1 if ground_speed is 0.0m/s an 3 or 5 if airborne. Do you have an idea what it is?
|
||||
self.assertEqual(message['gps_status'], '03x03')
|
||||
self.assertEqual(message['frequency_offset'], -1.6)
|
||||
self.assertEqual(message['signal_quality'], 8.5)
|
||||
self.assertEqual(message['error_count'], 1)
|
||||
self.assertEqual(message['distance'], 50.6)
|
||||
self.assertEqual(message['bearing'], 151.4)
|
||||
self.assertEqual(message['phi'], 1.9)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue