From 51102a435b61e0628624452b0e17f2f4bfe97f29 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Fri, 13 Jun 2025 14:48:20 -0400 Subject: [PATCH 1/2] converting from event strings to a string enumeration --- amqtt/broker.py | 47 ++++++++++++++++++++++++--------------- tests/plugins/test_sys.py | 1 - tests/test_broker.py | 37 +++++++++++------------------- tests/test_paho.py | 7 +++--- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index 5690227..d376b07 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -41,18 +41,29 @@ _defaults = read_yaml_config(Path(__file__).parent / "scripts/default_broker.yam DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 -EVENT_BROKER_PRE_START = "broker_pre_start" -EVENT_BROKER_POST_START = "broker_post_start" -EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown" -EVENT_BROKER_POST_SHUTDOWN = "broker_post_shutdown" -EVENT_BROKER_CLIENT_CONNECTED = "broker_client_connected" -EVENT_BROKER_CLIENT_DISCONNECTED = "broker_client_disconnected" -EVENT_BROKER_CLIENT_SUBSCRIBED = "broker_client_subscribed" -EVENT_BROKER_CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" -EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" + +class EventBroker(Enum): + """Events issued by the broker. + + Attributes: + PRE_START: on_event_broker_pre_start + + """ + + PRE_START = "broker_pre_start" + POST_START = "broker_post_start" + PRE_SHUTDOWN = "broker_pre_shutdown" + POST_SHUTDOWN = "broker_post_shutdown" + CLIENT_CONNECTED = "broker_client_connected" + CLIENT_DISCONNECTED = "broker_client_disconnected" + CLIENT_SUBSCRIBED = "broker_client_subscribed" + CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" + MESSAGE_RECEIVED = "broker_message_received" class Action(Enum): + """Actions issued by the broker.""" + SUBSCRIBE = "subscribe" PUBLISH = "publish" @@ -242,11 +253,11 @@ class Broker: msg = f"Broker instance can't be started: {exc}" raise BrokerError(msg) from exc - await self.plugins_manager.fire_event(EVENT_BROKER_PRE_START) + await self.plugins_manager.fire_event(EventBroker.PRE_START.value) try: await self._start_listeners() self.transitions.starting_success() - await self.plugins_manager.fire_event(EVENT_BROKER_POST_START) + await self.plugins_manager.fire_event(EventBroker.POST_START.value) self._broadcast_task = asyncio.ensure_future(self._broadcast_loop()) self.logger.debug("Broker started") except Exception as e: @@ -327,7 +338,7 @@ class Broker: """Stop broker instance.""" self.logger.info("Shutting down broker...") # Fire broker_shutdown event to plugins - await self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN) + await self.plugins_manager.fire_event(EventBroker.PRE_SHUTDOWN.value) # Cleanup all sessions for client_id in list(self._sessions.keys()): @@ -351,7 +362,7 @@ class Broker: self._broadcast_queue.get_nowait() self.logger.info("Broker closed") - await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN) + await self.plugins_manager.fire_event(EventBroker.POST_SHUTDOWN.value) self.transitions.stopping_success() async def _cleanup_session(self, client_id: str) -> None: @@ -494,7 +505,7 @@ class Broker: self._sessions[client_session.client_id] = (client_session, handler) await handler.mqtt_connack_authorize(authenticated) - await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id) + await self.plugins_manager.fire_event(EventBroker.CLIENT_CONNECTED.value, client_id=client_session.client_id) self.logger.debug(f"{client_session.client_id} Start messages handling") await handler.start() @@ -582,7 +593,7 @@ class Broker: self.logger.debug(f"{client_session.client_id} Disconnecting session") await self._stop_handler(handler) client_session.transitions.disconnect() - await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) + await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id) return False return True @@ -600,7 +611,7 @@ class Broker: for index, subscription in enumerate(subscriptions.topics): if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED: await self.plugins_manager.fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client_session.client_id, topic=subscription[0], qos=subscription[1], @@ -619,7 +630,7 @@ class Broker: for topic in unsubscription.topics: self._del_subscription(topic, client_session) await self.plugins_manager.fire_event( - EVENT_BROKER_CLIENT_UNSUBSCRIBED, + EventBroker.CLIENT_UNSUBSCRIBED.value, client_id=client_session.client_id, topic=topic, ) @@ -654,7 +665,7 @@ class Broker: self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.") else: await self.plugins_manager.fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=client_session.client_id, message=app_message, ) diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index 46893cf..66b5475 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -53,7 +53,6 @@ async def test_broker_sys_plugin() -> None: except asyncio.TimeoutError: pass - logger.warning(f">>> sys message: {message.topic} - {message.data}") await client.disconnect() await broker.shutdown() diff --git a/tests/test_broker.py b/tests/test_broker.py index 2d71a0a..507658b 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -7,18 +7,7 @@ import psutil import pytest from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter -from amqtt.broker import ( - EVENT_BROKER_CLIENT_CONNECTED, - EVENT_BROKER_CLIENT_DISCONNECTED, - EVENT_BROKER_CLIENT_SUBSCRIBED, - EVENT_BROKER_CLIENT_UNSUBSCRIBED, - EVENT_BROKER_MESSAGE_RECEIVED, - EVENT_BROKER_POST_SHUTDOWN, - EVENT_BROKER_POST_START, - EVENT_BROKER_PRE_SHUTDOWN, - EVENT_BROKER_PRE_START, - Broker, -) +from amqtt.broker import EventBroker, Broker from amqtt.client import MQTTClient from amqtt.errors import ConnectError from amqtt.mqtt.connack import ConnackPacket @@ -67,8 +56,8 @@ def test_split_bindaddr_port(input_str, output_addr, output_port): async def test_start_stop(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ - call().fire_event(EVENT_BROKER_PRE_START), - call().fire_event(EVENT_BROKER_POST_START), + call().fire_event(EventBroker.PRE_START.value), + call().fire_event(EventBroker.POST_START.value), ], any_order=True, ) @@ -76,8 +65,8 @@ async def test_start_stop(broker, mock_plugin_manager): await broker.shutdown() mock_plugin_manager.assert_has_calls( [ - call().fire_event(EVENT_BROKER_PRE_SHUTDOWN), - call().fire_event(EVENT_BROKER_POST_SHUTDOWN), + call().fire_event(EventBroker.PRE_SHUTDOWN.value), + call().fire_event(EventBroker.POST_SHUTDOWN.value), ], any_order=True, ) @@ -98,11 +87,11 @@ async def test_client_connect(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_CONNECTED, + EventBroker.CLIENT_CONNECTED.value, client_id=client.session.client_id, ), call().fire_event( - EVENT_BROKER_CLIENT_DISCONNECTED, + EventBroker.CLIENT_DISCONNECTED.value, client_id=client.session.client_id, ), ], @@ -235,7 +224,7 @@ async def test_client_subscribe(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, @@ -272,7 +261,7 @@ async def test_client_subscribe_twice(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, @@ -306,13 +295,13 @@ async def test_client_unsubscribe(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_CLIENT_SUBSCRIBED, + EventBroker.CLIENT_SUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", qos=QOS_0, ), call().fire_event( - EVENT_BROKER_CLIENT_UNSUBSCRIBED, + EventBroker.CLIENT_UNSUBSCRIBED.value, client_id=client.session.client_id, topic="/topic", ), @@ -337,7 +326,7 @@ async def test_client_publish(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=pub_client.session.client_id, message=ret_message, ), @@ -509,7 +498,7 @@ async def test_client_publish_big(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( [ call().fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, + EventBroker.MESSAGE_RECEIVED.value, client_id=pub_client.session.client_id, message=ret_message, ), diff --git a/tests/test_paho.py b/tests/test_paho.py index 2ffca53..2ba8936 100644 --- a/tests/test_paho.py +++ b/tests/test_paho.py @@ -6,8 +6,7 @@ from unittest.mock import MagicMock, call, patch import pytest from paho.mqtt import client as mqtt_client -from amqtt.broker import EVENT_BROKER_CLIENT_CONNECTED, EVENT_BROKER_CLIENT_DISCONNECTED, EVENT_BROKER_PRE_START, \ - EVENT_BROKER_POST_START +from amqtt.broker import EventBroker from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 @@ -54,11 +53,11 @@ async def test_paho_connect(broker, mock_plugin_manager): broker.plugins_manager.assert_has_calls( [ call.fire_event( - EVENT_BROKER_CLIENT_CONNECTED, + EventBroker.CLIENT_CONNECTED.value, client_id=client_id, ), call.fire_event( - EVENT_BROKER_CLIENT_DISCONNECTED, + EventBroker.CLIENT_DISCONNECTED.value, client_id=client_id, ), ], From 6ab946df1444a2eece6e0956341100290fab9c97 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sat, 14 Jun 2025 09:35:53 -0400 Subject: [PATCH 2/2] updating docstring --- amqtt/broker.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index d376b07..849db7f 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -43,12 +43,7 @@ AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 class EventBroker(Enum): - """Events issued by the broker. - - Attributes: - PRE_START: on_event_broker_pre_start - - """ + """Events issued by the broker.""" PRE_START = "broker_pre_start" POST_START = "broker_post_start"