kopia lustrzana https://github.com/glidernet/python-ogn-client
Added telnet parser and test
Added TelnetClient Change encoding, added connect() and disconnect() Bugfixespull/51/head
rodzic
52468a4bd1
commit
c58739705f
|
@ -1,6 +1,6 @@
|
||||||
import socket
|
import socket
|
||||||
import logging
|
import logging
|
||||||
from time import time
|
from time import time, sleep
|
||||||
|
|
||||||
from ogn.client import settings
|
from ogn.client import settings
|
||||||
|
|
||||||
|
@ -83,3 +83,35 @@ class AprsClient:
|
||||||
self.connect()
|
self.connect()
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class TelnetClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
self.logger.info("Connect to local telnet server")
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self.sock.connect((settings.TELNET_SERVER_HOST, settings.TELNET_SERVER_PORT))
|
||||||
|
|
||||||
|
def run(self, callback, autoreconnect=False):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.sock_file = self.sock.makefile(mode='rw', encoding='iso-8859-1')
|
||||||
|
while True:
|
||||||
|
packet_str = self.sock_file.readline().strip()
|
||||||
|
callback(packet_str)
|
||||||
|
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
self.logger.error('Telnet server not running', exc_info=True)
|
||||||
|
|
||||||
|
if autoreconnect:
|
||||||
|
sleep(1)
|
||||||
|
self.connect()
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
self.logger.info('Disconnect')
|
||||||
|
self.sock.shutdown(0)
|
||||||
|
self.sock.close()
|
||||||
|
|
|
@ -8,3 +8,6 @@ PACKAGE_VERSION = '0.8.2'
|
||||||
APRS_APP_VER = PACKAGE_VERSION[:3]
|
APRS_APP_VER = PACKAGE_VERSION[:3]
|
||||||
|
|
||||||
APRS_KEEPALIVE_TIME = 240
|
APRS_KEEPALIVE_TIME = 240
|
||||||
|
|
||||||
|
TELNET_SERVER_HOST = 'localhost'
|
||||||
|
TELNET_SERVER_PORT = 50001
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
import unittest
|
||||||
|
import unittest.mock as mock
|
||||||
|
|
||||||
|
from ogn.client.client import TelnetClient
|
||||||
|
|
||||||
|
|
||||||
|
class TelnetClientTest(unittest.TestCase):
|
||||||
|
@mock.patch('ogn.client.client.socket')
|
||||||
|
def test_connect(self, socket_mock):
|
||||||
|
def callback(raw_message):
|
||||||
|
pass
|
||||||
|
|
||||||
|
client = TelnetClient()
|
||||||
|
client.run(callback=callback)
|
Ładowanie…
Reference in New Issue