Merge branch 'rc' into issues/154

pull/198/head
Andrew Mirsky 2025-06-14 10:03:09 -04:00
commit 101e32af64
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
5 zmienionych plików z 78 dodań i 57 usunięć

Wyświetl plik

@ -41,18 +41,24 @@ _defaults = read_yaml_config(Path(__file__).parent / "scripts/default_broker.yam
DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} DEFAULT_PORTS = {"tcp": 1883, "ws": 8883}
AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80
EVENT_BROKER_PRE_START = "broker_pre_start"
EVENT_BROKER_POST_START = "broker_post_start" class EventBroker(Enum):
EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown" """Events issued by the broker."""
EVENT_BROKER_POST_SHUTDOWN = "broker_post_shutdown"
EVENT_BROKER_CLIENT_CONNECTED = "broker_client_connected" PRE_START = "broker_pre_start"
EVENT_BROKER_CLIENT_DISCONNECTED = "broker_client_disconnected" POST_START = "broker_post_start"
EVENT_BROKER_CLIENT_SUBSCRIBED = "broker_client_subscribed" PRE_SHUTDOWN = "broker_pre_shutdown"
EVENT_BROKER_CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" POST_SHUTDOWN = "broker_post_shutdown"
EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" CLIENT_CONNECTED = "broker_client_connected"
CLIENT_DISCONNECTED = "broker_client_disconnected"
CLIENT_SUBSCRIBED = "broker_client_subscribed"
CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
MESSAGE_RECEIVED = "broker_message_received"
class Action(Enum): class Action(Enum):
"""Actions issued by the broker."""
SUBSCRIBE = "subscribe" SUBSCRIBE = "subscribe"
PUBLISH = "publish" PUBLISH = "publish"
@ -242,11 +248,11 @@ class Broker:
msg = f"Broker instance can't be started: {exc}" msg = f"Broker instance can't be started: {exc}"
raise BrokerError(msg) from exc raise BrokerError(msg) from exc
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_START) await self.plugins_manager.fire_event(EventBroker.PRE_START.value)
try: try:
await self._start_listeners() await self._start_listeners()
self.transitions.starting_success() self.transitions.starting_success()
await self.plugins_manager.fire_event(EVENT_BROKER_POST_START) await self.plugins_manager.fire_event(EventBroker.POST_START.value)
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop()) self._broadcast_task = asyncio.ensure_future(self._broadcast_loop())
self.logger.debug("Broker started") self.logger.debug("Broker started")
except Exception as e: except Exception as e:
@ -327,7 +333,7 @@ class Broker:
"""Stop broker instance.""" """Stop broker instance."""
self.logger.info("Shutting down broker...") self.logger.info("Shutting down broker...")
# Fire broker_shutdown event to plugins # Fire broker_shutdown event to plugins
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN) await self.plugins_manager.fire_event(EventBroker.PRE_SHUTDOWN.value)
# Cleanup all sessions # Cleanup all sessions
for client_id in list(self._sessions.keys()): for client_id in list(self._sessions.keys()):
@ -351,7 +357,7 @@ class Broker:
self._broadcast_queue.get_nowait() self._broadcast_queue.get_nowait()
self.logger.info("Broker closed") self.logger.info("Broker closed")
await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN) await self.plugins_manager.fire_event(EventBroker.POST_SHUTDOWN.value)
self.transitions.stopping_success() self.transitions.stopping_success()
async def _cleanup_session(self, client_id: str) -> None: async def _cleanup_session(self, client_id: str) -> None:
@ -494,7 +500,7 @@ class Broker:
self._sessions[client_session.client_id] = (client_session, handler) self._sessions[client_session.client_id] = (client_session, handler)
await handler.mqtt_connack_authorize(authenticated) await handler.mqtt_connack_authorize(authenticated)
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id) await self.plugins_manager.fire_event(EventBroker.CLIENT_CONNECTED.value, client_id=client_session.client_id)
self.logger.debug(f"{client_session.client_id} Start messages handling") self.logger.debug(f"{client_session.client_id} Start messages handling")
await handler.start() await handler.start()
@ -582,7 +588,7 @@ class Broker:
self.logger.debug(f"{client_session.client_id} Disconnecting session") self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler) await self._stop_handler(handler)
client_session.transitions.disconnect() client_session.transitions.disconnect()
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id)
return False return False
return True return True
@ -600,7 +606,7 @@ class Broker:
for index, subscription in enumerate(subscriptions.topics): for index, subscription in enumerate(subscriptions.topics):
if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED: if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED:
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
topic=subscription[0], topic=subscription[0],
qos=subscription[1], qos=subscription[1],
@ -619,7 +625,7 @@ class Broker:
for topic in unsubscription.topics: for topic in unsubscription.topics:
self._del_subscription(topic, client_session) self._del_subscription(topic, client_session)
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED, EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
topic=topic, topic=topic,
) )
@ -654,7 +660,7 @@ class Broker:
self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.") self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.")
else: else:
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
message=app_message, message=app_message,
) )

Wyświetl plik

@ -219,7 +219,7 @@ class MQTTClient:
self.logger.debug(f"Reconnecting with session parameters: {self.session}") self.logger.debug(f"Reconnecting with session parameters: {self.session}")
reconnect_max_interval = self.config.get("reconnect_max_interval", 10) reconnect_max_interval = self.config.get("reconnect_max_interval", 10)
reconnect_retries = self.config.get("reconnect_retries", 5) reconnect_retries = self.config.get("reconnect_retries", 2)
nb_attempt = 1 nb_attempt = 1
while True: while True:
@ -232,7 +232,7 @@ class MQTTClient:
except Exception as e: except Exception as e:
self.logger.warning(f"Reconnection attempt failed: {e!r}") self.logger.warning(f"Reconnection attempt failed: {e!r}")
self.logger.debug("", exc_info=True) self.logger.debug("", exc_info=True)
if reconnect_retries < nb_attempt: # reconnect_retries >= 0 and if 0 <= reconnect_retries < nb_attempt:
self.logger.exception("Maximum connection attempts reached. Reconnection aborted.") self.logger.exception("Maximum connection attempts reached. Reconnection aborted.")
self.logger.debug("", exc_info=True) self.logger.debug("", exc_info=True)
msg = "Too many failed attempts" msg = "Too many failed attempts"
@ -470,6 +470,7 @@ class MQTTClient:
reader: StreamReaderAdapter | WebSocketsReader | None = None reader: StreamReaderAdapter | WebSocketsReader | None = None
writer: StreamWriterAdapter | WebSocketsWriter | None = None writer: StreamWriterAdapter | WebSocketsWriter | None = None
self._connected_state.clear() self._connected_state.clear()
# Open connection # Open connection
if scheme in ("mqtt", "mqtts"): if scheme in ("mqtt", "mqtts"):
conn_reader, conn_writer = await asyncio.open_connection( conn_reader, conn_writer = await asyncio.open_connection(
@ -489,11 +490,11 @@ class MQTTClient:
) )
reader = WebSocketsReader(websocket) reader = WebSocketsReader(websocket)
writer = WebSocketsWriter(websocket) writer = WebSocketsWriter(websocket)
elif not self.session.broker_uri:
if reader is None or writer is None: msg = "missing broker uri"
self.session.transitions.disconnect() raise ClientError(msg)
self.logger.warning("reader or writer not initialized") else:
msg = "reader or writer not initialized" msg = f"incorrect scheme defined in uri: '{scheme!r}'"
raise ClientError(msg) raise ClientError(msg)
# Start MQTT protocol # Start MQTT protocol
@ -533,7 +534,7 @@ class MQTTClient:
while self.client_tasks: while self.client_tasks:
task = self.client_tasks.popleft() task = self.client_tasks.popleft()
if not task.done(): if not task.done():
task.cancel() task.cancel(msg="Connection closed.")
self.logger.debug("Monitoring broker disconnection") self.logger.debug("Monitoring broker disconnection")
# Wait for disconnection from broker (like connection lost) # Wait for disconnection from broker (like connection lost)

Wyświetl plik

@ -7,18 +7,7 @@ import psutil
import pytest import pytest
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
from amqtt.broker import ( from amqtt.broker import EventBroker, Broker
EVENT_BROKER_CLIENT_CONNECTED,
EVENT_BROKER_CLIENT_DISCONNECTED,
EVENT_BROKER_CLIENT_SUBSCRIBED,
EVENT_BROKER_CLIENT_UNSUBSCRIBED,
EVENT_BROKER_MESSAGE_RECEIVED,
EVENT_BROKER_POST_SHUTDOWN,
EVENT_BROKER_POST_START,
EVENT_BROKER_PRE_SHUTDOWN,
EVENT_BROKER_PRE_START,
Broker,
)
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.errors import ConnectError from amqtt.errors import ConnectError
from amqtt.mqtt.connack import ConnackPacket from amqtt.mqtt.connack import ConnackPacket
@ -67,8 +56,8 @@ def test_split_bindaddr_port(input_str, output_addr, output_port):
async def test_start_stop(broker, mock_plugin_manager): async def test_start_stop(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event(EVENT_BROKER_PRE_START), call().fire_event(EventBroker.PRE_START.value),
call().fire_event(EVENT_BROKER_POST_START), call().fire_event(EventBroker.POST_START.value),
], ],
any_order=True, any_order=True,
) )
@ -76,8 +65,8 @@ async def test_start_stop(broker, mock_plugin_manager):
await broker.shutdown() await broker.shutdown()
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event(EVENT_BROKER_PRE_SHUTDOWN), call().fire_event(EventBroker.PRE_SHUTDOWN.value),
call().fire_event(EVENT_BROKER_POST_SHUTDOWN), call().fire_event(EventBroker.POST_SHUTDOWN.value),
], ],
any_order=True, any_order=True,
) )
@ -98,11 +87,11 @@ async def test_client_connect(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_CONNECTED, EventBroker.CLIENT_CONNECTED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
), ),
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED, EventBroker.CLIENT_DISCONNECTED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
), ),
], ],
@ -235,7 +224,7 @@ async def test_client_subscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
@ -272,7 +261,7 @@ async def test_client_subscribe_twice(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
@ -306,13 +295,13 @@ async def test_client_unsubscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
), ),
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED, EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
), ),
@ -337,7 +326,7 @@ async def test_client_publish(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id, client_id=pub_client.session.client_id,
message=ret_message, message=ret_message,
), ),
@ -509,7 +498,7 @@ async def test_client_publish_big(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id, client_id=pub_client.session.client_id,
message=ret_message, message=ret_message,
), ),
@ -740,3 +729,16 @@ async def test_broker_broadcast_cancellation(broker):
await _client_publish(topic, data, qos) await _client_publish(topic, data, qos)
message = await asyncio.wait_for(sub_client.deliver_message(), timeout=1) message = await asyncio.wait_for(sub_client.deliver_message(), timeout=1)
assert message assert message
@pytest.mark.asyncio
async def test_broker_socket_open_close(broker):
# check that https://github.com/Yakifo/amqtt/issues/86 is fixed
# mqtt 3.1 requires a connect packet, otherwise the socket connection is rejected
static_connect_packet = b'\x10\x1b\x00\x04MQTT\x04\x02\x00<\x00\x0ftest-client-123'
s = socket.create_connection(("127.0.0.1", 1883))
s.send(static_connect_packet)
await asyncio.sleep(0.1)
s.close()

Wyświetl plik

@ -7,7 +7,7 @@ import pytest
from amqtt.broker import Broker from amqtt.broker import Broker
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.errors import ConnectError from amqtt.errors import ClientError, ConnectError
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
@ -332,9 +332,23 @@ async def test_client_with_will_empty_message(broker_fixture):
await client2.disconnect() await client2.disconnect()
async def test_client_no_auth(): async def test_connect_broken_uri():
config = {"auto_reconnect": False}
client = MQTTClient(config=config)
with pytest.raises(ClientError):
await client.connect('"mqtt://someplace')
@pytest.mark.asyncio
async def test_connect_incorrect_scheme():
config = {"auto_reconnect": False}
client = MQTTClient(config=config)
with pytest.raises(ClientError):
await client.connect('"mq://someplace')
async def test_client_no_auth():
class MockEntryPoints: class MockEntryPoints:
def select(self, group) -> list[EntryPoint]: def select(self, group) -> list[EntryPoint]:
@ -368,4 +382,3 @@ async def test_client_no_auth():
await client.connect("mqtt://127.0.0.1:1883/") await client.connect("mqtt://127.0.0.1:1883/")
await broker.shutdown() await broker.shutdown()

Wyświetl plik

@ -6,8 +6,7 @@ from unittest.mock import MagicMock, call, patch
import pytest import pytest
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from amqtt.broker import EVENT_BROKER_CLIENT_CONNECTED, EVENT_BROKER_CLIENT_DISCONNECTED, EVENT_BROKER_PRE_START, \ from amqtt.broker import EventBroker
EVENT_BROKER_POST_START
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_1, QOS_2
@ -54,11 +53,11 @@ async def test_paho_connect(broker, mock_plugin_manager):
broker.plugins_manager.assert_has_calls( broker.plugins_manager.assert_has_calls(
[ [
call.fire_event( call.fire_event(
EVENT_BROKER_CLIENT_CONNECTED, EventBroker.CLIENT_CONNECTED.value,
client_id=client_id, client_id=client_id,
), ),
call.fire_event( call.fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED, EventBroker.CLIENT_DISCONNECTED.value,
client_id=client_id, client_id=client_id,
), ),
], ],