kopia lustrzana https://github.com/glidernet/python-ogn-client
Refactoring: replace unittest with pytest
rodzic
f046d647cc
commit
3059b43dce
|
@ -1,150 +1,156 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
import pytest
|
||||
|
||||
from ogn.parser import parse
|
||||
from ogn.client.client import create_aprs_login, AprsClient
|
||||
from ogn.client.settings import APRS_APP_NAME, APRS_APP_VER, APRS_KEEPALIVE_TIME
|
||||
|
||||
|
||||
class AprsClientTest(unittest.TestCase):
|
||||
def test_create_aprs_login(self):
|
||||
basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1')
|
||||
assert 'user klaus pass -1 vers myApp 0.1\n' == basic_login
|
||||
def test_create_aprs_login():
|
||||
basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1')
|
||||
assert 'user klaus pass -1 vers myApp 0.1\n' == basic_login
|
||||
|
||||
login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100')
|
||||
assert 'user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n' == login_with_filter
|
||||
login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100')
|
||||
assert 'user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n' == login_with_filter
|
||||
|
||||
def test_initialisation(self):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
assert client.aprs_user == 'testuser'
|
||||
assert client.aprs_filter == ''
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_full_feed(self, mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
client.sock.send.assert_called_once_with('user testuser pass -1 vers {} {}\n'.format(
|
||||
APRS_APP_NAME, APRS_APP_VER).encode('ascii'))
|
||||
client.sock.makefile.assert_called_once_with('rb')
|
||||
def test_initialisation():
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
assert client.aprs_user == 'testuser'
|
||||
assert client.aprs_filter == ''
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_client_defined_filter(self, mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='r/50.4976/9.9495/100')
|
||||
client.connect()
|
||||
client.sock.send.assert_called_once_with('user testuser pass -1 vers {} {} filter r/50.4976/9.9495/100\n'.format(
|
||||
APRS_APP_NAME, APRS_APP_VER).encode('ascii'))
|
||||
client.sock.makefile.assert_called_once_with('rb')
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_disconnect(self, mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_full_feed(mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
client.sock.send.assert_called_once_with(f'user testuser pass -1 vers {APRS_APP_NAME} {APRS_APP_VER}\n'.encode('ascii'))
|
||||
client.sock.makefile.assert_called_once_with('rb')
|
||||
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_client_defined_filter(mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='r/50.4976/9.9495/100')
|
||||
client.connect()
|
||||
client.sock.send.assert_called_once_with(f'user testuser pass -1 vers {APRS_APP_NAME} {APRS_APP_VER} filter r/50.4976/9.9495/100\n'.encode('ascii'))
|
||||
client.sock.makefile.assert_called_once_with('rb')
|
||||
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_disconnect(mock_socket):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
client.disconnect()
|
||||
client.sock.shutdown.assert_called_once_with(0)
|
||||
client.sock.close.assert_called_once_with()
|
||||
assert client._kill is True
|
||||
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run(mock_socket):
|
||||
import socket
|
||||
mock_socket.error = socket.error
|
||||
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
client.sock_file.readline = mock.MagicMock()
|
||||
client.sock_file.readline.side_effect = [b'Normal text blabla',
|
||||
b'my weird character \xc2\xa5',
|
||||
UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2, 'This is just a fake reason!'),
|
||||
b'... show must go on',
|
||||
BrokenPipeError(),
|
||||
b'... and on',
|
||||
ConnectionResetError(),
|
||||
b'... and on',
|
||||
socket.error(),
|
||||
b'... and on',
|
||||
b'',
|
||||
b'... and on',
|
||||
KeyboardInterrupt()]
|
||||
|
||||
try:
|
||||
client.run(callback=lambda msg: print("got: {}".format(msg)), autoreconnect=True)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
client.sock.shutdown.assert_called_once_with(0)
|
||||
client.sock.close.assert_called_once_with()
|
||||
assert client._kill is True
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run(self, mock_socket):
|
||||
import socket
|
||||
mock_socket.error = socket.error
|
||||
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
@mock.patch('ogn.client.client.time')
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run_keepalive(mock_socket, mock_time):
|
||||
import socket
|
||||
mock_socket.error = socket.error
|
||||
|
||||
client.sock_file.readline = mock.MagicMock()
|
||||
client.sock_file.readline.side_effect = [b'Normal text blabla',
|
||||
b'my weird character \xc2\xa5',
|
||||
UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2, 'This is just a fake reason!'),
|
||||
b'... show must go on',
|
||||
BrokenPipeError(),
|
||||
b'... and on',
|
||||
ConnectionResetError(),
|
||||
b'... and on',
|
||||
socket.error(),
|
||||
b'... and on',
|
||||
b'',
|
||||
b'... and on',
|
||||
KeyboardInterrupt()]
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
client.sock_file.readline = mock.MagicMock()
|
||||
client.sock_file.readline.side_effect = [b'Normal text blabla',
|
||||
KeyboardInterrupt()]
|
||||
|
||||
mock_time.side_effect = [0, 0, APRS_KEEPALIVE_TIME + 1, APRS_KEEPALIVE_TIME + 1]
|
||||
|
||||
timed_callback = mock.MagicMock()
|
||||
|
||||
try:
|
||||
client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
||||
timed_callback.assert_called_with(client)
|
||||
|
||||
|
||||
def test_reset_kill_reconnect():
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
# .run() should be allowed to execute after .connect()
|
||||
mock_callback = mock.MagicMock(
|
||||
side_effect=lambda raw_msg: client.disconnect())
|
||||
|
||||
assert client._kill is False
|
||||
client.run(callback=mock_callback, autoreconnect=True)
|
||||
|
||||
# After .disconnect(), client._kill should be True
|
||||
assert client._kill is True
|
||||
assert mock_callback.call_count == 1
|
||||
|
||||
# After we reconnect, .run() should be able to run again
|
||||
mock_callback.reset_mock()
|
||||
client.connect()
|
||||
client.run(callback=mock_callback, autoreconnect=True)
|
||||
assert mock_callback.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.skip("Too much invalid APRS data on the live feed")
|
||||
def test_50_live_messages():
|
||||
print("Enter")
|
||||
remaining_messages = 50
|
||||
|
||||
def process_message(raw_message):
|
||||
global remaining_messages
|
||||
if raw_message[0] == '#':
|
||||
return
|
||||
try:
|
||||
client.run(callback=lambda msg: print("got: {}".format(msg)), autoreconnect=True)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
message = parse(raw_message)
|
||||
print("{}: {}".format(message['aprs_type'], raw_message))
|
||||
except NotImplementedError as e:
|
||||
print("{}: {}".format(e, raw_message))
|
||||
return
|
||||
if remaining_messages > 0:
|
||||
remaining_messages -= 1
|
||||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
@mock.patch('ogn.client.client.time')
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run_keepalive(self, mock_socket, mock_time):
|
||||
import socket
|
||||
mock_socket.error = socket.error
|
||||
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
client.sock_file.readline = mock.MagicMock()
|
||||
client.sock_file.readline.side_effect = [b'Normal text blabla',
|
||||
KeyboardInterrupt()]
|
||||
|
||||
mock_time.side_effect = [0, 0, APRS_KEEPALIVE_TIME + 1, APRS_KEEPALIVE_TIME + 1]
|
||||
|
||||
timed_callback = mock.MagicMock()
|
||||
|
||||
try:
|
||||
client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
||||
timed_callback.assert_called_with(client)
|
||||
|
||||
def test_reset_kill_reconnect(self):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
# .run() should be allowed to execute after .connect()
|
||||
mock_callback = mock.MagicMock(
|
||||
side_effect=lambda raw_msg: client.disconnect())
|
||||
|
||||
self.assertFalse(client._kill)
|
||||
client.run(callback=mock_callback, autoreconnect=True)
|
||||
|
||||
# After .disconnect(), client._kill should be True
|
||||
assert client._kill is True
|
||||
assert mock_callback.call_count == 1
|
||||
|
||||
# After we reconnect, .run() should be able to run again
|
||||
mock_callback.reset_mock()
|
||||
client.connect()
|
||||
client.run(callback=mock_callback, autoreconnect=True)
|
||||
assert mock_callback.call_count == 1
|
||||
|
||||
@unittest.skip("Too much invalid APRS data on the live feed")
|
||||
def test_50_live_messages(self):
|
||||
print("Enter")
|
||||
self.remaining_messages = 50
|
||||
|
||||
def process_message(raw_message):
|
||||
if raw_message[0] == '#':
|
||||
return
|
||||
try:
|
||||
message = parse(raw_message)
|
||||
print("{}: {}".format(message['aprs_type'], raw_message))
|
||||
except NotImplementedError as e:
|
||||
print("{}: {}".format(e, raw_message))
|
||||
return
|
||||
if self.remaining_messages > 0:
|
||||
self.remaining_messages -= 1
|
||||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
try:
|
||||
client.run(callback=process_message, autoreconnect=True)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
try:
|
||||
client.run(callback=process_message, autoreconnect=True)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
|
|
@ -1,26 +1,25 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from ogn.client.client import TelnetClient
|
||||
|
||||
|
||||
class TelnetClientTest(unittest.TestCase):
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_disconnect(self, socket_mock):
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
client.sock.connect.assert_called_once_with(('localhost', 50001))
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_connect_disconnect(socket_mock):
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
client.sock.connect.assert_called_once_with(('localhost', 50001))
|
||||
|
||||
client.disconnect()
|
||||
client.sock.shutdown.assert_called_once_with(0)
|
||||
client.sock.close.assert_called_once_with()
|
||||
client.disconnect()
|
||||
client.sock.shutdown.assert_called_once_with(0)
|
||||
client.sock.close.assert_called_once_with()
|
||||
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run(self, socket_mock):
|
||||
def callback(raw_message):
|
||||
raise ConnectionRefusedError
|
||||
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run(socket_mock):
|
||||
def callback(raw_message):
|
||||
raise ConnectionRefusedError
|
||||
|
||||
client.run(callback=callback)
|
||||
client = TelnetClient()
|
||||
client.connect()
|
||||
|
||||
client.run(callback=callback)
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
import unittest
|
||||
|
||||
from ogn.ddb import get_ddb_devices
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_get_ddb_devices(self):
|
||||
devices = list(get_ddb_devices())
|
||||
self.assertGreater(len(devices), 4000)
|
||||
self.assertCountEqual(devices[0].keys(), ['device_type', 'device_id', 'aircraft_model', 'registration', 'cn', 'tracked', 'identified'])
|
||||
def test_get_ddb_devices():
|
||||
devices = list(get_ddb_devices())
|
||||
assert len(devices) > 4000
|
||||
assert len(devices[0].keys()), len(['device_type', 'device_id', 'aircraft_model', 'registration', 'cn', 'tracked', 'identified'])
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
import pytest
|
||||
import os
|
||||
|
||||
from datetime import datetime
|
||||
|
@ -9,124 +9,141 @@ from ogn.parser.parse import parse
|
|||
from ogn.parser.exceptions import AprsParseError, OgnParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def parse_valid_beacon_data_file(self, filename, beacon_type):
|
||||
with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f:
|
||||
for line in f:
|
||||
try:
|
||||
message = parse(line, datetime(2015, 4, 10, 17, 0))
|
||||
self.assertFalse(message is None)
|
||||
if message['aprs_type'] == 'position' or message['aprs_type'] == 'status':
|
||||
assert message['beacon_type'] == beacon_type
|
||||
except NotImplementedError as e:
|
||||
print(e)
|
||||
|
||||
def test_aprs_aircraft_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aprs_aircraft')
|
||||
|
||||
def test_aprs_receiver_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='aprs_receiver.txt', beacon_type='aprs_receiver')
|
||||
|
||||
def test_aprs_fanet_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='fanet.txt', beacon_type='fanet')
|
||||
|
||||
def test_flarm_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='flarm.txt', beacon_type='flarm')
|
||||
|
||||
def test_receiver_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='receiver.txt', beacon_type='receiver')
|
||||
|
||||
def test_tracker_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='tracker.txt', beacon_type='tracker')
|
||||
|
||||
def test_capturs_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='capturs.txt', beacon_type='capturs')
|
||||
|
||||
def test_flymaster_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='flymaster.txt', beacon_type='flymaster')
|
||||
|
||||
def test_inreach_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='inreach.txt', beacon_type='inreach')
|
||||
|
||||
def test_lt24_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='lt24.txt', beacon_type='lt24')
|
||||
|
||||
def test_naviter_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='naviter.txt', beacon_type='naviter')
|
||||
|
||||
def test_pilot_aware_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='pilot_aware.txt', beacon_type='pilot_aware')
|
||||
|
||||
def test_skylines_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='skylines.txt', beacon_type='skylines')
|
||||
|
||||
def test_spider_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='spider.txt', beacon_type='spider')
|
||||
|
||||
def test_spot_beacons(self):
|
||||
self.parse_valid_beacon_data_file(filename='spot.txt', beacon_type='spot')
|
||||
|
||||
def test_generic_beacons(self):
|
||||
message = parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment")
|
||||
assert message['beacon_type'] == 'unknown'
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
def test_fail_parse_aprs_none(self):
|
||||
with self.assertRaises(TypeError):
|
||||
parse(None)
|
||||
|
||||
def test_fail_empty(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse("")
|
||||
|
||||
def test_fail_bad_string(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
def test_v026_chile(self):
|
||||
# receiver beacons from chile have a APRS position message with a pure user comment
|
||||
message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura")
|
||||
|
||||
assert message['user_comment'] == "Vitacura Municipal Aerodrome, Club de Planeadores Vitacura"
|
||||
|
||||
message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs")
|
||||
|
||||
assert message_with_id['user_comment'] == "Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs"
|
||||
|
||||
@mock.patch('ogn.parser.parse_module.createTimestamp')
|
||||
def test_default_reference_date(self, createTimestamp_mock):
|
||||
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB"
|
||||
|
||||
parse(valid_aprs_string)
|
||||
call_args_before = createTimestamp_mock.call_args
|
||||
|
||||
sleep(1)
|
||||
|
||||
parse(valid_aprs_string)
|
||||
call_args_seconds_later = createTimestamp_mock.call_args
|
||||
|
||||
self.assertNotEqual(call_args_before, call_args_seconds_later)
|
||||
|
||||
def test_copy_constructor(self):
|
||||
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
|
||||
message = parse(valid_aprs_string)
|
||||
|
||||
assert message['name'] == 'FLRDDA5BA'
|
||||
assert message['address'] == 'DDA5BA'
|
||||
|
||||
def test_bad_naviter_format(self):
|
||||
with self.assertRaises(OgnParseError):
|
||||
parse("FLRA51D93>OGNAVI,qAS,NAVITER2:/204507h4444.98N/09323.34W'000/000/A=000925 !W67! id06A51D93 +000fpm +0.0rot")
|
||||
|
||||
def test_no_receiver(self):
|
||||
result = parse("EDFW>OGNSDR:/102713h4949.02NI00953.88E&/A=000984")
|
||||
|
||||
assert result['aprs_type'] == 'position'
|
||||
assert result['beacon_type'] == 'receiver'
|
||||
assert result['name'] == 'EDFW'
|
||||
assert result['dstcall'] == 'OGNSDR'
|
||||
assert result['receiver_name'] is None
|
||||
def _parse_valid_beacon_data_file(filename, beacon_type):
|
||||
with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f:
|
||||
for line in f:
|
||||
try:
|
||||
message = parse(line, datetime(2015, 4, 10, 17, 0))
|
||||
assert message is not None
|
||||
if message['aprs_type'] == 'position' or message['aprs_type'] == 'status':
|
||||
assert message['beacon_type'] == beacon_type
|
||||
except NotImplementedError as e:
|
||||
print(e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_aprs_aircraft_beacons():
|
||||
_parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aprs_aircraft')
|
||||
|
||||
|
||||
def test_aprs_receiver_beacons():
|
||||
_parse_valid_beacon_data_file(filename='aprs_receiver.txt', beacon_type='aprs_receiver')
|
||||
|
||||
|
||||
def test_aprs_fanet_beacons():
|
||||
_parse_valid_beacon_data_file(filename='fanet.txt', beacon_type='fanet')
|
||||
|
||||
|
||||
def test_flarm_beacons():
|
||||
_parse_valid_beacon_data_file(filename='flarm.txt', beacon_type='flarm')
|
||||
|
||||
|
||||
def test_receiver_beacons():
|
||||
_parse_valid_beacon_data_file(filename='receiver.txt', beacon_type='receiver')
|
||||
|
||||
|
||||
def test_tracker_beacons():
|
||||
_parse_valid_beacon_data_file(filename='tracker.txt', beacon_type='tracker')
|
||||
|
||||
|
||||
def test_capturs_beacons():
|
||||
_parse_valid_beacon_data_file(filename='capturs.txt', beacon_type='capturs')
|
||||
|
||||
|
||||
def test_flymaster_beacons():
|
||||
_parse_valid_beacon_data_file(filename='flymaster.txt', beacon_type='flymaster')
|
||||
|
||||
|
||||
def test_inreach_beacons():
|
||||
_parse_valid_beacon_data_file(filename='inreach.txt', beacon_type='inreach')
|
||||
|
||||
|
||||
def test_lt24_beacons():
|
||||
_parse_valid_beacon_data_file(filename='lt24.txt', beacon_type='lt24')
|
||||
|
||||
|
||||
def test_naviter_beacons():
|
||||
_parse_valid_beacon_data_file(filename='naviter.txt', beacon_type='naviter')
|
||||
|
||||
|
||||
def test_pilot_aware_beacons():
|
||||
_parse_valid_beacon_data_file(filename='pilot_aware.txt', beacon_type='pilot_aware')
|
||||
|
||||
|
||||
def test_skylines_beacons():
|
||||
_parse_valid_beacon_data_file(filename='skylines.txt', beacon_type='skylines')
|
||||
|
||||
|
||||
def test_spider_beacons():
|
||||
_parse_valid_beacon_data_file(filename='spider.txt', beacon_type='spider')
|
||||
|
||||
|
||||
def test_spot_beacons():
|
||||
_parse_valid_beacon_data_file(filename='spot.txt', beacon_type='spot')
|
||||
|
||||
|
||||
def test_generic_beacons():
|
||||
message = parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment")
|
||||
assert message['beacon_type'] == 'unknown'
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
|
||||
def test_fail_parse_aprs_none():
|
||||
with pytest.raises(TypeError):
|
||||
parse(None)
|
||||
|
||||
|
||||
def test_fail_empty():
|
||||
with pytest.raises(AprsParseError):
|
||||
parse("")
|
||||
|
||||
|
||||
def test_fail_bad_string():
|
||||
with pytest.raises(AprsParseError):
|
||||
parse("Lachens>APRS,TCPIwontbeavalidstring")
|
||||
|
||||
|
||||
def test_v026_chile():
|
||||
# receiver beacons from chile have a APRS position message with a pure user comment
|
||||
message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura")
|
||||
assert message['user_comment'] == "Vitacura Municipal Aerodrome, Club de Planeadores Vitacura"
|
||||
|
||||
message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs")
|
||||
assert message_with_id['user_comment'] == "Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs"
|
||||
|
||||
|
||||
@mock.patch('ogn.parser.parse_module.createTimestamp')
|
||||
def test_default_reference_date(createTimestamp_mock):
|
||||
valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB"
|
||||
|
||||
parse(valid_aprs_string)
|
||||
call_args_before = createTimestamp_mock.call_args
|
||||
|
||||
sleep(1)
|
||||
|
||||
parse(valid_aprs_string)
|
||||
call_args_seconds_later = createTimestamp_mock.call_args
|
||||
|
||||
assert call_args_before != call_args_seconds_later
|
||||
|
||||
|
||||
def test_copy_constructor():
|
||||
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
|
||||
message = parse(valid_aprs_string)
|
||||
|
||||
assert message['name'] == 'FLRDDA5BA'
|
||||
assert message['address'] == 'DDA5BA'
|
||||
|
||||
|
||||
def test_bad_naviter_format():
|
||||
with pytest.raises(OgnParseError):
|
||||
parse("FLRA51D93>OGNAVI,qAS,NAVITER2:/204507h4444.98N/09323.34W'000/000/A=000925 !W67! id06A51D93 +000fpm +0.0rot")
|
||||
|
||||
|
||||
def test_no_receiver():
|
||||
result = parse("EDFW>OGNSDR:/102713h4949.02NI00953.88E&/A=000984")
|
||||
|
||||
assert result['aprs_type'] == 'position'
|
||||
assert result['beacon_type'] == 'receiver'
|
||||
assert result['name'] == 'EDFW'
|
||||
assert result['dstcall'] == 'OGNSDR'
|
||||
assert result['receiver_name'] is None
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
@ -7,158 +7,169 @@ from ogn.parser.parse import parse_aprs
|
|||
from ogn.parser.exceptions import AprsParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("notAValidString")
|
||||
|
||||
def test_basic(self):
|
||||
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment")
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['name'] == "FLRDDA5BA"
|
||||
assert message['dstcall'] == "APRS"
|
||||
assert message['receiver_name'] == "LFMX"
|
||||
assert message['timestamp'].strftime('%H:%M:%S') == "16:08:29"
|
||||
self.assertAlmostEqual(message['latitude'], 44.25683, 5)
|
||||
assert message['symboltable'] == '/'
|
||||
self.assertAlmostEqual(message['longitude'], 6.0005, 5)
|
||||
assert message['symbolcode'] == '\''
|
||||
assert message['track'] == 342
|
||||
assert message['ground_speed'] == 49 * KNOTS_TO_MS / KPH_TO_MS
|
||||
self.assertAlmostEqual(message['altitude'], 5524 * FEETS_TO_METER, 5)
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
def test_v024(self):
|
||||
# higher precision datum format introduced
|
||||
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
self.assertAlmostEqual(message['latitude'] - 44.2568 - 1 / 30000, 2 / 1000 / 60, 10)
|
||||
self.assertAlmostEqual(message['longitude'] - 6.0005, 6 / 1000 / 60, 10)
|
||||
|
||||
def test_v025(self):
|
||||
# introduced the "aprs status" format where many informations (lat, lon, alt, speed, ...) are just optional
|
||||
raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'status'
|
||||
assert message['name'] == "EPZR"
|
||||
assert message['receiver_name'] == "GLIDERN1"
|
||||
assert message['timestamp'].strftime('%H:%M:%S') == "09:34:56"
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
def test_v026(self):
|
||||
# from 0.2.6 the ogn comment of a receiver beacon is just optional
|
||||
raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['comment'] == ''
|
||||
|
||||
def test_v026_relay(self):
|
||||
# beacons can be relayed
|
||||
raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['relay'] == "NAV07220E"
|
||||
|
||||
def test_v027_ddhhmm(self):
|
||||
# beacons can have hhmmss or ddhhmm timestamp
|
||||
raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['timestamp'].strftime('%d %H:%M') == "30 10:46"
|
||||
|
||||
def test_v028_fanet_position_weather(self):
|
||||
# with v0.2.8 fanet devices can report weather data
|
||||
raw_message = 'FNTFC9002>OGNFNT,qAS,LSXI2:/163051h4640.33N/00752.21E_187/004g007t075h78b63620 29.0dB -8.0kHz'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['wind_direction'] == 187
|
||||
assert message['wind_speed'] == 4 * KNOTS_TO_MS / KPH_TO_MS
|
||||
assert message['wind_speed_peak'] == 7 * KNOTS_TO_MS / KPH_TO_MS
|
||||
assert message['temperature'] == fahrenheit_to_celsius(75)
|
||||
assert message['humidity'] == 78 * 0.01
|
||||
assert message['barometric_pressure'] == 63620
|
||||
|
||||
assert message['comment'] == '29.0dB -8.0kHz'
|
||||
|
||||
def test_GXAirCom_fanet_position_weather_rainfall(self):
|
||||
raw_message = 'FNT08F298>OGNFNT,qAS,DREIFBERG:/082654h4804.90N/00845.74E_273/005g008t057r123p234h90b10264 0.0dB'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['rainfall_1h'] == 123 / 100 * INCH_TO_MM
|
||||
assert message['rainfall_24h'] == 234 / 100 * INCH_TO_MM
|
||||
|
||||
def test_v028_fanet_position_weather_empty(self):
|
||||
raw_message = 'FNT010115>OGNFNT,qAS,DB7MJ:/065738h4727.72N/01012.83E_.../...g...t... 27.8dB -13.8kHz'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['wind_direction'] is None
|
||||
assert message['wind_speed'] is None
|
||||
assert message['wind_speed_peak'] is None
|
||||
assert message['temperature'] is None
|
||||
assert message['humidity'] is None
|
||||
assert message['barometric_pressure'] is None
|
||||
|
||||
def test_negative_altitude(self):
|
||||
# some devices can report negative altitudes
|
||||
raw_message = "OGNF71F40>APRS,qAS,NAVITER:/080852h4414.37N/01532.06E'253/052/A=-00013 !W73! id1EF71F40 -060fpm +0.0rot"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
self.assertAlmostEqual(message['altitude'], -13 * FEETS_TO_METER, 5)
|
||||
|
||||
def test_no_altitude(self):
|
||||
# altitude is not a 'must have'
|
||||
raw_message = "FLRDDEEF1>OGCAPT,qAS,CAPTURS:/065511h4837.63N/00233.79E'000/000"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['altitude'] is None
|
||||
|
||||
def test_invalid_coordinates(self):
|
||||
# sometimes the coordinates leave their valid range: -90<=latitude<=90 or -180<=longitude<=180
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
|
||||
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
|
||||
|
||||
def test_invalid_timestamp(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm")
|
||||
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322")
|
||||
|
||||
def test_invalid_altitude(self):
|
||||
with self.assertRaises(AprsParseError):
|
||||
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345")
|
||||
|
||||
def test_bad_comment(self):
|
||||
raw_message = "# bad configured ogn receiver"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['comment'] == raw_message
|
||||
assert message['aprs_type'] == 'comment'
|
||||
|
||||
def test_server_comment(self):
|
||||
raw_message = "# aprsc 2.1.4-g408ed49 17 Mar 2018 09:30:36 GMT GLIDERN1 37.187.40.234:10152"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['version'] == '2.1.4-g408ed49'
|
||||
assert message['timestamp'] == datetime(2018, 3, 17, 9, 30, 36)
|
||||
assert message['server'] == 'GLIDERN1'
|
||||
assert message['ip_address'] == '37.187.40.234'
|
||||
assert message['port'] == '10152'
|
||||
assert message['aprs_type'] == 'server'
|
||||
def test_fail_validationassert():
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("notAValidString")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_basic():
|
||||
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment")
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['name'] == "FLRDDA5BA"
|
||||
assert message['dstcall'] == "APRS"
|
||||
assert message['receiver_name'] == "LFMX"
|
||||
assert message['timestamp'].strftime('%H:%M:%S') == "16:08:29"
|
||||
assert message['latitude'] == pytest.approx(44.25683, 5)
|
||||
assert message['symboltable'] == '/'
|
||||
assert message['longitude'] == pytest.approx(6.0005, 5)
|
||||
assert message['symbolcode'] == '\''
|
||||
assert message['track'] == 342
|
||||
assert message['ground_speed'] == 49 * KNOTS_TO_MS / KPH_TO_MS
|
||||
assert message['altitude'] == pytest.approx(5524 * FEETS_TO_METER, 5)
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
|
||||
def test_v024():
|
||||
# higher precision datum format introduced
|
||||
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['latitude'] - 44.2568 - 1 / 30000 == pytest.approx(2 / 1000 / 60, 10)
|
||||
assert message['longitude'] - 6.0005 == pytest.approx(6 / 1000 / 60, 10)
|
||||
|
||||
|
||||
def test_v025():
|
||||
# introduced the "aprs status" format where many informations (lat, lon, alt, speed, ...) are just optional
|
||||
raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'status'
|
||||
assert message['name'] == "EPZR"
|
||||
assert message['receiver_name'] == "GLIDERN1"
|
||||
assert message['timestamp'].strftime('%H:%M:%S') == "09:34:56"
|
||||
assert message['comment'] == "this is a comment"
|
||||
|
||||
|
||||
def test_v026():
|
||||
# from 0.2.6 the ogn comment of a receiver beacon is just optional
|
||||
raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['comment'] == ''
|
||||
|
||||
|
||||
def test_v026_relay():
|
||||
# beacons can be relayed
|
||||
raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['relay'] == "NAV07220E"
|
||||
|
||||
|
||||
def test_v027_ddhhmm():
|
||||
# beacons can have hhmmss or ddhhmm timestamp
|
||||
raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position'
|
||||
assert message['timestamp'].strftime('%d %H:%M') == "30 10:46"
|
||||
|
||||
|
||||
def test_v028_fanet_position_weather():
|
||||
# with v0.2.8 fanet devices can report weather data
|
||||
raw_message = 'FNTFC9002>OGNFNT,qAS,LSXI2:/163051h4640.33N/00752.21E_187/004g007t075h78b63620 29.0dB -8.0kHz'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['wind_direction'] == 187
|
||||
assert message['wind_speed'] == 4 * KNOTS_TO_MS / KPH_TO_MS
|
||||
assert message['wind_speed_peak'] == 7 * KNOTS_TO_MS / KPH_TO_MS
|
||||
assert message['temperature'] == fahrenheit_to_celsius(75)
|
||||
assert message['humidity'] == 78 * 0.01
|
||||
assert message['barometric_pressure'] == 63620
|
||||
|
||||
assert message['comment'] == '29.0dB -8.0kHz'
|
||||
|
||||
|
||||
def test_GXAirCom_fanet_position_weather_rainfall():
|
||||
raw_message = 'FNT08F298>OGNFNT,qAS,DREIFBERG:/082654h4804.90N/00845.74E_273/005g008t057r123p234h90b10264 0.0dB'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['rainfall_1h'] == 123 / 100 * INCH_TO_MM
|
||||
assert message['rainfall_24h'] == 234 / 100 * INCH_TO_MM
|
||||
|
||||
|
||||
def test_v028_fanet_position_weather_empty():
|
||||
raw_message = 'FNT010115>OGNFNT,qAS,DB7MJ:/065738h4727.72N/01012.83E_.../...g...t... 27.8dB -13.8kHz'
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['aprs_type'] == 'position_weather'
|
||||
assert message['wind_direction'] is None
|
||||
assert message['wind_speed'] is None
|
||||
assert message['wind_speed_peak'] is None
|
||||
assert message['temperature'] is None
|
||||
assert message['humidity'] is None
|
||||
assert message['barometric_pressure'] is None
|
||||
|
||||
|
||||
def test_negative_altitude():
|
||||
# some devices can report negative altitudes
|
||||
raw_message = "OGNF71F40>APRS,qAS,NAVITER:/080852h4414.37N/01532.06E'253/052/A=-00013 !W73! id1EF71F40 -060fpm +0.0rot"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['altitude'] == pytest.approx(-13 * FEETS_TO_METER, 5)
|
||||
|
||||
|
||||
def test_no_altitude():
|
||||
# altitude is not a 'must have'
|
||||
raw_message = "FLRDDEEF1>OGCAPT,qAS,CAPTURS:/065511h4837.63N/00233.79E'000/000"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['altitude'] is None
|
||||
|
||||
|
||||
def test_invalid_coordinates():
|
||||
# sometimes the coordinates leave their valid range: -90<=latitude<=90 or -180<=longitude<=180
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
|
||||
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
|
||||
|
||||
|
||||
def test_invalid_timestamp():
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm")
|
||||
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322")
|
||||
|
||||
|
||||
def test_invalid_altitude():
|
||||
with pytest.raises(AprsParseError):
|
||||
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345")
|
||||
|
||||
|
||||
def test_bad_comment():
|
||||
raw_message = "# bad configured ogn receiver"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['comment'] == raw_message
|
||||
assert message['aprs_type'] == 'comment'
|
||||
|
||||
|
||||
def test_server_comment():
|
||||
raw_message = "# aprsc 2.1.4-g408ed49 17 Mar 2018 09:30:36 GMT GLIDERN1 37.187.40.234:10152"
|
||||
message = parse_aprs(raw_message)
|
||||
|
||||
assert message['version'] == '2.1.4-g408ed49'
|
||||
assert message['timestamp'] == datetime(2018, 3, 17, 9, 30, 36)
|
||||
assert message['server'] == 'GLIDERN1'
|
||||
assert message['ip_address'] == '37.187.40.234'
|
||||
assert message['port'] == '10152'
|
||||
assert message['aprs_type'] == 'server'
|
||||
|
|
|
@ -1,32 +1,29 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS
|
||||
from ogn.parser.aprs_comment.fanet_parser import FanetParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = FanetParser().parse_position("id1E1103CE -02fpm")
|
||||
def test_position_comment():
|
||||
message = FanetParser().parse_position("id1E1103CE -02fpm")
|
||||
|
||||
assert message['address_type'] == 2
|
||||
assert message['aircraft_type'] == 7
|
||||
self.assertFalse(message['stealth'])
|
||||
assert message['address'] == "1103CE"
|
||||
self.assertAlmostEqual(message['climb_rate'], -2 * FPM_TO_MS, 0.1)
|
||||
|
||||
def test_pseudo_status_comment(self):
|
||||
message = FanetParser().parse_position("")
|
||||
|
||||
assert message == {}
|
||||
|
||||
def test_v028_status(self):
|
||||
message = FanetParser().parse_status('Name="Juerg Zweifel" 15.0dB -17.1kHz 1e')
|
||||
|
||||
assert message['fanet_name'] == "Juerg Zweifel"
|
||||
assert message['signal_quality'] == 15.0
|
||||
assert message['frequency_offset'] == -17.1
|
||||
assert message['error_count'] == 1
|
||||
assert message['address_type'] == 2
|
||||
assert message['aircraft_type'] == 7
|
||||
assert message['stealth'] is False
|
||||
assert message['address'] == "1103CE"
|
||||
assert message['climb_rate'] == pytest.approx(-2 * FPM_TO_MS, 0.1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_pseudo_status_comment():
|
||||
message = FanetParser().parse_position("")
|
||||
|
||||
assert message == {}
|
||||
|
||||
|
||||
def test_v028_status():
|
||||
message = FanetParser().parse_status('Name="Juerg Zweifel" 15.0dB -17.1kHz 1e')
|
||||
|
||||
assert message['fanet_name'] == "Juerg Zweifel"
|
||||
assert message['signal_quality'] == 15.0
|
||||
assert message['frequency_offset'] == -17.1
|
||||
assert message['error_count'] == 1
|
||||
|
|
|
@ -1,35 +1,31 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS
|
||||
from ogn.parser.aprs_comment.flarm_parser import FlarmParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = FlarmParser().parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267")
|
||||
def test_position_comment():
|
||||
message = FlarmParser().parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267")
|
||||
|
||||
assert message['address_type'] == 1
|
||||
assert message['aircraft_type'] == 8
|
||||
self.assertFalse(message['stealth'])
|
||||
self.assertFalse(message['no-tracking'])
|
||||
assert message['address'] == "A8CBA8"
|
||||
self.assertAlmostEqual(message['climb_rate'], -39 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 0.1 * HPM_TO_DEGS
|
||||
assert message['signal_quality'] == 3.5
|
||||
assert message['error_count'] == 2
|
||||
assert message['frequency_offset'] == -8.7
|
||||
assert message['gps_quality'] == {'horizontal': 1, 'vertical': 2}
|
||||
assert message['software_version'] == 6.09
|
||||
assert message['hardware_version'] == 67
|
||||
assert message['real_address'] == "DF0267"
|
||||
|
||||
def test_position_comment_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = FlarmParser().parse_position("id21A8CBA8")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
assert message['address_type'] == 1
|
||||
assert message['aircraft_type'] == 8
|
||||
assert message['stealth'] is False
|
||||
assert message['no-tracking'] is False
|
||||
assert message['address'] == "A8CBA8"
|
||||
assert message['climb_rate'] == pytest.approx(-39 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 0.1 * HPM_TO_DEGS
|
||||
assert message['signal_quality'] == 3.5
|
||||
assert message['error_count'] == 2
|
||||
assert message['frequency_offset'] == -8.7
|
||||
assert message['gps_quality'] == {'horizontal': 1, 'vertical': 2}
|
||||
assert message['software_version'] == 6.09
|
||||
assert message['hardware_version'] == 67
|
||||
assert message['real_address'] == "DF0267"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_position_comment_relevant_keys_only():
|
||||
# return only keys where we got informations
|
||||
message = FlarmParser().parse_position("id21A8CBA8")
|
||||
|
||||
assert message is not None
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
|
|
|
@ -1,16 +1,9 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.generic_parser import GenericParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = GenericParser().parse_position("id0123456789 weather is good, climbing with 123fpm")
|
||||
assert 'comment' in message
|
||||
def test_position_comment():
|
||||
message = GenericParser().parse_position("id0123456789 weather is good, climbing with 123fpm")
|
||||
assert 'comment' in message
|
||||
|
||||
message = GenericParser().parse_status("id0123456789 weather is good, climbing with 123fpm")
|
||||
assert 'comment' in message
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
message = GenericParser().parse_status("id0123456789 weather is good, climbing with 123fpm")
|
||||
assert 'comment' in message
|
||||
|
|
|
@ -1,21 +1,15 @@
|
|||
import unittest
|
||||
from ogn.parser.aprs_comment.inreach_parser import InreachParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = InreachParser().parse_position("id300434060496190 inReac True")
|
||||
assert message['address'] == "300434060496190"
|
||||
assert message['model'] == 'inReac'
|
||||
assert message['status'] is True
|
||||
assert message['pilot_name'] is None
|
||||
def test_position_comment():
|
||||
message = InreachParser().parse_position("id300434060496190 inReac True")
|
||||
assert message['address'] == "300434060496190"
|
||||
assert message['model'] == 'inReac'
|
||||
assert message['status'] is True
|
||||
assert message['pilot_name'] is None
|
||||
|
||||
message = InreachParser().parse_position("id300434060496190 inReac True Jim Bob")
|
||||
assert message['address'] == "300434060496190"
|
||||
assert message['model'] == 'inReac'
|
||||
assert message['status'] is True
|
||||
assert message['pilot_name'] == "Jim Bob"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
message = InreachParser().parse_position("id300434060496190 inReac True Jim Bob")
|
||||
assert message['address'] == "300434060496190"
|
||||
assert message['model'] == 'inReac'
|
||||
assert message['status'] is True
|
||||
assert message['pilot_name'] == "Jim Bob"
|
||||
|
|
|
@ -1,17 +1,12 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS
|
||||
from ogn.parser.aprs_comment.lt24_parser import LT24Parser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = LT24Parser().parse_position("id25387 +123fpm GPS")
|
||||
def test_position_comment():
|
||||
message = LT24Parser().parse_position("id25387 +123fpm GPS")
|
||||
|
||||
assert message['lt24_id'] == "25387"
|
||||
self.assertAlmostEqual(message['climb_rate'], 123 * FPM_TO_MS, 2)
|
||||
assert message['source'] == 'GPS'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert message['lt24_id'] == "25387"
|
||||
assert message['climb_rate'] == pytest.approx(123 * FPM_TO_MS, 2)
|
||||
assert message['source'] == 'GPS'
|
||||
|
|
|
@ -1,25 +1,19 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.microtrak_parser import MicrotrakParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = MicrotrakParser().parse_position("id21A8CBA8")
|
||||
def test_position_comment():
|
||||
message = MicrotrakParser().parse_position("id21A8CBA8")
|
||||
|
||||
assert message['address_type'] == 1
|
||||
assert message['aircraft_type'] == 8
|
||||
self.assertFalse(message['stealth'])
|
||||
self.assertFalse(message['no-tracking'])
|
||||
assert message['address'] == "A8CBA8"
|
||||
|
||||
def test_position_comment_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = MicrotrakParser().parse_position("id21A8CBA8")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
assert message['address_type'] == 1
|
||||
assert message['aircraft_type'] == 8
|
||||
assert message['stealth'] is False
|
||||
assert message['no-tracking'] is False
|
||||
assert message['address'] == "A8CBA8"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_position_comment_relevant_keys_only():
|
||||
# return only keys where we got informations
|
||||
message = MicrotrakParser().parse_position("id21A8CBA8")
|
||||
|
||||
assert message is not None
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
|
|
|
@ -1,30 +1,25 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS
|
||||
from ogn.parser.aprs_comment.naviter_parser import NaviterParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_OGNAVI_1(self):
|
||||
message = NaviterParser().parse_position("id0440042121 +123fpm +0.5rot")
|
||||
def test_OGNAVI_1():
|
||||
message = NaviterParser().parse_position("id0440042121 +123fpm +0.5rot")
|
||||
|
||||
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
|
||||
# bit 0: stealth mode
|
||||
# bit 1: do not track mode
|
||||
# bits 2-5: aircraft type
|
||||
# bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers)
|
||||
# bits 12-15: reserved for use at a later time
|
||||
# bits 16-39: device id (24-bit device identifier, same as in APRS header)
|
||||
assert message['stealth'] is False
|
||||
assert message['do_not_track'] is False
|
||||
assert message['aircraft_type'] == 1
|
||||
assert message['address_type'] == 4
|
||||
assert message['reserved'] == 0
|
||||
assert message['address'] == "042121"
|
||||
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
|
||||
# bit 0: stealth mode
|
||||
# bit 1: do not track mode
|
||||
# bits 2-5: aircraft type
|
||||
# bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers)
|
||||
# bits 12-15: reserved for use at a later time
|
||||
# bits 16-39: device id (24-bit device identifier, same as in APRS header)
|
||||
assert message['stealth'] is False
|
||||
assert message['do_not_track'] is False
|
||||
assert message['aircraft_type'] == 1
|
||||
assert message['address_type'] == 4
|
||||
assert message['reserved'] == 0
|
||||
assert message['address'] == "042121"
|
||||
|
||||
self.assertAlmostEqual(message['climb_rate'], 123 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 0.5 * HPM_TO_DEGS
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert message['climb_rate'] == pytest.approx(123 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 0.5 * HPM_TO_DEGS
|
||||
|
|
|
@ -1,78 +1,81 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS
|
||||
from ogn.parser.aprs_comment.ogn_parser import OgnParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_invalid_token(self):
|
||||
assert OgnParser().parse_aircraft_beacon("notAValidToken") is None
|
||||
|
||||
def test_basic(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
|
||||
assert message['address_type'] == 2
|
||||
assert message['aircraft_type'] == 2
|
||||
self.assertFalse(message['stealth'])
|
||||
self.assertFalse(message['no-tracking'])
|
||||
assert message['address'] == "DDA5BA"
|
||||
self.assertAlmostEqual(message['climb_rate'], -454 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == -1.1 * HPM_TO_DEGS
|
||||
assert message['signal_quality'] == 8.8
|
||||
assert message['error_count'] == 0
|
||||
assert message['frequency_offset'] == 51.2
|
||||
assert message['gps_quality'] == {'horizontal': 4, 'vertical': 5}
|
||||
assert len(message['proximity']) == 3
|
||||
assert message['proximity'][0] == '1084'
|
||||
assert message['proximity'][1] == 'B597'
|
||||
assert message['proximity'][2] == 'B598'
|
||||
|
||||
def test_no_tracking(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
self.assertFalse(message['no-tracking'])
|
||||
|
||||
message = OgnParser().parse_aircraft_beacon("id4ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['no-tracking'] is True
|
||||
|
||||
def test_stealth(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['stealth'] is False
|
||||
|
||||
message = OgnParser().parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['stealth'] is True
|
||||
|
||||
def test_v024(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
|
||||
|
||||
assert message['software_version'] == 6.02
|
||||
assert message['hardware_version'] == 10
|
||||
assert message['real_address'] == "DF0C56"
|
||||
|
||||
def test_v024_ogn_tracker(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
|
||||
assert message['flightlevel'] == 4.43
|
||||
|
||||
def test_v025(self):
|
||||
message = OgnParser().parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
|
||||
|
||||
assert message['signal_power'] == 7.4
|
||||
|
||||
def test_v026(self):
|
||||
# from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID
|
||||
message_triple = OgnParser().parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
|
||||
message_single = OgnParser().parse_aircraft_beacon("id093D0930")
|
||||
|
||||
self.assertIsNotNone(message_triple)
|
||||
self.assertIsNotNone(message_single)
|
||||
|
||||
def test_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_aircraft_beacon("id093D0930")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
def test_invalid_token():
|
||||
assert OgnParser().parse_aircraft_beacon("notAValidToken") is None
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_basic():
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
|
||||
assert message['address_type'] == 2
|
||||
assert message['aircraft_type'] == 2
|
||||
assert message['stealth'] is False
|
||||
assert message['no-tracking'] is False
|
||||
assert message['address'] == "DDA5BA"
|
||||
assert message['climb_rate'] == pytest.approx(-454 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == -1.1 * HPM_TO_DEGS
|
||||
assert message['signal_quality'] == 8.8
|
||||
assert message['error_count'] == 0
|
||||
assert message['frequency_offset'] == 51.2
|
||||
assert message['gps_quality'] == {'horizontal': 4, 'vertical': 5}
|
||||
assert len(message['proximity']) == 3
|
||||
assert message['proximity'][0] == '1084'
|
||||
assert message['proximity'][1] == 'B597'
|
||||
assert message['proximity'][2] == 'B598'
|
||||
|
||||
|
||||
def test_no_tracking():
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['no-tracking'] is False
|
||||
|
||||
message = OgnParser().parse_aircraft_beacon("id4ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['no-tracking'] is True
|
||||
|
||||
|
||||
def test_stealth():
|
||||
message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['stealth'] is False
|
||||
|
||||
message = OgnParser().parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
|
||||
assert message['stealth'] is True
|
||||
|
||||
|
||||
def test_v024():
|
||||
message = OgnParser().parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
|
||||
|
||||
assert message['software_version'] == 6.02
|
||||
assert message['hardware_version'] == 10
|
||||
assert message['real_address'] == "DF0C56"
|
||||
|
||||
|
||||
def test_v024_ogn_tracker():
|
||||
message = OgnParser().parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
|
||||
|
||||
assert message['flightlevel'] == 4.43
|
||||
|
||||
|
||||
def test_v025():
|
||||
message = OgnParser().parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
|
||||
|
||||
assert message['signal_power'] == 7.4
|
||||
|
||||
|
||||
def test_v026():
|
||||
# from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID
|
||||
message_triple = OgnParser().parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
|
||||
message_single = OgnParser().parse_aircraft_beacon("id093D0930")
|
||||
|
||||
assert message_triple is not None
|
||||
assert message_single is not None
|
||||
|
||||
|
||||
def test_relevant_keys_only():
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_aircraft_beacon("id093D0930")
|
||||
|
||||
assert message is not None
|
||||
assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking'])
|
||||
|
|
|
@ -1,58 +1,56 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.ogn_parser import OgnParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_fail_validation(self):
|
||||
assert OgnParser().parse_receiver_beacon("notAValidToken") is None
|
||||
|
||||
def test_v021(self):
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
|
||||
assert message['version'] == "0.2.1"
|
||||
assert message['cpu_load'] == 0.8
|
||||
assert message['free_ram'] == 25.6
|
||||
assert message['total_ram'] == 458.9
|
||||
assert message['ntp_error'] == 0.1
|
||||
assert message['rt_crystal_correction'] == 2.3
|
||||
assert message['cpu_temp'] == 51.9
|
||||
|
||||
assert message['rec_crystal_correction'] == 26
|
||||
assert message['rec_crystal_correction_fine'] == -1.4
|
||||
assert message['rec_input_noise'] == -0.25
|
||||
|
||||
def test_v022(self):
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
assert message['platform'] == 'x86'
|
||||
|
||||
def test_v025(self):
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
|
||||
assert message['voltage'] == 5.016
|
||||
assert message['amperage'] == 0.534
|
||||
assert message['senders_signal'] == 10.8
|
||||
assert message['senders_messages'] == 57282
|
||||
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
|
||||
assert message['senders_visible'] == 14
|
||||
assert message['senders_total'] == 16
|
||||
assert message['senders_signal'] == 24.0
|
||||
assert message['senders_messages'] == 143717
|
||||
assert message['good_senders_signal'] == 26.7
|
||||
assert message['good_senders'] == 68
|
||||
assert message['good_and_bad_senders'] == 135
|
||||
|
||||
def test_v028(self):
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.8.RPI-GPU CPU:0.3 RAM:744.5/968.2MB NTP:3.6ms/+2.0ppm +68.2C 3/3Acfts[1h] Lat:1.6s RF:-8+67.8ppm/+10.33dB/+1.3dB@10km[30998]/+10.4dB@10km[3/5]")
|
||||
assert message['latency'] == 1.6
|
||||
|
||||
def test_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
assert sorted(message.keys()) == sorted(['version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction'])
|
||||
def test_fail_validation():
|
||||
assert OgnParser().parse_receiver_beacon("notAValidToken") is None
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_v021():
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
|
||||
|
||||
assert message['version'] == "0.2.1"
|
||||
assert message['cpu_load'] == 0.8
|
||||
assert message['free_ram'] == 25.6
|
||||
assert message['total_ram'] == 458.9
|
||||
assert message['ntp_error'] == 0.1
|
||||
assert message['rt_crystal_correction'] == 2.3
|
||||
assert message['cpu_temp'] == 51.9
|
||||
|
||||
assert message['rec_crystal_correction'] == 26
|
||||
assert message['rec_crystal_correction_fine'] == -1.4
|
||||
assert message['rec_input_noise'] == -0.25
|
||||
|
||||
|
||||
def test_v022():
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
|
||||
assert message['platform'] == 'x86'
|
||||
|
||||
|
||||
def test_v025():
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
|
||||
assert message['voltage'] == 5.016
|
||||
assert message['amperage'] == 0.534
|
||||
assert message['senders_signal'] == 10.8
|
||||
assert message['senders_messages'] == 57282
|
||||
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
|
||||
assert message['senders_visible'] == 14
|
||||
assert message['senders_total'] == 16
|
||||
assert message['senders_signal'] == 24.0
|
||||
assert message['senders_messages'] == 143717
|
||||
assert message['good_senders_signal'] == 26.7
|
||||
assert message['good_senders'] == 68
|
||||
assert message['good_and_bad_senders'] == 135
|
||||
|
||||
|
||||
def test_v028():
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.8.RPI-GPU CPU:0.3 RAM:744.5/968.2MB NTP:3.6ms/+2.0ppm +68.2C 3/3Acfts[1h] Lat:1.6s RF:-8+67.8ppm/+10.33dB/+1.3dB@10km[30998]/+10.4dB@10km[3/5]")
|
||||
assert message['latency'] == 1.6
|
||||
|
||||
|
||||
def test_relevant_keys_only():
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm")
|
||||
|
||||
assert message is not None
|
||||
assert sorted(message.keys()) == sorted(['version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction'])
|
||||
|
|
|
@ -1,41 +1,36 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = ReceiverParser().parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
|
||||
def test_position_comment():
|
||||
message = ReceiverParser().parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
|
||||
|
||||
assert message['user_comment'] == "Antenna: chinese, on a pylon, 20 meter above ground"
|
||||
|
||||
def test_position_comment_empty(self):
|
||||
message = ReceiverParser().parse_position("")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
|
||||
def test_status_comment(self):
|
||||
message = ReceiverParser().parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
|
||||
|
||||
assert message['version'] == "0.2.7"
|
||||
assert message['platform'] == 'RPI-GPU'
|
||||
assert message['cpu_load'] == 0.7
|
||||
assert message['free_ram'] == 770.2
|
||||
assert message['total_ram'] == 968.2
|
||||
assert message['ntp_error'] == 1.8
|
||||
assert message['rt_crystal_correction'] == -3.3
|
||||
assert message['cpu_temp'] == 55.7
|
||||
assert message['senders_visible'] == 7
|
||||
assert message['senders_total'] == 8
|
||||
assert message['rec_crystal_correction'] == 54
|
||||
assert message['rec_crystal_correction_fine'] == -1.1
|
||||
assert message['rec_input_noise'] == -0.16
|
||||
assert message['senders_signal'] == 7.1
|
||||
assert message['senders_messages'] == 19481
|
||||
assert message['good_senders_signal'] == 16.8
|
||||
assert message['good_senders'] == 7
|
||||
assert message['good_and_bad_senders'] == 13
|
||||
assert message['user_comment'] == "Antenna: chinese, on a pylon, 20 meter above ground"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_position_comment_empty():
|
||||
message = ReceiverParser().parse_position("")
|
||||
|
||||
assert message is not None
|
||||
|
||||
|
||||
def test_status_comment():
|
||||
message = ReceiverParser().parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
|
||||
|
||||
assert message['version'] == "0.2.7"
|
||||
assert message['platform'] == 'RPI-GPU'
|
||||
assert message['cpu_load'] == 0.7
|
||||
assert message['free_ram'] == 770.2
|
||||
assert message['total_ram'] == 968.2
|
||||
assert message['ntp_error'] == 1.8
|
||||
assert message['rt_crystal_correction'] == -3.3
|
||||
assert message['cpu_temp'] == 55.7
|
||||
assert message['senders_visible'] == 7
|
||||
assert message['senders_total'] == 8
|
||||
assert message['rec_crystal_correction'] == 54
|
||||
assert message['rec_crystal_correction_fine'] == -1.1
|
||||
assert message['rec_input_noise'] == -0.16
|
||||
assert message['senders_signal'] == 7.1
|
||||
assert message['senders_messages'] == 19481
|
||||
assert message['good_senders_signal'] == 16.8
|
||||
assert message['good_senders'] == 7
|
||||
assert message['good_and_bad_senders'] == 13
|
||||
|
|
|
@ -1,20 +1,15 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS
|
||||
from ogn.parser.aprs_comment.safesky_parser import SafeskyParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
# "SKY3E5906>OGNSKY,qAS,SafeSky:/072555h5103.47N/00524.81E'065/031/A=001250 !W05! id1C3E5906 +010fpm gps6x1"
|
||||
message = SafeskyParser().parse_position("id1C3E5906 +010fpm gps6x1")
|
||||
assert message['address'] == '3E5906'
|
||||
assert message['address_type'] == 0
|
||||
assert message['aircraft_type'] == 7
|
||||
self.assertFalse(message['stealth'])
|
||||
self.assertAlmostEqual(message['climb_rate'], 10 * FPM_TO_MS, 2)
|
||||
assert message['gps_quality'] == {'horizontal': 6, 'vertical': 1}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_position_comment():
|
||||
# "SKY3E5906>OGNSKY,qAS,SafeSky:/072555h5103.47N/00524.81E'065/031/A=001250 !W05! id1C3E5906 +010fpm gps6x1"
|
||||
message = SafeskyParser().parse_position("id1C3E5906 +010fpm gps6x1")
|
||||
assert message['address'] == '3E5906'
|
||||
assert message['address_type'] == 0
|
||||
assert message['aircraft_type'] == 7
|
||||
assert message['stealth'] is False
|
||||
assert message['climb_rate'] == pytest.approx(10 * FPM_TO_MS, 2)
|
||||
assert message['gps_quality'] == {'horizontal': 6, 'vertical': 1}
|
||||
|
|
|
@ -1,16 +1,11 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS
|
||||
from ogn.parser.aprs_comment.skylines_parser import SkylinesParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = SkylinesParser().parse_position("id2816 -015fpm")
|
||||
def test_position_comment():
|
||||
message = SkylinesParser().parse_position("id2816 -015fpm")
|
||||
|
||||
assert message['skylines_id'] == "2816"
|
||||
self.assertAlmostEqual(message['climb_rate'], -15 * FPM_TO_MS, 2)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert message['skylines_id'] == "2816"
|
||||
assert message['climb_rate'] == pytest.approx(-15 * FPM_TO_MS, 2)
|
||||
|
|
|
@ -1,17 +1,10 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.spider_parser import SpiderParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = SpiderParser().parse_position("id300234060668560 +30dB K23W 3D")
|
||||
def test_position_comment():
|
||||
message = SpiderParser().parse_position("id300234060668560 +30dB K23W 3D")
|
||||
|
||||
assert message['spider_id'] == "300234060668560"
|
||||
assert message['signal_power'] == 30
|
||||
assert message['spider_registration'] == "K23W"
|
||||
assert message['gps_quality'] == "3D"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert message['spider_id'] == "300234060668560"
|
||||
assert message['signal_power'] == 30
|
||||
assert message['spider_registration'] == "K23W"
|
||||
assert message['gps_quality'] == "3D"
|
||||
|
|
|
@ -1,16 +1,9 @@
|
|||
import unittest
|
||||
|
||||
from ogn.parser.aprs_comment.spot_parser import SpotParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = SpotParser().parse_position("id0-2860357 SPOT3 GOOD")
|
||||
def test_position_comment():
|
||||
message = SpotParser().parse_position("id0-2860357 SPOT3 GOOD")
|
||||
|
||||
assert message['spot_id'] == "0-2860357"
|
||||
assert message['model'] == 'SPOT3'
|
||||
assert message['status'] == "GOOD"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
assert message['spot_id'] == "0-2860357"
|
||||
assert message['model'] == 'SPOT3'
|
||||
assert message['status'] == "GOOD"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import unittest
|
||||
import unittest.mock as mock
|
||||
import pytest
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
@ -7,52 +7,49 @@ from ogn.parser.telnet_parser import parse
|
|||
from ogn.parser.exceptions import ParseError
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
@unittest.skip("Not yet implemented")
|
||||
def test_telnet_fail_corrupt(self):
|
||||
with self.assertRaises(ParseError):
|
||||
parse('This is rubbish')
|
||||
|
||||
@mock.patch('ogn.parser.telnet_parser.datetime')
|
||||
def test_telnet_parse_complete(self, datetime_mock):
|
||||
# set the now-mock near to the time in the test string
|
||||
datetime_mock.now.return_value = datetime(2015, 1, 1, 10, 0, 55)
|
||||
|
||||
message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ? R B8949')
|
||||
|
||||
assert message['pps_offset'] == 0.181
|
||||
assert message['frequency'] == 868.394
|
||||
assert message['aircraft_type'] == 1
|
||||
assert message['address_type'] == 2
|
||||
assert message['address'] == 'DDA411'
|
||||
assert message['timestamp'] == datetime(2015, 1, 1, 10, 30, 10)
|
||||
assert message['latitude'] == 50.868
|
||||
assert message['longitude'] == 12.15279
|
||||
assert message['altitude'] == 988
|
||||
assert message['climb_rate'] == 0.1
|
||||
assert message['ground_speed'] == 25.7
|
||||
assert message['track'] == 85.4
|
||||
assert message['turn_rate'] == -3.5
|
||||
assert message['magic_number'] == 5 # the '5' is a magic number... 1 if ground_speed is 0.0m/s an 3 or 5 if airborne. Do you have an idea what it is?
|
||||
assert message['gps_status'] == '03x04'
|
||||
assert message['channel'] == 1
|
||||
assert message['flarm_timeslot'] is True
|
||||
assert message['ogn_timeslot'] is False
|
||||
assert message['frequency_offset'] == -12.61
|
||||
assert message['decode_quality'] == 5.8
|
||||
assert message['signal_quality'] == 15.5
|
||||
assert message['demodulator_type'] == 2
|
||||
assert message['error_count'] == 10
|
||||
assert message['distance'] == 30.9
|
||||
assert message['bearing'] == 99.5
|
||||
assert message['phi'] == 1.1
|
||||
assert message['multichannel'] is True
|
||||
|
||||
def test_telnet_parse_corrupt(self):
|
||||
message = parse('0.397sec:868.407MHz: sA:1:784024 205656: [ +5.71003, +20.48951]deg 34012m +14.5m/s 109.7m/s 118.5deg +21.0deg/sec 0 27x40m 01_o +7.03kHz 17.2/27.0dB/2 12e 4719.5km 271.1deg -8.5deg ? R B34067')
|
||||
|
||||
self.assertIsNone(message)
|
||||
@pytest.mark.skip("Not yet implemented")
|
||||
def test_telnet_fail_corrupt():
|
||||
with pytest.raises(ParseError):
|
||||
parse('This is rubbish')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@mock.patch('ogn.parser.telnet_parser.datetime')
|
||||
def test_telnet_parse_complete(datetime_mock):
|
||||
# set the now-mock near to the time in the test string
|
||||
datetime_mock.now.return_value = datetime(2015, 1, 1, 10, 0, 55)
|
||||
|
||||
message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ? R B8949')
|
||||
|
||||
assert message['pps_offset'] == 0.181
|
||||
assert message['frequency'] == 868.394
|
||||
assert message['aircraft_type'] == 1
|
||||
assert message['address_type'] == 2
|
||||
assert message['address'] == 'DDA411'
|
||||
assert message['timestamp'] == datetime(2015, 1, 1, 10, 30, 10)
|
||||
assert message['latitude'] == 50.868
|
||||
assert message['longitude'] == 12.15279
|
||||
assert message['altitude'] == 988
|
||||
assert message['climb_rate'] == 0.1
|
||||
assert message['ground_speed'] == 25.7
|
||||
assert message['track'] == 85.4
|
||||
assert message['turn_rate'] == -3.5
|
||||
assert message['magic_number'] == 5 # the '5' is a magic number... 1 if ground_speed is 0.0m/s an 3 or 5 if airborne. Do you have an idea what it is?
|
||||
assert message['gps_status'] == '03x04'
|
||||
assert message['channel'] == 1
|
||||
assert message['flarm_timeslot'] is True
|
||||
assert message['ogn_timeslot'] is False
|
||||
assert message['frequency_offset'] == -12.61
|
||||
assert message['decode_quality'] == 5.8
|
||||
assert message['signal_quality'] == 15.5
|
||||
assert message['demodulator_type'] == 2
|
||||
assert message['error_count'] == 10
|
||||
assert message['distance'] == 30.9
|
||||
assert message['bearing'] == 99.5
|
||||
assert message['phi'] == 1.1
|
||||
assert message['multichannel'] is True
|
||||
|
||||
|
||||
def test_telnet_parse_corrupt():
|
||||
message = parse('0.397sec:868.407MHz: sA:1:784024 205656: [ +5.71003, +20.48951]deg 34012m +14.5m/s 109.7m/s 118.5deg +21.0deg/sec 0 27x40m 01_o +7.03kHz 17.2/27.0dB/2 12e 4719.5km 271.1deg -8.5deg ? R B34067')
|
||||
|
||||
assert message is None
|
||||
|
|
|
@ -1,47 +1,44 @@
|
|||
import unittest
|
||||
import pytest
|
||||
|
||||
from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS
|
||||
from ogn.parser.aprs_comment.tracker_parser import TrackerParser
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_position_comment(self):
|
||||
message = TrackerParser().parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm")
|
||||
def test_position_comment():
|
||||
message = TrackerParser().parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm")
|
||||
|
||||
assert message['address_type'] == 3
|
||||
assert message['aircraft_type'] == 1
|
||||
self.assertFalse(message['stealth'])
|
||||
assert message['address'] == "2FD00F"
|
||||
self.assertAlmostEqual(message['climb_rate'], -58 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 1.1 * HPM_TO_DEGS
|
||||
assert message['flightlevel'] == 3.12
|
||||
assert message['signal_quality'] == 32.8
|
||||
assert message['error_count'] == 0
|
||||
assert message['frequency_offset'] == -0.8
|
||||
assert message['gps_quality'] == {'horizontal': 3, 'vertical': 5}
|
||||
assert message['signal_power'] == 12.7
|
||||
|
||||
def test_status_comment(self):
|
||||
message = TrackerParser().parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
|
||||
|
||||
assert message['hardware_version'] == 0
|
||||
assert message['software_version'] == 0
|
||||
assert message['gps_satellites'] == 9
|
||||
assert message['gps_quality'] == 1
|
||||
assert message['gps_altitude'] == 164
|
||||
assert message['pressure'] == 1002.6
|
||||
assert message['temperature'] == 20.2
|
||||
assert message['humidity'] == 0
|
||||
assert message['voltage'] == 3.34
|
||||
assert message['transmitter_power'] == 14
|
||||
assert message['noise_level'] == -110.5
|
||||
assert message['relays'] == 1
|
||||
|
||||
def test_status_comment_comment(self):
|
||||
message = TrackerParser().parse_status("Pilot=Pawel Hard=DIY/STM32")
|
||||
|
||||
assert message['comment'] == "Pilot=Pawel Hard=DIY/STM32"
|
||||
assert message['address_type'] == 3
|
||||
assert message['aircraft_type'] == 1
|
||||
assert message['stealth'] is False
|
||||
assert message['address'] == "2FD00F"
|
||||
assert message['climb_rate'] == pytest.approx(-58 * FPM_TO_MS, 2)
|
||||
assert message['turn_rate'] == 1.1 * HPM_TO_DEGS
|
||||
assert message['flightlevel'] == 3.12
|
||||
assert message['signal_quality'] == 32.8
|
||||
assert message['error_count'] == 0
|
||||
assert message['frequency_offset'] == -0.8
|
||||
assert message['gps_quality'] == {'horizontal': 3, 'vertical': 5}
|
||||
assert message['signal_power'] == 12.7
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_status_comment():
|
||||
message = TrackerParser().parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
|
||||
|
||||
assert message['hardware_version'] == 0
|
||||
assert message['software_version'] == 0
|
||||
assert message['gps_satellites'] == 9
|
||||
assert message['gps_quality'] == 1
|
||||
assert message['gps_altitude'] == 164
|
||||
assert message['pressure'] == 1002.6
|
||||
assert message['temperature'] == 20.2
|
||||
assert message['humidity'] == 0
|
||||
assert message['voltage'] == 3.34
|
||||
assert message['transmitter_power'] == 14
|
||||
assert message['noise_level'] == -110.5
|
||||
assert message['relays'] == 1
|
||||
|
||||
|
||||
def test_status_comment_comment():
|
||||
message = TrackerParser().parse_status("Pilot=Pawel Hard=DIY/STM32")
|
||||
|
||||
assert message['comment'] == "Pilot=Pawel Hard=DIY/STM32"
|
||||
|
|
|
@ -1,66 +1,68 @@
|
|||
import unittest
|
||||
import pytest
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from ogn.parser.utils import parseAngle, createTimestamp, CheapRuler, normalized_quality
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_parseAngle(self):
|
||||
self.assertAlmostEqual(parseAngle('05048.30'), 50.805, 5)
|
||||
|
||||
def proceed_test_data(self, test_data={}):
|
||||
for test in test_data:
|
||||
timestamp = createTimestamp(test[0], reference_timestamp=test[1])
|
||||
assert timestamp == test[2]
|
||||
|
||||
def test_createTimestamp_hhmmss(self):
|
||||
test_data = [
|
||||
('000001h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
|
||||
('235959h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
|
||||
('110000h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 11, 0, 0)), # packet 11 hours from future or 13 hours old
|
||||
('123500h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
|
||||
('000001h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
|
||||
]
|
||||
|
||||
self.proceed_test_data(test_data)
|
||||
|
||||
def test_createTimestamp_ddhhmm(self):
|
||||
test_data = [
|
||||
('011212z', datetime(2017, 9, 28, 0, 0, 1), datetime(2017, 10, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th,
|
||||
('281313z', datetime(2017, 10, 1, 0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st,
|
||||
('281414z', datetime(2017, 1, 1, 0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st,
|
||||
]
|
||||
|
||||
self.proceed_test_data(test_data)
|
||||
|
||||
def test_createTimestamp_tzinfo(self):
|
||||
test_data = [
|
||||
('000001h', datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc), (datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc)))
|
||||
]
|
||||
|
||||
self.proceed_test_data(test_data)
|
||||
|
||||
def test_cheap_ruler_distance(self):
|
||||
koenigsdf = (11.465353, 47.829825)
|
||||
hochkoenig = (13.062405, 47.420516)
|
||||
|
||||
cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2)
|
||||
distance = cheap_ruler.distance(koenigsdf, hochkoenig)
|
||||
self.assertAlmostEqual(distance, 128381.47612138899)
|
||||
|
||||
def test_cheap_ruler_bearing(self):
|
||||
koenigsdf = (11.465353, 47.829825)
|
||||
hochkoenig = (13.062405, 47.420516)
|
||||
|
||||
cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2)
|
||||
bearing = cheap_ruler.bearing(koenigsdf, hochkoenig)
|
||||
self.assertAlmostEqual(bearing, 110.761300063515)
|
||||
|
||||
def test_normalized_quality(self):
|
||||
self.assertAlmostEqual(normalized_quality(10000, 1), 1)
|
||||
self.assertAlmostEqual(normalized_quality(20000, 10), 16.020599913279625)
|
||||
self.assertAlmostEqual(normalized_quality(5000, 5), -1.0205999132796242)
|
||||
def _proceed_test_data(test_data={}):
|
||||
for test in test_data:
|
||||
timestamp = createTimestamp(test[0], reference_timestamp=test[1])
|
||||
assert timestamp == test[2]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_parseAngle():
|
||||
assert parseAngle('05048.30') == pytest.approx(50.805, 5)
|
||||
|
||||
|
||||
def test_createTimestamp_hhmmss():
|
||||
test_data = [
|
||||
('000001h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
|
||||
('235959h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
|
||||
('110000h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 11, 0, 0)), # packet 11 hours from future or 13 hours old
|
||||
('123500h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
|
||||
('000001h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
|
||||
]
|
||||
|
||||
_proceed_test_data(test_data)
|
||||
|
||||
|
||||
def test_createTimestamp_ddhhmm():
|
||||
test_data = [
|
||||
('011212z', datetime(2017, 9, 28, 0, 0, 1), datetime(2017, 10, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th,
|
||||
('281313z', datetime(2017, 10, 1, 0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st,
|
||||
('281414z', datetime(2017, 1, 1, 0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st,
|
||||
]
|
||||
|
||||
_proceed_test_data(test_data)
|
||||
|
||||
|
||||
def test_createTimestamp_tzinfo():
|
||||
test_data = [
|
||||
('000001h', datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc), (datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc)))
|
||||
]
|
||||
|
||||
_proceed_test_data(test_data)
|
||||
|
||||
|
||||
def test_cheap_ruler_distance():
|
||||
koenigsdf = (11.465353, 47.829825)
|
||||
hochkoenig = (13.062405, 47.420516)
|
||||
|
||||
cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2)
|
||||
distance = cheap_ruler.distance(koenigsdf, hochkoenig)
|
||||
assert distance == pytest.approx(128381.47612138899)
|
||||
|
||||
|
||||
def test_cheap_ruler_bearing():
|
||||
koenigsdf = (11.465353, 47.829825)
|
||||
hochkoenig = (13.062405, 47.420516)
|
||||
|
||||
cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2)
|
||||
bearing = cheap_ruler.bearing(koenigsdf, hochkoenig)
|
||||
assert bearing == pytest.approx(110.761300063515)
|
||||
|
||||
|
||||
def test_normalized_quality():
|
||||
assert normalized_quality(10000, 1) == pytest.approx(1)
|
||||
assert normalized_quality(20000, 10) == pytest.approx(16.020599913279625)
|
||||
assert normalized_quality(5000, 5) == pytest.approx(-1.0205999132796242)
|
||||
|
|
Ładowanie…
Reference in New Issue