From 6ff7c5a87abddeb18239c6ff5816a30f08607851 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Mon, 4 Aug 2025 19:31:41 -0400 Subject: [PATCH] Yakifo/amqtt#73 : adding test case for external http server integration. comments and documentation. --- amqtt/broker.py | 18 +++-- amqtt/contexts.py | 4 - amqtt/mqtt/protocol/handler.py | 5 -- .../{web.py => http_server_integration.py} | 77 ++++++++++++++----- tests/test_samples.py | 30 +++++++- 5 files changed, 95 insertions(+), 39 deletions(-) rename samples/{web.py => http_server_integration.py} (55%) diff --git a/amqtt/broker.py b/amqtt/broker.py index 7bcbeec..d2e8bd1 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -52,6 +52,8 @@ class RetainedApplicationMessage(ApplicationMessage): class Server: + """Used to encapsulate the server associated with a listener. Allows broker to interact with the connection lifecycle.""" + def __init__( self, listener_name: str, @@ -90,8 +92,10 @@ class Server: class ExternalServer(Server): - def __init__(self): - super().__init__('aiohttp', None) + """For external listeners, the connection lifecycle is handled by that implementation so these are no-ops.""" + + def __init__(self) -> None: + super().__init__("aiohttp", None) # type: ignore[arg-type] async def acquire_connection(self) -> None: pass @@ -104,10 +108,7 @@ class ExternalServer(Server): class BrokerContext(BaseContext): - """BrokerContext is used as the context passed to plugins interacting with the broker. - - It act as an adapter to broker services from plugins developed for HBMQTT broker. - """ + """BrokerContext is used as the context passed to plugins interacting with the broker.""" def __init__(self, broker: "Broker") -> None: super().__init__() @@ -257,10 +258,14 @@ class Broker: max_connections = listener.get("max_connections", -1) ssl_context = self._create_ssl_context(listener) if listener.get("ssl", False) else None + # for listeners which are external, don't need to create a server if listener.type == ListenerType.EXTERNAL: + + # broker still needs to associate a new connection to the listener self.logger.info(f"External listener exists for '{listener_name}' ") self._servers[listener_name] = ExternalServer() else: + # for tcp and websockets, start servers to listen for inbound connections try: address, port = self._split_bindaddr_port(listener["bind"], DEFAULT_PORTS[listener["type"]]) except ValueError as e: @@ -404,6 +409,7 @@ class Broker: await self._client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer)) async def external_connected(self, reader: ReaderAdapter, writer: WriterAdapter, listener_name: str) -> None: + """Engage the broker in handling the data stream to/from an established connection.""" await self._client_connected(listener_name, reader, writer) async def _client_connected(self, listener_name: str, reader: ReaderAdapter, writer: WriterAdapter) -> None: diff --git a/amqtt/contexts.py b/amqtt/contexts.py index b397d62..66d7050 100644 --- a/amqtt/contexts.py +++ b/amqtt/contexts.py @@ -128,10 +128,6 @@ class ListenerConfig(Dictable): if isinstance(getattr(self, fn), str): setattr(self, fn, Path(getattr(self, fn))) - # if self.type == ListenerType.EXTERNAL and not all([self.reader, self.writer]): - # msg = "external type requires specifying reader, writer and server classes" - # raise ValueError(msg) - def apply(self, other: "ListenerConfig") -> None: """Apply the field from 'other', if 'self' field is default.""" for f in fields(self): diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index 8c53dc1..66bc1cb 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -1,5 +1,4 @@ import asyncio -import traceback try: from asyncio import InvalidStateError, QueueFull, QueueShutDown @@ -536,10 +535,6 @@ class ProtocolHandler(Generic[C]): self.handle_read_timeout() except NoDataError: self.logger.debug(f"{self.session.client_id} No data available") - except RuntimeError: - self.logger.debug(f"{self.session.client_id} websocket closed") - traceback.print_exc() - break except Exception as e: # noqa: BLE001 self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}") break diff --git a/samples/web.py b/samples/http_server_integration.py similarity index 55% rename from samples/web.py rename to samples/http_server_integration.py index 787ccf4..243d4ba 100644 --- a/samples/web.py +++ b/samples/http_server_integration.py @@ -8,12 +8,14 @@ from aiohttp import web from amqtt.adapters import ReaderAdapter, WriterAdapter from amqtt.broker import Broker from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType - +from amqtt.errors import ConnectError logger = logging.getLogger(__name__) +MQTT_LISTENER_NAME = "myMqttListener" async def hello(request): + """get request handler""" return web.Response(text="Hello, world") class WebSocketResponseReader(ReaderAdapter): @@ -25,26 +27,34 @@ class WebSocketResponseReader(ReaderAdapter): async def read(self, n: int = -1) -> bytes: """ + read 'n' bytes from the datastream, if < 0 read all available bytes + Raises: BrokerPipeError : if reading on a closed websocket connection """ # continue until buffer contains at least the amount of data being requested while not self.buffer or len(self.buffer) < n: + # if the websocket is closed if self.ws.closed: raise BrokenPipeError() + try: - async with asyncio.timeout(0.5): - msg = await self.ws.receive() - if msg.type == aiohttp.WSMsgType.BINARY: - self.buffer.extend(msg.data) - elif msg.type == aiohttp.WSMsgType.CLOSE: - raise BrokenPipeError() + # read from stream + msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5) + # mqtt streams should always be binary... + if msg.type == aiohttp.WSMsgType.BINARY: + self.buffer.extend(msg.data) + elif msg.type == aiohttp.WSMsgType.CLOSE: + raise BrokenPipeError() + except asyncio.TimeoutError: raise BrokenPipeError() + # return all bytes currently in the buffer if n == -1: result = bytes(self.buffer) self.buffer.clear() + # return the requested number of bytes from the buffer else: result = self.buffer[:n] del self.buffer[:n] @@ -75,9 +85,11 @@ class WebSocketResponseWriter(WriterAdapter): self._stream = io.BytesIO(b"") def write(self, data: bytes) -> None: + """Add bytes to stream buffer.""" self._stream.write(data) async def drain(self) -> None: + """Send the collected bytes in the buffer to the websocket connection.""" data = self._stream.getvalue() if data and len(data): await self.ws.send_bytes(data) @@ -87,54 +99,79 @@ class WebSocketResponseWriter(WriterAdapter): return self.client_ip, self.port async def close(self) -> None: + # no clean up needed, stream will be gc along with instance pass +async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse: -async def websocket_handler(request: web.Request) -> web.StreamResponse: - - # respond to the websocket request with the 'mqtt' protocol + # establish connection by responding to the websocket request with the 'mqtt' protocol ws = web.WebSocketResponse(protocols=['mqtt',]) await ws.prepare(request) - # access the broker created when the server started and notify the broker of this new connection + # access the broker created when the server started b: Broker = request.app['broker'] - # send/receive data to the websocket. must pass the name of the externalized listener in the broker config - await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), 'myAIOHttp') + # hand-off the websocket data stream to the broker for handling + # `listener_name` is the same name of the externalized listener in the broker config + await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME) logger.debug('websocket connection closed') return ws -def main(): +async def websocket_handler(request: web.Request) -> web.StreamResponse: + ws = web.WebSocketResponse() + await ws.prepare(request) + async for msg in ws: + logging.info(msg) + + logging.info("websocket connection closed") + return ws + +def main(): + # create an `aiohttp` server + lp = asyncio.get_event_loop() app = web.Application() app.add_routes( [ - web.get('/', hello), - web.get('/ws', websocket_handler) + web.get('/', hello), # http get request/response route + web.get('/ws', websocket_handler), # standard websocket handler + web.get('/mqtt', mqtt_websocket_handler), # websocket handler for mqtt connections ]) + # create background task for running the `amqtt` broker app.cleanup_ctx.append(run_broker) - web.run_app(app) + + # make sure that both `aiohttp` server and `amqtt` broker run in the same loop + # so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`) + web.run_app(app, loop=lp) async def run_broker(_app): - """https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks""" - loop = asyncio.get_event_loop() + """App init function to start (and then shutdown) the `amqtt` broker. + https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks""" + # standard TCP connection as well as an externalized-listener cfg = BrokerConfig( listeners={ 'default':ListenerConfig(type=ListenerType.TCP, bind='127.0.0.1:1883'), - 'myAIOHttp': ListenerConfig(type=ListenerType.EXTERNAL), + MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL), } ) + # make sure the `Broker` runs in the same loop as the aiohttp server + loop = asyncio.get_event_loop() broker = Broker(config=cfg, loop=loop) + + # store broker instance so that incoming requests can hand off processing of a datastream _app['broker'] = broker + # start the broker await broker.start() + # pass control back to web app yield + # closing activities await broker.shutdown() diff --git a/tests/test_samples.py b/tests/test_samples.py index 1e46046..fca99f3 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -2,14 +2,15 @@ import asyncio import logging import signal import subprocess + +from multiprocessing import Process from pathlib import Path +from samples.http_server_integration import main as http_server_main import pytest from amqtt.broker import Broker -from samples.client_publish import __main__ as client_publish_main -from samples.client_subscribe import __main__ as client_subscribe_main -from samples.client_keepalive import __main__ as client_keepalive_main +from amqtt.client import MQTTClient from samples.broker_acl import config as broker_acl_config from samples.broker_taboo import config as broker_taboo_config @@ -275,4 +276,25 @@ async def test_client_subscribe_plugin_taboo(): assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") - await broker.shutdown() \ No newline at end of file + await broker.shutdown() + + +@pytest.fixture +def external_http_server(): + p = Process(target=http_server_main) + p.start() + yield p + p.terminate() + + +@pytest.mark.asyncio +async def test_external_http_server(external_http_server): + + await asyncio.sleep(1) + client = MQTTClient(config={'auto_reconnect': False}) + await client.connect("ws://127.0.0.1:8080/mqtt") + assert client.session is not None + await client.publish("my/topic", b'test message') + await client.disconnect() + # Send the interrupt signal + await asyncio.sleep(1)