From e738335c7f3a0c206970f29cf214d07be09e5975 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sat, 14 Jun 2025 23:30:10 -0400 Subject: [PATCH 1/3] for 'amqtt' (broker script) was creating a new event loop but not passing it to the broker, so the broker was running in one event loop and all of its coroutines were executing in another, not a supported behavior --- amqtt/broker.py | 2 +- amqtt/scripts/broker_script.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index b8e2fd4..744ffb2 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -180,7 +180,7 @@ class Broker: self.config.update(config) self._build_listeners_config(self.config) - self._loop = loop or asyncio.new_event_loop() + self._loop = loop or asyncio.get_running_loop() self._servers: dict[str, Server] = {} self._init_states() self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {} diff --git a/amqtt/scripts/broker_script.py b/amqtt/scripts/broker_script.py index 6a469d3..8901ccc 100644 --- a/amqtt/scripts/broker_script.py +++ b/amqtt/scripts/broker_script.py @@ -54,11 +54,10 @@ def broker_main( typer.echo(f"❌ Config file error: {exc}", err=True) raise typer.Exit(code=1) from exc - loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - broker = Broker(config) + broker = Broker(config, loop=loop) except (BrokerError, ParserError, PluginError) as exc: typer.echo(f"❌ Broker failed to start: {exc}", err=True) raise typer.Exit(code=1) from exc From 012a744b6614c23d38b0cb3eea0dca025ada92f9 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sun, 15 Jun 2025 06:29:25 -0400 Subject: [PATCH 2/3] removing xfail from test_sys --- amqtt/plugins/manager.py | 3 +-- tests/plugins/test_sys.py | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index 39e9bb0..fdb8ee3 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -8,9 +8,8 @@ from importlib.metadata import EntryPoint, EntryPoints, entry_points import logging from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Optional, TypeVar -from amqtt.session import Session - from amqtt.errors import PluginImportError, PluginInitError +from amqtt.session import Session _LOGGER = logging.getLogger(__name__) diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index 30e4f95..19341e2 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -13,10 +13,6 @@ logger = logging.getLogger(__name__) # test broker sys @pytest.mark.asyncio -@pytest.mark.xfail( - reason="see https://github.com/Yakifo/aio-amqtt/issues/215", - strict=False, -) async def test_broker_sys_plugin() -> None: class MockEntryPoints: From efd8c15375631691b8a5ab1ce411015bda28a416 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sun, 15 Jun 2025 06:31:48 -0400 Subject: [PATCH 3/3] removing xfail --- tests/test_broker.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_broker.py b/tests/test_broker.py index 3591112..e313d35 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -630,10 +630,6 @@ async def test_client_subscribe_publish_dollar_topic_2(broker): @pytest.mark.asyncio -@pytest.mark.xfail( - reason="see https://github.com/Yakifo/aio-amqtt/issues/16", - strict=False, -) async def test_client_publish_retain_subscribe(broker): sub_client = MQTTClient() await sub_client.connect("mqtt://127.0.0.1", cleansession=False)