fixes Yakifo/amqtt#199 : cleanly disconnected clients should not send will messages. fixing client tests, adding additional. and correcting the broker's client handler so that normal or abnormal disconnections end the client's session and stop processing messages from the client

pull/200/head
Andrew Mirsky 2025-06-07 23:56:30 -04:00
rodzic 80016d8cca
commit 2093cc5961
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
3 zmienionych plików z 171 dodań i 23 usunięć

Wyświetl plik

@ -28,6 +28,7 @@ from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
from amqtt.utils import format_client_message, gen_client_id, read_yaml_config from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
from .mqtt.disconnect import DisconnectPacket
from .plugins.manager import BaseContext, PluginManager from .plugins.manager import BaseContext, PluginManager
_CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]] _CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]]
@ -525,8 +526,12 @@ class Broker:
) )
if disconnect_waiter in done: if disconnect_waiter in done:
connected = await self._handle_disconnect(client_session, handler, disconnect_waiter) # handle the disconnection: normal or abnormal result, either way, the client is no longer connected
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect()) await self._handle_disconnect(client_session, handler, disconnect_waiter)
connected = False
# no need to reschedule the `disconnect_waiter` since we're exiting the message loop
if subscribe_waiter in done: if subscribe_waiter in done:
await self._handle_subscription(client_session, handler, subscribe_waiter) await self._handle_subscription(client_session, handler, subscribe_waiter)
@ -556,11 +561,20 @@ class Broker:
client_session: Session, client_session: Session,
handler: BrokerProtocolHandler, handler: BrokerProtocolHandler,
disconnect_waiter: asyncio.Future[Any], disconnect_waiter: asyncio.Future[Any],
) -> bool: ) -> None:
"""Handle client disconnection.""" """Handle client disconnection.
Args:
client_session (Session): client session
handler (BrokerProtocolHandler): broker protocol handler
disconnect_waiter (asyncio.Future[Any]): future to wait for disconnection
"""
# check the disconnected waiter result
result = disconnect_waiter.result() result = disconnect_waiter.result()
self.logger.debug(f"{client_session.client_id} Result from wait_disconnect: {result}") self.logger.debug(f"{client_session.client_id} Result from wait_disconnect: {result}")
if result is None: # if the client disconnects abruptly by sending no message or the message isn't a disconnect packet
if result is None or not isinstance(result, DisconnectPacket):
self.logger.debug(f"Will flag: {client_session.will_flag}") self.logger.debug(f"Will flag: {client_session.will_flag}")
if client_session.will_flag: if client_session.will_flag:
self.logger.debug( self.logger.debug(
@ -579,12 +593,13 @@ class Broker:
client_session.will_message, client_session.will_message,
client_session.will_qos, client_session.will_qos,
) )
self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler) # normal or not, let's end the client's session
client_session.transitions.disconnect() self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) await self._stop_handler(handler)
return False client_session.transitions.disconnect()
return True await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)
async def _handle_subscription( async def _handle_subscription(
self, self,

Wyświetl plik

@ -152,7 +152,8 @@ class ProtocolHandler:
if self.writer is not None: if self.writer is not None:
await self.writer.close() await self.writer.close()
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.debug("Writer close was cancelled.", exc_info=True) # canceling the task is the expected result
self.logger.debug("Writer close was cancelled.")
except TimeoutError: except TimeoutError:
self.logger.debug("Writer close operation timed out.", exc_info=True) self.logger.debug("Writer close operation timed out.", exc_info=True)
except OSError: except OSError:

Wyświetl plik

@ -265,27 +265,121 @@ def client_config():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_client_publish_will_with_retain(broker_fixture, client_config): async def test_client_will_with_clean_disconnect(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": False,
"message": "client ABC has disconnected",
"qos": 1
},
}
# verifying client functionality of will topic client1 = MQTTClient(client_id="client1", config=config)
# https://github.com/Yakifo/amqtt/issues/159 await client1.connect("mqtt://localhost:1883")
client1 = MQTTClient(client_id="client1") client2 = MQTTClient(client_id="client2")
await client2.connect("mqtt://localhost:1883")
await client2.subscribe(
[
("test/will/topic", QOS_0),
]
)
await client1.disconnect()
await asyncio.sleep(1)
with pytest.raises(asyncio.TimeoutError):
message = await client2.deliver_message(timeout_duration=2)
# if we do get a message, make sure it's not a will message
assert message.topic != "test/will/topic"
await client2.disconnect()
@pytest.mark.asyncio
async def test_client_will_with_abrupt_disconnect(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": False,
"message": "client ABC has disconnected",
"qos": 1
},
}
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect("mqtt://localhost:1883")
client2 = MQTTClient(client_id="client2")
await client2.connect("mqtt://localhost:1883")
await client2.subscribe(
[
("test/will/topic", QOS_0),
]
)
# instead of client.disconnect, call the necessary closing but without sending the disconnect packet
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(1)
message = await client2.deliver_message(timeout_duration=1)
# make sure we receive the will message
assert message.topic == "test/will/topic"
assert message.data == b'client ABC has disconnected'
await client2.disconnect()
@pytest.mark.asyncio
async def test_client_retained_will_with_abrupt_disconnect(broker_fixture):
# verifying client functionality of retained will topic/message
config = {
"will": {
"topic": "test/will/topic",
"retain": True,
"message": "client ABC has disconnected",
"qos": 1
},
}
# first client, connect with retained will message
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect('mqtt://localhost:1883') await client1.connect('mqtt://localhost:1883')
await client1.subscribe([
client2 = MQTTClient(client_id="client2")
await client2.connect('mqtt://localhost:1883')
await client2.subscribe([
("test/will/topic", QOS_0) ("test/will/topic", QOS_0)
]) ])
client2 = MQTTClient(client_id="client2", config=client_config)
await client2.connect('mqtt://localhost:1883')
await client2.publish('my/topic', b'my message')
await client2.disconnect()
message = await client1.deliver_message(timeout_duration=1) # let's abruptly disconnect client1
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(0.5)
# make sure the client which is still connected that we get the 'will' message
message = await client2.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic' assert message.topic == 'test/will/topic'
assert message.data == b'client ABC has disconnected' assert message.data == b'client ABC has disconnected'
await client1.disconnect() await client2.disconnect()
# make sure a client which is connected after client1 disconnected still receives the 'will' message from
client3 = MQTTClient(client_id="client3") client3 = MQTTClient(client_id="client3")
await client3.connect('mqtt://localhost:1883') await client3.connect('mqtt://localhost:1883')
await client3.subscribe([ await client3.subscribe([
@ -295,3 +389,41 @@ async def test_client_publish_will_with_retain(broker_fixture, client_config):
assert message3.topic == 'test/will/topic' assert message3.topic == 'test/will/topic'
assert message3.data == b'client ABC has disconnected' assert message3.data == b'client ABC has disconnected'
await client3.disconnect() await client3.disconnect()
@pytest.mark.asyncio
async def test_client_abruptly_disconnecting_with_empty_will_message(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": True,
"message": "",
"qos": 1
},
}
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect('mqtt://localhost:1883')
client2 = MQTTClient(client_id="client2")
await client2.connect('mqtt://localhost:1883')
await client2.subscribe([
("test/will/topic", QOS_0)
])
# let's abruptly disconnect client1
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(0.5)
message = await client2.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic'
assert message.data == b''
await client2.disconnect()