2021-09-18 21:07:13 +00:00
|
|
|
import pytest
|
|
|
|
import socket
|
2021-11-07 20:27:07 +00:00
|
|
|
import logging
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2021-09-18 21:07:13 +00:00
|
|
|
|
|
|
|
|
|
|
|
class AprsIs:
|
|
|
|
def __init__(self, callsign, passwd="-1", host="localhost", port=10152) -> None:
|
|
|
|
self.callsign = callsign
|
|
|
|
self.passwd = passwd
|
|
|
|
self.server = (host, port)
|
|
|
|
self.socket = None
|
|
|
|
self.buffer = ""
|
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
if self.socket:
|
|
|
|
return False
|
|
|
|
|
2021-11-07 20:27:07 +00:00
|
|
|
logger.debug(f"trying to connect to {self.server}")
|
2021-09-18 21:07:13 +00:00
|
|
|
self.socket = socket.create_connection(self.server)
|
|
|
|
peer = self.socket.getpeername()
|
2021-11-07 20:27:07 +00:00
|
|
|
logger.info(f"Connected to {str(peer)}")
|
2021-09-18 21:07:13 +00:00
|
|
|
self.socket.setblocking(1)
|
|
|
|
self.socket.settimeout(1)
|
|
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
|
|
|
banner = list(self._get_line())
|
2021-11-07 20:27:07 +00:00
|
|
|
logger.info(f"Banner: {banner[0].rstrip()}")
|
2021-09-18 21:07:13 +00:00
|
|
|
|
2021-11-07 20:27:07 +00:00
|
|
|
login_line = f"user {self.callsign} pass {self.passwd} vers testlib 0.1\r\n"
|
|
|
|
logger.debug(login_line)
|
|
|
|
self.send_line(login_line)
|
2021-09-18 21:07:13 +00:00
|
|
|
|
|
|
|
login = list(self._get_line())
|
2021-11-07 20:27:07 +00:00
|
|
|
logger.info(f"login line: {login[0]}")
|
2021-09-18 21:07:13 +00:00
|
|
|
_, _, _, status, _ = login[0].split(' ', 4)
|
|
|
|
if status == "verified":
|
|
|
|
return True
|
|
|
|
self.close()
|
|
|
|
return False
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if not self.socket:
|
|
|
|
self.socket.close()
|
|
|
|
self.socket = None
|
|
|
|
|
|
|
|
def send_line(self, line):
|
|
|
|
if self.socket:
|
|
|
|
self.socket.sendall(bytearray(f"{line}\r\n", encoding='utf8'))
|
|
|
|
|
|
|
|
def get_line(self):
|
2021-09-22 20:06:32 +00:00
|
|
|
line_list = list(self._get_line())
|
|
|
|
new_list = []
|
|
|
|
if line_list:
|
|
|
|
for line in line_list:
|
|
|
|
if not line.startswith("#"):
|
|
|
|
new_list.append(line)
|
|
|
|
return new_list
|
2021-09-18 21:07:13 +00:00
|
|
|
|
|
|
|
def _get_line(self):
|
|
|
|
if self.socket:
|
|
|
|
try:
|
|
|
|
buf = self.socket.recv(4096)
|
|
|
|
self.buffer = self.buffer + buf.decode('latin-1')
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
while "\r\n" in self.buffer:
|
|
|
|
line, self.buffer = self.buffer.split("\r\n", 1)
|
|
|
|
yield line
|
|
|
|
return None
|
|
|
|
|
2021-09-25 19:24:33 +00:00
|
|
|
def wait_for(self, towait):
|
|
|
|
for i in range(2, 10):
|
|
|
|
line = self.get_line()
|
|
|
|
for l in line:
|
|
|
|
if l == towait:
|
|
|
|
return
|
|
|
|
raise
|
|
|
|
|
2021-09-18 21:07:13 +00:00
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def APRSIS():
|
|
|
|
aprs = AprsIs("OE5BPA-1", passwd="22948",
|
|
|
|
host="localhost", port=10152)
|
|
|
|
aprs.connect()
|
|
|
|
return aprs
|