From d41a6f5205071b8e7aa28eed8a5ab377fd959826 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Fri, 4 Jul 2025 21:32:03 -0400 Subject: [PATCH] fixes #245 : broker shouldn't allow clients to publish to '$' topics --- amqtt/broker.py | 24 ++++++-- .../{test_dollar.py => test_dollar_topics.py} | 60 +++++++++++++------ 2 files changed, 60 insertions(+), 24 deletions(-) rename tests/{test_dollar.py => test_dollar_topics.py} (51%) diff --git a/amqtt/broker.py b/amqtt/broker.py index 6f311ae..e9740f6 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -172,6 +172,8 @@ class Broker: self._subscriptions: dict[str, list[tuple[Session, int]]] = {} self._retained_messages: dict[str, RetainedApplicationMessage] = {} + self._topic_filter_matchers: dict[str, re.Pattern[str]] = {} + # Broadcast queue for outgoing messages self._broadcast_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue() self._broadcast_task: asyncio.Task[Any] | None = None @@ -671,6 +673,11 @@ class Broker: f"[MQTT-3.3.2-2] - {client_session.client_id} invalid TOPIC sent in PUBLISH message, closing connection", ) return False + if app_message.topic.startswith("$"): + self.logger.warning( + f"[MQTT-4.7.2-1] - {client_session.client_id} cannot use a topic with a leading $ character." + ) + return False permitted = await self._topic_filtering(client_session, topic=app_message.topic, action=Action.PUBLISH) if not permitted: @@ -876,9 +883,6 @@ class Broker: self.logger.debug(f"Processing broadcast message: {broadcast}") for k_filter, subscriptions in self._subscriptions.items(): - if broadcast["topic"].startswith("$") and (k_filter.startswith(("+", "#"))): - self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #") - continue # Skip all subscriptions which do not match the topic if not self._matches(broadcast["topic"], k_filter): @@ -1003,11 +1007,21 @@ class Broker: ) def _matches(self, topic: str, a_filter: str) -> bool: + if topic.startswith("$") and (a_filter.startswith(("+", "#"))): + self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #") + return False + if "#" not in a_filter and "+" not in a_filter: # if filter doesn't contain wildcard, return exact match return a_filter == topic - # else use regex - match_pattern = re.compile(re.escape(a_filter).replace("\\#", "?.*").replace("\\+", "[^/]*").lstrip("?")) + + # else use regex (re.compile is an expensive operation, store the matcher for future use) + if a_filter not in self._topic_filter_matchers: + self._topic_filter_matchers[a_filter] = re.compile(re.escape(a_filter) + .replace("\\#", "?.*") + .replace("\\+", "[^/]*") + .lstrip("?")) + match_pattern = self._topic_filter_matchers[a_filter] return bool(match_pattern.fullmatch(topic)) def _get_handler(self, session: Session) -> BrokerProtocolHandler | None: diff --git a/tests/test_dollar.py b/tests/test_dollar_topics.py similarity index 51% rename from tests/test_dollar.py rename to tests/test_dollar_topics.py index d79fbc1..374f548 100644 --- a/tests/test_dollar.py +++ b/tests/test_dollar_topics.py @@ -10,10 +10,9 @@ from amqtt.mqtt.constants import QOS_0 logger = logging.getLogger(__name__) - @pytest.mark.asyncio async def test_publish_to_dollar_sign_topics(): - """Applications cannot use a topic with a leading $ character for their own purposes.""" + """Applications cannot use a topic with a leading $ character for their own purposes [MQTT-4.7.2-1].""" cfg = { 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, @@ -33,12 +32,10 @@ async def test_publish_to_dollar_sign_topics(): await asyncio.sleep(0.1) await c.publish('$MY', b'message should be blocked') await asyncio.sleep(0.1) - try: - msg = await c.deliver_message(timeout_duration=1) - logger.debug(f"topic: {msg.topic}") - assert msg is None - except asyncio.TimeoutError: - pass + + with pytest.raises(asyncio.TimeoutError): + # wait long enough for broker sys plugin to run + _ = await c.deliver_message(timeout_duration=1) await c.disconnect() await asyncio.sleep(0.1) @@ -46,7 +43,7 @@ async def test_publish_to_dollar_sign_topics(): @pytest.mark.asyncio async def test_hash_will_not_receive_dollar(): - """A subscription to “#” will not receive any messages published to a topic beginning with a $""" + """A subscription to “#” will not receive any messages published to a topic beginning with a $ [MQTT-4.7.2-1].""" cfg = { 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, @@ -67,22 +64,47 @@ async def test_hash_will_not_receive_dollar(): ) await asyncio.sleep(0.1) - try: - msg = await c.deliver_message(timeout_duration=5) - logger.debug(f"topic: {msg.topic}") - assert msg is None - except asyncio.TimeoutError: - pass + with pytest.raises(asyncio.TimeoutError): + # wait long enough for broker sys plugin to run + _ = await c.deliver_message(timeout_duration=5) await c.disconnect() await asyncio.sleep(0.1) await b.shutdown() -# A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients -# A subscription to “$SYS/#” will receive messages published to topics beginning with “$SYS/” +@pytest.mark.asyncio +async def test_plus_will_not_receive_dollar(): + """A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients [MQTT-4.7.2-1]""" + # BrokerSysPlugin doesn't use $SYS/monitor/Clients, so this is an equivalent test with $SYS/broker topics -# A subscription to “$SYS/monitor/+” will receive messages published to “$SYS/monitor/Clients” + cfg = { + 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, + 'plugins': { + 'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True}, + 'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2} + } + } -# For a Client to receive messages from topics that begin with $SYS/ and from topics that don’t begin with a $, it has to subscribe to both “#” and “$SYS/#” + b = Broker(config=cfg) + await b.start() + await asyncio.sleep(0.1) + c = MQTTClient(config={'auto_reconnect': False}) + await c.connect() + await asyncio.sleep(0.1) + await c.subscribe( + [('+/broker/#', QOS_0), + ('+/broker/time', QOS_0), + ('+/broker/clients/#', QOS_0), + ('+/broker/+/maximum', QOS_0) + ] + ) + await asyncio.sleep(0.1) + with pytest.raises(asyncio.TimeoutError): + # wait long enough for broker sys plugin to run + _ = await c.deliver_message(timeout_duration=5) + + await c.disconnect() + await asyncio.sleep(0.1) + await b.shutdown()