kopia lustrzana https://github.com/glidernet/python-ogn-client
Test keepalive
rodzic
b309cfe805
commit
9a90c347fa
|
@ -3,7 +3,7 @@ import unittest.mock as mock
|
|||
|
||||
from ogn.parser import parse
|
||||
from ogn.client.client import create_aprs_login, AprsClient
|
||||
from ogn.client.settings import APRS_APP_NAME, APRS_APP_VER
|
||||
from ogn.client.settings import APRS_APP_NAME, APRS_APP_VER, APRS_KEEPALIVE_TIME
|
||||
|
||||
|
||||
class AprsClientTest(unittest.TestCase):
|
||||
|
@ -72,6 +72,32 @@ class AprsClientTest(unittest.TestCase):
|
|||
finally:
|
||||
client.disconnect()
|
||||
|
||||
@mock.patch('ogn.client.client.time')
|
||||
@mock.patch('ogn.client.client.socket')
|
||||
def test_run_keepalive(self, mock_socket, mock_time):
|
||||
import socket
|
||||
mock_socket.error = socket.error
|
||||
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
||||
client.sock_file.readline = mock.MagicMock()
|
||||
client.sock_file.readline.side_effect = ['Normal text blabla',
|
||||
KeyboardInterrupt()]
|
||||
|
||||
mock_time.side_effect = [0, 0, APRS_KEEPALIVE_TIME + 1, APRS_KEEPALIVE_TIME + 1]
|
||||
|
||||
timed_callback = mock.MagicMock()
|
||||
|
||||
try:
|
||||
client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
||||
timed_callback.assert_called_with(client)
|
||||
|
||||
def test_reset_kill_reconnect(self):
|
||||
client = AprsClient(aprs_user='testuser', aprs_filter='')
|
||||
client.connect()
|
||||
|
|
Ładowanie…
Reference in New Issue