diff --git a/tests/client/test_AprsClient.py b/tests/client/test_AprsClient.py index ac61978..20d0556 100644 --- a/tests/client/test_AprsClient.py +++ b/tests/client/test_AprsClient.py @@ -1,150 +1,156 @@ -import unittest import unittest.mock as mock +import pytest from ogn.parser import parse from ogn.client.client import create_aprs_login, AprsClient from ogn.client.settings import APRS_APP_NAME, APRS_APP_VER, APRS_KEEPALIVE_TIME -class AprsClientTest(unittest.TestCase): - def test_create_aprs_login(self): - basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1') - assert 'user klaus pass -1 vers myApp 0.1\n' == basic_login +def test_create_aprs_login(): + basic_login = create_aprs_login('klaus', -1, 'myApp', '0.1') + assert 'user klaus pass -1 vers myApp 0.1\n' == basic_login - login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100') - assert 'user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n' == login_with_filter + login_with_filter = create_aprs_login('klaus', -1, 'myApp', '0.1', 'r/48.0/11.0/100') + assert 'user klaus pass -1 vers myApp 0.1 filter r/48.0/11.0/100\n' == login_with_filter - def test_initialisation(self): - client = AprsClient(aprs_user='testuser', aprs_filter='') - assert client.aprs_user == 'testuser' - assert client.aprs_filter == '' - @mock.patch('ogn.client.client.socket') - def test_connect_full_feed(self, mock_socket): - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() - client.sock.send.assert_called_once_with('user testuser pass -1 vers {} {}\n'.format( - APRS_APP_NAME, APRS_APP_VER).encode('ascii')) - client.sock.makefile.assert_called_once_with('rb') +def test_initialisation(): + client = AprsClient(aprs_user='testuser', aprs_filter='') + assert client.aprs_user == 'testuser' + assert client.aprs_filter == '' - @mock.patch('ogn.client.client.socket') - def test_connect_client_defined_filter(self, mock_socket): - client = AprsClient(aprs_user='testuser', aprs_filter='r/50.4976/9.9495/100') - client.connect() - client.sock.send.assert_called_once_with('user testuser pass -1 vers {} {} filter r/50.4976/9.9495/100\n'.format( - APRS_APP_NAME, APRS_APP_VER).encode('ascii')) - client.sock.makefile.assert_called_once_with('rb') - @mock.patch('ogn.client.client.socket') - def test_disconnect(self, mock_socket): - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() +@mock.patch('ogn.client.client.socket') +def test_connect_full_feed(mock_socket): + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + client.sock.send.assert_called_once_with(f'user testuser pass -1 vers {APRS_APP_NAME} {APRS_APP_VER}\n'.encode('ascii')) + client.sock.makefile.assert_called_once_with('rb') + + +@mock.patch('ogn.client.client.socket') +def test_connect_client_defined_filter(mock_socket): + client = AprsClient(aprs_user='testuser', aprs_filter='r/50.4976/9.9495/100') + client.connect() + client.sock.send.assert_called_once_with(f'user testuser pass -1 vers {APRS_APP_NAME} {APRS_APP_VER} filter r/50.4976/9.9495/100\n'.encode('ascii')) + client.sock.makefile.assert_called_once_with('rb') + + +@mock.patch('ogn.client.client.socket') +def test_disconnect(mock_socket): + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + client.disconnect() + client.sock.shutdown.assert_called_once_with(0) + client.sock.close.assert_called_once_with() + assert client._kill is True + + +@mock.patch('ogn.client.client.socket') +def test_run(mock_socket): + import socket + mock_socket.error = socket.error + + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + + client.sock_file.readline = mock.MagicMock() + client.sock_file.readline.side_effect = [b'Normal text blabla', + b'my weird character \xc2\xa5', + UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2, 'This is just a fake reason!'), + b'... show must go on', + BrokenPipeError(), + b'... and on', + ConnectionResetError(), + b'... and on', + socket.error(), + b'... and on', + b'', + b'... and on', + KeyboardInterrupt()] + + try: + client.run(callback=lambda msg: print("got: {}".format(msg)), autoreconnect=True) + except KeyboardInterrupt: + pass + finally: client.disconnect() - client.sock.shutdown.assert_called_once_with(0) - client.sock.close.assert_called_once_with() - assert client._kill is True - @mock.patch('ogn.client.client.socket') - def test_run(self, mock_socket): - import socket - mock_socket.error = socket.error - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() +@mock.patch('ogn.client.client.time') +@mock.patch('ogn.client.client.socket') +def test_run_keepalive(mock_socket, mock_time): + import socket + mock_socket.error = socket.error - client.sock_file.readline = mock.MagicMock() - client.sock_file.readline.side_effect = [b'Normal text blabla', - b'my weird character \xc2\xa5', - UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2, 'This is just a fake reason!'), - b'... show must go on', - BrokenPipeError(), - b'... and on', - ConnectionResetError(), - b'... and on', - socket.error(), - b'... and on', - b'', - b'... and on', - KeyboardInterrupt()] + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + client.sock_file.readline = mock.MagicMock() + client.sock_file.readline.side_effect = [b'Normal text blabla', + KeyboardInterrupt()] + + mock_time.side_effect = [0, 0, APRS_KEEPALIVE_TIME + 1, APRS_KEEPALIVE_TIME + 1] + + timed_callback = mock.MagicMock() + + try: + client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback) + except KeyboardInterrupt: + pass + finally: + client.disconnect() + + timed_callback.assert_called_with(client) + + +def test_reset_kill_reconnect(): + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + + # .run() should be allowed to execute after .connect() + mock_callback = mock.MagicMock( + side_effect=lambda raw_msg: client.disconnect()) + + assert client._kill is False + client.run(callback=mock_callback, autoreconnect=True) + + # After .disconnect(), client._kill should be True + assert client._kill is True + assert mock_callback.call_count == 1 + + # After we reconnect, .run() should be able to run again + mock_callback.reset_mock() + client.connect() + client.run(callback=mock_callback, autoreconnect=True) + assert mock_callback.call_count == 1 + + +@pytest.mark.skip("Too much invalid APRS data on the live feed") +def test_50_live_messages(): + print("Enter") + remaining_messages = 50 + + def process_message(raw_message): + global remaining_messages + if raw_message[0] == '#': + return try: - client.run(callback=lambda msg: print("got: {}".format(msg)), autoreconnect=True) - except KeyboardInterrupt: - pass - finally: - client.disconnect() + message = parse(raw_message) + print("{}: {}".format(message['aprs_type'], raw_message)) + except NotImplementedError as e: + print("{}: {}".format(e, raw_message)) + return + if remaining_messages > 0: + remaining_messages -= 1 + else: + raise KeyboardInterrupt - @mock.patch('ogn.client.client.time') - @mock.patch('ogn.client.client.socket') - def test_run_keepalive(self, mock_socket, mock_time): - import socket - mock_socket.error = socket.error - - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() - - client.sock_file.readline = mock.MagicMock() - client.sock_file.readline.side_effect = [b'Normal text blabla', - KeyboardInterrupt()] - - mock_time.side_effect = [0, 0, APRS_KEEPALIVE_TIME + 1, APRS_KEEPALIVE_TIME + 1] - - timed_callback = mock.MagicMock() - - try: - client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback) - except KeyboardInterrupt: - pass - finally: - client.disconnect() - - timed_callback.assert_called_with(client) - - def test_reset_kill_reconnect(self): - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() - - # .run() should be allowed to execute after .connect() - mock_callback = mock.MagicMock( - side_effect=lambda raw_msg: client.disconnect()) - - self.assertFalse(client._kill) - client.run(callback=mock_callback, autoreconnect=True) - - # After .disconnect(), client._kill should be True - assert client._kill is True - assert mock_callback.call_count == 1 - - # After we reconnect, .run() should be able to run again - mock_callback.reset_mock() - client.connect() - client.run(callback=mock_callback, autoreconnect=True) - assert mock_callback.call_count == 1 - - @unittest.skip("Too much invalid APRS data on the live feed") - def test_50_live_messages(self): - print("Enter") - self.remaining_messages = 50 - - def process_message(raw_message): - if raw_message[0] == '#': - return - try: - message = parse(raw_message) - print("{}: {}".format(message['aprs_type'], raw_message)) - except NotImplementedError as e: - print("{}: {}".format(e, raw_message)) - return - if self.remaining_messages > 0: - self.remaining_messages -= 1 - else: - raise KeyboardInterrupt - - client = AprsClient(aprs_user='testuser', aprs_filter='') - client.connect() - try: - client.run(callback=process_message, autoreconnect=True) - except KeyboardInterrupt: - pass - finally: - client.disconnect() + client = AprsClient(aprs_user='testuser', aprs_filter='') + client.connect() + try: + client.run(callback=process_message, autoreconnect=True) + except KeyboardInterrupt: + pass + finally: + client.disconnect() diff --git a/tests/client/test_TelnetClient.py b/tests/client/test_TelnetClient.py index 3e39d76..48d74d1 100644 --- a/tests/client/test_TelnetClient.py +++ b/tests/client/test_TelnetClient.py @@ -1,26 +1,25 @@ -import unittest import unittest.mock as mock from ogn.client.client import TelnetClient -class TelnetClientTest(unittest.TestCase): - @mock.patch('ogn.client.client.socket') - def test_connect_disconnect(self, socket_mock): - client = TelnetClient() - client.connect() - client.sock.connect.assert_called_once_with(('localhost', 50001)) +@mock.patch('ogn.client.client.socket') +def test_connect_disconnect(socket_mock): + client = TelnetClient() + client.connect() + client.sock.connect.assert_called_once_with(('localhost', 50001)) - client.disconnect() - client.sock.shutdown.assert_called_once_with(0) - client.sock.close.assert_called_once_with() + client.disconnect() + client.sock.shutdown.assert_called_once_with(0) + client.sock.close.assert_called_once_with() - @mock.patch('ogn.client.client.socket') - def test_run(self, socket_mock): - def callback(raw_message): - raise ConnectionRefusedError - client = TelnetClient() - client.connect() +@mock.patch('ogn.client.client.socket') +def test_run(socket_mock): + def callback(raw_message): + raise ConnectionRefusedError - client.run(callback=callback) + client = TelnetClient() + client.connect() + + client.run(callback=callback) diff --git a/tests/ddb/test_utils.py b/tests/ddb/test_utils.py index 73c9e35..33104e8 100644 --- a/tests/ddb/test_utils.py +++ b/tests/ddb/test_utils.py @@ -1,10 +1,7 @@ -import unittest - from ogn.ddb import get_ddb_devices -class TestStringMethods(unittest.TestCase): - def test_get_ddb_devices(self): - devices = list(get_ddb_devices()) - self.assertGreater(len(devices), 4000) - self.assertCountEqual(devices[0].keys(), ['device_type', 'device_id', 'aircraft_model', 'registration', 'cn', 'tracked', 'identified']) +def test_get_ddb_devices(): + devices = list(get_ddb_devices()) + assert len(devices) > 4000 + assert len(devices[0].keys()), len(['device_type', 'device_id', 'aircraft_model', 'registration', 'cn', 'tracked', 'identified']) diff --git a/tests/parser/test_parse.py b/tests/parser/test_parse.py index e596c31..f28561f 100644 --- a/tests/parser/test_parse.py +++ b/tests/parser/test_parse.py @@ -1,5 +1,5 @@ -import unittest import unittest.mock as mock +import pytest import os from datetime import datetime @@ -9,124 +9,141 @@ from ogn.parser.parse import parse from ogn.parser.exceptions import AprsParseError, OgnParseError -class TestStringMethods(unittest.TestCase): - def parse_valid_beacon_data_file(self, filename, beacon_type): - with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f: - for line in f: - try: - message = parse(line, datetime(2015, 4, 10, 17, 0)) - self.assertFalse(message is None) - if message['aprs_type'] == 'position' or message['aprs_type'] == 'status': - assert message['beacon_type'] == beacon_type - except NotImplementedError as e: - print(e) - - def test_aprs_aircraft_beacons(self): - self.parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aprs_aircraft') - - def test_aprs_receiver_beacons(self): - self.parse_valid_beacon_data_file(filename='aprs_receiver.txt', beacon_type='aprs_receiver') - - def test_aprs_fanet_beacons(self): - self.parse_valid_beacon_data_file(filename='fanet.txt', beacon_type='fanet') - - def test_flarm_beacons(self): - self.parse_valid_beacon_data_file(filename='flarm.txt', beacon_type='flarm') - - def test_receiver_beacons(self): - self.parse_valid_beacon_data_file(filename='receiver.txt', beacon_type='receiver') - - def test_tracker_beacons(self): - self.parse_valid_beacon_data_file(filename='tracker.txt', beacon_type='tracker') - - def test_capturs_beacons(self): - self.parse_valid_beacon_data_file(filename='capturs.txt', beacon_type='capturs') - - def test_flymaster_beacons(self): - self.parse_valid_beacon_data_file(filename='flymaster.txt', beacon_type='flymaster') - - def test_inreach_beacons(self): - self.parse_valid_beacon_data_file(filename='inreach.txt', beacon_type='inreach') - - def test_lt24_beacons(self): - self.parse_valid_beacon_data_file(filename='lt24.txt', beacon_type='lt24') - - def test_naviter_beacons(self): - self.parse_valid_beacon_data_file(filename='naviter.txt', beacon_type='naviter') - - def test_pilot_aware_beacons(self): - self.parse_valid_beacon_data_file(filename='pilot_aware.txt', beacon_type='pilot_aware') - - def test_skylines_beacons(self): - self.parse_valid_beacon_data_file(filename='skylines.txt', beacon_type='skylines') - - def test_spider_beacons(self): - self.parse_valid_beacon_data_file(filename='spider.txt', beacon_type='spider') - - def test_spot_beacons(self): - self.parse_valid_beacon_data_file(filename='spot.txt', beacon_type='spot') - - def test_generic_beacons(self): - message = parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment") - assert message['beacon_type'] == 'unknown' - assert message['comment'] == "this is a comment" - - def test_fail_parse_aprs_none(self): - with self.assertRaises(TypeError): - parse(None) - - def test_fail_empty(self): - with self.assertRaises(AprsParseError): - parse("") - - def test_fail_bad_string(self): - with self.assertRaises(AprsParseError): - parse("Lachens>APRS,TCPIwontbeavalidstring") - - def test_v026_chile(self): - # receiver beacons from chile have a APRS position message with a pure user comment - message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura") - - assert message['user_comment'] == "Vitacura Municipal Aerodrome, Club de Planeadores Vitacura" - - message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs") - - assert message_with_id['user_comment'] == "Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs" - - @mock.patch('ogn.parser.parse_module.createTimestamp') - def test_default_reference_date(self, createTimestamp_mock): - valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB" - - parse(valid_aprs_string) - call_args_before = createTimestamp_mock.call_args - - sleep(1) - - parse(valid_aprs_string) - call_args_seconds_later = createTimestamp_mock.call_args - - self.assertNotEqual(call_args_before, call_args_seconds_later) - - def test_copy_constructor(self): - valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5" - message = parse(valid_aprs_string) - - assert message['name'] == 'FLRDDA5BA' - assert message['address'] == 'DDA5BA' - - def test_bad_naviter_format(self): - with self.assertRaises(OgnParseError): - parse("FLRA51D93>OGNAVI,qAS,NAVITER2:/204507h4444.98N/09323.34W'000/000/A=000925 !W67! id06A51D93 +000fpm +0.0rot") - - def test_no_receiver(self): - result = parse("EDFW>OGNSDR:/102713h4949.02NI00953.88E&/A=000984") - - assert result['aprs_type'] == 'position' - assert result['beacon_type'] == 'receiver' - assert result['name'] == 'EDFW' - assert result['dstcall'] == 'OGNSDR' - assert result['receiver_name'] is None +def _parse_valid_beacon_data_file(filename, beacon_type): + with open(os.path.dirname(__file__) + '/valid_beacon_data/' + filename) as f: + for line in f: + try: + message = parse(line, datetime(2015, 4, 10, 17, 0)) + assert message is not None + if message['aprs_type'] == 'position' or message['aprs_type'] == 'status': + assert message['beacon_type'] == beacon_type + except NotImplementedError as e: + print(e) -if __name__ == '__main__': - unittest.main() +def test_aprs_aircraft_beacons(): + _parse_valid_beacon_data_file(filename='aprs_aircraft.txt', beacon_type='aprs_aircraft') + + +def test_aprs_receiver_beacons(): + _parse_valid_beacon_data_file(filename='aprs_receiver.txt', beacon_type='aprs_receiver') + + +def test_aprs_fanet_beacons(): + _parse_valid_beacon_data_file(filename='fanet.txt', beacon_type='fanet') + + +def test_flarm_beacons(): + _parse_valid_beacon_data_file(filename='flarm.txt', beacon_type='flarm') + + +def test_receiver_beacons(): + _parse_valid_beacon_data_file(filename='receiver.txt', beacon_type='receiver') + + +def test_tracker_beacons(): + _parse_valid_beacon_data_file(filename='tracker.txt', beacon_type='tracker') + + +def test_capturs_beacons(): + _parse_valid_beacon_data_file(filename='capturs.txt', beacon_type='capturs') + + +def test_flymaster_beacons(): + _parse_valid_beacon_data_file(filename='flymaster.txt', beacon_type='flymaster') + + +def test_inreach_beacons(): + _parse_valid_beacon_data_file(filename='inreach.txt', beacon_type='inreach') + + +def test_lt24_beacons(): + _parse_valid_beacon_data_file(filename='lt24.txt', beacon_type='lt24') + + +def test_naviter_beacons(): + _parse_valid_beacon_data_file(filename='naviter.txt', beacon_type='naviter') + + +def test_pilot_aware_beacons(): + _parse_valid_beacon_data_file(filename='pilot_aware.txt', beacon_type='pilot_aware') + + +def test_skylines_beacons(): + _parse_valid_beacon_data_file(filename='skylines.txt', beacon_type='skylines') + + +def test_spider_beacons(): + _parse_valid_beacon_data_file(filename='spider.txt', beacon_type='spider') + + +def test_spot_beacons(): + _parse_valid_beacon_data_file(filename='spot.txt', beacon_type='spot') + + +def test_generic_beacons(): + message = parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment") + assert message['beacon_type'] == 'unknown' + assert message['comment'] == "this is a comment" + + +def test_fail_parse_aprs_none(): + with pytest.raises(TypeError): + parse(None) + + +def test_fail_empty(): + with pytest.raises(AprsParseError): + parse("") + + +def test_fail_bad_string(): + with pytest.raises(AprsParseError): + parse("Lachens>APRS,TCPIwontbeavalidstring") + + +def test_v026_chile(): + # receiver beacons from chile have a APRS position message with a pure user comment + message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura") + assert message['user_comment'] == "Vitacura Municipal Aerodrome, Club de Planeadores Vitacura" + + message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs") + assert message_with_id['user_comment'] == "Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs" + + +@mock.patch('ogn.parser.parse_module.createTimestamp') +def test_default_reference_date(createTimestamp_mock): + valid_aprs_string = "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB" + + parse(valid_aprs_string) + call_args_before = createTimestamp_mock.call_args + + sleep(1) + + parse(valid_aprs_string) + call_args_seconds_later = createTimestamp_mock.call_args + + assert call_args_before != call_args_seconds_later + + +def test_copy_constructor(): + valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5" + message = parse(valid_aprs_string) + + assert message['name'] == 'FLRDDA5BA' + assert message['address'] == 'DDA5BA' + + +def test_bad_naviter_format(): + with pytest.raises(OgnParseError): + parse("FLRA51D93>OGNAVI,qAS,NAVITER2:/204507h4444.98N/09323.34W'000/000/A=000925 !W67! id06A51D93 +000fpm +0.0rot") + + +def test_no_receiver(): + result = parse("EDFW>OGNSDR:/102713h4949.02NI00953.88E&/A=000984") + + assert result['aprs_type'] == 'position' + assert result['beacon_type'] == 'receiver' + assert result['name'] == 'EDFW' + assert result['dstcall'] == 'OGNSDR' + assert result['receiver_name'] is None diff --git a/tests/parser/test_parse_aprs.py b/tests/parser/test_parse_aprs.py index b22c4cb..04497e6 100644 --- a/tests/parser/test_parse_aprs.py +++ b/tests/parser/test_parse_aprs.py @@ -1,4 +1,4 @@ -import unittest +import pytest from datetime import datetime @@ -7,158 +7,169 @@ from ogn.parser.parse import parse_aprs from ogn.parser.exceptions import AprsParseError -class TestStringMethods(unittest.TestCase): - def test_fail_validation(self): - with self.assertRaises(AprsParseError): - parse_aprs("notAValidString") - - def test_basic(self): - message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment") - - assert message['aprs_type'] == 'position' - assert message['name'] == "FLRDDA5BA" - assert message['dstcall'] == "APRS" - assert message['receiver_name'] == "LFMX" - assert message['timestamp'].strftime('%H:%M:%S') == "16:08:29" - self.assertAlmostEqual(message['latitude'], 44.25683, 5) - assert message['symboltable'] == '/' - self.assertAlmostEqual(message['longitude'], 6.0005, 5) - assert message['symbolcode'] == '\'' - assert message['track'] == 342 - assert message['ground_speed'] == 49 * KNOTS_TO_MS / KPH_TO_MS - self.assertAlmostEqual(message['altitude'], 5524 * FEETS_TO_METER, 5) - assert message['comment'] == "this is a comment" - - def test_v024(self): - # higher precision datum format introduced - raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56" - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position' - self.assertAlmostEqual(message['latitude'] - 44.2568 - 1 / 30000, 2 / 1000 / 60, 10) - self.assertAlmostEqual(message['longitude'] - 6.0005, 6 / 1000 / 60, 10) - - def test_v025(self): - # introduced the "aprs status" format where many informations (lat, lon, alt, speed, ...) are just optional - raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment" - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'status' - assert message['name'] == "EPZR" - assert message['receiver_name'] == "GLIDERN1" - assert message['timestamp'].strftime('%H:%M:%S') == "09:34:56" - assert message['comment'] == "this is a comment" - - def test_v026(self): - # from 0.2.6 the ogn comment of a receiver beacon is just optional - raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322" - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position' - assert message['comment'] == '' - - def test_v026_relay(self): - # beacons can be relayed - raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot" - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position' - assert message['relay'] == "NAV07220E" - - def test_v027_ddhhmm(self): - # beacons can have hhmmss or ddhhmm timestamp - raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1" - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position' - assert message['timestamp'].strftime('%d %H:%M') == "30 10:46" - - def test_v028_fanet_position_weather(self): - # with v0.2.8 fanet devices can report weather data - raw_message = 'FNTFC9002>OGNFNT,qAS,LSXI2:/163051h4640.33N/00752.21E_187/004g007t075h78b63620 29.0dB -8.0kHz' - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position_weather' - assert message['wind_direction'] == 187 - assert message['wind_speed'] == 4 * KNOTS_TO_MS / KPH_TO_MS - assert message['wind_speed_peak'] == 7 * KNOTS_TO_MS / KPH_TO_MS - assert message['temperature'] == fahrenheit_to_celsius(75) - assert message['humidity'] == 78 * 0.01 - assert message['barometric_pressure'] == 63620 - - assert message['comment'] == '29.0dB -8.0kHz' - - def test_GXAirCom_fanet_position_weather_rainfall(self): - raw_message = 'FNT08F298>OGNFNT,qAS,DREIFBERG:/082654h4804.90N/00845.74E_273/005g008t057r123p234h90b10264 0.0dB' - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position_weather' - assert message['rainfall_1h'] == 123 / 100 * INCH_TO_MM - assert message['rainfall_24h'] == 234 / 100 * INCH_TO_MM - - def test_v028_fanet_position_weather_empty(self): - raw_message = 'FNT010115>OGNFNT,qAS,DB7MJ:/065738h4727.72N/01012.83E_.../...g...t... 27.8dB -13.8kHz' - message = parse_aprs(raw_message) - - assert message['aprs_type'] == 'position_weather' - assert message['wind_direction'] is None - assert message['wind_speed'] is None - assert message['wind_speed_peak'] is None - assert message['temperature'] is None - assert message['humidity'] is None - assert message['barometric_pressure'] is None - - def test_negative_altitude(self): - # some devices can report negative altitudes - raw_message = "OGNF71F40>APRS,qAS,NAVITER:/080852h4414.37N/01532.06E'253/052/A=-00013 !W73! id1EF71F40 -060fpm +0.0rot" - message = parse_aprs(raw_message) - - self.assertAlmostEqual(message['altitude'], -13 * FEETS_TO_METER, 5) - - def test_no_altitude(self): - # altitude is not a 'must have' - raw_message = "FLRDDEEF1>OGCAPT,qAS,CAPTURS:/065511h4837.63N/00233.79E'000/000" - message = parse_aprs(raw_message) - - assert message['altitude'] is None - - def test_invalid_coordinates(self): - # sometimes the coordinates leave their valid range: -90<=latitude<=90 or -180<=longitude<=180 - with self.assertRaises(AprsParseError): - parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17") - - with self.assertRaises(AprsParseError): - parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17") - - def test_invalid_timestamp(self): - with self.assertRaises(AprsParseError): - parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm") - - with self.assertRaises(AprsParseError): - parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322") - - def test_invalid_altitude(self): - with self.assertRaises(AprsParseError): - parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345") - - def test_bad_comment(self): - raw_message = "# bad configured ogn receiver" - message = parse_aprs(raw_message) - - assert message['comment'] == raw_message - assert message['aprs_type'] == 'comment' - - def test_server_comment(self): - raw_message = "# aprsc 2.1.4-g408ed49 17 Mar 2018 09:30:36 GMT GLIDERN1 37.187.40.234:10152" - message = parse_aprs(raw_message) - - assert message['version'] == '2.1.4-g408ed49' - assert message['timestamp'] == datetime(2018, 3, 17, 9, 30, 36) - assert message['server'] == 'GLIDERN1' - assert message['ip_address'] == '37.187.40.234' - assert message['port'] == '10152' - assert message['aprs_type'] == 'server' +def test_fail_validationassert(): + with pytest.raises(AprsParseError): + parse_aprs("notAValidString") -if __name__ == '__main__': - unittest.main() +def test_basic(): + message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment") + + assert message['aprs_type'] == 'position' + assert message['name'] == "FLRDDA5BA" + assert message['dstcall'] == "APRS" + assert message['receiver_name'] == "LFMX" + assert message['timestamp'].strftime('%H:%M:%S') == "16:08:29" + assert message['latitude'] == pytest.approx(44.25683, 5) + assert message['symboltable'] == '/' + assert message['longitude'] == pytest.approx(6.0005, 5) + assert message['symbolcode'] == '\'' + assert message['track'] == 342 + assert message['ground_speed'] == 49 * KNOTS_TO_MS / KPH_TO_MS + assert message['altitude'] == pytest.approx(5524 * FEETS_TO_METER, 5) + assert message['comment'] == "this is a comment" + + +def test_v024(): + # higher precision datum format introduced + raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56" + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position' + assert message['latitude'] - 44.2568 - 1 / 30000 == pytest.approx(2 / 1000 / 60, 10) + assert message['longitude'] - 6.0005 == pytest.approx(6 / 1000 / 60, 10) + + +def test_v025(): + # introduced the "aprs status" format where many informations (lat, lon, alt, speed, ...) are just optional + raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment" + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'status' + assert message['name'] == "EPZR" + assert message['receiver_name'] == "GLIDERN1" + assert message['timestamp'].strftime('%H:%M:%S') == "09:34:56" + assert message['comment'] == "this is a comment" + + +def test_v026(): + # from 0.2.6 the ogn comment of a receiver beacon is just optional + raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322" + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position' + assert message['comment'] == '' + + +def test_v026_relay(): + # beacons can be relayed + raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot" + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position' + assert message['relay'] == "NAV07220E" + + +def test_v027_ddhhmm(): + # beacons can have hhmmss or ddhhmm timestamp + raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1" + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position' + assert message['timestamp'].strftime('%d %H:%M') == "30 10:46" + + +def test_v028_fanet_position_weather(): + # with v0.2.8 fanet devices can report weather data + raw_message = 'FNTFC9002>OGNFNT,qAS,LSXI2:/163051h4640.33N/00752.21E_187/004g007t075h78b63620 29.0dB -8.0kHz' + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position_weather' + assert message['wind_direction'] == 187 + assert message['wind_speed'] == 4 * KNOTS_TO_MS / KPH_TO_MS + assert message['wind_speed_peak'] == 7 * KNOTS_TO_MS / KPH_TO_MS + assert message['temperature'] == fahrenheit_to_celsius(75) + assert message['humidity'] == 78 * 0.01 + assert message['barometric_pressure'] == 63620 + + assert message['comment'] == '29.0dB -8.0kHz' + + +def test_GXAirCom_fanet_position_weather_rainfall(): + raw_message = 'FNT08F298>OGNFNT,qAS,DREIFBERG:/082654h4804.90N/00845.74E_273/005g008t057r123p234h90b10264 0.0dB' + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position_weather' + assert message['rainfall_1h'] == 123 / 100 * INCH_TO_MM + assert message['rainfall_24h'] == 234 / 100 * INCH_TO_MM + + +def test_v028_fanet_position_weather_empty(): + raw_message = 'FNT010115>OGNFNT,qAS,DB7MJ:/065738h4727.72N/01012.83E_.../...g...t... 27.8dB -13.8kHz' + message = parse_aprs(raw_message) + + assert message['aprs_type'] == 'position_weather' + assert message['wind_direction'] is None + assert message['wind_speed'] is None + assert message['wind_speed_peak'] is None + assert message['temperature'] is None + assert message['humidity'] is None + assert message['barometric_pressure'] is None + + +def test_negative_altitude(): + # some devices can report negative altitudes + raw_message = "OGNF71F40>APRS,qAS,NAVITER:/080852h4414.37N/01532.06E'253/052/A=-00013 !W73! id1EF71F40 -060fpm +0.0rot" + message = parse_aprs(raw_message) + + assert message['altitude'] == pytest.approx(-13 * FEETS_TO_METER, 5) + + +def test_no_altitude(): + # altitude is not a 'must have' + raw_message = "FLRDDEEF1>OGCAPT,qAS,CAPTURS:/065511h4837.63N/00233.79E'000/000" + message = parse_aprs(raw_message) + + assert message['altitude'] is None + + +def test_invalid_coordinates(): + # sometimes the coordinates leave their valid range: -90<=latitude<=90 or -180<=longitude<=180 + with pytest.raises(AprsParseError): + parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17") + + with pytest.raises(AprsParseError): + parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17") + + +def test_invalid_timestamp(): + with pytest.raises(AprsParseError): + parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm") + + with pytest.raises(AprsParseError): + parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322") + + +def test_invalid_altitude(): + with pytest.raises(AprsParseError): + parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345") + + +def test_bad_comment(): + raw_message = "# bad configured ogn receiver" + message = parse_aprs(raw_message) + + assert message['comment'] == raw_message + assert message['aprs_type'] == 'comment' + + +def test_server_comment(): + raw_message = "# aprsc 2.1.4-g408ed49 17 Mar 2018 09:30:36 GMT GLIDERN1 37.187.40.234:10152" + message = parse_aprs(raw_message) + + assert message['version'] == '2.1.4-g408ed49' + assert message['timestamp'] == datetime(2018, 3, 17, 9, 30, 36) + assert message['server'] == 'GLIDERN1' + assert message['ip_address'] == '37.187.40.234' + assert message['port'] == '10152' + assert message['aprs_type'] == 'server' diff --git a/tests/parser/test_parse_fanet.py b/tests/parser/test_parse_fanet.py index d7ed566..d14b485 100644 --- a/tests/parser/test_parse_fanet.py +++ b/tests/parser/test_parse_fanet.py @@ -1,32 +1,29 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS from ogn.parser.aprs_comment.fanet_parser import FanetParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = FanetParser().parse_position("id1E1103CE -02fpm") +def test_position_comment(): + message = FanetParser().parse_position("id1E1103CE -02fpm") - assert message['address_type'] == 2 - assert message['aircraft_type'] == 7 - self.assertFalse(message['stealth']) - assert message['address'] == "1103CE" - self.assertAlmostEqual(message['climb_rate'], -2 * FPM_TO_MS, 0.1) - - def test_pseudo_status_comment(self): - message = FanetParser().parse_position("") - - assert message == {} - - def test_v028_status(self): - message = FanetParser().parse_status('Name="Juerg Zweifel" 15.0dB -17.1kHz 1e') - - assert message['fanet_name'] == "Juerg Zweifel" - assert message['signal_quality'] == 15.0 - assert message['frequency_offset'] == -17.1 - assert message['error_count'] == 1 + assert message['address_type'] == 2 + assert message['aircraft_type'] == 7 + assert message['stealth'] is False + assert message['address'] == "1103CE" + assert message['climb_rate'] == pytest.approx(-2 * FPM_TO_MS, 0.1) -if __name__ == '__main__': - unittest.main() +def test_pseudo_status_comment(): + message = FanetParser().parse_position("") + + assert message == {} + + +def test_v028_status(): + message = FanetParser().parse_status('Name="Juerg Zweifel" 15.0dB -17.1kHz 1e') + + assert message['fanet_name'] == "Juerg Zweifel" + assert message['signal_quality'] == 15.0 + assert message['frequency_offset'] == -17.1 + assert message['error_count'] == 1 diff --git a/tests/parser/test_parse_flarm.py b/tests/parser/test_parse_flarm.py index 271b4a6..266a3e7 100644 --- a/tests/parser/test_parse_flarm.py +++ b/tests/parser/test_parse_flarm.py @@ -1,35 +1,31 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS from ogn.parser.aprs_comment.flarm_parser import FlarmParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = FlarmParser().parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267") +def test_position_comment(): + message = FlarmParser().parse_position("id21A8CBA8 -039fpm +0.1rot 3.5dB 2e -8.7kHz gps1x2 s6.09 h43 rDF0267") - assert message['address_type'] == 1 - assert message['aircraft_type'] == 8 - self.assertFalse(message['stealth']) - self.assertFalse(message['no-tracking']) - assert message['address'] == "A8CBA8" - self.assertAlmostEqual(message['climb_rate'], -39 * FPM_TO_MS, 2) - assert message['turn_rate'] == 0.1 * HPM_TO_DEGS - assert message['signal_quality'] == 3.5 - assert message['error_count'] == 2 - assert message['frequency_offset'] == -8.7 - assert message['gps_quality'] == {'horizontal': 1, 'vertical': 2} - assert message['software_version'] == 6.09 - assert message['hardware_version'] == 67 - assert message['real_address'] == "DF0267" - - def test_position_comment_relevant_keys_only(self): - # return only keys where we got informations - message = FlarmParser().parse_position("id21A8CBA8") - - self.assertIsNotNone(message) - assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) + assert message['address_type'] == 1 + assert message['aircraft_type'] == 8 + assert message['stealth'] is False + assert message['no-tracking'] is False + assert message['address'] == "A8CBA8" + assert message['climb_rate'] == pytest.approx(-39 * FPM_TO_MS, 2) + assert message['turn_rate'] == 0.1 * HPM_TO_DEGS + assert message['signal_quality'] == 3.5 + assert message['error_count'] == 2 + assert message['frequency_offset'] == -8.7 + assert message['gps_quality'] == {'horizontal': 1, 'vertical': 2} + assert message['software_version'] == 6.09 + assert message['hardware_version'] == 67 + assert message['real_address'] == "DF0267" -if __name__ == '__main__': - unittest.main() +def test_position_comment_relevant_keys_only(): + # return only keys where we got informations + message = FlarmParser().parse_position("id21A8CBA8") + + assert message is not None + assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) diff --git a/tests/parser/test_parse_generic.py b/tests/parser/test_parse_generic.py index 505b3fa..82e5d35 100644 --- a/tests/parser/test_parse_generic.py +++ b/tests/parser/test_parse_generic.py @@ -1,16 +1,9 @@ -import unittest - from ogn.parser.aprs_comment.generic_parser import GenericParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = GenericParser().parse_position("id0123456789 weather is good, climbing with 123fpm") - assert 'comment' in message +def test_position_comment(): + message = GenericParser().parse_position("id0123456789 weather is good, climbing with 123fpm") + assert 'comment' in message - message = GenericParser().parse_status("id0123456789 weather is good, climbing with 123fpm") - assert 'comment' in message - - -if __name__ == '__main__': - unittest.main() + message = GenericParser().parse_status("id0123456789 weather is good, climbing with 123fpm") + assert 'comment' in message diff --git a/tests/parser/test_parse_inreach.py b/tests/parser/test_parse_inreach.py index 79a0265..96489ac 100644 --- a/tests/parser/test_parse_inreach.py +++ b/tests/parser/test_parse_inreach.py @@ -1,21 +1,15 @@ -import unittest from ogn.parser.aprs_comment.inreach_parser import InreachParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = InreachParser().parse_position("id300434060496190 inReac True") - assert message['address'] == "300434060496190" - assert message['model'] == 'inReac' - assert message['status'] is True - assert message['pilot_name'] is None +def test_position_comment(): + message = InreachParser().parse_position("id300434060496190 inReac True") + assert message['address'] == "300434060496190" + assert message['model'] == 'inReac' + assert message['status'] is True + assert message['pilot_name'] is None - message = InreachParser().parse_position("id300434060496190 inReac True Jim Bob") - assert message['address'] == "300434060496190" - assert message['model'] == 'inReac' - assert message['status'] is True - assert message['pilot_name'] == "Jim Bob" - - -if __name__ == '__main__': - unittest.main() + message = InreachParser().parse_position("id300434060496190 inReac True Jim Bob") + assert message['address'] == "300434060496190" + assert message['model'] == 'inReac' + assert message['status'] is True + assert message['pilot_name'] == "Jim Bob" diff --git a/tests/parser/test_parse_lt24.py b/tests/parser/test_parse_lt24.py index 2692971..aa0201f 100644 --- a/tests/parser/test_parse_lt24.py +++ b/tests/parser/test_parse_lt24.py @@ -1,17 +1,12 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS from ogn.parser.aprs_comment.lt24_parser import LT24Parser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = LT24Parser().parse_position("id25387 +123fpm GPS") +def test_position_comment(): + message = LT24Parser().parse_position("id25387 +123fpm GPS") - assert message['lt24_id'] == "25387" - self.assertAlmostEqual(message['climb_rate'], 123 * FPM_TO_MS, 2) - assert message['source'] == 'GPS' - - -if __name__ == '__main__': - unittest.main() + assert message['lt24_id'] == "25387" + assert message['climb_rate'] == pytest.approx(123 * FPM_TO_MS, 2) + assert message['source'] == 'GPS' diff --git a/tests/parser/test_parse_microtrak.py b/tests/parser/test_parse_microtrak.py index 484d09f..c406291 100644 --- a/tests/parser/test_parse_microtrak.py +++ b/tests/parser/test_parse_microtrak.py @@ -1,25 +1,19 @@ -import unittest - from ogn.parser.aprs_comment.microtrak_parser import MicrotrakParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = MicrotrakParser().parse_position("id21A8CBA8") +def test_position_comment(): + message = MicrotrakParser().parse_position("id21A8CBA8") - assert message['address_type'] == 1 - assert message['aircraft_type'] == 8 - self.assertFalse(message['stealth']) - self.assertFalse(message['no-tracking']) - assert message['address'] == "A8CBA8" - - def test_position_comment_relevant_keys_only(self): - # return only keys where we got informations - message = MicrotrakParser().parse_position("id21A8CBA8") - - self.assertIsNotNone(message) - assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) + assert message['address_type'] == 1 + assert message['aircraft_type'] == 8 + assert message['stealth'] is False + assert message['no-tracking'] is False + assert message['address'] == "A8CBA8" -if __name__ == '__main__': - unittest.main() +def test_position_comment_relevant_keys_only(): + # return only keys where we got informations + message = MicrotrakParser().parse_position("id21A8CBA8") + + assert message is not None + assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) diff --git a/tests/parser/test_parse_naviter.py b/tests/parser/test_parse_naviter.py index 18ecb11..3559886 100644 --- a/tests/parser/test_parse_naviter.py +++ b/tests/parser/test_parse_naviter.py @@ -1,30 +1,25 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS from ogn.parser.aprs_comment.naviter_parser import NaviterParser -class TestStringMethods(unittest.TestCase): - def test_OGNAVI_1(self): - message = NaviterParser().parse_position("id0440042121 +123fpm +0.5rot") +def test_OGNAVI_1(): + message = NaviterParser().parse_position("id0440042121 +123fpm +0.5rot") - # id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001 - # bit 0: stealth mode - # bit 1: do not track mode - # bits 2-5: aircraft type - # bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers) - # bits 12-15: reserved for use at a later time - # bits 16-39: device id (24-bit device identifier, same as in APRS header) - assert message['stealth'] is False - assert message['do_not_track'] is False - assert message['aircraft_type'] == 1 - assert message['address_type'] == 4 - assert message['reserved'] == 0 - assert message['address'] == "042121" + # id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001 + # bit 0: stealth mode + # bit 1: do not track mode + # bits 2-5: aircraft type + # bits 6-11: address type (namespace is extended from 2 to 6 bits to avoid collisions with other tracking providers) + # bits 12-15: reserved for use at a later time + # bits 16-39: device id (24-bit device identifier, same as in APRS header) + assert message['stealth'] is False + assert message['do_not_track'] is False + assert message['aircraft_type'] == 1 + assert message['address_type'] == 4 + assert message['reserved'] == 0 + assert message['address'] == "042121" - self.assertAlmostEqual(message['climb_rate'], 123 * FPM_TO_MS, 2) - assert message['turn_rate'] == 0.5 * HPM_TO_DEGS - - -if __name__ == '__main__': - unittest.main() + assert message['climb_rate'] == pytest.approx(123 * FPM_TO_MS, 2) + assert message['turn_rate'] == 0.5 * HPM_TO_DEGS diff --git a/tests/parser/test_parse_ogn_aircraft.py b/tests/parser/test_parse_ogn_aircraft.py index 3abc74c..4b92b45 100644 --- a/tests/parser/test_parse_ogn_aircraft.py +++ b/tests/parser/test_parse_ogn_aircraft.py @@ -1,78 +1,81 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS from ogn.parser.aprs_comment.ogn_parser import OgnParser -class TestStringMethods(unittest.TestCase): - def test_invalid_token(self): - assert OgnParser().parse_aircraft_beacon("notAValidToken") is None - - def test_basic(self): - message = OgnParser().parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") - - assert message['address_type'] == 2 - assert message['aircraft_type'] == 2 - self.assertFalse(message['stealth']) - self.assertFalse(message['no-tracking']) - assert message['address'] == "DDA5BA" - self.assertAlmostEqual(message['climb_rate'], -454 * FPM_TO_MS, 2) - assert message['turn_rate'] == -1.1 * HPM_TO_DEGS - assert message['signal_quality'] == 8.8 - assert message['error_count'] == 0 - assert message['frequency_offset'] == 51.2 - assert message['gps_quality'] == {'horizontal': 4, 'vertical': 5} - assert len(message['proximity']) == 3 - assert message['proximity'][0] == '1084' - assert message['proximity'][1] == 'B597' - assert message['proximity'][2] == 'B598' - - def test_no_tracking(self): - message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") - self.assertFalse(message['no-tracking']) - - message = OgnParser().parse_aircraft_beacon("id4ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") - assert message['no-tracking'] is True - - def test_stealth(self): - message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") - assert message['stealth'] is False - - message = OgnParser().parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") - assert message['stealth'] is True - - def test_v024(self): - message = OgnParser().parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56") - - assert message['software_version'] == 6.02 - assert message['hardware_version'] == 10 - assert message['real_address'] == "DF0C56" - - def test_v024_ogn_tracker(self): - message = OgnParser().parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz") - - assert message['flightlevel'] == 4.43 - - def test_v025(self): - message = OgnParser().parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm") - - assert message['signal_power'] == 7.4 - - def test_v026(self): - # from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID - message_triple = OgnParser().parse_aircraft_beacon("id093D0930 +000fpm +0.0rot") - message_single = OgnParser().parse_aircraft_beacon("id093D0930") - - self.assertIsNotNone(message_triple) - self.assertIsNotNone(message_single) - - def test_relevant_keys_only(self): - # return only keys where we got informations - message = OgnParser().parse_aircraft_beacon("id093D0930") - - self.assertIsNotNone(message) - assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) +def test_invalid_token(): + assert OgnParser().parse_aircraft_beacon("notAValidToken") is None -if __name__ == '__main__': - unittest.main() +def test_basic(): + message = OgnParser().parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") + + assert message['address_type'] == 2 + assert message['aircraft_type'] == 2 + assert message['stealth'] is False + assert message['no-tracking'] is False + assert message['address'] == "DDA5BA" + assert message['climb_rate'] == pytest.approx(-454 * FPM_TO_MS, 2) + assert message['turn_rate'] == -1.1 * HPM_TO_DEGS + assert message['signal_quality'] == 8.8 + assert message['error_count'] == 0 + assert message['frequency_offset'] == 51.2 + assert message['gps_quality'] == {'horizontal': 4, 'vertical': 5} + assert len(message['proximity']) == 3 + assert message['proximity'][0] == '1084' + assert message['proximity'][1] == 'B597' + assert message['proximity'][2] == 'B598' + + +def test_no_tracking(): + message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") + assert message['no-tracking'] is False + + message = OgnParser().parse_aircraft_beacon("id4ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") + assert message['no-tracking'] is True + + +def test_stealth(): + message = OgnParser().parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") + assert message['stealth'] is False + + message = OgnParser().parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598") + assert message['stealth'] is True + + +def test_v024(): + message = OgnParser().parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56") + + assert message['software_version'] == 6.02 + assert message['hardware_version'] == 10 + assert message['real_address'] == "DF0C56" + + +def test_v024_ogn_tracker(): + message = OgnParser().parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz") + + assert message['flightlevel'] == 4.43 + + +def test_v025(): + message = OgnParser().parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm") + + assert message['signal_power'] == 7.4 + + +def test_v026(): + # from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID + message_triple = OgnParser().parse_aircraft_beacon("id093D0930 +000fpm +0.0rot") + message_single = OgnParser().parse_aircraft_beacon("id093D0930") + + assert message_triple is not None + assert message_single is not None + + +def test_relevant_keys_only(): + # return only keys where we got informations + message = OgnParser().parse_aircraft_beacon("id093D0930") + + assert message is not None + assert sorted(message.keys()) == sorted(['address_type', 'aircraft_type', 'stealth', 'address', 'no-tracking']) diff --git a/tests/parser/test_parse_ogn_receiver.py b/tests/parser/test_parse_ogn_receiver.py index 927f093..e59184e 100644 --- a/tests/parser/test_parse_ogn_receiver.py +++ b/tests/parser/test_parse_ogn_receiver.py @@ -1,58 +1,56 @@ -import unittest - from ogn.parser.aprs_comment.ogn_parser import OgnParser -class TestStringMethods(unittest.TestCase): - def test_fail_validation(self): - assert OgnParser().parse_receiver_beacon("notAValidToken") is None - - def test_v021(self): - message = OgnParser().parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB") - - assert message['version'] == "0.2.1" - assert message['cpu_load'] == 0.8 - assert message['free_ram'] == 25.6 - assert message['total_ram'] == 458.9 - assert message['ntp_error'] == 0.1 - assert message['rt_crystal_correction'] == 2.3 - assert message['cpu_temp'] == 51.9 - - assert message['rec_crystal_correction'] == 26 - assert message['rec_crystal_correction_fine'] == -1.4 - assert message['rec_input_noise'] == -0.25 - - def test_v022(self): - message = OgnParser().parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB") - assert message['platform'] == 'x86' - - def test_v025(self): - message = OgnParser().parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]") - assert message['voltage'] == 5.016 - assert message['amperage'] == 0.534 - assert message['senders_signal'] == 10.8 - assert message['senders_messages'] == 57282 - - message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]") - assert message['senders_visible'] == 14 - assert message['senders_total'] == 16 - assert message['senders_signal'] == 24.0 - assert message['senders_messages'] == 143717 - assert message['good_senders_signal'] == 26.7 - assert message['good_senders'] == 68 - assert message['good_and_bad_senders'] == 135 - - def test_v028(self): - message = OgnParser().parse_receiver_beacon("v0.2.8.RPI-GPU CPU:0.3 RAM:744.5/968.2MB NTP:3.6ms/+2.0ppm +68.2C 3/3Acfts[1h] Lat:1.6s RF:-8+67.8ppm/+10.33dB/+1.3dB@10km[30998]/+10.4dB@10km[3/5]") - assert message['latency'] == 1.6 - - def test_relevant_keys_only(self): - # return only keys where we got informations - message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm") - - self.assertIsNotNone(message) - assert sorted(message.keys()) == sorted(['version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction']) +def test_fail_validation(): + assert OgnParser().parse_receiver_beacon("notAValidToken") is None -if __name__ == '__main__': - unittest.main() +def test_v021(): + message = OgnParser().parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB") + + assert message['version'] == "0.2.1" + assert message['cpu_load'] == 0.8 + assert message['free_ram'] == 25.6 + assert message['total_ram'] == 458.9 + assert message['ntp_error'] == 0.1 + assert message['rt_crystal_correction'] == 2.3 + assert message['cpu_temp'] == 51.9 + + assert message['rec_crystal_correction'] == 26 + assert message['rec_crystal_correction_fine'] == -1.4 + assert message['rec_input_noise'] == -0.25 + + +def test_v022(): + message = OgnParser().parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB") + assert message['platform'] == 'x86' + + +def test_v025(): + message = OgnParser().parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]") + assert message['voltage'] == 5.016 + assert message['amperage'] == 0.534 + assert message['senders_signal'] == 10.8 + assert message['senders_messages'] == 57282 + + message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]") + assert message['senders_visible'] == 14 + assert message['senders_total'] == 16 + assert message['senders_signal'] == 24.0 + assert message['senders_messages'] == 143717 + assert message['good_senders_signal'] == 26.7 + assert message['good_senders'] == 68 + assert message['good_and_bad_senders'] == 135 + + +def test_v028(): + message = OgnParser().parse_receiver_beacon("v0.2.8.RPI-GPU CPU:0.3 RAM:744.5/968.2MB NTP:3.6ms/+2.0ppm +68.2C 3/3Acfts[1h] Lat:1.6s RF:-8+67.8ppm/+10.33dB/+1.3dB@10km[30998]/+10.4dB@10km[3/5]") + assert message['latency'] == 1.6 + + +def test_relevant_keys_only(): + # return only keys where we got informations + message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm") + + assert message is not None + assert sorted(message.keys()) == sorted(['version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction']) diff --git a/tests/parser/test_parse_receiver.py b/tests/parser/test_parse_receiver.py index 0b079f1..9121454 100644 --- a/tests/parser/test_parse_receiver.py +++ b/tests/parser/test_parse_receiver.py @@ -1,41 +1,36 @@ -import unittest - from ogn.parser.aprs_comment.receiver_parser import ReceiverParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = ReceiverParser().parse_position("Antenna: chinese, on a pylon, 20 meter above ground") +def test_position_comment(): + message = ReceiverParser().parse_position("Antenna: chinese, on a pylon, 20 meter above ground") - assert message['user_comment'] == "Antenna: chinese, on a pylon, 20 meter above ground" - - def test_position_comment_empty(self): - message = ReceiverParser().parse_position("") - - self.assertIsNotNone(message) - - def test_status_comment(self): - message = ReceiverParser().parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]") - - assert message['version'] == "0.2.7" - assert message['platform'] == 'RPI-GPU' - assert message['cpu_load'] == 0.7 - assert message['free_ram'] == 770.2 - assert message['total_ram'] == 968.2 - assert message['ntp_error'] == 1.8 - assert message['rt_crystal_correction'] == -3.3 - assert message['cpu_temp'] == 55.7 - assert message['senders_visible'] == 7 - assert message['senders_total'] == 8 - assert message['rec_crystal_correction'] == 54 - assert message['rec_crystal_correction_fine'] == -1.1 - assert message['rec_input_noise'] == -0.16 - assert message['senders_signal'] == 7.1 - assert message['senders_messages'] == 19481 - assert message['good_senders_signal'] == 16.8 - assert message['good_senders'] == 7 - assert message['good_and_bad_senders'] == 13 + assert message['user_comment'] == "Antenna: chinese, on a pylon, 20 meter above ground" -if __name__ == '__main__': - unittest.main() +def test_position_comment_empty(): + message = ReceiverParser().parse_position("") + + assert message is not None + + +def test_status_comment(): + message = ReceiverParser().parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]") + + assert message['version'] == "0.2.7" + assert message['platform'] == 'RPI-GPU' + assert message['cpu_load'] == 0.7 + assert message['free_ram'] == 770.2 + assert message['total_ram'] == 968.2 + assert message['ntp_error'] == 1.8 + assert message['rt_crystal_correction'] == -3.3 + assert message['cpu_temp'] == 55.7 + assert message['senders_visible'] == 7 + assert message['senders_total'] == 8 + assert message['rec_crystal_correction'] == 54 + assert message['rec_crystal_correction_fine'] == -1.1 + assert message['rec_input_noise'] == -0.16 + assert message['senders_signal'] == 7.1 + assert message['senders_messages'] == 19481 + assert message['good_senders_signal'] == 16.8 + assert message['good_senders'] == 7 + assert message['good_and_bad_senders'] == 13 diff --git a/tests/parser/test_parse_safesky.py b/tests/parser/test_parse_safesky.py index b3e3342..f165b30 100644 --- a/tests/parser/test_parse_safesky.py +++ b/tests/parser/test_parse_safesky.py @@ -1,20 +1,15 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS from ogn.parser.aprs_comment.safesky_parser import SafeskyParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - # "SKY3E5906>OGNSKY,qAS,SafeSky:/072555h5103.47N/00524.81E'065/031/A=001250 !W05! id1C3E5906 +010fpm gps6x1" - message = SafeskyParser().parse_position("id1C3E5906 +010fpm gps6x1") - assert message['address'] == '3E5906' - assert message['address_type'] == 0 - assert message['aircraft_type'] == 7 - self.assertFalse(message['stealth']) - self.assertAlmostEqual(message['climb_rate'], 10 * FPM_TO_MS, 2) - assert message['gps_quality'] == {'horizontal': 6, 'vertical': 1} - - -if __name__ == '__main__': - unittest.main() +def test_position_comment(): + # "SKY3E5906>OGNSKY,qAS,SafeSky:/072555h5103.47N/00524.81E'065/031/A=001250 !W05! id1C3E5906 +010fpm gps6x1" + message = SafeskyParser().parse_position("id1C3E5906 +010fpm gps6x1") + assert message['address'] == '3E5906' + assert message['address_type'] == 0 + assert message['aircraft_type'] == 7 + assert message['stealth'] is False + assert message['climb_rate'] == pytest.approx(10 * FPM_TO_MS, 2) + assert message['gps_quality'] == {'horizontal': 6, 'vertical': 1} diff --git a/tests/parser/test_parse_skylines.py b/tests/parser/test_parse_skylines.py index e0908d2..1a92933 100644 --- a/tests/parser/test_parse_skylines.py +++ b/tests/parser/test_parse_skylines.py @@ -1,16 +1,11 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS from ogn.parser.aprs_comment.skylines_parser import SkylinesParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = SkylinesParser().parse_position("id2816 -015fpm") +def test_position_comment(): + message = SkylinesParser().parse_position("id2816 -015fpm") - assert message['skylines_id'] == "2816" - self.assertAlmostEqual(message['climb_rate'], -15 * FPM_TO_MS, 2) - - -if __name__ == '__main__': - unittest.main() + assert message['skylines_id'] == "2816" + assert message['climb_rate'] == pytest.approx(-15 * FPM_TO_MS, 2) diff --git a/tests/parser/test_parse_spider.py b/tests/parser/test_parse_spider.py index be4e75a..1ca6054 100644 --- a/tests/parser/test_parse_spider.py +++ b/tests/parser/test_parse_spider.py @@ -1,17 +1,10 @@ -import unittest - from ogn.parser.aprs_comment.spider_parser import SpiderParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = SpiderParser().parse_position("id300234060668560 +30dB K23W 3D") +def test_position_comment(): + message = SpiderParser().parse_position("id300234060668560 +30dB K23W 3D") - assert message['spider_id'] == "300234060668560" - assert message['signal_power'] == 30 - assert message['spider_registration'] == "K23W" - assert message['gps_quality'] == "3D" - - -if __name__ == '__main__': - unittest.main() + assert message['spider_id'] == "300234060668560" + assert message['signal_power'] == 30 + assert message['spider_registration'] == "K23W" + assert message['gps_quality'] == "3D" diff --git a/tests/parser/test_parse_spot.py b/tests/parser/test_parse_spot.py index 7dfcb2a..0496bef 100644 --- a/tests/parser/test_parse_spot.py +++ b/tests/parser/test_parse_spot.py @@ -1,16 +1,9 @@ -import unittest - from ogn.parser.aprs_comment.spot_parser import SpotParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = SpotParser().parse_position("id0-2860357 SPOT3 GOOD") +def test_position_comment(): + message = SpotParser().parse_position("id0-2860357 SPOT3 GOOD") - assert message['spot_id'] == "0-2860357" - assert message['model'] == 'SPOT3' - assert message['status'] == "GOOD" - - -if __name__ == '__main__': - unittest.main() + assert message['spot_id'] == "0-2860357" + assert message['model'] == 'SPOT3' + assert message['status'] == "GOOD" diff --git a/tests/parser/test_parse_telnet.py b/tests/parser/test_parse_telnet.py index d0cf67f..2f399c3 100644 --- a/tests/parser/test_parse_telnet.py +++ b/tests/parser/test_parse_telnet.py @@ -1,5 +1,5 @@ -import unittest import unittest.mock as mock +import pytest from datetime import datetime @@ -7,52 +7,49 @@ from ogn.parser.telnet_parser import parse from ogn.parser.exceptions import ParseError -class TestStringMethods(unittest.TestCase): - @unittest.skip("Not yet implemented") - def test_telnet_fail_corrupt(self): - with self.assertRaises(ParseError): - parse('This is rubbish') - - @mock.patch('ogn.parser.telnet_parser.datetime') - def test_telnet_parse_complete(self, datetime_mock): - # set the now-mock near to the time in the test string - datetime_mock.now.return_value = datetime(2015, 1, 1, 10, 0, 55) - - message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ? R B8949') - - assert message['pps_offset'] == 0.181 - assert message['frequency'] == 868.394 - assert message['aircraft_type'] == 1 - assert message['address_type'] == 2 - assert message['address'] == 'DDA411' - assert message['timestamp'] == datetime(2015, 1, 1, 10, 30, 10) - assert message['latitude'] == 50.868 - assert message['longitude'] == 12.15279 - assert message['altitude'] == 988 - assert message['climb_rate'] == 0.1 - assert message['ground_speed'] == 25.7 - assert message['track'] == 85.4 - assert message['turn_rate'] == -3.5 - assert message['magic_number'] == 5 # the '5' is a magic number... 1 if ground_speed is 0.0m/s an 3 or 5 if airborne. Do you have an idea what it is? - assert message['gps_status'] == '03x04' - assert message['channel'] == 1 - assert message['flarm_timeslot'] is True - assert message['ogn_timeslot'] is False - assert message['frequency_offset'] == -12.61 - assert message['decode_quality'] == 5.8 - assert message['signal_quality'] == 15.5 - assert message['demodulator_type'] == 2 - assert message['error_count'] == 10 - assert message['distance'] == 30.9 - assert message['bearing'] == 99.5 - assert message['phi'] == 1.1 - assert message['multichannel'] is True - - def test_telnet_parse_corrupt(self): - message = parse('0.397sec:868.407MHz: sA:1:784024 205656: [ +5.71003, +20.48951]deg 34012m +14.5m/s 109.7m/s 118.5deg +21.0deg/sec 0 27x40m 01_o +7.03kHz 17.2/27.0dB/2 12e 4719.5km 271.1deg -8.5deg ? R B34067') - - self.assertIsNone(message) +@pytest.mark.skip("Not yet implemented") +def test_telnet_fail_corrupt(): + with pytest.raises(ParseError): + parse('This is rubbish') -if __name__ == '__main__': - unittest.main() +@mock.patch('ogn.parser.telnet_parser.datetime') +def test_telnet_parse_complete(datetime_mock): + # set the now-mock near to the time in the test string + datetime_mock.now.return_value = datetime(2015, 1, 1, 10, 0, 55) + + message = parse('0.181sec:868.394MHz: 1:2:DDA411 103010: [ +50.86800, +12.15279]deg 988m +0.1m/s 25.7m/s 085.4deg -3.5deg/sec 5 03x04m 01f_-12.61kHz 5.8/15.5dB/2 10e 30.9km 099.5deg +1.1deg + ? R B8949') + + assert message['pps_offset'] == 0.181 + assert message['frequency'] == 868.394 + assert message['aircraft_type'] == 1 + assert message['address_type'] == 2 + assert message['address'] == 'DDA411' + assert message['timestamp'] == datetime(2015, 1, 1, 10, 30, 10) + assert message['latitude'] == 50.868 + assert message['longitude'] == 12.15279 + assert message['altitude'] == 988 + assert message['climb_rate'] == 0.1 + assert message['ground_speed'] == 25.7 + assert message['track'] == 85.4 + assert message['turn_rate'] == -3.5 + assert message['magic_number'] == 5 # the '5' is a magic number... 1 if ground_speed is 0.0m/s an 3 or 5 if airborne. Do you have an idea what it is? + assert message['gps_status'] == '03x04' + assert message['channel'] == 1 + assert message['flarm_timeslot'] is True + assert message['ogn_timeslot'] is False + assert message['frequency_offset'] == -12.61 + assert message['decode_quality'] == 5.8 + assert message['signal_quality'] == 15.5 + assert message['demodulator_type'] == 2 + assert message['error_count'] == 10 + assert message['distance'] == 30.9 + assert message['bearing'] == 99.5 + assert message['phi'] == 1.1 + assert message['multichannel'] is True + + +def test_telnet_parse_corrupt(): + message = parse('0.397sec:868.407MHz: sA:1:784024 205656: [ +5.71003, +20.48951]deg 34012m +14.5m/s 109.7m/s 118.5deg +21.0deg/sec 0 27x40m 01_o +7.03kHz 17.2/27.0dB/2 12e 4719.5km 271.1deg -8.5deg ? R B34067') + + assert message is None diff --git a/tests/parser/test_parse_tracker.py b/tests/parser/test_parse_tracker.py index b874f46..d05b2b4 100644 --- a/tests/parser/test_parse_tracker.py +++ b/tests/parser/test_parse_tracker.py @@ -1,47 +1,44 @@ -import unittest +import pytest from ogn.parser.utils import FPM_TO_MS, HPM_TO_DEGS from ogn.parser.aprs_comment.tracker_parser import TrackerParser -class TestStringMethods(unittest.TestCase): - def test_position_comment(self): - message = TrackerParser().parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm") +def test_position_comment(): + message = TrackerParser().parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5 +12.7dBm") - assert message['address_type'] == 3 - assert message['aircraft_type'] == 1 - self.assertFalse(message['stealth']) - assert message['address'] == "2FD00F" - self.assertAlmostEqual(message['climb_rate'], -58 * FPM_TO_MS, 2) - assert message['turn_rate'] == 1.1 * HPM_TO_DEGS - assert message['flightlevel'] == 3.12 - assert message['signal_quality'] == 32.8 - assert message['error_count'] == 0 - assert message['frequency_offset'] == -0.8 - assert message['gps_quality'] == {'horizontal': 3, 'vertical': 5} - assert message['signal_power'] == 12.7 - - def test_status_comment(self): - message = TrackerParser().parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min") - - assert message['hardware_version'] == 0 - assert message['software_version'] == 0 - assert message['gps_satellites'] == 9 - assert message['gps_quality'] == 1 - assert message['gps_altitude'] == 164 - assert message['pressure'] == 1002.6 - assert message['temperature'] == 20.2 - assert message['humidity'] == 0 - assert message['voltage'] == 3.34 - assert message['transmitter_power'] == 14 - assert message['noise_level'] == -110.5 - assert message['relays'] == 1 - - def test_status_comment_comment(self): - message = TrackerParser().parse_status("Pilot=Pawel Hard=DIY/STM32") - - assert message['comment'] == "Pilot=Pawel Hard=DIY/STM32" + assert message['address_type'] == 3 + assert message['aircraft_type'] == 1 + assert message['stealth'] is False + assert message['address'] == "2FD00F" + assert message['climb_rate'] == pytest.approx(-58 * FPM_TO_MS, 2) + assert message['turn_rate'] == 1.1 * HPM_TO_DEGS + assert message['flightlevel'] == 3.12 + assert message['signal_quality'] == 32.8 + assert message['error_count'] == 0 + assert message['frequency_offset'] == -0.8 + assert message['gps_quality'] == {'horizontal': 3, 'vertical': 5} + assert message['signal_power'] == 12.7 -if __name__ == '__main__': - unittest.main() +def test_status_comment(): + message = TrackerParser().parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min") + + assert message['hardware_version'] == 0 + assert message['software_version'] == 0 + assert message['gps_satellites'] == 9 + assert message['gps_quality'] == 1 + assert message['gps_altitude'] == 164 + assert message['pressure'] == 1002.6 + assert message['temperature'] == 20.2 + assert message['humidity'] == 0 + assert message['voltage'] == 3.34 + assert message['transmitter_power'] == 14 + assert message['noise_level'] == -110.5 + assert message['relays'] == 1 + + +def test_status_comment_comment(): + message = TrackerParser().parse_status("Pilot=Pawel Hard=DIY/STM32") + + assert message['comment'] == "Pilot=Pawel Hard=DIY/STM32" diff --git a/tests/parser/test_utils.py b/tests/parser/test_utils.py index ca1d245..6b6c6b6 100644 --- a/tests/parser/test_utils.py +++ b/tests/parser/test_utils.py @@ -1,66 +1,68 @@ -import unittest +import pytest from datetime import datetime, timezone from ogn.parser.utils import parseAngle, createTimestamp, CheapRuler, normalized_quality -class TestStringMethods(unittest.TestCase): - def test_parseAngle(self): - self.assertAlmostEqual(parseAngle('05048.30'), 50.805, 5) - - def proceed_test_data(self, test_data={}): - for test in test_data: - timestamp = createTimestamp(test[0], reference_timestamp=test[1]) - assert timestamp == test[2] - - def test_createTimestamp_hhmmss(self): - test_data = [ - ('000001h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick) - ('235959h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old) - ('110000h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 11, 0, 0)), # packet 11 hours from future or 13 hours old - ('123500h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old) - ('000001h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future) - ] - - self.proceed_test_data(test_data) - - def test_createTimestamp_ddhhmm(self): - test_data = [ - ('011212z', datetime(2017, 9, 28, 0, 0, 1), datetime(2017, 10, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th, - ('281313z', datetime(2017, 10, 1, 0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st, - ('281414z', datetime(2017, 1, 1, 0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st, - ] - - self.proceed_test_data(test_data) - - def test_createTimestamp_tzinfo(self): - test_data = [ - ('000001h', datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc), (datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc))) - ] - - self.proceed_test_data(test_data) - - def test_cheap_ruler_distance(self): - koenigsdf = (11.465353, 47.829825) - hochkoenig = (13.062405, 47.420516) - - cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2) - distance = cheap_ruler.distance(koenigsdf, hochkoenig) - self.assertAlmostEqual(distance, 128381.47612138899) - - def test_cheap_ruler_bearing(self): - koenigsdf = (11.465353, 47.829825) - hochkoenig = (13.062405, 47.420516) - - cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2) - bearing = cheap_ruler.bearing(koenigsdf, hochkoenig) - self.assertAlmostEqual(bearing, 110.761300063515) - - def test_normalized_quality(self): - self.assertAlmostEqual(normalized_quality(10000, 1), 1) - self.assertAlmostEqual(normalized_quality(20000, 10), 16.020599913279625) - self.assertAlmostEqual(normalized_quality(5000, 5), -1.0205999132796242) +def _proceed_test_data(test_data={}): + for test in test_data: + timestamp = createTimestamp(test[0], reference_timestamp=test[1]) + assert timestamp == test[2] -if __name__ == '__main__': - unittest.main() +def test_parseAngle(): + assert parseAngle('05048.30') == pytest.approx(50.805, 5) + + +def test_createTimestamp_hhmmss(): + test_data = [ + ('000001h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick) + ('235959h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old) + ('110000h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 11, 0, 0)), # packet 11 hours from future or 13 hours old + ('123500h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old) + ('000001h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future) + ] + + _proceed_test_data(test_data) + + +def test_createTimestamp_ddhhmm(): + test_data = [ + ('011212z', datetime(2017, 9, 28, 0, 0, 1), datetime(2017, 10, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th, + ('281313z', datetime(2017, 10, 1, 0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st, + ('281414z', datetime(2017, 1, 1, 0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st, + ] + + _proceed_test_data(test_data) + + +def test_createTimestamp_tzinfo(): + test_data = [ + ('000001h', datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc), (datetime(2020, 9, 10, 0, 0, 1, tzinfo=timezone.utc))) + ] + + _proceed_test_data(test_data) + + +def test_cheap_ruler_distance(): + koenigsdf = (11.465353, 47.829825) + hochkoenig = (13.062405, 47.420516) + + cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2) + distance = cheap_ruler.distance(koenigsdf, hochkoenig) + assert distance == pytest.approx(128381.47612138899) + + +def test_cheap_ruler_bearing(): + koenigsdf = (11.465353, 47.829825) + hochkoenig = (13.062405, 47.420516) + + cheap_ruler = CheapRuler((koenigsdf[1] + hochkoenig[1]) / 2) + bearing = cheap_ruler.bearing(koenigsdf, hochkoenig) + assert bearing == pytest.approx(110.761300063515) + + +def test_normalized_quality(): + assert normalized_quality(10000, 1) == pytest.approx(1) + assert normalized_quality(20000, 10) == pytest.approx(16.020599913279625) + assert normalized_quality(5000, 5) == pytest.approx(-1.0205999132796242)