From 78834006449c2720d19e42489e445ac55c949666 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 11:54:36 -0400 Subject: [PATCH 01/19] fix a broken link on the test.amqtt.io page --- docs_test/src/dashboard/components/MainGrid.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs_test/src/dashboard/components/MainGrid.tsx b/docs_test/src/dashboard/components/MainGrid.tsx index b27b2ee..eccbaea 100644 --- a/docs_test/src/dashboard/components/MainGrid.tsx +++ b/docs_test/src/dashboard/components/MainGrid.tsx @@ -138,7 +138,7 @@ export default function MainGrid() {

github: Yakifo/amqtt + href="https://github.com/Yakifo/amqtt">Yakifo/amqtt

PyPi: Date: Tue, 17 Jun 2025 12:53:37 -0400 Subject: [PATCH 02/19] moving base classes for auth and topic plugins into common file --- amqtt/plugins/authentication.py | 32 +-------------- amqtt/plugins/base.py | 61 ++++++++++++++++++++++++++++ amqtt/plugins/manager.py | 5 +-- amqtt/plugins/topic_checking.py | 33 +-------------- docs/custom_plugins.md | 4 +- tests/plugins/mocks.py | 4 +- tests/plugins/test_manager.py | 3 +- tests/plugins/test_topic_checking.py | 3 +- 8 files changed, 71 insertions(+), 74 deletions(-) diff --git a/amqtt/plugins/authentication.py b/amqtt/plugins/authentication.py index 90f7eb9..954c403 100644 --- a/amqtt/plugins/authentication.py +++ b/amqtt/plugins/authentication.py @@ -1,44 +1,14 @@ from pathlib import Path -from typing import Any from passlib.apps import custom_app_context as pwd_context from amqtt.broker import BrokerContext -from amqtt.plugins.base import BasePlugin -from amqtt.plugins.manager import BaseContext +from amqtt.plugins.base import BaseAuthPlugin from amqtt.session import Session _PARTS_EXPECTED_LENGTH = 2 # Expected number of parts in a valid line -class BaseAuthPlugin(BasePlugin[BaseContext]): - """Base class for authentication plugins.""" - - def __init__(self, context: BaseContext) -> None: - super().__init__(context) - - self.auth_config: dict[str, Any] | None = self._get_config_section("auth") - if not self.auth_config: - self.context.logger.warning("'auth' section not found in context configuration") - - async def authenticate(self, *, session: Session) -> bool | None: - """Logic for session authentication. - - Args: - session: amqtt.session.Session - - Returns: - - `True` if user is authentication succeed, `False` if user authentication fails - - `None` if authentication can't be achieved (then plugin result is then ignored) - - """ - if not self.auth_config: - # auth config section not found - self.context.logger.warning("'auth' section not found in context configuration") - return False - return True - - class AnonymousAuthPlugin(BaseAuthPlugin): """Authentication plugin allowing anonymous access.""" diff --git a/amqtt/plugins/base.py b/amqtt/plugins/base.py index 2d5f644..90ee1b3 100644 --- a/amqtt/plugins/base.py +++ b/amqtt/plugins/base.py @@ -1,6 +1,8 @@ from typing import Any, Generic, TypeVar +from amqtt.broker import Action from amqtt.plugins.manager import BaseContext +from amqtt.session import Session C = TypeVar("C", bound=BaseContext) @@ -24,3 +26,62 @@ class BasePlugin(Generic[C]): async def close(self) -> None: """Override if plugin needs to clean up resources upon shutdown.""" + + +class BaseTopicPlugin(BasePlugin[BaseContext]): + """Base class for topic plugins.""" + + def __init__(self, context: BaseContext) -> None: + super().__init__(context) + + self.topic_config: dict[str, Any] | None = self._get_config_section("topic-check") + if self.topic_config is None: + self.context.logger.warning("'topic-check' section not found in context configuration") + + async def topic_filtering( + self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None + ) -> bool: + """Logic for filtering out topics. + + Args: + session: amqtt.session.Session + topic: str + action: amqtt.broker.Action + + Returns: + bool: `True` if topic is allowed, `False` otherwise + + """ + if not self.topic_config: + # auth config section not found + self.context.logger.warning("'topic-check' section not found in context configuration") + return False + return True + + +class BaseAuthPlugin(BasePlugin[BaseContext]): + """Base class for authentication plugins.""" + + def __init__(self, context: BaseContext) -> None: + super().__init__(context) + + self.auth_config: dict[str, Any] | None = self._get_config_section("auth") + if not self.auth_config: + self.context.logger.warning("'auth' section not found in context configuration") + + async def authenticate(self, *, session: Session) -> bool | None: + """Logic for session authentication. + + Args: + session: amqtt.session.Session + + Returns: + - `True` if user is authentication succeed, `False` if user authentication fails + - `None` if authentication can't be achieved (then plugin result is then ignored) + + """ + if not self.auth_config: + # auth config section not found + self.context.logger.warning("'auth' section not found in context configuration") + return False + return True diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index d1beb60..dd177e7 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -18,9 +18,8 @@ _LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: from amqtt.broker import Action - from amqtt.plugins.authentication import BaseAuthPlugin - from amqtt.plugins.base import BasePlugin - from amqtt.plugins.topic_checking import BaseTopicPlugin + from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin + class Plugin(NamedTuple): name: str diff --git a/amqtt/plugins/topic_checking.py b/amqtt/plugins/topic_checking.py index d92d672..c61e313 100644 --- a/amqtt/plugins/topic_checking.py +++ b/amqtt/plugins/topic_checking.py @@ -1,42 +1,11 @@ from typing import Any from amqtt.broker import Action -from amqtt.plugins.base import BasePlugin +from amqtt.plugins.base import BaseTopicPlugin from amqtt.plugins.manager import BaseContext from amqtt.session import Session -class BaseTopicPlugin(BasePlugin[BaseContext]): - """Base class for topic plugins.""" - - def __init__(self, context: BaseContext) -> None: - super().__init__(context) - - self.topic_config: dict[str, Any] | None = self._get_config_section("topic-check") - if self.topic_config is None: - self.context.logger.warning("'topic-check' section not found in context configuration") - - async def topic_filtering( - self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None - ) -> bool: - """Logic for filtering out topics. - - Args: - session: amqtt.session.Session - topic: str - action: amqtt.broker.Action - - Returns: - bool: `True` if topic is allowed, `False` otherwise - - """ - if not self.topic_config: - # auth config section not found - self.context.logger.warning("'topic-check' section not found in context configuration") - return False - return True - - class TopicTabooPlugin(BaseTopicPlugin): def __init__(self, context: BaseContext) -> None: super().__init__(context) diff --git a/docs/custom_plugins.md b/docs/custom_plugins.md index 08ccbdc..34eb4ec 100644 --- a/docs/custom_plugins.md +++ b/docs/custom_plugins.md @@ -58,7 +58,7 @@ auth: These plugins should subclass from `BaseAuthPlugin` and implement the `authenticate` method. -::: amqtt.plugins.authentication.BaseAuthPlugin +::: amqtt.plugins.base.BaseAuthPlugin ## Topic Filter Plugins @@ -75,4 +75,4 @@ topic-check: These plugins should subclass from `BaseTopicPlugin` and implement the `topic_filtering` method. -::: amqtt.plugins.topic_checking.BaseTopicPlugin +::: amqtt.plugins.base.BaseTopicPlugin diff --git a/tests/plugins/mocks.py b/tests/plugins/mocks.py index dab1229..dce94a6 100644 --- a/tests/plugins/mocks.py +++ b/tests/plugins/mocks.py @@ -4,10 +4,8 @@ from dataclasses import dataclass from amqtt.broker import Action -from amqtt.plugins.base import BasePlugin +from amqtt.plugins.base import BasePlugin, BaseTopicPlugin, BaseAuthPlugin from amqtt.plugins.manager import BaseContext -from amqtt.plugins.topic_checking import BaseTopicPlugin -from amqtt.plugins.authentication import BaseAuthPlugin from amqtt.session import Session diff --git a/tests/plugins/test_manager.py b/tests/plugins/test_manager.py index b9fa575..5164534 100644 --- a/tests/plugins/test_manager.py +++ b/tests/plugins/test_manager.py @@ -4,9 +4,8 @@ import unittest from amqtt.broker import Action from amqtt.events import BrokerEvents -from amqtt.plugins.authentication import BaseAuthPlugin from amqtt.plugins.manager import BaseContext, PluginManager -from amqtt.plugins.topic_checking import BaseTopicPlugin +from amqtt.plugins.base import BaseTopicPlugin, BaseAuthPlugin from amqtt.session import Session formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py index e90f365..ef5c69a 100644 --- a/tests/plugins/test_topic_checking.py +++ b/tests/plugins/test_topic_checking.py @@ -4,7 +4,8 @@ import pytest from amqtt.broker import Action, BrokerContext, Broker from amqtt.plugins.manager import BaseContext -from amqtt.plugins.topic_checking import BaseTopicPlugin, TopicAccessControlListPlugin, TopicTabooPlugin +from amqtt.plugins.topic_checking import TopicAccessControlListPlugin, TopicTabooPlugin +from amqtt.plugins.base import BaseTopicPlugin from amqtt.session import Session # Base plug-in object From 29e5a74dbc72f8c26ff764de8d65353dce9a91bf Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 17:03:40 -0400 Subject: [PATCH 03/19] fixes Yakifo/amqtt#225 : updating all samples, adding test coverage for samples. python 3.10 needs to explicitly catch asyncio.TimeoutError. correct 'topic-check' plugin list --- amqtt/mqtt/protocol/handler.py | 6 +- amqtt/plugins/manager.py | 4 +- docs/overrides/base.html | 8 ++ mkdocs.rtd.yml | 1 + samples/broker_acl.py | 44 ++++++++-- samples/broker_simple.py | 32 ++++--- samples/broker_start.py | 48 ++++++++--- samples/broker_taboo.py | 45 ++++++++-- samples/client_keepalive.py | 47 +++++++--- samples/client_publish.py | 52 ++++++------ samples/client_publish_acl.py | 42 ++++----- samples/client_publish_ssl.py | 36 ++++---- samples/client_publish_ws.py | 34 ++++---- samples/client_subscribe.py | 33 ++++---- samples/client_subscribe_acl.py | 36 ++++---- samples/mosquitto.org.crt | 38 +++++---- tests/test_samples.py | 146 ++++++++++++++++++++++++++++++++ 17 files changed, 476 insertions(+), 176 deletions(-) create mode 100644 docs/overrides/base.html create mode 100644 tests/test_samples.py diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index f9a7238..666e7f5 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -153,7 +153,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: # canceling the task is the expected result self.logger.debug("Writer close was cancelled.") - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug("Writer close operation timed out.", exc_info=True) except OSError: self.logger.debug("Writer close failed due to I/O error.", exc_info=True) @@ -321,7 +321,7 @@ class ProtocolHandler(Generic[C]): self._puback_waiters[app_message.packet_id] = waiter try: app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5) - except TimeoutError: + except asyncio.TimeoutError: msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}" self.logger.warning(msg) raise TimeoutError(msg) from None @@ -528,7 +528,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: self.logger.debug("Task cancelled, reader loop ending") break - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug(f"{self.session.client_id} Input stream read timeout") self.handle_read_timeout() except NoDataError: diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index d1beb60..521f0c8 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -87,8 +87,8 @@ class PluginManager(Generic[C]): topic_filter_list = [] if self.app_context.config and "auth" in self.app_context.config: auth_filter_list = self.app_context.config["auth"].get("plugins", []) - if self.app_context.config and "topic" in self.app_context.config: - topic_filter_list = self.app_context.config["topic"].get("plugins", []) + if self.app_context.config and "topic-check" in self.app_context.config: + topic_filter_list = self.app_context.config["topic-check"].get("plugins", []) ep: EntryPoints | list[EntryPoint] = [] if hasattr(entry_points(), "select"): diff --git a/docs/overrides/base.html b/docs/overrides/base.html new file mode 100644 index 0000000..0af326a --- /dev/null +++ b/docs/overrides/base.html @@ -0,0 +1,8 @@ +{% extends "base.html" %} + +{% block outdated %} + You're not viewing the latest version. + + Click here to go to latest. + +{% endblock %} diff --git a/mkdocs.rtd.yml b/mkdocs.rtd.yml index 7f248d9..b4c1262 100644 --- a/mkdocs.rtd.yml +++ b/mkdocs.rtd.yml @@ -56,6 +56,7 @@ nav: theme: name: material logo: assets/amqtt_bw.svg + custom_dir: overrides features: - announce.dismiss - content.action.edit diff --git a/samples/broker_acl.py b/samples/broker_acl.py index c4c740f..7382ffd 100644 --- a/samples/broker_acl.py +++ b/samples/broker_acl.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check acl plugin +""" + logger = logging.getLogger(__name__) config = { @@ -38,15 +42,43 @@ config = { }, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main() -> None: + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/broker_simple.py b/samples/broker_simple.py index 22c4639..751bdf6 100644 --- a/samples/broker_simple.py +++ b/samples/broker_simple.py @@ -1,21 +1,31 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.broker import Broker -broker = Broker() +""" +This sample shows how to run a broker +""" + +formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" +logging.basicConfig(level=logging.INFO, format=formatter) -async def test_coro() -> None: - await broker.start() +async def run_server() -> None: + broker = Broker() + try: + await broker.start() + while True: + await asyncio.sleep(1) + except CancelledError: + await broker.shutdown() +def __main__(): + try: + asyncio.run(run_server()) + except KeyboardInterrupt: + print("Server exiting...") if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - - asyncio.get_event_loop().run_until_complete(test_coro()) - try: - asyncio.get_event_loop().run_forever() - except KeyboardInterrupt: - asyncio.get_event_loop().run_until_complete(broker.shutdown()) + __main__() diff --git a/samples/broker_start.py b/samples/broker_start.py index 1055ff1..578f704 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker without stacktraces on keyboard interrupt +""" + logger = logging.getLogger(__name__) config = { @@ -30,19 +34,41 @@ config = { "topic-check": {"enabled": False}, } -broker = Broker(config) +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -async def test_coro() -> None: - await broker.start() - # await asyncio.sleep(5) - # await broker.shutdown() +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - # formatter = "%(asctime)s :: %(levelname)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() - + __main__() diff --git a/samples/broker_taboo.py b/samples/broker_taboo.py index 4a1107b..7752469 100644 --- a/samples/broker_taboo.py +++ b/samples/broker_taboo.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check taboo plugin +""" + logger = logging.getLogger(__name__) config = { @@ -30,15 +34,44 @@ config = { "topic-check": {"enabled": True, "plugins": ["topic_taboo"]}, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 2e11b9a..1bb960c 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -1,13 +1,12 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.client import MQTTClient -# -# This sample shows a client running idle. -# Meanwhile, keepalive is managed through PING messages sent every 5 seconds -# - +""" +This sample shows how to run an idle client +""" logger = logging.getLogger(__name__) @@ -15,17 +14,39 @@ config = { "keep_alive": 5, "ping_delay": 1, } -C = MQTTClient(config=config) + +async def main() -> None: + client = MQTTClient(config=config) + + try: + await client.connect("mqtt://test.mosquitto.org:1883/") + logger.info("client connected") + await asyncio.sleep(18) + except CancelledError: + pass + + await client.disconnect() -async def test_coro() -> None: - await C.connect("mqtt://test.mosquitto.org:1883/") - await asyncio.sleep(18) +def __main__(): - await C.disconnect() + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping client...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + + loop.close() if __name__ == "__main__": - formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish.py b/samples/client_publish.py index 9b0f4f6..83b1eda 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -4,10 +4,9 @@ import logging from amqtt.client import ConnectError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker using different QOS +""" logger = logging.getLogger(__name__) @@ -21,36 +20,39 @@ config = { } -async def test_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") +async def test_coro1() -> None: + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) - logger.info("messages published") - await C.disconnect() + logger.info("test_coro1 messages published") + await client.disconnect() async def test_coro2() -> None: try: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org:1883/") - await C.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) - logger.info("messages published") - await C.disconnect() + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org:1883/") + await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) + logger.info("test_coro2 messages published") + await client.disconnect() except ConnectError: logger.exception(f"Connection failed", exc_info=True) - asyncio.get_event_loop().stop() +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro1()) + asyncio.run(test_coro2()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_until_complete(test_coro2()) + __main__() diff --git a/samples/client_publish_acl.py b/samples/client_publish_acl.py index 2ecf33e..43148d7 100644 --- a/samples/client_publish_acl.py +++ b/samples/client_publish_acl.py @@ -2,37 +2,41 @@ import asyncio import logging from amqtt.client import ConnectError, MQTTClient +from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker running either `samples/broker_acl.py` + or `samples/broker_taboo.py`. +""" logger = logging.getLogger(__name__) async def test_coro() -> None: try: - C = MQTTClient() - await C.connect("mqtt://0.0.0.0:1883") - await C.publish("data/classified", b"TOP SECRET", qos=0x01) - await C.publish("data/memes", b"REAL FUN", qos=0x01) - await C.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=0x01) - await C.publish( + client = MQTTClient() + await client.connect("mqtt://0.0.0.0:1883") + await client.publish("data/classified", b"TOP SECRET", qos=QOS_1) + await client.publish("data/memes", b"REAL FUN", qos=QOS_1) + await client.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=QOS_1) + await client.publish( "repositories/amqtt/devel", b"THIS NEEDS TO BE CHECKED", - qos=0x01, + qos=QOS_1, ) - await C.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=0x01) + await client.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=QOS_1) logger.info("messages published") - await C.disconnect() + await client.disconnect() except ConnectError as ce: - logger.exception("Connection failed") - asyncio.get_event_loop().stop() + logger.exception("ERROR: Connection failed") +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish_ssl.py b/samples/client_publish_ssl.py index 0f2de13..60c3acc 100644 --- a/samples/client_publish_ssl.py +++ b/samples/client_publish_ssl.py @@ -1,41 +1,45 @@ import asyncio import logging +from pathlib import Path from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure broker. +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, - }, + } } -C = MQTTClient(config=config) -# C = MQTTClient() + +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("mqtts://test.mosquitto.org/", cafile="mosquitto.org.crt") + + await client.connect("mqtts://broker.hivemq.com:8883") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_publish_ws.py b/samples/client_publish_ws.py index 4c0fa51..f9d96c9 100644 --- a/samples/client_publish_ws.py +++ b/samples/client_publish_ws.py @@ -4,39 +4,39 @@ import logging from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure websocket broker +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, - }, - "capath": ".", + } } -C = MQTTClient(config=config) -# C = MQTTClient() +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("wss://test.mosquitto.org:8081/", cafile="mosquitto.org.crt") + await client.connect("wss://test.mosquitto.org:8081/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe.py b/samples/client_subscribe.py index 3d175d6..642e66e 100644 --- a/samples/client_subscribe.py +++ b/samples/client_subscribe.py @@ -4,20 +4,19 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# + +""" +This sample shows how to subscribe to different $SYS topics and how to receive incoming messages +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org/") + + await client.subscribe( [ ("$SYS/broker/uptime", QOS_1), ("$SYS/broker/load/#", QOS_2), @@ -25,16 +24,20 @@ async def uptime_coro() -> None: ) logger.info("Subscribed") try: - for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + for _i in range(1, 10): + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(uptime_coro()) + asyncio.run(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe_acl.py b/samples/client_subscribe_acl.py index 1810b6b..f133a0b 100644 --- a/samples/client_subscribe_acl.py +++ b/samples/client_subscribe_acl.py @@ -4,41 +4,45 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# +""" +Run `samples/broker_acl.py` or `samples/broker_taboo.py` + +This sample shows how to subscribe to different topics, some of which are allowed. +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test:test@0.0.0.0:1883") - # await C.connect('mqtt://0.0.0.0:1883') - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test:test@0.0.0.0:1883") + + result = await client.subscribe( [ + ("$SYS/#", QOS_1), # Topic forbidden when running `broker_acl.py` ("data/memes", QOS_1), # Topic allowed ("data/classified", QOS_1), # Topic forbidden ("repositories/amqtt/master", QOS_1), # Topic allowed - ("repositories/amqtt/devel", QOS_1), # Topic forbidden + ("repositories/amqtt/devel", QOS_1), # Topic forbidden when running `broker_acl.py` ("calendar/amqtt/releases", QOS_1), # Topic allowed ], ) - logger.info("Subscribed") + logger.info(f"Subscribed results: {result}") try: for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/#", "data/memes"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError as ce: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.get_event_loop().run_until_complete(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/mosquitto.org.crt b/samples/mosquitto.org.crt index b8535e8..e76dbd8 100644 --- a/samples/mosquitto.org.crt +++ b/samples/mosquitto.org.crt @@ -1,18 +1,24 @@ -----BEGIN CERTIFICATE----- -MIIC8DCCAlmgAwIBAgIJAOD63PlXjJi8MA0GCSqGSIb3DQEBBQUAMIGQMQswCQYD -VQQGEwJHQjEXMBUGA1UECAwOVW5pdGVkIEtpbmdkb20xDjAMBgNVBAcMBURlcmJ5 -MRIwEAYDVQQKDAlNb3NxdWl0dG8xCzAJBgNVBAsMAkNBMRYwFAYDVQQDDA1tb3Nx -dWl0dG8ub3JnMR8wHQYJKoZIhvcNAQkBFhByb2dlckBhdGNob28ub3JnMB4XDTEy -MDYyOTIyMTE1OVoXDTIyMDYyNzIyMTE1OVowgZAxCzAJBgNVBAYTAkdCMRcwFQYD -VQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwGA1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1v -c3F1aXR0bzELMAkGA1UECwwCQ0ExFjAUBgNVBAMMDW1vc3F1aXR0by5vcmcxHzAd -BgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hvby5vcmcwgZ8wDQYJKoZIhvcNAQEBBQAD -gY0AMIGJAoGBAMYkLmX7SqOT/jJCZoQ1NWdCrr/pq47m3xxyXcI+FLEmwbE3R9vM -rE6sRbP2S89pfrCt7iuITXPKycpUcIU0mtcT1OqxGBV2lb6RaOT2gC5pxyGaFJ+h -A+GIbdYKO3JprPxSBoRponZJvDGEZuM3N7p3S/lRoi7G5wG5mvUmaE5RAgMBAAGj -UDBOMB0GA1UdDgQWBBTad2QneVztIPQzRRGj6ZHKqJTv5jAfBgNVHSMEGDAWgBTa -d2QneVztIPQzRRGj6ZHKqJTv5jAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUA -A4GBAAqw1rK4NlRUCUBLhEFUQasjP7xfFqlVbE2cRy0Rs4o3KS0JwzQVBwG85xge -REyPOFdGdhBY2P1FNRy0MDr6xr+D2ZOwxs63dG1nnAnWZg7qwoLgpZ4fESPD3PkA -1ZgKJc2zbSQ9fCPxt2W3mdVav66c6fsb7els2W2Iz7gERJSX +MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL +BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG +A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU +BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv +by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE +BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES +MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp +dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg +UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW +Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA +s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH +3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo +E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT +MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV +6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC +6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf ++pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK +sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839 +LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE +m/XriWr/Cq4h/JfB7NTsezVslgkBaoU= -----END CERTIFICATE----- diff --git a/tests/test_samples.py b/tests/test_samples.py new file mode 100644 index 0000000..49aa5b4 --- /dev/null +++ b/tests/test_samples.py @@ -0,0 +1,146 @@ +import asyncio +import logging +import signal +import subprocess +from pathlib import Path + +import pytest + +from amqtt.broker import Broker +from samples.client_publish import __main__ as client_publish_main +from samples.client_publish_ssl import __main__ as client_publish_ssl_main +from samples.client_publish_ws import __main__ as client_publish_ws_main +from samples.client_subscribe import __main__ as client_subscribe_main +from samples.client_keepalive import __main__ as client_keepalive_main +from samples.broker_acl import config as broker_acl_config +from samples.broker_taboo import config as broker_taboo_config + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_broker_acl(): + broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py" + process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_simple(): + broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_start(): + broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" + process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_taboo(): + broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" + process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.timeout(20) +def test_client_keepalive(): + client_keepalive_main() + + +def test_client_publish(): + client_publish_main() + + +def test_client_publish_ssl(): + client_publish_ssl_main() + + +@pytest.mark.asyncio +async def test_client_publish_acl(): + + broker = Broker() + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +def test_client_publish_ws(): + client_publish_ws_main() + + +def test_client_subscribe(): + client_subscribe_main() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_acl(): + broker = Broker(config=broker_acl_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [128, 1, 128, 1, 128, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_taboo(): + broker = Broker(config=broker_taboo_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(4) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [1, 1, 128, 1, 1, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() \ No newline at end of file From d4cc024719f3419d61f5e7bd6c113dca68757ed1 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 18:12:05 -0400 Subject: [PATCH 04/19] fix readme file image src so it works on pypi --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cb8b2fc..97e6724 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ![Python Wheel](https://img.shields.io/pypi/wheel/amqtt?style=plastic) [![PyPI](https://img.shields.io/pypi/v/amqtt?style=plastic&logo=python&logoColor=yellow)](https://pypi.org/project/amqtt/) -![docs/assets/amqtt.svg](docs/assets/amqtt.svg) +![docs/assets/amqtt.svg](https://raw.githubusercontent.com/Yakifo/amqtt/refs/tags/v0.11.0/docs/assets/amqtt.svg) `aMQTT` is an open source [MQTT](http://www.mqtt.org) broker and client[^1], natively implemented with Python's [asyncio](https://docs.python.org/3/library/asyncio.html). From 9ba7c8019b5a25ea5aaf1ed70d4e91035226e40e Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 18:22:32 -0400 Subject: [PATCH 05/19] test case fix and show debug message on failure --- tests/test_samples.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/test_samples.py b/tests/test_samples.py index 49aa5b4..e4612bf 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -22,10 +22,11 @@ async def test_broker_acl(): broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py" process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -35,10 +36,11 @@ async def test_broker_simple(): broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -48,10 +50,11 @@ async def test_broker_start(): broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() - assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -61,9 +64,10 @@ async def test_broker_taboo(): broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -91,9 +95,10 @@ async def test_client_publish_acl(): broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) assert "ERROR" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8") @@ -116,7 +121,7 @@ async def test_client_subscribe_plugin_acl(): broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -135,7 +140,7 @@ async def test_client_subscribe_plugin_taboo(): broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(4) + await asyncio.sleep(5) process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) From e0cb182957282b0ef5175b663cba37bc5e78d77e Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 18:32:35 -0400 Subject: [PATCH 06/19] broker_simple.py has different exit messages depending on python version --- tests/test_samples.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_samples.py b/tests/test_samples.py index e4612bf..1932bc5 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -40,9 +40,10 @@ async def test_broker_simple(): process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) - assert "Broker closed" in stderr.decode("utf-8") - assert "ERROR" not in stderr.decode("utf-8") - assert "Exception" not in stderr.decode("utf-8") + has_broker_closed = "Broker closed" in stderr.decode("utf-8") + has_loop_stopped = "Broadcast loop stopped by exception" in stderr.decode("utf-8") + + assert has_broker_closed or has_loop_stopped, "Broker didn't close correctly." @pytest.mark.asyncio From 0aa7274069dde7ef8680d953f415d1eeb4025ba3 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Tue, 17 Jun 2025 19:57:06 -0400 Subject: [PATCH 07/19] update the readthedocs shield --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 97e6724..f6684cf 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![MIT licensed](https://img.shields.io/github/license/Yakifo/amqtt?style=plastic)](https://amqtt.readthedocs.io/en/latest/) [![CI](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml/badge.svg?branch=rc)](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml) [![CodeQL](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml) -[![Documentation Status](https://img.shields.io/readthedocs/amqtt?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/en/latest/) +![Read the Docs](https://img.shields.io/readthedocs/amqtt/v0.11.0?style=plastic&logo=readthedocs)(https://amqtt.readthedocs.io/) [![](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3) ![Python Version](https://img.shields.io/pypi/pyversions/amqtt?style=plastic&logo=python&logoColor=yellow) ![Python Wheel](https://img.shields.io/pypi/wheel/amqtt?style=plastic) From 2cbdc86eeaddee99fe9eeed68a2a871f39275295 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sun, 22 Jun 2025 13:03:02 -0400 Subject: [PATCH 08/19] adding number of clients connected --- docs_test/src/dashboard/components/MainGrid.tsx | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs_test/src/dashboard/components/MainGrid.tsx b/docs_test/src/dashboard/components/MainGrid.tsx index b27b2ee..471fb50 100644 --- a/docs_test/src/dashboard/components/MainGrid.tsx +++ b/docs_test/src/dashboard/components/MainGrid.tsx @@ -20,6 +20,7 @@ export default function MainGrid() { const [received, setReceived] = useState([]); const [bytesIn, setBytesIn] = useState([]); const [bytesOut, setBytesOut] = useState([]); + const [clientsConnected, setClientsConnected] = useState([]); const [serverStart, setServerStart] = useState(''); const [serverUptime, setServerUptime] = useState(''); @@ -63,6 +64,7 @@ export default function MainGrid() { mqttSubscribe('$SYS/broker/load/bytes/#'); mqttSubscribe('$SYS/broker/uptime/formatted'); mqttSubscribe('$SYS/broker/uptime'); + mqttSubscribe('$SYS/broker/clients/connected'); } }, [isConnected, mqttSubscribe]); @@ -97,6 +99,13 @@ export default function MainGrid() { value: d } setBytesOut(bytesOut => [...bytesOut, newPoint]); + } else if (payload.topic === '$SYS/broker/clients/connected') { + const newPoint: DataPoint = { + timestamp: new Date().toISOString(), + value: d + } + setClientsConnected(clientsConnected => [...clientsConnected, newPoint]); + } else if (payload.topic === '$SYS/broker/uptime/formatted') { const dt = new Date(d + "Z"); setServerStart(dt.toLocaleString()); @@ -237,6 +246,9 @@ export default function MainGrid() { + + + From 4ee3ece28bd68579b7c7996e0c25d37fcc6d9663 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 17:56:53 -0400 Subject: [PATCH 09/19] fixes Yakifo/amqtt#232 : adding system metrics about the broker to the --- amqtt/codecs_amqtt.py | 8 ++++++++ amqtt/plugins/sys/broker.py | 40 ++++++++++++++++++++++++++++++++++--- pyproject.toml | 1 + tests/plugins/test_sys.py | 6 +++++- uv.lock | 2 ++ 5 files changed, 53 insertions(+), 4 deletions(-) diff --git a/amqtt/codecs_amqtt.py b/amqtt/codecs_amqtt.py index 3e425a3..81b2661 100644 --- a/amqtt/codecs_amqtt.py +++ b/amqtt/codecs_amqtt.py @@ -1,4 +1,5 @@ import asyncio +from decimal import ROUND_HALF_UP, Decimal from struct import pack, unpack from amqtt.adapters import ReaderAdapter @@ -139,3 +140,10 @@ def int_to_bytes_str(value: int) -> bytes: :return: bytes array. """ return str(value).encode("utf-8") + + +def float_to_bytes_str(value: float, places:int=3) -> bytes: + """Convert an float value to a bytes array containing the numeric character.""" + quant = Decimal(f"0.{"".join(['0' for i in range(places-1)])}1") + rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP) + return str(rounded).encode("utf-8") diff --git a/amqtt/plugins/sys/broker.py b/amqtt/plugins/sys/broker.py index 99d3166..b0ba07f 100644 --- a/amqtt/plugins/sys/broker.py +++ b/amqtt/plugins/sys/broker.py @@ -1,6 +1,8 @@ import asyncio from collections import deque # pylint: disable=C0412 -from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 +from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 + +import psutil from amqtt.plugins.base import BasePlugin from amqtt.session import Session @@ -26,7 +28,7 @@ except ImportError: import amqtt from amqtt.broker import BrokerContext -from amqtt.codecs_amqtt import int_to_bytes_str +from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader DOLLAR_SYS_ROOT = "$SYS/broker/" @@ -40,17 +42,35 @@ STAT_START_TIME = "start_time" STAT_CLIENTS_MAXIMUM = "clients_maximum" STAT_CLIENTS_CONNECTED = "clients_connected" STAT_CLIENTS_DISCONNECTED = "clients_disconnected" +MEMORY_USAGE_MAXIMUM = "memory_maximum" +CPU_USAGE_MAXIMUM = "cpu_usage_maximum" +CPU_USAGE_LAST = "cpu_usage_last" PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] +def val_to_bytes_str(value: Any) -> bytes: + """Convert an int, float or string to byte string.""" + match value: + case int(): + return int_to_bytes_str(value) + case float(): + return float_to_bytes_str(value) + case str(): + return value.encode("utf-8") + case _: + msg = f"Unsupported type {type(value)}" + raise NotImplementedError(msg) + + class BrokerSysPlugin(BasePlugin[BrokerContext]): def __init__(self, context: BrokerContext) -> None: super().__init__(context) # Broker statistics initialization self._stats: dict[str, int] = {} self._sys_handle: asyncio.Handle | None = None + self._current_process = psutil.Process() def _clear_stats(self) -> None: """Initialize broker statistics data structures.""" @@ -64,6 +84,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): STAT_CLIENTS_DISCONNECTED, STAT_PUBLISH_RECEIVED, STAT_PUBLISH_SENT, + MEMORY_USAGE_MAXIMUM, + CPU_USAGE_MAXIMUM ): self._stats[stat] = 0 @@ -127,6 +149,14 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): messages_stored += session.retained_messages_count messages_stored += len(self.context.retained_messages) subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values()) + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected + + cpu_usage = self._current_process.cpu_percent(interval=0) + self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage) + + mem_info_usage = self._current_process.memory_full_info() + mem_size = mem_info_usage.rss / (1024 ** 2) + self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size) # Broadcast updates tasks: deque[asyncio.Task[None]] = deque() @@ -150,9 +180,13 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): "messages/publish/sent": self._stats[STAT_PUBLISH_SENT], "messages/retained/count": len(self.context.retained_messages), "messages/subscriptions/count": subscriptions_count, + "heap/size": mem_size, + "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM], + "cpu/percent": cpu_usage, + "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM], } for stat_name, stat_value in stats.items(): - data: bytes = int_to_bytes_str(stat_value) if isinstance(stat_value, int) else stat_value.encode("utf-8") + data: bytes = val_to_bytes_str(stat_value) tasks.append(self.schedule_broadcast_sys_topic(stat_name, data)) # Wait until broadcasting tasks end diff --git a/pyproject.toml b/pyproject.toml index d8a2a61..462cd4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "passlib==1.7.4", # https://pypi.org/project/passlib "PyYAML==6.0.2", # https://pypi.org/project/PyYAML "typer==0.15.4", + "psutil>=7.0.0", ] [dependency-groups] diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index c01d322..563a313 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -31,7 +31,11 @@ all_sys_topics = [ '$SYS/broker/messages/publish/received', '$SYS/broker/messages/publish/sent', '$SYS/broker/messages/retained/count', - '$SYS/broker/messages/subscriptions/count' + '$SYS/broker/messages/subscriptions/count', + '$SYS/broker/heap/size', + '$SYS/broker/heap/maximum', + '$SYS/broker/cpu/percent', + '$SYS/broker/cpu/maximum', ] diff --git a/uv.lock b/uv.lock index 4501155..6dab78d 100644 --- a/uv.lock +++ b/uv.lock @@ -13,6 +13,7 @@ version = "0.11.0" source = { editable = "." } dependencies = [ { name = "passlib" }, + { name = "psutil" }, { name = "pyyaml" }, { name = "transitions" }, { name = "typer" }, @@ -67,6 +68,7 @@ docs = [ requires-dist = [ { name = "coveralls", marker = "extra == 'ci'", specifier = "==4.0.1" }, { name = "passlib", specifier = "==1.7.4" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "pyyaml", specifier = "==6.0.2" }, { name = "transitions", specifier = "==0.9.2" }, { name = "typer", specifier = "==0.15.4" }, From 9b8a1d59a1b5e0f3db0512bc92a071463c17b0ae Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 18:07:51 -0400 Subject: [PATCH 10/19] adjusting syntax to support python 3.10 --- amqtt/codecs_amqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amqtt/codecs_amqtt.py b/amqtt/codecs_amqtt.py index 81b2661..1db2d9c 100644 --- a/amqtt/codecs_amqtt.py +++ b/amqtt/codecs_amqtt.py @@ -144,6 +144,6 @@ def int_to_bytes_str(value: int) -> bytes: def float_to_bytes_str(value: float, places:int=3) -> bytes: """Convert an float value to a bytes array containing the numeric character.""" - quant = Decimal(f"0.{"".join(['0' for i in range(places-1)])}1") + quant = Decimal(f"0.{''.join(['0' for i in range(places-1)])}1") rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP) return str(rounded).encode("utf-8") From 1bb78ef5d2bf13e0f010d4a90f29ea2e1ab0f194 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 18:52:28 -0400 Subject: [PATCH 11/19] release candidate version and changelog updates for 0.11.1 --- amqtt/__init__.py | 2 +- docs/changelog.md | 7 +++++++ pyproject.toml | 2 +- uv.lock | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/amqtt/__init__.py b/amqtt/__init__.py index ef244ad..0eaf8fa 100644 --- a/amqtt/__init__.py +++ b/amqtt/__init__.py @@ -1,3 +1,3 @@ """INIT.""" -__version__ = "0.11.0" +__version__ = "0.11.1rc0" diff --git a/docs/changelog.md b/docs/changelog.md index 0765167..4f1e43a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 0.11.1 + +- [PR #226](https://github.com/Yakifo/amqtt/pull/226) Consolidate super classes for plugins +- [PR #227](https://github.com/Yakifo/amqtt/pull/227) Update sample files +- [PR #229](https://github.com/Yakifo/amqtt/pull/229) & [PR #228](https://github.com/Yakifo/amqtt/pull/228) Broken pypi and test.amqtt.io links +- [PR #232](https://github.com/Yakifo/amqtt/pull/234) $SYS additions for cpu & mem. + ## 0.11.0 - upgrades to support python 3.10, 3.11, 3.12 and 3.13 diff --git a/pyproject.toml b/pyproject.toml index d8a2a61..6af59e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.13" ] -version = "0.11.0" +version = "0.11.1rc0" requires-python = ">=3.10.0" readme = "README.md" license = { text = "MIT" } diff --git a/uv.lock b/uv.lock index 4501155..7c853df 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ [[package]] name = "amqtt" -version = "0.11.0" +version = "0.11.1rc0" source = { editable = "." } dependencies = [ { name = "passlib" }, From 03ea5aab168dada89a12f8af1a56e250d59517ec Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 19:49:50 -0400 Subject: [PATCH 12/19] fixed broken link --- README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f6684cf..99b7639 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ [![MIT licensed](https://img.shields.io/github/license/Yakifo/amqtt?style=plastic)](https://amqtt.readthedocs.io/en/latest/) [![CI](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml/badge.svg?branch=rc)](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml) [![CodeQL](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml) -![Read the Docs](https://img.shields.io/readthedocs/amqtt/v0.11.0?style=plastic&logo=readthedocs)(https://amqtt.readthedocs.io/) -[![](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3) +[![Read the Docs](https://img.shields.io/readthedocs/amqtt/v0.11.0?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/) +[![Discord](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3) ![Python Version](https://img.shields.io/pypi/pyversions/amqtt?style=plastic&logo=python&logoColor=yellow) ![Python Wheel](https://img.shields.io/pypi/wheel/amqtt?style=plastic) [![PyPI](https://img.shields.io/pypi/v/amqtt?style=plastic&logo=python&logoColor=yellow)](https://pypi.org/project/amqtt/) @@ -50,13 +50,14 @@ Bug reports, patches and suggestions welcome! Just [open an issue](https://githu ## Python Version Compatibility -| Version | hbmqtt compatibility | Supported Python Versions | PyPi Release | -| ------- | -------------------- | ------------------------- | ------------ | -| 0.10.x | yes [^2] | 3.7 - 3.9 | 0.10.1 | -| 0.11.x | no [^3] | 3.10 - 3.13 | 0.11.0 | +| Version | hbmqtt compatibility | Supported Python Versions | +| ------- | -------------------- | ------------------------- | +| 0.10.x | yes [^2] | 3.7 - 3.9 | +| 0.11.x | no [^3] | 3.10 - 3.13 | -For a full feature roadmap, see ... [^1]: Forked from [HBMQTT](https://github.com/beerfactory/hbmqtt) after it was deprecated by the original author. + [^2]: drop-in replacement + [^3]: module renamed and small API differences From bb76dbeec99a05c73eff947d4e7eedfbef4280ab Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 22:23:20 -0400 Subject: [PATCH 13/19] fixing config parameters for MQTTClient and updating docs to match. refining sample scripts to rely less on accessing external broker for successful completion --- .gitignore | 1 + amqtt/client.py | 6 ++- docs/references/client_config.md | 7 +++ samples/client_keepalive.py | 2 +- samples/client_publish_ssl.py | 10 +++- samples/client_publish_ws.py | 2 +- tests/test_samples.py | 85 +++++++++++++++++++++++++++----- 7 files changed, 96 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index dd30c86..1d775a3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__ node_modules .vite +*.pem #------- Environment Files ------- .python-version diff --git a/amqtt/client.py b/amqtt/client.py index 5154a98..dcb61d6 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -463,10 +463,12 @@ class MQTTClient: capath=self.session.capath, cadata=self.session.cadata, ) - if "certfile" in self.config and "keyfile" in self.config: - sc.load_cert_chain(self.config["certfile"], self.config["keyfile"]) + + if "certfile" in self.config: + sc.load_verify_locations(cafile=self.config["certfile"]) if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool): sc.check_hostname = self.config["check_hostname"] + sc.verify_mode = ssl.CERT_REQUIRED kwargs["ssl"] = sc try: diff --git a/docs/references/client_config.md b/docs/references/client_config.md index 24985d1..146b8dd 100644 --- a/docs/references/client_config.md +++ b/docs/references/client_config.md @@ -63,8 +63,15 @@ TLS certificates used to verify the broker's authenticity. - `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info. - `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/). - `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates. +- +- +### `certfile` *(string)* +Path to a single file in PEM format containing the certificate as well as any number of CA certificates needed to establish the server certificate's authenticity. +### `check_hostname` *(bool)* + +Bypass ssl host certificate verification, allowing self-signed certificates ## Default Configuration diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 1bb960c..6f02f17 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -21,7 +21,7 @@ async def main() -> None: try: await client.connect("mqtt://test.mosquitto.org:1883/") logger.info("client connected") - await asyncio.sleep(18) + await asyncio.sleep(15) except CancelledError: pass diff --git a/samples/client_publish_ssl.py b/samples/client_publish_ssl.py index 60c3acc..528c347 100644 --- a/samples/client_publish_ssl.py +++ b/samples/client_publish_ssl.py @@ -7,6 +7,9 @@ from amqtt.mqtt.constants import QOS_1, QOS_2 """ This sample shows how to publish messages to secure broker. + +Use `openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"` to +generate a self-signed certificate for the broker to use. """ logger = logging.getLogger(__name__) @@ -17,7 +20,10 @@ config = { "message": "Dead or alive", "qos": QOS_1, "retain": True, - } + }, + "auto_reconnect": False, + "check_hostname": False, + "certfile": "cert.pem", } client = MQTTClient(config=config) @@ -25,7 +31,7 @@ client = MQTTClient(config=config) async def test_coro() -> None: - await client.connect("mqtts://broker.hivemq.com:8883") + await client.connect("mqtts://localhost:8883") tasks = [ asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), diff --git a/samples/client_publish_ws.py b/samples/client_publish_ws.py index f9d96c9..e02c73e 100644 --- a/samples/client_publish_ws.py +++ b/samples/client_publish_ws.py @@ -22,7 +22,7 @@ client = MQTTClient(config=config) async def test_coro() -> None: - await client.connect("wss://test.mosquitto.org:8081/") + await client.connect("ws://localhost:8080/") tasks = [ asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), diff --git a/tests/test_samples.py b/tests/test_samples.py index 1932bc5..8a6e37e 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -8,8 +8,6 @@ import pytest from amqtt.broker import Broker from samples.client_publish import __main__ as client_publish_main -from samples.client_publish_ssl import __main__ as client_publish_ssl_main -from samples.client_publish_ws import __main__ as client_publish_ws_main from samples.client_subscribe import __main__ as client_subscribe_main from samples.client_keepalive import __main__ as client_keepalive_main from samples.broker_acl import config as broker_acl_config @@ -35,8 +33,9 @@ async def test_broker_acl(): async def test_broker_simple(): broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -50,8 +49,9 @@ async def test_broker_simple(): async def test_broker_start(): broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal to stop broker process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -64,8 +64,9 @@ async def test_broker_start(): async def test_broker_taboo(): broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal to stop broker process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -82,9 +83,43 @@ def test_client_keepalive(): def test_client_publish(): client_publish_main() +broker_ssl_config = { + "listeners": { + "default": { + "type": "tcp", + "bind": "0.0.0.0:8883", + "ssl": True, + "certfile": "cert.pem", + "keyfile": "key.pem", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} -def test_client_publish_ssl(): - client_publish_ssl_main() +@pytest.mark.asyncio +async def test_client_publish_ssl(): + + # generate a self-signed certificate for this test + cmd = 'openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"' + subprocess.run(cmd, shell=True, capture_output=True, text=True) + + # start a secure broker + broker = Broker(config=broker_ssl_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() @pytest.mark.asyncio @@ -92,12 +127,13 @@ async def test_client_publish_acl(): broker = Broker() await broker.start() + await asyncio.sleep(2) broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(5) - process.send_signal(signal.SIGINT) + await asyncio.sleep(2) + stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) assert "ERROR" not in stderr.decode("utf-8") @@ -105,9 +141,36 @@ async def test_client_publish_acl(): await broker.shutdown() +broker_ws_config = { + "listeners": { + "default": { + "type": "ws", + "bind": "0.0.0.0:8080", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} -def test_client_publish_ws(): - client_publish_ws_main() +@pytest.mark.asyncio +async def test_client_publish_ws(): + # start a secure broker + broker = Broker(config=broker_ws_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() def test_client_subscribe(): From e44321f179e5cbec7bc1b30e5aefc8dfd2bd01e4 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 12:37:38 -0400 Subject: [PATCH 14/19] additional clean up of samples and validation --- samples/client_keepalive.py | 15 +------------- samples/client_publish.py | 8 ++++---- tests/plugins/test_plugins.py | 6 +++--- tests/test_samples.py | 37 ++++++++++++++++++++++++++++++----- 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 6f02f17..13284c2 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -32,21 +32,8 @@ def __main__(): formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.run(main()) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - task = loop.create_task(main()) - - try: - loop.run_until_complete(task) - except KeyboardInterrupt: - logger.info("KeyboardInterrupt received. Stopping client...") - task.cancel() - loop.run_until_complete(task) # Ensure task finishes cleanup - finally: - - loop.close() if __name__ == "__main__": __main__() diff --git a/samples/client_publish.py b/samples/client_publish.py index 83b1eda..cd10ed1 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -14,7 +14,7 @@ config = { "will": { "topic": "/will/client", "message": b"Dead or alive", - "qos": 0x01, + "qos": QOS_1, "retain": True, }, } @@ -22,7 +22,7 @@ config = { async def test_coro1() -> None: client = MQTTClient() - await client.connect("mqtt://test.mosquitto.org/") + await client.connect("mqtt://localhost:1883/") tasks = [ asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), @@ -35,8 +35,8 @@ async def test_coro1() -> None: async def test_coro2() -> None: try: - client = MQTTClient() - await client.connect("mqtt://test.mosquitto.org:1883/") + client = MQTTClient(config={'auto_connect': False}) + await client.connect("mqtt://localhost:1884/") await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) diff --git a/tests/plugins/test_plugins.py b/tests/plugins/test_plugins.py index 5782ad7..bda597d 100644 --- a/tests/plugins/test_plugins.py +++ b/tests/plugins/test_plugins.py @@ -132,7 +132,7 @@ async def test_plugin_exception_while_loading() -> None: _ = Broker(plugin_namespace='tests.mock_plugins', config=config) -class TestAllEventsPlugin(BasePlugin[BaseContext]): +class AllEventsPlugin(BasePlugin[BaseContext]): """A plugin to verify all events get sent to plugins.""" def __init__(self, context: BaseContext) -> None: super().__init__(context) @@ -161,7 +161,7 @@ async def test_all_plugin_events(): match group: case 'tests.mock_plugins': return [ - EntryPoint(name='TestAllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:TestAllEventsPlugin'), + EntryPoint(name='AllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:AllEventsPlugin'), ] case _: return list() @@ -192,7 +192,7 @@ async def test_all_plugin_events(): await asyncio.sleep(1) # get the plugin so it doesn't get gc on shutdown - test_plugin = broker.plugins_manager.get_plugin('TestAllEventsPlugin') + test_plugin = broker.plugins_manager.get_plugin('AllEventsPlugin') await broker.shutdown() await asyncio.sleep(1) diff --git a/tests/test_samples.py b/tests/test_samples.py index 8a6e37e..d2798da 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -75,13 +75,40 @@ async def test_broker_taboo(): assert "Exception" not in stderr.decode("utf-8") -@pytest.mark.timeout(20) -def test_client_keepalive(): - client_keepalive_main() +@pytest.mark.timeout(25) +@pytest.mark.asyncio +async def test_client_keepalive(): + + broker = Broker() + await broker.start() + await asyncio.sleep(2) + + keep_alive_script = Path(__file__).parent.parent / "samples/client_keepalive.py" + process = subprocess.Popen(["python", keep_alive_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() -def test_client_publish(): - client_publish_main() +@pytest.mark.asyncio +async def test_client_publish(): + broker = Broker() + await broker.start() + await asyncio.sleep(2) + + client_publish = Path(__file__).parent.parent / "samples/client_publish.py" + process = subprocess.Popen(["python", client_publish], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() broker_ssl_config = { "listeners": { From 1608edff91aa3e14b188c3088971528f15326543 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 13:00:40 -0400 Subject: [PATCH 15/19] add setting for connection timeout --- amqtt/client.py | 21 +++++++++++++-------- docs/references/client_config.md | 5 +++++ tests/test_client.py | 9 +++++++++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/amqtt/client.py b/amqtt/client.py index 5154a98..a11c5af 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -456,6 +456,8 @@ class MQTTClient: # if not self._handler: self._handler = ClientProtocolHandler(self.plugins_manager) + connection_timeout = self.config.get('connection_timeout', None) + if secure: sc = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, @@ -476,21 +478,24 @@ class MQTTClient: # Open connection if scheme in ("mqtt", "mqtts"): - conn_reader, conn_writer = await asyncio.open_connection( + conn_reader, conn_writer = await asyncio.wait_for( + asyncio.open_connection( self.session.remote_address, self.session.remote_port, **kwargs, - ) + ), timeout=connection_timeout) reader = StreamReaderAdapter(conn_reader) writer = StreamWriterAdapter(conn_writer) elif scheme in ("ws", "wss") and self.session.broker_uri: - websocket: ClientConnection = await websockets.connect( - self.session.broker_uri, - subprotocols=[websockets.Subprotocol("mqtt")], - additional_headers=self.additional_headers, - **kwargs, - ) + websocket: ClientConnection = await asyncio.wait_for( + websockets.connect( + self.session.broker_uri, + subprotocols=[websockets.Subprotocol("mqtt")], + additional_headers=self.additional_headers, + **kwargs, + ), timeout=connection_timeout) + reader = WebSocketsReader(websocket) writer = WebSocketsWriter(websocket) elif not self.session.broker_uri: diff --git a/docs/references/client_config.md b/docs/references/client_config.md index 24985d1..e061ac5 100644 --- a/docs/references/client_config.md +++ b/docs/references/client_config.md @@ -25,6 +25,11 @@ Default retain value to messages published. Defaults to `false`. Enable or disable auto-reconnect if connection with the broker is interrupted. Defaults to `false`. + +### `connect_timeout` *(int)* + +If specified, the number of seconds before a connection times out + ### `reconnect_retries` *(int)* Maximum reconnection retries. Defaults to `2`. Negative value will cause client to reconnect infinitely. diff --git a/tests/test_client.py b/tests/test_client.py index 8c426d7..28f5c49 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -446,6 +446,15 @@ async def test_connect_incorrect_scheme(): await client.connect('"mq://someplace') +@pytest.mark.asyncio +@pytest.mark.timeout(3) +async def test_connect_timeout(): + config = {"auto_reconnect": False, "connection_timeout": 2} + client = MQTTClient(config=config) + with pytest.raises(ClientError): + await client.connect("mqtt://localhost:8888") + + async def test_client_no_auth(): class MockEntryPoints: From 311dec77748643c96bc19685ed39a190aa1377c5 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 13:08:24 -0400 Subject: [PATCH 16/19] lint fixing --- amqtt/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/amqtt/client.py b/amqtt/client.py index a11c5af..c01e347 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -456,7 +456,7 @@ class MQTTClient: # if not self._handler: self._handler = ClientProtocolHandler(self.plugins_manager) - connection_timeout = self.config.get('connection_timeout', None) + connection_timeout = self.config.get("connection_timeout", None) if secure: sc = ssl.create_default_context( @@ -480,10 +480,10 @@ class MQTTClient: if scheme in ("mqtt", "mqtts"): conn_reader, conn_writer = await asyncio.wait_for( asyncio.open_connection( - self.session.remote_address, - self.session.remote_port, - **kwargs, - ), timeout=connection_timeout) + self.session.remote_address, + self.session.remote_port, + **kwargs, + ), timeout=connection_timeout) reader = StreamReaderAdapter(conn_reader) writer = StreamWriterAdapter(conn_writer) From ecad0ce15bc5cd34248be75e7bd37d0aae8836fc Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 13:33:13 -0400 Subject: [PATCH 17/19] updating sample to demonstrate failure --- samples/client_publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/client_publish.py b/samples/client_publish.py index cd10ed1..135fb57 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -35,7 +35,7 @@ async def test_coro1() -> None: async def test_coro2() -> None: try: - client = MQTTClient(config={'auto_connect': False}) + client = MQTTClient(config={'auto_reconnect': False, 'connection_timeout': 1}) await client.connect("mqtt://localhost:1884/") await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) @@ -43,7 +43,7 @@ async def test_coro2() -> None: logger.info("test_coro2 messages published") await client.disconnect() except ConnectError: - logger.exception(f"Connection failed", exc_info=True) + logger.info(f"Connection failed", exc_info=True) def __main__(): From f0754a8ef7c8b468f422e36dd2ddd1f01e2aabc5 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 14:30:30 -0400 Subject: [PATCH 18/19] release 0.11.1 --- amqtt/__init__.py | 2 +- pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/amqtt/__init__.py b/amqtt/__init__.py index 0eaf8fa..672ae70 100644 --- a/amqtt/__init__.py +++ b/amqtt/__init__.py @@ -1,3 +1,3 @@ """INIT.""" -__version__ = "0.11.1rc0" +__version__ = "0.11.1" diff --git a/pyproject.toml b/pyproject.toml index 6edc91f..6856f53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.13" ] -version = "0.11.1rc0" +version = "0.11.1" requires-python = ">=3.10.0" readme = "README.md" license = { text = "MIT" } diff --git a/uv.lock b/uv.lock index 8471502..18071eb 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ [[package]] name = "amqtt" -version = "0.11.1rc0" +version = "0.11.1" source = { editable = "." } dependencies = [ { name = "passlib" }, From 1276503748ba75eb696e4cfd9e611f3a058346af Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Thu, 26 Jun 2025 14:37:48 -0400 Subject: [PATCH 19/19] fix mkdoc configuration issue --- mkdocs.rtd.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/mkdocs.rtd.yml b/mkdocs.rtd.yml index b4c1262..7f248d9 100644 --- a/mkdocs.rtd.yml +++ b/mkdocs.rtd.yml @@ -56,7 +56,6 @@ nav: theme: name: material logo: assets/amqtt_bw.svg - custom_dir: overrides features: - announce.dismiss - content.action.edit