adding documentation and test case

pull/291/head
Andrew Mirsky 2025-08-09 13:38:55 -04:00
rodzic b5b05fa19c
commit 6f56facbba
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
2 zmienionych plików z 93 dodań i 20 usunięć

Wyświetl plik

@ -3,6 +3,7 @@ import logging
import asyncio import asyncio
from asyncio import StreamWriter, StreamReader, Event from asyncio import StreamWriter, StreamReader, Event
from functools import partial from functools import partial
from pathlib import Path
import typer import typer
@ -14,15 +15,24 @@ from amqtt.plugins.manager import PluginManager
from amqtt.session import Session from amqtt.session import Session
from amqtt.adapters import ReaderAdapter, WriterAdapter from amqtt.adapters import ReaderAdapter, WriterAdapter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = typer.Typer(add_completion=False, rich_markup_mode=None) app = typer.Typer(add_completion=False, rich_markup_mode=None)
# Usage: unix_sockets.py [OPTIONS] COMMAND [ARGS]...
#
# Options:
# --help Show this message and exit.
#
# Commands:
# broker Run an mqtt broker that communicates over a unix (file) socket.
# client Run an mqtt client that communicates over a unix (file) socket.
class UnixStreamReaderAdapter(ReaderAdapter): class UnixStreamReaderAdapter(ReaderAdapter):
def __init__(self, reader: StreamReader): def __init__(self, reader: StreamReader) -> None:
self._reader = reader self._reader = reader
async def read(self, n:int = -1) -> bytes: async def read(self, n:int = -1) -> bytes:
@ -30,29 +40,29 @@ class UnixStreamReaderAdapter(ReaderAdapter):
return await self._reader.read() return await self._reader.read()
return await self._reader.readexactly(n) return await self._reader.readexactly(n)
def feed_eof(self): def feed_eof(self) -> None:
return self._reader.feed_eof() return self._reader.feed_eof()
class UnixStreamWriterAdapter(WriterAdapter): class UnixStreamWriterAdapter(WriterAdapter):
def __init__(self, writer: StreamWriter): def __init__(self, writer: StreamWriter) -> None:
self._writer = writer self._writer = writer
self.is_closed = Event() self.is_closed = Event()
def write(self, data): def write(self, data: bytes) -> None:
if not self.is_closed.is_set(): if not self.is_closed.is_set():
self._writer.write(data) self._writer.write(data)
async def drain(self): async def drain(self) -> None:
if self.is_closed.is_set(): if self.is_closed.is_set():
await self._writer.drain() await self._writer.drain()
def get_peer_info(self): def get_peer_info(self) -> tuple[str, int]:
extra_info = self._writer.get_extra_info('socket') extra_info = self._writer.get_extra_info('socket')
return extra_info.getsockname(), 0 return extra_info.getsockname(), 0
async def close(self): async def close(self) -> None:
if self.is_closed.is_set(): if self.is_closed.is_set():
return return
self.is_closed.set() self.is_closed.set()
@ -67,9 +77,9 @@ class UnixStreamWriterAdapter(WriterAdapter):
await self._writer.wait_closed() await self._writer.wait_closed()
async def run_broker(socket_file: Path):
async def run_broker(): # configure the broker with a single, external listener
cfg = BrokerConfig( cfg = BrokerConfig(
listeners={ listeners={
'default': ListenerConfig( 'default': ListenerConfig(
@ -85,38 +95,62 @@ async def run_broker():
b = Broker(cfg) b = Broker(cfg)
async def unix_stream_connected(reader, writer, listener_name): # new connection handler
async def unix_stream_connected(reader: StreamReader, writer: StreamWriter, listener_name: str):
logger.info("received new unix connection....") logger.info("received new unix connection....")
# wraps the reader/writer in a compatible interface
r = UnixStreamReaderAdapter(reader) r = UnixStreamReaderAdapter(reader)
w = UnixStreamWriterAdapter(writer) w = UnixStreamWriterAdapter(writer)
await b.external_connected(reader=r, writer=w, listener_name='default')
await asyncio.start_unix_server(partial(unix_stream_connected, listener_name='default'), path="/tmp/mqtt") # passes the connection to the broker for protocol communications
await b.external_connected(reader=r, writer=w, listener_name=listener_name)
await asyncio.start_unix_server(partial(unix_stream_connected, listener_name='default'), path=socket_file)
await b.start() await b.start()
try: try:
logger.info("starting mqtt unix server") logger.info("starting mqtt unix server")
# run until ctrl-c
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
await b.shutdown() await b.shutdown()
@app.command()
def broker():
asyncio.run(run_broker())
async def run_client(): @app.command()
def broker(
socket_file: str | None = typer.Option("/tmp/mqtt", "-s", "--socket", help="path and file for unix socket"),
verbose: bool = typer.Option(False, "-v", "--verbose", help="set logging level to DEBUG"),
):
"""Run an mqtt broker that communicates over a unix (file) socket."""
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
asyncio.run(run_broker(Path(socket_file)))
async def run_client(socket_file: Path):
# 'MQTTClient' establishes the connection but uses the ClientProtocolHandler for MQTT protocol communications
# create a plugin manager
config = ClientConfig() config = ClientConfig()
context = ClientContext() context = ClientContext()
context.config = config context.config = config
plugins_manager = PluginManager("amqtt.client.plugins", context) plugins_manager = PluginManager("amqtt.client.plugins", context)
# create a client protocol handler
cph = ClientProtocolHandler(plugins_manager) cph = ClientProtocolHandler(plugins_manager)
conn_reader, conn_writer = await asyncio.open_unix_connection(path="/tmp/mqtt")
# connect to the unix socket
conn_reader, conn_writer = await asyncio.open_unix_connection(path=socket_file)
# anonymous session connection just needs a client_id
s = Session() s = Session()
s.client_id = "myUnixClientID" s.client_id = "myUnixClientID"
# wraps the reader/writer in compatible interface
r = UnixStreamReaderAdapter(conn_reader) r = UnixStreamReaderAdapter(conn_reader)
w = UnixStreamWriterAdapter(conn_writer) w = UnixStreamWriterAdapter(conn_writer)
# pass the connection to the protocol handler for mqtt communications and initiate CONNECT/CONNACK
cph.attach(session=s, reader=r, writer=w) cph.attach(session=s, reader=r, writer=w)
logger.debug("handler attached") logger.debug("handler attached")
ret = await cph.mqtt_connect() ret = await cph.mqtt_connect()
@ -124,14 +158,21 @@ async def run_client():
try: try:
while True: while True:
# periodically send a message
await cph.mqtt_publish('my/topic', b'my message', 0, False) await cph.mqtt_publish('my/topic', b'my message', 0, False)
await asyncio.sleep(1) await asyncio.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
cph.detach() cph.detach()
@app.command() @app.command()
def client(): def client(
asyncio.run(run_client()) socket_file: str | None = typer.Option("/tmp/mqtt", "-s", "--socket", help="path and file for unix socket"),
verbose: bool = typer.Option(False, "-v", "--verbose", help="set logging level to DEBUG"),
):
"""Run an mqtt client that communicates over a unix (file) socket."""
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
asyncio.run(run_client(Path(socket_file)))
if __name__ == "__main__": if __name__ == "__main__":

Wyświetl plik

@ -5,7 +5,11 @@ import subprocess
from multiprocessing import Process from multiprocessing import Process
from pathlib import Path from pathlib import Path
from typer.testing import CliRunner
from samples.http_server_integration import main as http_server_main from samples.http_server_integration import main as http_server_main
from samples.unix_sockets import app as unix_sockets_app
import pytest import pytest
@ -298,3 +302,31 @@ async def test_external_http_server(external_http_server):
await client.disconnect() await client.disconnect()
# Send the interrupt signal # Send the interrupt signal
await asyncio.sleep(1) await asyncio.sleep(1)
@pytest.mark.asyncio
async def test_unix_connection():
unix_socket_script = Path(__file__).parent.parent / "samples/unix_sockets.py"
broker_process = subprocess.Popen(["python", unix_socket_script, "broker", "-s", "/tmp/mqtt"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# start the broker
await asyncio.sleep(1)
# start the client
client_process = subprocess.Popen(["python", unix_socket_script, "client", "-s", "/tmp/mqtt"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(3)
# stop the client (ctrl-c)
client_process.send_signal(signal.SIGINT)
_ = client_process.communicate()
# stop the broker (ctrl-c)
broker_process.send_signal(signal.SIGINT)
broker_stdout, broker_stderr = broker_process.communicate()
logger.debug(broker_stderr.decode("utf-8"))
# verify that the broker received client connected/disconnected
assert "on_broker_client_connected" in broker_stderr.decode("utf-8")
assert "on_broker_client_disconnected" in broker_stderr.decode("utf-8")