From 2093cc5961105b809d9300053e32d4bb25845db9 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sat, 7 Jun 2025 23:56:30 -0400 Subject: [PATCH] fixes Yakifo/amqtt#199 : cleanly disconnected clients should not send will messages. fixing client tests, adding additional. and correcting the broker's client handler so that normal or abnormal disconnections end the client's session and stop processing messages from the client --- amqtt/broker.py | 37 +++++--- amqtt/mqtt/protocol/handler.py | 3 +- tests/test_client.py | 154 ++++++++++++++++++++++++++++++--- 3 files changed, 171 insertions(+), 23 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index 5690227..6d06ff2 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -28,6 +28,7 @@ from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session from amqtt.utils import format_client_message, gen_client_id, read_yaml_config +from .mqtt.disconnect import DisconnectPacket from .plugins.manager import BaseContext, PluginManager _CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]] @@ -525,8 +526,12 @@ class Broker: ) if disconnect_waiter in done: - connected = await self._handle_disconnect(client_session, handler, disconnect_waiter) - disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect()) + # handle the disconnection: normal or abnormal result, either way, the client is no longer connected + await self._handle_disconnect(client_session, handler, disconnect_waiter) + connected = False + + # no need to reschedule the `disconnect_waiter` since we're exiting the message loop + if subscribe_waiter in done: await self._handle_subscription(client_session, handler, subscribe_waiter) @@ -556,11 +561,20 @@ class Broker: client_session: Session, handler: BrokerProtocolHandler, disconnect_waiter: asyncio.Future[Any], - ) -> bool: - """Handle client disconnection.""" + ) -> None: + """Handle client disconnection. + + Args: + client_session (Session): client session + handler (BrokerProtocolHandler): broker protocol handler + disconnect_waiter (asyncio.Future[Any]): future to wait for disconnection + + """ + # check the disconnected waiter result result = disconnect_waiter.result() self.logger.debug(f"{client_session.client_id} Result from wait_disconnect: {result}") - if result is None: + # if the client disconnects abruptly by sending no message or the message isn't a disconnect packet + if result is None or not isinstance(result, DisconnectPacket): self.logger.debug(f"Will flag: {client_session.will_flag}") if client_session.will_flag: self.logger.debug( @@ -579,12 +593,13 @@ class Broker: client_session.will_message, client_session.will_qos, ) - self.logger.debug(f"{client_session.client_id} Disconnecting session") - await self._stop_handler(handler) - client_session.transitions.disconnect() - await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) - return False - return True + + # normal or not, let's end the client's session + self.logger.debug(f"{client_session.client_id} Disconnecting session") + await self._stop_handler(handler) + client_session.transitions.disconnect() + await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) + async def _handle_subscription( self, diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index 240c520..244bd46 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -152,7 +152,8 @@ class ProtocolHandler: if self.writer is not None: await self.writer.close() except asyncio.CancelledError: - self.logger.debug("Writer close was cancelled.", exc_info=True) + # canceling the task is the expected result + self.logger.debug("Writer close was cancelled.") except TimeoutError: self.logger.debug("Writer close operation timed out.", exc_info=True) except OSError: diff --git a/tests/test_client.py b/tests/test_client.py index 59547ce..ca34f6f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -265,27 +265,121 @@ def client_config(): @pytest.mark.asyncio -async def test_client_publish_will_with_retain(broker_fixture, client_config): +async def test_client_will_with_clean_disconnect(broker_fixture): + config = { + "will": { + "topic": "test/will/topic", + "retain": False, + "message": "client ABC has disconnected", + "qos": 1 + }, + } - # verifying client functionality of will topic - # https://github.com/Yakifo/amqtt/issues/159 + client1 = MQTTClient(client_id="client1", config=config) + await client1.connect("mqtt://localhost:1883") - client1 = MQTTClient(client_id="client1") + client2 = MQTTClient(client_id="client2") + await client2.connect("mqtt://localhost:1883") + await client2.subscribe( + [ + ("test/will/topic", QOS_0), + ] + ) + + await client1.disconnect() + await asyncio.sleep(1) + + with pytest.raises(asyncio.TimeoutError): + message = await client2.deliver_message(timeout_duration=2) + # if we do get a message, make sure it's not a will message + assert message.topic != "test/will/topic" + + await client2.disconnect() + + +@pytest.mark.asyncio +async def test_client_will_with_abrupt_disconnect(broker_fixture): + config = { + "will": { + "topic": "test/will/topic", + "retain": False, + "message": "client ABC has disconnected", + "qos": 1 + }, + } + + client1 = MQTTClient(client_id="client1", config=config) + await client1.connect("mqtt://localhost:1883") + + client2 = MQTTClient(client_id="client2") + await client2.connect("mqtt://localhost:1883") + await client2.subscribe( + [ + ("test/will/topic", QOS_0), + ] + ) + + # instead of client.disconnect, call the necessary closing but without sending the disconnect packet + await client1.cancel_tasks() + if client1._disconnect_task and not client1._disconnect_task.done(): + client1._disconnect_task.cancel() + client1._connected_state.clear() + await client1._handler.stop() + client1.session.transitions.disconnect() + + await asyncio.sleep(1) + + + message = await client2.deliver_message(timeout_duration=1) + # make sure we receive the will message + assert message.topic == "test/will/topic" + assert message.data == b'client ABC has disconnected' + + await client2.disconnect() + + +@pytest.mark.asyncio +async def test_client_retained_will_with_abrupt_disconnect(broker_fixture): + + # verifying client functionality of retained will topic/message + + config = { + "will": { + "topic": "test/will/topic", + "retain": True, + "message": "client ABC has disconnected", + "qos": 1 + }, + } + + # first client, connect with retained will message + client1 = MQTTClient(client_id="client1", config=config) await client1.connect('mqtt://localhost:1883') - await client1.subscribe([ + + client2 = MQTTClient(client_id="client2") + await client2.connect('mqtt://localhost:1883') + await client2.subscribe([ ("test/will/topic", QOS_0) ]) - client2 = MQTTClient(client_id="client2", config=client_config) - await client2.connect('mqtt://localhost:1883') - await client2.publish('my/topic', b'my message') - await client2.disconnect() - message = await client1.deliver_message(timeout_duration=1) + # let's abruptly disconnect client1 + await client1.cancel_tasks() + if client1._disconnect_task and not client1._disconnect_task.done(): + client1._disconnect_task.cancel() + client1._connected_state.clear() + await client1._handler.stop() + client1.session.transitions.disconnect() + + await asyncio.sleep(0.5) + + # make sure the client which is still connected that we get the 'will' message + message = await client2.deliver_message(timeout_duration=1) assert message.topic == 'test/will/topic' assert message.data == b'client ABC has disconnected' - await client1.disconnect() + await client2.disconnect() + # make sure a client which is connected after client1 disconnected still receives the 'will' message from client3 = MQTTClient(client_id="client3") await client3.connect('mqtt://localhost:1883') await client3.subscribe([ @@ -295,3 +389,41 @@ async def test_client_publish_will_with_retain(broker_fixture, client_config): assert message3.topic == 'test/will/topic' assert message3.data == b'client ABC has disconnected' await client3.disconnect() + + +@pytest.mark.asyncio +async def test_client_abruptly_disconnecting_with_empty_will_message(broker_fixture): + + config = { + "will": { + "topic": "test/will/topic", + "retain": True, + "message": "", + "qos": 1 + }, + } + client1 = MQTTClient(client_id="client1", config=config) + await client1.connect('mqtt://localhost:1883') + + client2 = MQTTClient(client_id="client2") + await client2.connect('mqtt://localhost:1883') + await client2.subscribe([ + ("test/will/topic", QOS_0) + ]) + + # let's abruptly disconnect client1 + await client1.cancel_tasks() + if client1._disconnect_task and not client1._disconnect_task.done(): + client1._disconnect_task.cancel() + client1._connected_state.clear() + await client1._handler.stop() + client1.session.transitions.disconnect() + + await asyncio.sleep(0.5) + + message = await client2.deliver_message(timeout_duration=1) + assert message.topic == 'test/will/topic' + assert message.data == b'' + + await client2.disconnect() +