resolves Yakifo/amqtt#27 Yakifo/amqtt#246 Yakifo/amqtt#123: only retain messages for clients which specify clean session = 1 and messages are QoS 1 & 2. Also, clients which are completely anonymous, specifying no username, have no expectation of message retention.

pull/248/head
Andrew Mirsky 2025-07-01 22:00:18 -04:00
rodzic 1276503748
commit 2ceb2ae43b
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
10 zmienionych plików z 27 dodań i 13 usunięć

4
.gitignore vendored
Wyświetl plik

@ -34,3 +34,7 @@ site/
_build/
.hypothesis/
coverage.xml
#----- generated files -----
*.log
*memray*

Wyświetl plik

@ -29,6 +29,7 @@ from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Sessio
from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
from .events import BrokerEvents
from .mqtt.constants import QOS_1, QOS_2
from .mqtt.disconnect import DisconnectPacket
from .plugins.manager import BaseContext, PluginManager
@ -438,6 +439,7 @@ class Broker:
await self._delete_session(client_session.client_id)
else:
client_session.client_id = gen_client_id()
client_session.parent = 0
# Get session from cache
elif client_session.client_id in self._sessions:
@ -880,7 +882,12 @@ class Broker:
qos = broadcast.get("qos", sub_qos)
# Retain all messages which cannot be broadcasted, due to the session not being connected
if target_session.transitions.state != "connected":
# but only when clean session is false and qos is 1 or 2 [MQTT 3.1.2.4]
# and, if a client used anonymous authentication, there is no expectation that messages should be retained
if (target_session.transitions.state != "connected"
and not target_session.clean_session
and qos in (QOS_1, QOS_2)
and not target_session.is_anonymous):
self.logger.debug(f"Session {target_session.client_id} is not connected, retaining message.")
await self._retain_broadcast_message(broadcast, qos, target_session)
continue

Wyświetl plik

@ -597,7 +597,7 @@ class MQTTClient:
session.cadata = broker_conf.get("cadata")
if cleansession is not None:
broker_conf["cleansession"] = cleansession
broker_conf["cleansession"] = cleansession # noop?
session.clean_session = cleansession
else:
session.clean_session = self.config.get("cleansession", True)

Wyświetl plik

@ -192,7 +192,7 @@ class ConnectPayload(MQTTPayload[ConnectVariableHeader]):
# A Server MAY allow a Client to supply a ClientId that has a length of zero bytes
# [MQTT-3.1.3-6]
payload.client_id = gen_client_id()
# indicator to trow exception in case CLEAN_SESSION_FLAG is set to False
# indicator to throw exception in case CLEAN_SESSION_FLAG is set to False
payload.client_id_is_random = True
# Read will topic, username and password

Wyświetl plik

@ -19,6 +19,7 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
allow_anonymous = self.auth_config.get("allow-anonymous", True) if isinstance(self.auth_config, dict) else True
if allow_anonymous:
self.context.logger.debug("Authentication success: config allows anonymous")
session.is_anonymous = True
return True
if session and session.username:

Wyświetl plik

@ -165,6 +165,10 @@ class PluginManager(Generic[C]):
def _schedule_coro(self, coro: Awaitable[str | bool | None]) -> asyncio.Future[str | bool | None]:
return asyncio.ensure_future(coro)
def _clean_fired_events(self, future: asyncio.Future[Any]) -> None:
with contextlib.suppress(KeyError, ValueError):
self._fired_events.remove(future)
async def fire_event(self, event_name: Events, *, wait: bool = False, **method_kwargs: Any) -> None:
"""Fire an event to plugins.
@ -190,12 +194,7 @@ class PluginManager(Generic[C]):
coro_instance: Awaitable[Any] = call_method(event_awaitable, method_kwargs)
tasks.append(asyncio.ensure_future(coro_instance))
def clean_fired_events(future: asyncio.Future[Any]) -> None:
with contextlib.suppress(KeyError, ValueError):
self._fired_events.remove(future)
tasks[-1].add_done_callback(clean_fired_events)
tasks[-1].add_done_callback(self._clean_fired_events)
self._fired_events.extend(tasks)
if wait and tasks:

Wyświetl plik

@ -4,6 +4,7 @@ ping_delay: 1
default_qos: 0
default_retain: false
auto_reconnect: true
cleansession: true
reconnect_max_interval: 10
reconnect_retries: 2
broker:

Wyświetl plik

@ -151,6 +151,9 @@ class Session:
# Stores PUBLISH messages ID received in order and ready for application process
self.delivered_message_queue: Queue[ApplicationMessage] = Queue()
# identify anonymous client sessions or clients which didn't identify themselves
self.is_anonymous: bool = False
def _init_states(self) -> None:
self.transitions = Machine(states=Session.states, initial="new")
self.transitions.add_transition(

Wyświetl plik

@ -26,8 +26,7 @@ System status report interval in seconds (`broker_sys` plugin)
### `timeout-disconnect-delay` *(int)*
Client disconnect timeout without a keep-alive
Client disconnect timeout without a keep-alive.
### `auth` *(mapping)*

Wyświetl plik

@ -632,7 +632,7 @@ async def test_client_subscribe_publish_dollar_topic_2(broker):
@pytest.mark.asyncio
async def test_client_publish_retain_subscribe(broker):
sub_client = MQTTClient()
sub_client = MQTTClient(client_id='test_client')
await sub_client.connect("mqtt://127.0.0.1", cleansession=False)
ret = await sub_client.subscribe(
[("/qos0", QOS_0), ("/qos1", QOS_1), ("/qos2", QOS_2)],
@ -644,7 +644,7 @@ async def test_client_publish_retain_subscribe(broker):
await _client_publish("/qos0", b"data", QOS_0, retain=True)
await _client_publish("/qos1", b"data", QOS_1, retain=True)
await _client_publish("/qos2", b"data", QOS_2, retain=True)
await sub_client.reconnect()
await sub_client.reconnect(cleansession=False)
for qos in [QOS_0, QOS_1, QOS_2]:
log.debug(f"TEST QOS: {qos}")
message = await sub_client.deliver_message()