LoRa_APRS_iGate/HIL/aprs_con.py

87 wiersze
2.4 KiB
Python

import pytest
import socket
import logging
logger = logging.getLogger(__name__)
class AprsIs:
def __init__(self, callsign, passwd="-1", host="localhost", port=10152) -> None:
self.callsign = callsign
self.passwd = passwd
self.server = (host, port)
self.socket = None
self.buffer = ""
def connect(self):
if self.socket:
return False
logger.debug(f"trying to connect to {self.server}")
self.socket = socket.create_connection(self.server)
peer = self.socket.getpeername()
logger.info(f"Connected to {str(peer)}")
self.socket.setblocking(1)
self.socket.settimeout(1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
banner = list(self._get_line())
logger.info(f"Banner: {banner[0].rstrip()}")
login_line = f"user {self.callsign} pass {self.passwd} vers testlib 0.1\r\n"
logger.debug(login_line)
self.send_line(login_line)
login = list(self._get_line())
logger.info(f"login line: {login[0]}")
_, _, _, status, _ = login[0].split(' ', 4)
if status == "verified":
return True
self.close()
return False
def close(self):
if not self.socket:
self.socket.close()
self.socket = None
def send_line(self, line):
if self.socket:
self.socket.sendall(bytearray(f"{line}\r\n", encoding='utf8'))
def get_line(self):
line_list = list(self._get_line())
new_list = []
if line_list:
for line in line_list:
if not line.startswith("#"):
new_list.append(line)
return new_list
def _get_line(self):
if self.socket:
try:
buf = self.socket.recv(4096)
self.buffer = self.buffer + buf.decode('latin-1')
except:
pass
while "\r\n" in self.buffer:
line, self.buffer = self.buffer.split("\r\n", 1)
yield line
return None
def wait_for(self, towait):
for i in range(2, 10):
line = self.get_line()
for l in line:
if l == towait:
return
raise
@pytest.fixture
def APRSIS():
aprs = AprsIs("OE5BPA-1", passwd="22948",
host="localhost", port=10152)
aprs.connect()
return aprs