From 29e5a74dbc72f8c26ff764de8d65353dce9a91bf Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 17:03:40 -0400 Subject: [PATCH 1/3] fixes Yakifo/amqtt#225 : updating all samples, adding test coverage for samples. python 3.10 needs to explicitly catch asyncio.TimeoutError. correct 'topic-check' plugin list --- amqtt/mqtt/protocol/handler.py | 6 +- amqtt/plugins/manager.py | 4 +- docs/overrides/base.html | 8 ++ mkdocs.rtd.yml | 1 + samples/broker_acl.py | 44 ++++++++-- samples/broker_simple.py | 32 ++++--- samples/broker_start.py | 48 ++++++++--- samples/broker_taboo.py | 45 ++++++++-- samples/client_keepalive.py | 47 +++++++--- samples/client_publish.py | 52 ++++++------ samples/client_publish_acl.py | 42 ++++----- samples/client_publish_ssl.py | 36 ++++---- samples/client_publish_ws.py | 34 ++++---- samples/client_subscribe.py | 33 ++++---- samples/client_subscribe_acl.py | 36 ++++---- samples/mosquitto.org.crt | 38 +++++---- tests/test_samples.py | 146 ++++++++++++++++++++++++++++++++ 17 files changed, 476 insertions(+), 176 deletions(-) create mode 100644 docs/overrides/base.html create mode 100644 tests/test_samples.py diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index f9a7238..666e7f5 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -153,7 +153,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: # canceling the task is the expected result self.logger.debug("Writer close was cancelled.") - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug("Writer close operation timed out.", exc_info=True) except OSError: self.logger.debug("Writer close failed due to I/O error.", exc_info=True) @@ -321,7 +321,7 @@ class ProtocolHandler(Generic[C]): self._puback_waiters[app_message.packet_id] = waiter try: app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5) - except TimeoutError: + except asyncio.TimeoutError: msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}" self.logger.warning(msg) raise TimeoutError(msg) from None @@ -528,7 +528,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: self.logger.debug("Task cancelled, reader loop ending") break - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug(f"{self.session.client_id} Input stream read timeout") self.handle_read_timeout() except NoDataError: diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index d1beb60..521f0c8 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -87,8 +87,8 @@ class PluginManager(Generic[C]): topic_filter_list = [] if self.app_context.config and "auth" in self.app_context.config: auth_filter_list = self.app_context.config["auth"].get("plugins", []) - if self.app_context.config and "topic" in self.app_context.config: - topic_filter_list = self.app_context.config["topic"].get("plugins", []) + if self.app_context.config and "topic-check" in self.app_context.config: + topic_filter_list = self.app_context.config["topic-check"].get("plugins", []) ep: EntryPoints | list[EntryPoint] = [] if hasattr(entry_points(), "select"): diff --git a/docs/overrides/base.html b/docs/overrides/base.html new file mode 100644 index 0000000..0af326a --- /dev/null +++ b/docs/overrides/base.html @@ -0,0 +1,8 @@ +{% extends "base.html" %} + +{% block outdated %} + You're not viewing the latest version. + + Click here to go to latest. + +{% endblock %} diff --git a/mkdocs.rtd.yml b/mkdocs.rtd.yml index 7f248d9..b4c1262 100644 --- a/mkdocs.rtd.yml +++ b/mkdocs.rtd.yml @@ -56,6 +56,7 @@ nav: theme: name: material logo: assets/amqtt_bw.svg + custom_dir: overrides features: - announce.dismiss - content.action.edit diff --git a/samples/broker_acl.py b/samples/broker_acl.py index c4c740f..7382ffd 100644 --- a/samples/broker_acl.py +++ b/samples/broker_acl.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check acl plugin +""" + logger = logging.getLogger(__name__) config = { @@ -38,15 +42,43 @@ config = { }, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main() -> None: + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/broker_simple.py b/samples/broker_simple.py index 22c4639..751bdf6 100644 --- a/samples/broker_simple.py +++ b/samples/broker_simple.py @@ -1,21 +1,31 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.broker import Broker -broker = Broker() +""" +This sample shows how to run a broker +""" + +formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" +logging.basicConfig(level=logging.INFO, format=formatter) -async def test_coro() -> None: - await broker.start() +async def run_server() -> None: + broker = Broker() + try: + await broker.start() + while True: + await asyncio.sleep(1) + except CancelledError: + await broker.shutdown() +def __main__(): + try: + asyncio.run(run_server()) + except KeyboardInterrupt: + print("Server exiting...") if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - - asyncio.get_event_loop().run_until_complete(test_coro()) - try: - asyncio.get_event_loop().run_forever() - except KeyboardInterrupt: - asyncio.get_event_loop().run_until_complete(broker.shutdown()) + __main__() diff --git a/samples/broker_start.py b/samples/broker_start.py index 1055ff1..578f704 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker without stacktraces on keyboard interrupt +""" + logger = logging.getLogger(__name__) config = { @@ -30,19 +34,41 @@ config = { "topic-check": {"enabled": False}, } -broker = Broker(config) +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -async def test_coro() -> None: - await broker.start() - # await asyncio.sleep(5) - # await broker.shutdown() +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - # formatter = "%(asctime)s :: %(levelname)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() - + __main__() diff --git a/samples/broker_taboo.py b/samples/broker_taboo.py index 4a1107b..7752469 100644 --- a/samples/broker_taboo.py +++ b/samples/broker_taboo.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check taboo plugin +""" + logger = logging.getLogger(__name__) config = { @@ -30,15 +34,44 @@ config = { "topic-check": {"enabled": True, "plugins": ["topic_taboo"]}, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 2e11b9a..1bb960c 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -1,13 +1,12 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.client import MQTTClient -# -# This sample shows a client running idle. -# Meanwhile, keepalive is managed through PING messages sent every 5 seconds -# - +""" +This sample shows how to run an idle client +""" logger = logging.getLogger(__name__) @@ -15,17 +14,39 @@ config = { "keep_alive": 5, "ping_delay": 1, } -C = MQTTClient(config=config) + +async def main() -> None: + client = MQTTClient(config=config) + + try: + await client.connect("mqtt://test.mosquitto.org:1883/") + logger.info("client connected") + await asyncio.sleep(18) + except CancelledError: + pass + + await client.disconnect() -async def test_coro() -> None: - await C.connect("mqtt://test.mosquitto.org:1883/") - await asyncio.sleep(18) +def __main__(): - await C.disconnect() + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping client...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + + loop.close() if __name__ == "__main__": - formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish.py b/samples/client_publish.py index 9b0f4f6..83b1eda 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -4,10 +4,9 @@ import logging from amqtt.client import ConnectError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker using different QOS +""" logger = logging.getLogger(__name__) @@ -21,36 +20,39 @@ config = { } -async def test_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") +async def test_coro1() -> None: + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) - logger.info("messages published") - await C.disconnect() + logger.info("test_coro1 messages published") + await client.disconnect() async def test_coro2() -> None: try: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org:1883/") - await C.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) - logger.info("messages published") - await C.disconnect() + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org:1883/") + await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) + logger.info("test_coro2 messages published") + await client.disconnect() except ConnectError: logger.exception(f"Connection failed", exc_info=True) - asyncio.get_event_loop().stop() +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro1()) + asyncio.run(test_coro2()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_until_complete(test_coro2()) + __main__() diff --git a/samples/client_publish_acl.py b/samples/client_publish_acl.py index 2ecf33e..43148d7 100644 --- a/samples/client_publish_acl.py +++ b/samples/client_publish_acl.py @@ -2,37 +2,41 @@ import asyncio import logging from amqtt.client import ConnectError, MQTTClient +from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker running either `samples/broker_acl.py` + or `samples/broker_taboo.py`. +""" logger = logging.getLogger(__name__) async def test_coro() -> None: try: - C = MQTTClient() - await C.connect("mqtt://0.0.0.0:1883") - await C.publish("data/classified", b"TOP SECRET", qos=0x01) - await C.publish("data/memes", b"REAL FUN", qos=0x01) - await C.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=0x01) - await C.publish( + client = MQTTClient() + await client.connect("mqtt://0.0.0.0:1883") + await client.publish("data/classified", b"TOP SECRET", qos=QOS_1) + await client.publish("data/memes", b"REAL FUN", qos=QOS_1) + await client.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=QOS_1) + await client.publish( "repositories/amqtt/devel", b"THIS NEEDS TO BE CHECKED", - qos=0x01, + qos=QOS_1, ) - await C.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=0x01) + await client.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=QOS_1) logger.info("messages published") - await C.disconnect() + await client.disconnect() except ConnectError as ce: - logger.exception("Connection failed") - asyncio.get_event_loop().stop() + logger.exception("ERROR: Connection failed") +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish_ssl.py b/samples/client_publish_ssl.py index 0f2de13..60c3acc 100644 --- a/samples/client_publish_ssl.py +++ b/samples/client_publish_ssl.py @@ -1,41 +1,45 @@ import asyncio import logging +from pathlib import Path from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure broker. +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, - }, + } } -C = MQTTClient(config=config) -# C = MQTTClient() + +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("mqtts://test.mosquitto.org/", cafile="mosquitto.org.crt") + + await client.connect("mqtts://broker.hivemq.com:8883") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_publish_ws.py b/samples/client_publish_ws.py index 4c0fa51..f9d96c9 100644 --- a/samples/client_publish_ws.py +++ b/samples/client_publish_ws.py @@ -4,39 +4,39 @@ import logging from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure websocket broker +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, - }, - "capath": ".", + } } -C = MQTTClient(config=config) -# C = MQTTClient() +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("wss://test.mosquitto.org:8081/", cafile="mosquitto.org.crt") + await client.connect("wss://test.mosquitto.org:8081/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe.py b/samples/client_subscribe.py index 3d175d6..642e66e 100644 --- a/samples/client_subscribe.py +++ b/samples/client_subscribe.py @@ -4,20 +4,19 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# + +""" +This sample shows how to subscribe to different $SYS topics and how to receive incoming messages +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org/") + + await client.subscribe( [ ("$SYS/broker/uptime", QOS_1), ("$SYS/broker/load/#", QOS_2), @@ -25,16 +24,20 @@ async def uptime_coro() -> None: ) logger.info("Subscribed") try: - for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + for _i in range(1, 10): + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(uptime_coro()) + asyncio.run(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe_acl.py b/samples/client_subscribe_acl.py index 1810b6b..f133a0b 100644 --- a/samples/client_subscribe_acl.py +++ b/samples/client_subscribe_acl.py @@ -4,41 +4,45 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# +""" +Run `samples/broker_acl.py` or `samples/broker_taboo.py` + +This sample shows how to subscribe to different topics, some of which are allowed. +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test:test@0.0.0.0:1883") - # await C.connect('mqtt://0.0.0.0:1883') - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test:test@0.0.0.0:1883") + + result = await client.subscribe( [ + ("$SYS/#", QOS_1), # Topic forbidden when running `broker_acl.py` ("data/memes", QOS_1), # Topic allowed ("data/classified", QOS_1), # Topic forbidden ("repositories/amqtt/master", QOS_1), # Topic allowed - ("repositories/amqtt/devel", QOS_1), # Topic forbidden + ("repositories/amqtt/devel", QOS_1), # Topic forbidden when running `broker_acl.py` ("calendar/amqtt/releases", QOS_1), # Topic allowed ], ) - logger.info("Subscribed") + logger.info(f"Subscribed results: {result}") try: for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/#", "data/memes"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError as ce: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.get_event_loop().run_until_complete(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/mosquitto.org.crt b/samples/mosquitto.org.crt index b8535e8..e76dbd8 100644 --- a/samples/mosquitto.org.crt +++ b/samples/mosquitto.org.crt @@ -1,18 +1,24 @@ -----BEGIN CERTIFICATE----- -MIIC8DCCAlmgAwIBAgIJAOD63PlXjJi8MA0GCSqGSIb3DQEBBQUAMIGQMQswCQYD -VQQGEwJHQjEXMBUGA1UECAwOVW5pdGVkIEtpbmdkb20xDjAMBgNVBAcMBURlcmJ5 -MRIwEAYDVQQKDAlNb3NxdWl0dG8xCzAJBgNVBAsMAkNBMRYwFAYDVQQDDA1tb3Nx -dWl0dG8ub3JnMR8wHQYJKoZIhvcNAQkBFhByb2dlckBhdGNob28ub3JnMB4XDTEy -MDYyOTIyMTE1OVoXDTIyMDYyNzIyMTE1OVowgZAxCzAJBgNVBAYTAkdCMRcwFQYD -VQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwGA1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1v -c3F1aXR0bzELMAkGA1UECwwCQ0ExFjAUBgNVBAMMDW1vc3F1aXR0by5vcmcxHzAd -BgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hvby5vcmcwgZ8wDQYJKoZIhvcNAQEBBQAD -gY0AMIGJAoGBAMYkLmX7SqOT/jJCZoQ1NWdCrr/pq47m3xxyXcI+FLEmwbE3R9vM -rE6sRbP2S89pfrCt7iuITXPKycpUcIU0mtcT1OqxGBV2lb6RaOT2gC5pxyGaFJ+h -A+GIbdYKO3JprPxSBoRponZJvDGEZuM3N7p3S/lRoi7G5wG5mvUmaE5RAgMBAAGj -UDBOMB0GA1UdDgQWBBTad2QneVztIPQzRRGj6ZHKqJTv5jAfBgNVHSMEGDAWgBTa -d2QneVztIPQzRRGj6ZHKqJTv5jAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUA -A4GBAAqw1rK4NlRUCUBLhEFUQasjP7xfFqlVbE2cRy0Rs4o3KS0JwzQVBwG85xge -REyPOFdGdhBY2P1FNRy0MDr6xr+D2ZOwxs63dG1nnAnWZg7qwoLgpZ4fESPD3PkA -1ZgKJc2zbSQ9fCPxt2W3mdVav66c6fsb7els2W2Iz7gERJSX +MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL +BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG +A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU +BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv +by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE +BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES +MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp +dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg +UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW +Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA +s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH +3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo +E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT +MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV +6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC +6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf ++pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK +sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839 +LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE +m/XriWr/Cq4h/JfB7NTsezVslgkBaoU= -----END CERTIFICATE----- diff --git a/tests/test_samples.py b/tests/test_samples.py new file mode 100644 index 0000000..49aa5b4 --- /dev/null +++ b/tests/test_samples.py @@ -0,0 +1,146 @@ +import asyncio +import logging +import signal +import subprocess +from pathlib import Path + +import pytest + +from amqtt.broker import Broker +from samples.client_publish import __main__ as client_publish_main +from samples.client_publish_ssl import __main__ as client_publish_ssl_main +from samples.client_publish_ws import __main__ as client_publish_ws_main +from samples.client_subscribe import __main__ as client_subscribe_main +from samples.client_keepalive import __main__ as client_keepalive_main +from samples.broker_acl import config as broker_acl_config +from samples.broker_taboo import config as broker_taboo_config + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_broker_acl(): + broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py" + process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_simple(): + broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_start(): + broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" + process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_taboo(): + broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" + process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.timeout(20) +def test_client_keepalive(): + client_keepalive_main() + + +def test_client_publish(): + client_publish_main() + + +def test_client_publish_ssl(): + client_publish_ssl_main() + + +@pytest.mark.asyncio +async def test_client_publish_acl(): + + broker = Broker() + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +def test_client_publish_ws(): + client_publish_ws_main() + + +def test_client_subscribe(): + client_subscribe_main() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_acl(): + broker = Broker(config=broker_acl_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [128, 1, 128, 1, 128, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_taboo(): + broker = Broker(config=broker_taboo_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [1, 1, 128, 1, 1, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() \ No newline at end of file From 9ba7c8019b5a25ea5aaf1ed70d4e91035226e40e Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 18:22:32 -0400 Subject: [PATCH 2/3] test case fix and show debug message on failure --- tests/test_samples.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/test_samples.py b/tests/test_samples.py index 49aa5b4..e4612bf 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -22,10 +22,11 @@ async def test_broker_acl(): broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py" process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -35,10 +36,11 @@ async def test_broker_simple(): broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -48,10 +50,11 @@ async def test_broker_start(): broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -61,9 +64,10 @@ async def test_broker_taboo(): broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -91,9 +95,10 @@ async def test_client_publish_acl(): broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -116,7 +121,7 @@ async def test_client_subscribe_plugin_acl(): broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -135,7 +140,7 @@ async def test_client_subscribe_plugin_taboo(): broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) From e0cb182957282b0ef5175b663cba37bc5e78d77e Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 18:32:35 -0400 Subject: [PATCH 3/3] broker_simple.py has different exit messages depending on python version --- tests/test_samples.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_samples.py b/tests/test_samples.py index e4612bf..1932bc5 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -40,9 +40,10 @@ async def test_broker_simple(): process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) - assert "Broker closed" in stderr.decode("utf-8") - assert "ERROR" not in stderr.decode("utf-8") - assert "Exception" not in stderr.decode("utf-8") + has_broker_closed = "Broker closed" in stderr.decode("utf-8") + has_loop_stopped = "Broadcast loop stopped by exception" in stderr.decode("utf-8") + + assert has_broker_closed or has_loop_stopped, "Broker didn't close correctly." @pytest.mark.asyncio