kopia lustrzana https://github.com/Yakifo/amqtt
fixes #245 : broker shouldn't allow clients to publish to '$' topics
rodzic
0704169929
commit
d41a6f5205
|
@ -172,6 +172,8 @@ class Broker:
|
||||||
self._subscriptions: dict[str, list[tuple[Session, int]]] = {}
|
self._subscriptions: dict[str, list[tuple[Session, int]]] = {}
|
||||||
self._retained_messages: dict[str, RetainedApplicationMessage] = {}
|
self._retained_messages: dict[str, RetainedApplicationMessage] = {}
|
||||||
|
|
||||||
|
self._topic_filter_matchers: dict[str, re.Pattern[str]] = {}
|
||||||
|
|
||||||
# Broadcast queue for outgoing messages
|
# Broadcast queue for outgoing messages
|
||||||
self._broadcast_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
|
self._broadcast_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
|
||||||
self._broadcast_task: asyncio.Task[Any] | None = None
|
self._broadcast_task: asyncio.Task[Any] | None = None
|
||||||
|
@ -671,6 +673,11 @@ class Broker:
|
||||||
f"[MQTT-3.3.2-2] - {client_session.client_id} invalid TOPIC sent in PUBLISH message, closing connection",
|
f"[MQTT-3.3.2-2] - {client_session.client_id} invalid TOPIC sent in PUBLISH message, closing connection",
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
if app_message.topic.startswith("$"):
|
||||||
|
self.logger.warning(
|
||||||
|
f"[MQTT-4.7.2-1] - {client_session.client_id} cannot use a topic with a leading $ character."
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
permitted = await self._topic_filtering(client_session, topic=app_message.topic, action=Action.PUBLISH)
|
permitted = await self._topic_filtering(client_session, topic=app_message.topic, action=Action.PUBLISH)
|
||||||
if not permitted:
|
if not permitted:
|
||||||
|
@ -876,9 +883,6 @@ class Broker:
|
||||||
self.logger.debug(f"Processing broadcast message: {broadcast}")
|
self.logger.debug(f"Processing broadcast message: {broadcast}")
|
||||||
|
|
||||||
for k_filter, subscriptions in self._subscriptions.items():
|
for k_filter, subscriptions in self._subscriptions.items():
|
||||||
if broadcast["topic"].startswith("$") and (k_filter.startswith(("+", "#"))):
|
|
||||||
self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Skip all subscriptions which do not match the topic
|
# Skip all subscriptions which do not match the topic
|
||||||
if not self._matches(broadcast["topic"], k_filter):
|
if not self._matches(broadcast["topic"], k_filter):
|
||||||
|
@ -1003,11 +1007,21 @@ class Broker:
|
||||||
)
|
)
|
||||||
|
|
||||||
def _matches(self, topic: str, a_filter: str) -> bool:
|
def _matches(self, topic: str, a_filter: str) -> bool:
|
||||||
|
if topic.startswith("$") and (a_filter.startswith(("+", "#"))):
|
||||||
|
self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #")
|
||||||
|
return False
|
||||||
|
|
||||||
if "#" not in a_filter and "+" not in a_filter:
|
if "#" not in a_filter and "+" not in a_filter:
|
||||||
# if filter doesn't contain wildcard, return exact match
|
# if filter doesn't contain wildcard, return exact match
|
||||||
return a_filter == topic
|
return a_filter == topic
|
||||||
# else use regex
|
|
||||||
match_pattern = re.compile(re.escape(a_filter).replace("\\#", "?.*").replace("\\+", "[^/]*").lstrip("?"))
|
# else use regex (re.compile is an expensive operation, store the matcher for future use)
|
||||||
|
if a_filter not in self._topic_filter_matchers:
|
||||||
|
self._topic_filter_matchers[a_filter] = re.compile(re.escape(a_filter)
|
||||||
|
.replace("\\#", "?.*")
|
||||||
|
.replace("\\+", "[^/]*")
|
||||||
|
.lstrip("?"))
|
||||||
|
match_pattern = self._topic_filter_matchers[a_filter]
|
||||||
return bool(match_pattern.fullmatch(topic))
|
return bool(match_pattern.fullmatch(topic))
|
||||||
|
|
||||||
def _get_handler(self, session: Session) -> BrokerProtocolHandler | None:
|
def _get_handler(self, session: Session) -> BrokerProtocolHandler | None:
|
||||||
|
|
|
@ -10,10 +10,9 @@ from amqtt.mqtt.constants import QOS_0
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_publish_to_dollar_sign_topics():
|
async def test_publish_to_dollar_sign_topics():
|
||||||
"""Applications cannot use a topic with a leading $ character for their own purposes."""
|
"""Applications cannot use a topic with a leading $ character for their own purposes [MQTT-4.7.2-1]."""
|
||||||
|
|
||||||
cfg = {
|
cfg = {
|
||||||
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
||||||
|
@ -33,12 +32,10 @@ async def test_publish_to_dollar_sign_topics():
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
await c.publish('$MY', b'message should be blocked')
|
await c.publish('$MY', b'message should be blocked')
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
try:
|
|
||||||
msg = await c.deliver_message(timeout_duration=1)
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
logger.debug(f"topic: {msg.topic}")
|
# wait long enough for broker sys plugin to run
|
||||||
assert msg is None
|
_ = await c.deliver_message(timeout_duration=1)
|
||||||
except asyncio.TimeoutError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
await c.disconnect()
|
await c.disconnect()
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
@ -46,7 +43,7 @@ async def test_publish_to_dollar_sign_topics():
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_hash_will_not_receive_dollar():
|
async def test_hash_will_not_receive_dollar():
|
||||||
"""A subscription to “#” will not receive any messages published to a topic beginning with a $"""
|
"""A subscription to “#” will not receive any messages published to a topic beginning with a $ [MQTT-4.7.2-1]."""
|
||||||
|
|
||||||
cfg = {
|
cfg = {
|
||||||
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
||||||
|
@ -67,22 +64,47 @@ async def test_hash_will_not_receive_dollar():
|
||||||
)
|
)
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
try:
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
msg = await c.deliver_message(timeout_duration=5)
|
# wait long enough for broker sys plugin to run
|
||||||
logger.debug(f"topic: {msg.topic}")
|
_ = await c.deliver_message(timeout_duration=5)
|
||||||
assert msg is None
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
await c.disconnect()
|
await c.disconnect()
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
await b.shutdown()
|
await b.shutdown()
|
||||||
|
|
||||||
# A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients
|
|
||||||
|
|
||||||
# A subscription to “$SYS/#” will receive messages published to topics beginning with “$SYS/”
|
@pytest.mark.asyncio
|
||||||
|
async def test_plus_will_not_receive_dollar():
|
||||||
|
"""A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients [MQTT-4.7.2-1]"""
|
||||||
|
# BrokerSysPlugin doesn't use $SYS/monitor/Clients, so this is an equivalent test with $SYS/broker topics
|
||||||
|
|
||||||
# A subscription to “$SYS/monitor/+” will receive messages published to “$SYS/monitor/Clients”
|
cfg = {
|
||||||
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
||||||
|
'plugins': {
|
||||||
|
'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True},
|
||||||
|
'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
# For a Client to receive messages from topics that begin with $SYS/ and from topics that don’t begin with a $, it has to subscribe to both “#” and “$SYS/#”
|
b = Broker(config=cfg)
|
||||||
|
await b.start()
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
c = MQTTClient(config={'auto_reconnect': False})
|
||||||
|
await c.connect()
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
await c.subscribe(
|
||||||
|
[('+/broker/#', QOS_0),
|
||||||
|
('+/broker/time', QOS_0),
|
||||||
|
('+/broker/clients/#', QOS_0),
|
||||||
|
('+/broker/+/maximum', QOS_0)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
|
# wait long enough for broker sys plugin to run
|
||||||
|
_ = await c.deliver_message(timeout_duration=5)
|
||||||
|
|
||||||
|
await c.disconnect()
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
await b.shutdown()
|
Ładowanie…
Reference in New Issue