converting from event strings to a string enumeration

pull/213/head
Andrew Mirsky 2025-06-13 14:48:20 -04:00
rodzic 24f99fdc2d
commit 51102a435b
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
4 zmienionych plików z 45 dodań i 47 usunięć

Wyświetl plik

@ -41,18 +41,29 @@ _defaults = read_yaml_config(Path(__file__).parent / "scripts/default_broker.yam
DEFAULT_PORTS = {"tcp": 1883, "ws": 8883}
AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80
EVENT_BROKER_PRE_START = "broker_pre_start"
EVENT_BROKER_POST_START = "broker_post_start"
EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown"
EVENT_BROKER_POST_SHUTDOWN = "broker_post_shutdown"
EVENT_BROKER_CLIENT_CONNECTED = "broker_client_connected"
EVENT_BROKER_CLIENT_DISCONNECTED = "broker_client_disconnected"
EVENT_BROKER_CLIENT_SUBSCRIBED = "broker_client_subscribed"
EVENT_BROKER_CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received"
class EventBroker(Enum):
"""Events issued by the broker.
Attributes:
PRE_START: on_event_broker_pre_start
"""
PRE_START = "broker_pre_start"
POST_START = "broker_post_start"
PRE_SHUTDOWN = "broker_pre_shutdown"
POST_SHUTDOWN = "broker_post_shutdown"
CLIENT_CONNECTED = "broker_client_connected"
CLIENT_DISCONNECTED = "broker_client_disconnected"
CLIENT_SUBSCRIBED = "broker_client_subscribed"
CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
MESSAGE_RECEIVED = "broker_message_received"
class Action(Enum):
"""Actions issued by the broker."""
SUBSCRIBE = "subscribe"
PUBLISH = "publish"
@ -242,11 +253,11 @@ class Broker:
msg = f"Broker instance can't be started: {exc}"
raise BrokerError(msg) from exc
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)
await self.plugins_manager.fire_event(EventBroker.PRE_START.value)
try:
await self._start_listeners()
self.transitions.starting_success()
await self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
await self.plugins_manager.fire_event(EventBroker.POST_START.value)
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop())
self.logger.debug("Broker started")
except Exception as e:
@ -327,7 +338,7 @@ class Broker:
"""Stop broker instance."""
self.logger.info("Shutting down broker...")
# Fire broker_shutdown event to plugins
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN)
await self.plugins_manager.fire_event(EventBroker.PRE_SHUTDOWN.value)
# Cleanup all sessions
for client_id in list(self._sessions.keys()):
@ -351,7 +362,7 @@ class Broker:
self._broadcast_queue.get_nowait()
self.logger.info("Broker closed")
await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)
await self.plugins_manager.fire_event(EventBroker.POST_SHUTDOWN.value)
self.transitions.stopping_success()
async def _cleanup_session(self, client_id: str) -> None:
@ -494,7 +505,7 @@ class Broker:
self._sessions[client_session.client_id] = (client_session, handler)
await handler.mqtt_connack_authorize(authenticated)
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id)
await self.plugins_manager.fire_event(EventBroker.CLIENT_CONNECTED.value, client_id=client_session.client_id)
self.logger.debug(f"{client_session.client_id} Start messages handling")
await handler.start()
@ -582,7 +593,7 @@ class Broker:
self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler)
client_session.transitions.disconnect()
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)
await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id)
return False
return True
@ -600,7 +611,7 @@ class Broker:
for index, subscription in enumerate(subscriptions.topics):
if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED:
await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED,
EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client_session.client_id,
topic=subscription[0],
qos=subscription[1],
@ -619,7 +630,7 @@ class Broker:
for topic in unsubscription.topics:
self._del_subscription(topic, client_session)
await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED,
EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client_session.client_id,
topic=topic,
)
@ -654,7 +665,7 @@ class Broker:
self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.")
else:
await self.plugins_manager.fire_event(
EVENT_BROKER_MESSAGE_RECEIVED,
EventBroker.MESSAGE_RECEIVED.value,
client_id=client_session.client_id,
message=app_message,
)

Wyświetl plik

@ -53,7 +53,6 @@ async def test_broker_sys_plugin() -> None:
except asyncio.TimeoutError:
pass
logger.warning(f">>> sys message: {message.topic} - {message.data}")
await client.disconnect()
await broker.shutdown()

Wyświetl plik

@ -7,18 +7,7 @@ import psutil
import pytest
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
from amqtt.broker import (
EVENT_BROKER_CLIENT_CONNECTED,
EVENT_BROKER_CLIENT_DISCONNECTED,
EVENT_BROKER_CLIENT_SUBSCRIBED,
EVENT_BROKER_CLIENT_UNSUBSCRIBED,
EVENT_BROKER_MESSAGE_RECEIVED,
EVENT_BROKER_POST_SHUTDOWN,
EVENT_BROKER_POST_START,
EVENT_BROKER_PRE_SHUTDOWN,
EVENT_BROKER_PRE_START,
Broker,
)
from amqtt.broker import EventBroker, Broker
from amqtt.client import MQTTClient
from amqtt.errors import ConnectError
from amqtt.mqtt.connack import ConnackPacket
@ -67,8 +56,8 @@ def test_split_bindaddr_port(input_str, output_addr, output_port):
async def test_start_stop(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(EVENT_BROKER_PRE_START),
call().fire_event(EVENT_BROKER_POST_START),
call().fire_event(EventBroker.PRE_START.value),
call().fire_event(EventBroker.POST_START.value),
],
any_order=True,
)
@ -76,8 +65,8 @@ async def test_start_stop(broker, mock_plugin_manager):
await broker.shutdown()
mock_plugin_manager.assert_has_calls(
[
call().fire_event(EVENT_BROKER_PRE_SHUTDOWN),
call().fire_event(EVENT_BROKER_POST_SHUTDOWN),
call().fire_event(EventBroker.PRE_SHUTDOWN.value),
call().fire_event(EventBroker.POST_SHUTDOWN.value),
],
any_order=True,
)
@ -98,11 +87,11 @@ async def test_client_connect(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_CLIENT_CONNECTED,
EventBroker.CLIENT_CONNECTED.value,
client_id=client.session.client_id,
),
call().fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED,
EventBroker.CLIENT_DISCONNECTED.value,
client_id=client.session.client_id,
),
],
@ -235,7 +224,7 @@ async def test_client_subscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED,
EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
@ -272,7 +261,7 @@ async def test_client_subscribe_twice(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED,
EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
@ -306,13 +295,13 @@ async def test_client_unsubscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED,
EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
),
call().fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED,
EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client.session.client_id,
topic="/topic",
),
@ -337,7 +326,7 @@ async def test_client_publish(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED,
EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id,
message=ret_message,
),
@ -509,7 +498,7 @@ async def test_client_publish_big(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED,
EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id,
message=ret_message,
),

Wyświetl plik

@ -6,8 +6,7 @@ from unittest.mock import MagicMock, call, patch
import pytest
from paho.mqtt import client as mqtt_client
from amqtt.broker import EVENT_BROKER_CLIENT_CONNECTED, EVENT_BROKER_CLIENT_DISCONNECTED, EVENT_BROKER_PRE_START, \
EVENT_BROKER_POST_START
from amqtt.broker import EventBroker
from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2
@ -54,11 +53,11 @@ async def test_paho_connect(broker, mock_plugin_manager):
broker.plugins_manager.assert_has_calls(
[
call.fire_event(
EVENT_BROKER_CLIENT_CONNECTED,
EventBroker.CLIENT_CONNECTED.value,
client_id=client_id,
),
call.fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED,
EventBroker.CLIENT_DISCONNECTED.value,
client_id=client_id,
),
],