From 69a10523824fe279a6e08637fbf81f5a3479875d Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 3 Jun 2025 11:42:50 -0400 Subject: [PATCH 1/8] fixes Yakifo/amqtt#157 : get_event_loop is deprecated, replacing with asyncio.run and new_event_loop (where applicable) --- .pre-commit-config.yaml | 2 -- amqtt/broker.py | 4 ++-- amqtt/scripts/broker_script.py | 7 ++++--- amqtt/scripts/pub_script.py | 6 +----- amqtt/scripts/sub_script.py | 6 ++---- 5 files changed, 9 insertions(+), 16 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8301d3d..e13067c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,8 +16,6 @@ repos: hooks: - id: ruff args: - - --fix - - --unsafe-fixes - --line-length=130 - --exit-non-zero-on-fix diff --git a/amqtt/broker.py b/amqtt/broker.py index 5690227..5416f52 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -142,7 +142,7 @@ class Broker: Args: config: dictionary of configuration options (see [broker configuration](broker_config.md)). - loop: asyncio loop. defaults to `asyncio.get_event_loop()`. + loop: asyncio loop. defaults to `asyncio.new_event_loop()`. plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`. """ @@ -170,7 +170,7 @@ class Broker: self.config.update(config) self._build_listeners_config(self.config) - self._loop = loop or asyncio.get_event_loop() + self._loop = loop or asyncio.new_event_loop() self._servers: dict[str, Server] = {} self._init_states() self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {} diff --git a/amqtt/scripts/broker_script.py b/amqtt/scripts/broker_script.py index 7790108..dc3d87b 100644 --- a/amqtt/scripts/broker_script.py +++ b/amqtt/scripts/broker_script.py @@ -55,20 +55,21 @@ def broker_main( raise typer.Exit(code=1) from exc - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) try: broker = Broker(config) except (BrokerError, ParserError) as exc: typer.echo(f"❌ Broker failed to start: {exc}", err=True) raise typer.Exit(code=1) from exc + _ = loop.create_task(broker.start()) #noqa : RUF006 try: - loop.run_until_complete(broker.start()) loop.run_forever() except KeyboardInterrupt: loop.run_until_complete(broker.shutdown()) except Exception as exc: - typer.echo("❌ Connection failed", err=True) + typer.echo("❌ Broker execution halted", err=True) raise typer.Exit(code=1) from exc finally: loop.close() diff --git a/amqtt/scripts/pub_script.py b/amqtt/scripts/pub_script.py index cf53209..80d5aa3 100644 --- a/amqtt/scripts/pub_script.py +++ b/amqtt/scripts/pub_script.py @@ -182,8 +182,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 logger.debug(f"Using default configuration from {default_config_path}") config = read_yaml_config(default_config_path) - loop = asyncio.get_event_loop() - if not client_id: client_id = _gen_client_id() @@ -217,7 +215,7 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 ) with contextlib.suppress(KeyboardInterrupt): try: - loop.run_until_complete( + asyncio.run( do_pub( client=client, message_input=message_input, @@ -234,8 +232,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 typer.echo("❌ Connection failed", err=True) raise typer.Exit(code=1) from exc - loop.close() - if __name__ == "__main__": typer.run(main) diff --git a/amqtt/scripts/sub_script.py b/amqtt/scripts/sub_script.py index c348379..e837676 100644 --- a/amqtt/scripts/sub_script.py +++ b/amqtt/scripts/sub_script.py @@ -147,8 +147,6 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 logger.debug(f"Using default configuration from {default_config_path}") config = read_yaml_config(default_config_path) - loop = asyncio.get_event_loop() - if not client_id: client_id = _gen_client_id() @@ -175,7 +173,7 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 ) with contextlib.suppress(KeyboardInterrupt): try: - loop.run_until_complete(do_sub(client, + asyncio.run(do_sub(client, url=url, topics=topics, ca_info=ca_info, @@ -184,10 +182,10 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913 max_count=max_count, clean_session=clean_session, )) + except (ClientError, ConnectError) as exc: typer.echo("❌ Connection failed", err=True) raise typer.Exit(code=1) from exc - loop.close() if __name__ == "__main__": From e7882a3755cf086139619b6170aa4445b8bda723 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 12 Jun 2025 10:52:44 -0400 Subject: [PATCH 2/8] fixes amqtt/Yakifo#210 : when reconnect is false, authentication failure causes NoDataError instead of ConnectError --- amqtt/mqtt/protocol/client_handler.py | 8 +++-- tests/plugins/mocks.py | 18 ++++++++++++ tests/test_client.py | 42 +++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 tests/plugins/mocks.py diff --git a/amqtt/mqtt/protocol/client_handler.py b/amqtt/mqtt/protocol/client_handler.py index 32755f8..9b7d9d8 100644 --- a/amqtt/mqtt/protocol/client_handler.py +++ b/amqtt/mqtt/protocol/client_handler.py @@ -1,7 +1,7 @@ import asyncio from typing import Any -from amqtt.errors import AMQTTError +from amqtt.errors import AMQTTError, NoDataError from amqtt.mqtt.connack import ConnackPacket from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader from amqtt.mqtt.disconnect import DisconnectPacket @@ -87,8 +87,10 @@ class ClientProtocolHandler(ProtocolHandler): if self.reader is None: msg = "Reader is not initialized." raise AMQTTError(msg) - - connack = await ConnackPacket.from_stream(self.reader) + try: + connack = await ConnackPacket.from_stream(self.reader) + except NoDataError as e: + raise ConnectionError from e await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack, session=self.session) return connack.return_code diff --git a/tests/plugins/mocks.py b/tests/plugins/mocks.py new file mode 100644 index 0000000..3d80ca2 --- /dev/null +++ b/tests/plugins/mocks.py @@ -0,0 +1,18 @@ +import logging + +from amqtt.plugins.authentication import BaseAuthPlugin +from amqtt.session import Session + +logger = logging.getLogger(__name__) + + +class NoAuthPlugin(BaseAuthPlugin): + + async def authenticate(self, *, session: Session) -> bool | None: + return False + +class AuthPlugin(BaseAuthPlugin): + + async def authenticate(self, *, session: Session) -> bool | None: + return True + diff --git a/tests/test_client.py b/tests/test_client.py index 59547ce..ad0a5cc 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,8 +1,11 @@ import asyncio import logging +from importlib.metadata import EntryPoint +from unittest.mock import patch import pytest +from amqtt.broker import Broker from amqtt.client import MQTTClient from amqtt.errors import ConnectError from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 @@ -295,3 +298,42 @@ async def test_client_publish_will_with_retain(broker_fixture, client_config): assert message3.topic == 'test/will/topic' assert message3.data == b'client ABC has disconnected' await client3.disconnect() + + +@pytest.mark.asyncio +async def test_client_no_auth(): + + + class MockEntryPoints: + + def select(self, group) -> list[EntryPoint]: + match group: + case 'tests.mock_plugins': + return [ + EntryPoint(name='auth_plugin', group='tests.mock_plugins', value='tests.plugins.mocks:NoAuthPlugin'), + ] + case _: + return list() + + + with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish: + + config = { + "listeners": { + "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, + }, + 'sys_interval': 1, + 'auth': { + 'plugins': ['auth_plugin', ] + } + } + + client = MQTTClient(client_id="client1", config={'auto_reconnect': False}) + + broker = Broker(plugin_namespace='tests.mock_plugins', config=config) + await broker.start() + + + with pytest.raises(ConnectError): + await client.connect("mqtt://127.0.0.1:1883/") + From fed5b8eb082aae81f63d2cfd760827f6cc3247a8 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 12 Jun 2025 11:03:28 -0400 Subject: [PATCH 3/8] shutdown broker when test completes --- tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client.py b/tests/test_client.py index ad0a5cc..8d49035 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -333,7 +333,7 @@ async def test_client_no_auth(): broker = Broker(plugin_namespace='tests.mock_plugins', config=config) await broker.start() - with pytest.raises(ConnectError): await client.connect("mqtt://127.0.0.1:1883/") + await broker.shutdown() From a2cbef1bc4bb6be9214ccbe7f97e6bead89111f8 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 12 Jun 2025 14:20:31 -0400 Subject: [PATCH 4/8] running pytest to generate htmlcov for readthedocs --- .readthedocs.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 2146144..62a145f 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -7,7 +7,9 @@ jobs: pre_install: - pip install --upgrade pip - - pip install --group docs + - pip install uv + - uv pip install --group dev --group docs + - uv run pytest mkdocs: configuration: mkdocs.rtd.yml From 24f99fdc2d7d5520a3d03328bb6b4aa7475187ed Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 12 Jun 2025 14:28:50 -0400 Subject: [PATCH 5/8] consistent description in readme file --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index 72b302f..9953197 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,7 @@ - Communication over TCP and/or websocket, including support for SSL/TLS - Support QoS 0, QoS 1 and QoS 2 messages flow - Client auto-reconnection on network lost -- Functionality expansion; plugins included: - - Authentication through password file - - Basic `$SYS` topics +- Functionality expansion; plugins included: authentication and `$SYS` topic publishing ## Installation From 51102a435b61e0628624452b0e17f2f4bfe97f29 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Fri, 13 Jun 2025 14:48:20 -0400 Subject: [PATCH 6/8] converting from event strings to a string enumeration --- amqtt/broker.py | 47 ++++++++++++++++++++++++--------------- tests/plugins/test_sys.py | 1 - tests/test_broker.py | 37 +++++++++++------------------- tests/test_paho.py | 7 +++--- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index 5690227..d376b07 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -41,18 +41,29 @@ _defaults = read_yaml_config(Path(__file__).parent / "scripts/default_broker.yam DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 -EVENT_BROKER_PRE_START = "broker_pre_start" -EVENT_BROKER_POST_START = "broker_post_start" -EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown" -EVENT_BROKER_POST_SHUTDOWN = "broker_post_shutdown" -EVENT_BROKER_CLIENT_CONNECTED = "broker_client_connected" -EVENT_BROKER_CLIENT_DISCONNECTED = "broker_client_disconnected" -EVENT_BROKER_CLIENT_SUBSCRIBED = "broker_client_subscribed" -EVENT_BROKER_CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" -EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" + +class EventBroker(Enum): + """Events issued by the broker. + + Attributes: + PRE_START: on_event_broker_pre_start + + """ + + PRE_START = "broker_pre_start" + POST_START = "broker_post_start" + PRE_SHUTDOWN = "broker_pre_shutdown" + POST_SHUTDOWN = "broker_post_shutdown" + CLIENT_CONNECTED = "broker_client_connected" + CLIENT_DISCONNECTED = "broker_client_disconnected" + CLIENT_SUBSCRIBED = "broker_client_subscribed" + CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" + MESSAGE_RECEIVED = "broker_message_received" class Action(Enum): + """Actions issued by the broker.""" + SUBSCRIBE = "subscribe" PUBLISH = "publish" @@ -242,11 +253,11 @@ class Broker: msg = f"Broker instance can't be started: {exc}" raise BrokerError(msg) from exc - await self.plugins_manager.fire_event(EVENT_BROKER_PRE_START) + await self.plugins_manager.fire_event(EventBroker.PRE_START.value) try: await self._start_listeners() self.transitions.starting_success() - await self.plugins_manager.fire_event(EVENT_BROKER_POST_START) + await self.plugins_manager.fire_event(EventBroker.POST_START.value) self._broadcast_task = asyncio.ensure_future(self._broadcast_loop()) self.logger.debug("Broker started") except Exception as e: @@ -327,7 +338,7 @@ class Broker: """Stop broker instance.""" self.logger.info("Shutting down broker...") # Fire broker_shutdown event to plugins - await self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN) + await self.plugins_manager.fire_event(EventBroker.PRE_SHUTDOWN.value) # Cleanup all sessions for client_id in list(self._sessions.keys()): @@ -351,7 +362,7 @@ class Broker: self._broadcast_queue.get_nowait() self.logger.info("Broker closed") - await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN) + await self.plugins_manager.fire_event(EventBroker.POST_SHUTDOWN.value) self.transitions.stopping_success() async def _cleanup_session(self, client_id: str) -> None: @@ -494,7 +505,7 @@ class Broker: self._sessions[client_session.client_id] = (client_session, handler) await handler.mqtt_connack_authorize(authenticated) - await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id) + await self.plugins_manager.fire_event(EventBroker.CLIENT_CONNECTED.value, client_id=client_session.client_id) self.logger.debug(f"{client_session.client_id} Start messages handling") await handler.start() @@ -582,7 +593,7 @@ class Broker: self.logger.debug(f"{client_session.client_id} Disconnecting session") await self._stop_handler(handler) client_session.transitions.disconnect() - await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) + await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id) return False return True @@ -600,7 +611,7 @@ class Broker: for index, subscription in enumerate(subscriptions.topics): if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED: await self.plugins_manager.fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client_session.client_id, topic=subscription[0], qos=subscription[1], @@ -619,7 +630,7 @@ class Broker: for topic in unsubscription.topics: self._del_subscription(topic, client_session) await self.plugins_manager.fire_event( - EVENT_BROKER_CLIENT_UNSUBSCRIBED, + EventBroker.CLIENT_UNSUBSCRIBED.value, client_id=client_session.client_id, topic=topic, ) @@ -654,7 +665,7 @@ class Broker: self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.") else: await self.plugins_manager.fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=client_session.client_id, message=app_message, ) diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index 46893cf..66b5475 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -53,7 +53,6 @@ async def test_broker_sys_plugin() -> None: except asyncio.TimeoutError: pass - logger.warning(f">>> sys message: {message.topic} - {message.data}") await client.disconnect() await broker.shutdown() diff --git a/tests/test_broker.py b/tests/test_broker.py index 2d71a0a..507658b 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -7,18 +7,7 @@ import psutil import pytest from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter -from amqtt.broker import ( - EVENT_BROKER_CLIENT_CONNECTED, - EVENT_BROKER_CLIENT_DISCONNECTED, - EVENT_BROKER_CLIENT_SUBSCRIBED, - EVENT_BROKER_CLIENT_UNSUBSCRIBED, - EVENT_BROKER_MESSAGE_RECEIVED, - EVENT_BROKER_POST_SHUTDOWN, - EVENT_BROKER_POST_START, - EVENT_BROKER_PRE_SHUTDOWN, - EVENT_BROKER_PRE_START, - Broker, -) +from amqtt.broker import EventBroker, Broker from amqtt.client import MQTTClient from amqtt.errors import ConnectError from amqtt.mqtt.connack import ConnackPacket @@ -67,8 +56,8 @@ def test_split_bindaddr_port(input_str, output_addr, output_port): async def test_start_stop(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ - call().fire_event(EVENT_BROKER_PRE_START), - call().fire_event(EVENT_BROKER_POST_START), + call().fire_event(EventBroker.PRE_START.value), + call().fire_event(EventBroker.POST_START.value), ], any_order=True, ) @@ -76,8 +65,8 @@ async def test_start_stop(broker, mock_plugin_manager): await broker.shutdown() mock_plugin_manager.assert_has_calls( [ - call().fire_event(EVENT_BROKER_PRE_SHUTDOWN), - call().fire_event(EVENT_BROKER_POST_SHUTDOWN), + call().fire_event(EventBroker.PRE_SHUTDOWN.value), + call().fire_event(EventBroker.POST_SHUTDOWN.value), ], any_order=True, ) @@ -98,11 +87,11 @@ async def test_client_connect(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_CONNECTED, + EventBroker.CLIENT_CONNECTED.value, client_id=client.session.client_id, ), call().fire_event( - EVENT_BROKER_CLIENT_DISCONNECTED, + EventBroker.CLIENT_DISCONNECTED.value, client_id=client.session.client_id, ), ], @@ -235,7 +224,7 @@ async def test_client_subscribe(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, @@ -272,7 +261,7 @@ async def test_client_subscribe_twice(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, @@ -306,13 +295,13 @@ async def test_client_unsubscribe(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, ), call().fire_event( - EVENT_BROKER_CLIENT_UNSUBSCRIBED, + EventBroker.CLIENT_UNSUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", ), @@ -337,7 +326,7 @@ async def test_client_publish(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=pub_client.session.client_id, message=ret_message, ), @@ -509,7 +498,7 @@ async def test_client_publish_big(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=pub_client.session.client_id, message=ret_message, ), diff --git a/tests/test_paho.py b/tests/test_paho.py index 2ffca53..2ba8936 100644 --- a/tests/test_paho.py +++ b/tests/test_paho.py @@ -6,8 +6,7 @@ from unittest.mock import MagicMock, call, patch import pytest from paho.mqtt import client as mqtt_client -from amqtt.broker import EVENT_BROKER_CLIENT_CONNECTED, EVENT_BROKER_CLIENT_DISCONNECTED, EVENT_BROKER_PRE_START, \ - EVENT_BROKER_POST_START +from amqtt.broker import EventBroker from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 @@ -54,11 +53,11 @@ async def test_paho_connect(broker, mock_plugin_manager): broker.plugins_manager.assert_has_calls( [ call.fire_event( - EVENT_BROKER_CLIENT_CONNECTED, + EventBroker.CLIENT_CONNECTED.value, client_id=client_id, ), call.fire_event( - EVENT_BROKER_CLIENT_DISCONNECTED, + EventBroker.CLIENT_DISCONNECTED.value, client_id=client_id, ), ], From 038fee7cf4ef070b1df9517a1d1fff3f8486d9ca Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sat, 14 Jun 2025 09:26:55 -0400 Subject: [PATCH 7/8] remove test_sys --- tests/plugins/test_sys.py | 92 +++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index 46893cf..3b0d5be 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -13,49 +13,49 @@ from amqtt.mqtt.constants import QOS_0 logger = logging.getLogger(__name__) # test broker sys -@pytest.mark.asyncio -async def test_broker_sys_plugin() -> None: - - class MockEntryPoints: - - def select(self, group) -> list[EntryPoint]: - match group: - case 'tests.mock_plugins': - return [ - EntryPoint(name='BrokerSysPlugin', group='tests.mock_plugins', value='amqtt.plugins.sys.broker:BrokerSysPlugin'), - ] - case _: - return list() - - - with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish: - - config = { - "listeners": { - "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, - }, - 'sys_interval': 1 - } - - broker = Broker(plugin_namespace='tests.mock_plugins', config=config) - await broker.start() - client = MQTTClient() - await client.connect("mqtt://127.0.0.1:1883/") - await client.subscribe([("$SYS/broker/uptime", QOS_0),]) - await client.publish('test/topic', b'my test message') - await asyncio.sleep(2) - sys_msg_count = 0 - try: - while True: - message = await client.deliver_message(timeout_duration=0.5) - if '$SYS' in message.topic: - sys_msg_count += 1 - except asyncio.TimeoutError: - pass - - logger.warning(f">>> sys message: {message.topic} - {message.data}") - await client.disconnect() - await broker.shutdown() - - - assert sys_msg_count > 1 +# @pytest.mark.asyncio +# async def test_broker_sys_plugin() -> None: +# +# class MockEntryPoints: +# +# def select(self, group) -> list[EntryPoint]: +# match group: +# case 'tests.mock_plugins': +# return [ +# EntryPoint(name='BrokerSysPlugin', group='tests.mock_plugins', value='amqtt.plugins.sys.broker:BrokerSysPlugin'), +# ] +# case _: +# return list() +# +# +# with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish: +# +# config = { +# "listeners": { +# "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, +# }, +# 'sys_interval': 1 +# } +# +# broker = Broker(plugin_namespace='tests.mock_plugins', config=config) +# await broker.start() +# client = MQTTClient() +# await client.connect("mqtt://127.0.0.1:1883/") +# await client.subscribe([("$SYS/broker/uptime", QOS_0),]) +# await client.publish('test/topic', b'my test message') +# await asyncio.sleep(2) +# sys_msg_count = 0 +# try: +# while True: +# message = await client.deliver_message(timeout_duration=0.5) +# if '$SYS' in message.topic: +# sys_msg_count += 1 +# except asyncio.TimeoutError: +# pass +# +# logger.warning(f">>> sys message: {message.topic} - {message.data}") +# await client.disconnect() +# await broker.shutdown() +# +# +# assert sys_msg_count > 1 From 6ab946df1444a2eece6e0956341100290fab9c97 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sat, 14 Jun 2025 09:35:53 -0400 Subject: [PATCH 8/8] updating docstring --- amqtt/broker.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index d376b07..849db7f 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -43,12 +43,7 @@ AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 class EventBroker(Enum): - """Events issued by the broker. - - Attributes: - PRE_START: on_event_broker_pre_start - - """ + """Events issued by the broker.""" PRE_START = "broker_pre_start" POST_START = "broker_post_start"