Merge pull request #200 from ajmirsky/issues/199

fixes #199 : 'will' messages should only be sent if disconnection without a disconnect packet
pull/214/head^2
Andrew Mirsky 2025-06-14 10:16:17 -04:00 zatwierdzone przez GitHub
commit 5dcf7080f2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
5 zmienionych plików z 151 dodań i 141 usunięć

Wyświetl plik

@ -28,6 +28,7 @@ from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
from .mqtt.disconnect import DisconnectPacket
from .plugins.manager import BaseContext, PluginManager
_CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]]
@ -534,8 +535,12 @@ class Broker:
)
if disconnect_waiter in done:
connected = await self._handle_disconnect(client_session, handler, disconnect_waiter)
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect())
# handle the disconnection: normal or abnormal result, either way, the client is no longer connected
await self._handle_disconnect(client_session, handler, disconnect_waiter)
connected = False
# no need to reschedule the `disconnect_waiter` since we're exiting the message loop
if subscribe_waiter in done:
await self._handle_subscription(client_session, handler, subscribe_waiter)
@ -565,11 +570,20 @@ class Broker:
client_session: Session,
handler: BrokerProtocolHandler,
disconnect_waiter: asyncio.Future[Any],
) -> bool:
"""Handle client disconnection."""
) -> None:
"""Handle client disconnection.
Args:
client_session (Session): client session
handler (BrokerProtocolHandler): broker protocol handler
disconnect_waiter (asyncio.Future[Any]): future to wait for disconnection
"""
# check the disconnected waiter result
result = disconnect_waiter.result()
self.logger.debug(f"{client_session.client_id} Result from wait_disconnect: {result}")
if result is None:
# if the client disconnects abruptly by sending no message or the message isn't a disconnect packet
if result is None or not isinstance(result, DisconnectPacket):
self.logger.debug(f"Will flag: {client_session.will_flag}")
if client_session.will_flag:
self.logger.debug(
@ -588,12 +602,13 @@ class Broker:
client_session.will_message,
client_session.will_qos,
)
self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler)
client_session.transitions.disconnect()
await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id)
return False
return True
# normal or not, let's end the client's session
self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler)
client_session.transitions.disconnect()
await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id)
async def _handle_subscription(
self,
@ -672,7 +687,6 @@ class Broker:
self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
return True
# TODO: Remove this method, not found it used
async def _init_handler(self, session: Session, reader: ReaderAdapter, writer: WriterAdapter) -> BrokerProtocolHandler:
"""Create a BrokerProtocolHandler and attach to a session."""
handler = BrokerProtocolHandler(self.plugins_manager, loop=self._loop)
@ -695,7 +709,6 @@ class Broker:
- False if user authentication fails
- None if authentication can't be achieved (then plugin result is then ignored)
:param session:
:param listener:
:return:
"""
auth_plugins = None
@ -766,7 +779,6 @@ class Broker:
- False if MQTT client is not allowed to subscribe to the topic
- None if topic filtering can't be achieved (then plugin result is then ignored)
:param session:
:param listener:
:param topic: Topic in which the client wants to subscribe / publish
:param action: What is being done with the topic? subscribe or publish
:return:

Wyświetl plik

@ -152,7 +152,8 @@ class ProtocolHandler:
if self.writer is not None:
await self.writer.close()
except asyncio.CancelledError:
self.logger.debug("Writer close was cancelled.", exc_info=True)
# canceling the task is the expected result
self.logger.debug("Writer close was cancelled.")
except TimeoutError:
self.logger.debug("Writer close operation timed out.", exc_info=True)
except OSError:

Wyświetl plik

@ -110,8 +110,7 @@ def _version(v:bool) -> None:
def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
url: str = typer.Option(None, help="Broker connection URL, *must conform to MQTT or URI scheme: `[mqtt(s)|ws(s)]://<username:password>@HOST:port`*", show_default=False),
config_file: str | None = typer.Option(None, "-c", help="Client configuration file"),
client_id: str | None = typer.Option(None, "-i", help="client identification for mqtt connection. *default: process id and the hostname of the client*"),
max_count: int | None = typer.Option(None, "-n", help="Number of messages to read before ending *default: read indefinitely*"),
client_id: str | None = typer.Option(None, "-i", "--client-id", help="client identification for mqtt connection. *default: process id and the hostname of the client*"), max_count: int | None = typer.Option(None, "-n", help="Number of messages to read before ending *default: read indefinitely*"),
qos: int = typer.Option(0, "--qos", "-q", help="Quality of service (0, 1, or 2)"),
topics: list[str] = typer.Option(..., "-t", help="Topic filter to subscribe, can be used multiple times."), # noqa: B008
keep_alive: int | None = typer.Option(None, "-k", help="Keep alive timeout in seconds"),

Wyświetl plik

@ -4,10 +4,12 @@ import os
import signal
import subprocess
import tempfile
from unittest.mock import patch
import pytest
import yaml
from amqtt.broker import Broker
from amqtt.mqtt.constants import QOS_0
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
@ -158,38 +160,6 @@ async def test_publish_subscribe(broker):
assert sub_proc.returncode == 0
@pytest.mark.asyncio
async def test_pub_sub_retain(broker):
"""Test various pub/sub will retain options."""
# Test publishing with retain flag
pub_proc = subprocess.run(
[
"amqtt_pub",
"--url", "mqtt://127.0.0.1:1884",
"-t", "topic/test",
"-m", "standard message",
"--will-topic", "topic/retain",
"--will-message", "last will message",
"--will-retain",
],
capture_output=True,
)
assert pub_proc.returncode == 0, f"publisher error code: {pub_proc.returncode}\n{pub_proc.stderr}"
logger.debug("publisher succeeded")
# Verify retained message is received by new subscriber
sub_proc = subprocess.run(
[
"amqtt_sub",
"--url", "mqtt://127.0.0.1:1884",
"-t", "topic/retain",
"-n", "1",
],
capture_output=True,
)
assert sub_proc.returncode == 0, f"subscriber error code: {sub_proc.returncode}\n{sub_proc.stderr}"
assert "last will message" in str(sub_proc.stdout)
@pytest.mark.asyncio
async def test_pub_errors(client_config_file):
"""Test error handling in pub/sub tools."""
@ -275,74 +245,3 @@ async def test_pub_client_config(broker, client_config_file):
logger.debug(f"Stderr: {stderr.decode()}")
assert proc.returncode == 0, f"publisher error code: {proc.returncode}"
@pytest.mark.asyncio
async def test_pub_client_config_will(broker, client_config_file):
# verifying client script functionality of will topic (publisher)
# https://github.com/Yakifo/amqtt/issues/159
await asyncio.sleep(1)
client1 = MQTTClient(client_id="client1")
await client1.connect('mqtt://localhost:1884')
await client1.subscribe([
("test/will/topic", QOS_0)
])
cmd = ["amqtt_pub",
"-t", "test/topic",
"-m", "\"test of regular topic\"",
"-c", client_config_file]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")
message = await client1.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic'
assert message.data == b'client ABC has disconnected'
await client1.disconnect()
@pytest.mark.asyncio
@pytest.mark.timeout(20)
async def test_sub_client_config_will(broker, client_config, client_config_file):
# verifying client script functionality of will topic (subscriber)
# https://github.com/Yakifo/amqtt/issues/159
client1 = MQTTClient(client_id="client1")
await client1.connect('mqtt://localhost:1884')
await client1.subscribe([
("test/will/topic", QOS_0)
])
cmd = ["amqtt_sub",
"-t", "test/topic",
"-c", client_config_file,
"-n", "1"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(2)
# cause `amqtt_sub` to exit after receiving this one message
await client1.publish("test/topic", b'my test message')
# validate the 'will' message was received correctly
message = await client1.deliver_message(timeout_duration=3)
assert message.topic == 'test/will/topic'
assert message.data == b'client ABC has disconnected'
await client1.disconnect()
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")

Wyświetl plik

@ -268,27 +268,121 @@ def client_config():
@pytest.mark.asyncio
async def test_client_publish_will_with_retain(broker_fixture, client_config):
async def test_client_will_with_clean_disconnect(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": False,
"message": "client ABC has disconnected",
"qos": 1
},
}
# verifying client functionality of will topic
# https://github.com/Yakifo/amqtt/issues/159
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect("mqtt://localhost:1883")
client1 = MQTTClient(client_id="client1")
client2 = MQTTClient(client_id="client2")
await client2.connect("mqtt://localhost:1883")
await client2.subscribe(
[
("test/will/topic", QOS_0),
]
)
await client1.disconnect()
await asyncio.sleep(1)
with pytest.raises(asyncio.TimeoutError):
message = await client2.deliver_message(timeout_duration=2)
# if we do get a message, make sure it's not a will message
assert message.topic != "test/will/topic"
await client2.disconnect()
@pytest.mark.asyncio
async def test_client_will_with_abrupt_disconnect(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": False,
"message": "client ABC has disconnected",
"qos": 1
},
}
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect("mqtt://localhost:1883")
client2 = MQTTClient(client_id="client2")
await client2.connect("mqtt://localhost:1883")
await client2.subscribe(
[
("test/will/topic", QOS_0),
]
)
# instead of client.disconnect, call the necessary closing but without sending the disconnect packet
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(1)
message = await client2.deliver_message(timeout_duration=1)
# make sure we receive the will message
assert message.topic == "test/will/topic"
assert message.data == b'client ABC has disconnected'
await client2.disconnect()
@pytest.mark.asyncio
async def test_client_retained_will_with_abrupt_disconnect(broker_fixture):
# verifying client functionality of retained will topic/message
config = {
"will": {
"topic": "test/will/topic",
"retain": True,
"message": "client ABC has disconnected",
"qos": 1
},
}
# first client, connect with retained will message
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect('mqtt://localhost:1883')
await client1.subscribe([
client2 = MQTTClient(client_id="client2")
await client2.connect('mqtt://localhost:1883')
await client2.subscribe([
("test/will/topic", QOS_0)
])
client2 = MQTTClient(client_id="client2", config=client_config)
await client2.connect('mqtt://localhost:1883')
await client2.publish('my/topic', b'my message')
await client2.disconnect()
message = await client1.deliver_message(timeout_duration=1)
# let's abruptly disconnect client1
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(0.5)
# make sure the client which is still connected that we get the 'will' message
message = await client2.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic'
assert message.data == b'client ABC has disconnected'
await client1.disconnect()
await client2.disconnect()
# make sure a client which is connected after client1 disconnected still receives the 'will' message from
client3 = MQTTClient(client_id="client3")
await client3.connect('mqtt://localhost:1883')
await client3.subscribe([
@ -301,21 +395,18 @@ async def test_client_publish_will_with_retain(broker_fixture, client_config):
@pytest.mark.asyncio
async def test_client_with_will_empty_message(broker_fixture):
client_config = {
"broker": {
"uri": "mqtt://localhost:1883"
},
"reconnect_max_interval": 5,
async def test_client_abruptly_disconnecting_with_empty_will_message(broker_fixture):
config = {
"will": {
"topic": "test/will/topic",
"retain": True,
"message": "",
"qos": 0
"qos": 1
},
}
client1 = MQTTClient(client_id="client1", config=client_config)
await client1.connect()
client1 = MQTTClient(client_id="client1", config=config)
await client1.connect('mqtt://localhost:1883')
client2 = MQTTClient(client_id="client2")
await client2.connect('mqtt://localhost:1883')
@ -323,7 +414,15 @@ async def test_client_with_will_empty_message(broker_fixture):
("test/will/topic", QOS_0)
])
await client1.disconnect()
# let's abruptly disconnect client1
await client1.cancel_tasks()
if client1._disconnect_task and not client1._disconnect_task.done():
client1._disconnect_task.cancel()
client1._connected_state.clear()
await client1._handler.stop()
client1.session.transitions.disconnect()
await asyncio.sleep(0.5)
message = await client2.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic'