diff --git a/.gitignore b/.gitignore index dd30c86..1d775a3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__ node_modules .vite +*.pem #------- Environment Files ------- .python-version diff --git a/README.md b/README.md index cb8b2fc..99b7639 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ [![MIT licensed](https://img.shields.io/github/license/Yakifo/amqtt?style=plastic)](https://amqtt.readthedocs.io/en/latest/) [![CI](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml/badge.svg?branch=rc)](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml) [![CodeQL](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml) -[![Documentation Status](https://img.shields.io/readthedocs/amqtt?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/en/latest/) -[![](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3) +[![Read the Docs](https://img.shields.io/readthedocs/amqtt/v0.11.0?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/) +[![Discord](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3) ![Python Version](https://img.shields.io/pypi/pyversions/amqtt?style=plastic&logo=python&logoColor=yellow) ![Python Wheel](https://img.shields.io/pypi/wheel/amqtt?style=plastic) [![PyPI](https://img.shields.io/pypi/v/amqtt?style=plastic&logo=python&logoColor=yellow)](https://pypi.org/project/amqtt/) -![docs/assets/amqtt.svg](docs/assets/amqtt.svg) +![docs/assets/amqtt.svg](https://raw.githubusercontent.com/Yakifo/amqtt/refs/tags/v0.11.0/docs/assets/amqtt.svg) `aMQTT` is an open source [MQTT](http://www.mqtt.org) broker and client[^1], natively implemented with Python's [asyncio](https://docs.python.org/3/library/asyncio.html). @@ -50,13 +50,14 @@ Bug reports, patches and suggestions welcome! Just [open an issue](https://githu ## Python Version Compatibility -| Version | hbmqtt compatibility | Supported Python Versions | PyPi Release | -| ------- | -------------------- | ------------------------- | ------------ | -| 0.10.x | yes [^2] | 3.7 - 3.9 | 0.10.1 | -| 0.11.x | no [^3] | 3.10 - 3.13 | 0.11.0 | +| Version | hbmqtt compatibility | Supported Python Versions | +| ------- | -------------------- | ------------------------- | +| 0.10.x | yes [^2] | 3.7 - 3.9 | +| 0.11.x | no [^3] | 3.10 - 3.13 | -For a full feature roadmap, see ... [^1]: Forked from [HBMQTT](https://github.com/beerfactory/hbmqtt) after it was deprecated by the original author. + [^2]: drop-in replacement + [^3]: module renamed and small API differences diff --git a/amqtt/__init__.py b/amqtt/__init__.py index ef244ad..672ae70 100644 --- a/amqtt/__init__.py +++ b/amqtt/__init__.py @@ -1,3 +1,3 @@ """INIT.""" -__version__ = "0.11.0" +__version__ = "0.11.1" diff --git a/amqtt/broker.py b/amqtt/broker.py index 12d88a6..fe949e6 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -3,7 +3,6 @@ from asyncio import CancelledError, futures from collections import deque from collections.abc import Generator import copy -from enum import Enum from functools import partial import logging from pathlib import Path @@ -23,6 +22,7 @@ from amqtt.adapters import ( WebSocketsWriter, WriterAdapter, ) +from amqtt.contexts import Action, BaseContext from amqtt.errors import AMQTTError, BrokerError, MQTTError, NoDataError from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session @@ -30,7 +30,6 @@ from amqtt.utils import format_client_message, gen_client_id, read_yaml_config from .events import BrokerEvents from .mqtt.disconnect import DisconnectPacket -from .plugins.contexts import BaseContext from .plugins.manager import PluginManager _CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]] @@ -45,13 +44,6 @@ DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 -class Action(Enum): - """Actions issued by the broker.""" - - SUBSCRIBE = "subscribe" - PUBLISH = "publish" - - class RetainedApplicationMessage(ApplicationMessage): __slots__ = ("data", "qos", "source_session", "topic") diff --git a/amqtt/client.py b/amqtt/client.py index 9c7ca18..70781a1 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -19,11 +19,11 @@ from amqtt.adapters import ( WebSocketsReader, WebSocketsWriter, ) +from amqtt.contexts import BaseContext from amqtt.errors import ClientError, ConnectError, ProtocolHandlerError from amqtt.mqtt.connack import CONNECTION_ACCEPTED from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.mqtt.protocol.client_handler import ClientProtocolHandler -from amqtt.plugins.contexts import BaseContext from amqtt.plugins.manager import PluginManager from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session from amqtt.utils import gen_client_id, read_yaml_config @@ -457,6 +457,8 @@ class MQTTClient: # if not self._handler: self._handler = ClientProtocolHandler(self.plugins_manager) + connection_timeout = self.config.get("connection_timeout", None) + if secure: sc = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, @@ -464,10 +466,12 @@ class MQTTClient: capath=self.session.capath, cadata=self.session.cadata, ) - if "certfile" in self.config and "keyfile" in self.config: - sc.load_cert_chain(self.config["certfile"], self.config["keyfile"]) + + if "certfile" in self.config: + sc.load_verify_locations(cafile=self.config["certfile"]) if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool): sc.check_hostname = self.config["check_hostname"] + sc.verify_mode = ssl.CERT_REQUIRED kwargs["ssl"] = sc try: @@ -477,21 +481,24 @@ class MQTTClient: # Open connection if scheme in ("mqtt", "mqtts"): - conn_reader, conn_writer = await asyncio.open_connection( - self.session.remote_address, - self.session.remote_port, - **kwargs, - ) + conn_reader, conn_writer = await asyncio.wait_for( + asyncio.open_connection( + self.session.remote_address, + self.session.remote_port, + **kwargs, + ), timeout=connection_timeout) reader = StreamReaderAdapter(conn_reader) writer = StreamWriterAdapter(conn_writer) elif scheme in ("ws", "wss") and self.session.broker_uri: - websocket: ClientConnection = await websockets.connect( - self.session.broker_uri, - subprotocols=[websockets.Subprotocol("mqtt")], - additional_headers=self.additional_headers, - **kwargs, - ) + websocket: ClientConnection = await asyncio.wait_for( + websockets.connect( + self.session.broker_uri, + subprotocols=[websockets.Subprotocol("mqtt")], + additional_headers=self.additional_headers, + **kwargs, + ), timeout=connection_timeout) + reader = WebSocketsReader(websocket) writer = WebSocketsWriter(websocket) elif not self.session.broker_uri: diff --git a/amqtt/codecs_amqtt.py b/amqtt/codecs_amqtt.py index 3e425a3..1db2d9c 100644 --- a/amqtt/codecs_amqtt.py +++ b/amqtt/codecs_amqtt.py @@ -1,4 +1,5 @@ import asyncio +from decimal import ROUND_HALF_UP, Decimal from struct import pack, unpack from amqtt.adapters import ReaderAdapter @@ -139,3 +140,10 @@ def int_to_bytes_str(value: int) -> bytes: :return: bytes array. """ return str(value).encode("utf-8") + + +def float_to_bytes_str(value: float, places:int=3) -> bytes: + """Convert an float value to a bytes array containing the numeric character.""" + quant = Decimal(f"0.{''.join(['0' for i in range(places-1)])}1") + rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP) + return str(rounded).encode("utf-8") diff --git a/amqtt/plugins/contexts.py b/amqtt/contexts.py similarity index 71% rename from amqtt/plugins/contexts.py rename to amqtt/contexts.py index 2e894fd..f62baae 100644 --- a/amqtt/plugins/contexts.py +++ b/amqtt/contexts.py @@ -1,3 +1,4 @@ +from enum import Enum import logging from typing import TYPE_CHECKING, Any @@ -11,3 +12,10 @@ class BaseContext: self.loop: asyncio.AbstractEventLoop | None = None self.logger: logging.Logger = _LOGGER self.config: dict[str, Any] | None = None + + +class Action(Enum): + """Actions issued by the broker.""" + + SUBSCRIBE = "subscribe" + PUBLISH = "publish" diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index cb24d3c..eff191c 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -20,6 +20,7 @@ import logging from typing import Generic, TypeVar, cast from amqtt.adapters import ReaderAdapter, WriterAdapter +from amqtt.contexts import BaseContext from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError from amqtt.events import MQTTEvents from amqtt.mqtt import packet_class @@ -57,7 +58,6 @@ from amqtt.mqtt.suback import SubackPacket from amqtt.mqtt.subscribe import SubscribePacket from amqtt.mqtt.unsuback import UnsubackPacket from amqtt.mqtt.unsubscribe import UnsubscribePacket -from amqtt.plugins.contexts import BaseContext from amqtt.plugins.manager import PluginManager from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session @@ -154,7 +154,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: # canceling the task is the expected result self.logger.debug("Writer close was cancelled.") - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug("Writer close operation timed out.", exc_info=True) except OSError: self.logger.debug("Writer close failed due to I/O error.", exc_info=True) @@ -322,7 +322,7 @@ class ProtocolHandler(Generic[C]): self._puback_waiters[app_message.packet_id] = waiter try: app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5) - except TimeoutError: + except asyncio.TimeoutError: msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}" self.logger.warning(msg) raise TimeoutError(msg) from None @@ -529,7 +529,7 @@ class ProtocolHandler(Generic[C]): except asyncio.CancelledError: self.logger.debug("Task cancelled, reader loop ending") break - except TimeoutError: + except asyncio.TimeoutError: self.logger.debug(f"{self.session.client_id} Input stream read timeout") self.handle_read_timeout() except NoDataError: diff --git a/amqtt/plugins/base.py b/amqtt/plugins/base.py index b9004c6..ae6c9b2 100644 --- a/amqtt/plugins/base.py +++ b/amqtt/plugins/base.py @@ -1,14 +1,10 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar +from typing import Any, Generic, TypeVar -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import Action, BaseContext from amqtt.session import Session C = TypeVar("C", bound=BaseContext) -if TYPE_CHECKING: - from amqtt.broker import Action - class BasePlugin(Generic[C]): """The base from which all plugins should inherit.""" @@ -30,31 +26,6 @@ class BasePlugin(Generic[C]): async def close(self) -> None: """Override if plugin needs to clean up resources upon shutdown.""" - @dataclass - class Config: - """Override to define the configuration and defaults for plugin.""" - - -class BaseAuthPlugin(BasePlugin[BaseContext]): - """Base class for authentication plugins.""" - - def __init__(self, context: BaseContext) -> None: - super().__init__(context) - - self.auth_config: dict[str, Any] | None = self._get_config_section("auth") - - async def authenticate(self, *, session: Session) -> bool | None: - """Logic for session authentication. - - Args: - session: amqtt.session.Session - - Returns: - - `True` if user is authentication succeed, `False` if user authentication fails - - `None` if authentication can't be achieved (then plugin result is then ignored) - - """ - return bool(self.auth_config) class BaseTopicPlugin(BasePlugin[BaseContext]): """Base class for topic plugins.""" @@ -63,9 +34,11 @@ class BaseTopicPlugin(BasePlugin[BaseContext]): super().__init__(context) self.topic_config: dict[str, Any] | None = self._get_config_section("topic-check") + if self.topic_config is None: + self.context.logger.warning("'topic-check' section not found in context configuration") async def topic_filtering( - self, *, session: Session | None = None, topic: str | None = None, action: Optional["Action"] = None + self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None ) -> bool: """Logic for filtering out topics. @@ -78,4 +51,36 @@ class BaseTopicPlugin(BasePlugin[BaseContext]): bool: `True` if topic is allowed, `False` otherwise """ - return bool(self.topic_config) + if not self.topic_config: + # auth config section not found + self.context.logger.warning("'topic-check' section not found in context configuration") + return False + return True + + +class BaseAuthPlugin(BasePlugin[BaseContext]): + """Base class for authentication plugins.""" + + def __init__(self, context: BaseContext) -> None: + super().__init__(context) + + self.auth_config: dict[str, Any] | None = self._get_config_section("auth") + if not self.auth_config: + self.context.logger.warning("'auth' section not found in context configuration") + + async def authenticate(self, *, session: Session) -> bool | None: + """Logic for session authentication. + + Args: + session: amqtt.session.Session + + Returns: + - `True` if user is authentication succeed, `False` if user authentication fails + - `None` if authentication can't be achieved (then plugin result is then ignored) + + """ + if not self.auth_config: + # auth config section not found + self.context.logger.warning("'auth' section not found in context configuration") + return False + return True diff --git a/amqtt/plugins/logging_amqtt.py b/amqtt/plugins/logging_amqtt.py index 5acf10d..e2d3433 100644 --- a/amqtt/plugins/logging_amqtt.py +++ b/amqtt/plugins/logging_amqtt.py @@ -3,11 +3,11 @@ from functools import partial import logging from typing import Any, TypeAlias +from amqtt.contexts import BaseContext from amqtt.events import BrokerEvents from amqtt.mqtt import MQTTPacket from amqtt.mqtt.packet import MQTTFixedHeader, MQTTPayload, MQTTVariableHeader from amqtt.plugins.base import BasePlugin -from amqtt.plugins.contexts import BaseContext from amqtt.session import Session PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index dcd078f..b6a5a8f 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -8,20 +8,17 @@ import copy from importlib.metadata import EntryPoint, EntryPoints, entry_points from inspect import iscoroutinefunction import logging -from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Optional, TypeAlias, TypeVar, cast +from typing import Any, Generic, NamedTuple, Optional, TypeAlias, TypeVar, cast from dacite import Config as DaciteConfig, DaciteError, from_dict +from amqtt.contexts import Action, BaseContext from amqtt.errors import PluginCoroError, PluginImportError, PluginInitError, PluginLoadError from amqtt.events import BrokerEvents, Events, MQTTEvents from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin -from amqtt.plugins.contexts import BaseContext from amqtt.session import Session from amqtt.utils import import_string -if TYPE_CHECKING: - from amqtt.broker import Action - class Plugin(NamedTuple): name: str @@ -109,8 +106,8 @@ class PluginManager(Generic[C]): topic_filter_list = [] if self.app_context.config and "auth" in self.app_context.config: auth_filter_list = self.app_context.config["auth"].get("plugins", []) - if self.app_context.config and "topic" in self.app_context.config: - topic_filter_list = self.app_context.config["topic"].get("plugins", []) + if self.app_context.config and "topic-check" in self.app_context.config: + topic_filter_list = self.app_context.config["topic-check"].get("plugins", []) ep: EntryPoints | list[EntryPoint] = [] if hasattr(entry_points(), "select"): diff --git a/amqtt/plugins/persistence.py b/amqtt/plugins/persistence.py index 8fb1ed3..ee79d33 100644 --- a/amqtt/plugins/persistence.py +++ b/amqtt/plugins/persistence.py @@ -2,7 +2,7 @@ import json import sqlite3 from typing import Any -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext from amqtt.session import Session diff --git a/amqtt/plugins/sys/broker.py b/amqtt/plugins/sys/broker.py index 99d3166..b0ba07f 100644 --- a/amqtt/plugins/sys/broker.py +++ b/amqtt/plugins/sys/broker.py @@ -1,6 +1,8 @@ import asyncio from collections import deque # pylint: disable=C0412 -from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 +from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 + +import psutil from amqtt.plugins.base import BasePlugin from amqtt.session import Session @@ -26,7 +28,7 @@ except ImportError: import amqtt from amqtt.broker import BrokerContext -from amqtt.codecs_amqtt import int_to_bytes_str +from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader DOLLAR_SYS_ROOT = "$SYS/broker/" @@ -40,17 +42,35 @@ STAT_START_TIME = "start_time" STAT_CLIENTS_MAXIMUM = "clients_maximum" STAT_CLIENTS_CONNECTED = "clients_connected" STAT_CLIENTS_DISCONNECTED = "clients_disconnected" +MEMORY_USAGE_MAXIMUM = "memory_maximum" +CPU_USAGE_MAXIMUM = "cpu_usage_maximum" +CPU_USAGE_LAST = "cpu_usage_last" PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] +def val_to_bytes_str(value: Any) -> bytes: + """Convert an int, float or string to byte string.""" + match value: + case int(): + return int_to_bytes_str(value) + case float(): + return float_to_bytes_str(value) + case str(): + return value.encode("utf-8") + case _: + msg = f"Unsupported type {type(value)}" + raise NotImplementedError(msg) + + class BrokerSysPlugin(BasePlugin[BrokerContext]): def __init__(self, context: BrokerContext) -> None: super().__init__(context) # Broker statistics initialization self._stats: dict[str, int] = {} self._sys_handle: asyncio.Handle | None = None + self._current_process = psutil.Process() def _clear_stats(self) -> None: """Initialize broker statistics data structures.""" @@ -64,6 +84,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): STAT_CLIENTS_DISCONNECTED, STAT_PUBLISH_RECEIVED, STAT_PUBLISH_SENT, + MEMORY_USAGE_MAXIMUM, + CPU_USAGE_MAXIMUM ): self._stats[stat] = 0 @@ -127,6 +149,14 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): messages_stored += session.retained_messages_count messages_stored += len(self.context.retained_messages) subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values()) + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected + + cpu_usage = self._current_process.cpu_percent(interval=0) + self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage) + + mem_info_usage = self._current_process.memory_full_info() + mem_size = mem_info_usage.rss / (1024 ** 2) + self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size) # Broadcast updates tasks: deque[asyncio.Task[None]] = deque() @@ -150,9 +180,13 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): "messages/publish/sent": self._stats[STAT_PUBLISH_SENT], "messages/retained/count": len(self.context.retained_messages), "messages/subscriptions/count": subscriptions_count, + "heap/size": mem_size, + "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM], + "cpu/percent": cpu_usage, + "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM], } for stat_name, stat_value in stats.items(): - data: bytes = int_to_bytes_str(stat_value) if isinstance(stat_value, int) else stat_value.encode("utf-8") + data: bytes = val_to_bytes_str(stat_value) tasks.append(self.schedule_broadcast_sys_topic(stat_name, data)) # Wait until broadcasting tasks end diff --git a/amqtt/plugins/topic_checking.py b/amqtt/plugins/topic_checking.py index 5761ab9..dc4d46c 100644 --- a/amqtt/plugins/topic_checking.py +++ b/amqtt/plugins/topic_checking.py @@ -1,8 +1,7 @@ from typing import Any -from amqtt.broker import Action +from amqtt.contexts import Action, BaseContext from amqtt.plugins.base import BaseTopicPlugin -from amqtt.plugins.contexts import BaseContext from amqtt.session import Session diff --git a/docs/changelog.md b/docs/changelog.md index 0765167..4f1e43a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 0.11.1 + +- [PR #226](https://github.com/Yakifo/amqtt/pull/226) Consolidate super classes for plugins +- [PR #227](https://github.com/Yakifo/amqtt/pull/227) Update sample files +- [PR #229](https://github.com/Yakifo/amqtt/pull/229) & [PR #228](https://github.com/Yakifo/amqtt/pull/228) Broken pypi and test.amqtt.io links +- [PR #232](https://github.com/Yakifo/amqtt/pull/234) $SYS additions for cpu & mem. + ## 0.11.0 - upgrades to support python 3.10, 3.11, 3.12 and 3.13 diff --git a/docs/custom_plugins.md b/docs/custom_plugins.md index 08ccbdc..34eb4ec 100644 --- a/docs/custom_plugins.md +++ b/docs/custom_plugins.md @@ -58,7 +58,7 @@ auth: These plugins should subclass from `BaseAuthPlugin` and implement the `authenticate` method. -::: amqtt.plugins.authentication.BaseAuthPlugin +::: amqtt.plugins.base.BaseAuthPlugin ## Topic Filter Plugins @@ -75,4 +75,4 @@ topic-check: These plugins should subclass from `BaseTopicPlugin` and implement the `topic_filtering` method. -::: amqtt.plugins.topic_checking.BaseTopicPlugin +::: amqtt.plugins.base.BaseTopicPlugin diff --git a/docs/overrides/base.html b/docs/overrides/base.html new file mode 100644 index 0000000..0af326a --- /dev/null +++ b/docs/overrides/base.html @@ -0,0 +1,8 @@ +{% extends "base.html" %} + +{% block outdated %} + You're not viewing the latest version. + + Click here to go to latest. + +{% endblock %} diff --git a/docs/references/client_config.md b/docs/references/client_config.md index 24985d1..67f875f 100644 --- a/docs/references/client_config.md +++ b/docs/references/client_config.md @@ -25,6 +25,11 @@ Default retain value to messages published. Defaults to `false`. Enable or disable auto-reconnect if connection with the broker is interrupted. Defaults to `false`. + +### `connect_timeout` *(int)* + +If specified, the number of seconds before a connection times out + ### `reconnect_retries` *(int)* Maximum reconnection retries. Defaults to `2`. Negative value will cause client to reconnect infinitely. @@ -63,8 +68,15 @@ TLS certificates used to verify the broker's authenticity. - `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info. - `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/). - `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates. +- +- +### `certfile` *(string)* +Path to a single file in PEM format containing the certificate as well as any number of CA certificates needed to establish the server certificate's authenticity. +### `check_hostname` *(bool)* + +Bypass ssl host certificate verification, allowing self-signed certificates ## Default Configuration diff --git a/docs_test/src/dashboard/components/MainGrid.tsx b/docs_test/src/dashboard/components/MainGrid.tsx index b27b2ee..e98c298 100644 --- a/docs_test/src/dashboard/components/MainGrid.tsx +++ b/docs_test/src/dashboard/components/MainGrid.tsx @@ -20,6 +20,7 @@ export default function MainGrid() { const [received, setReceived] = useState([]); const [bytesIn, setBytesIn] = useState([]); const [bytesOut, setBytesOut] = useState([]); + const [clientsConnected, setClientsConnected] = useState([]); const [serverStart, setServerStart] = useState(''); const [serverUptime, setServerUptime] = useState(''); @@ -63,6 +64,7 @@ export default function MainGrid() { mqttSubscribe('$SYS/broker/load/bytes/#'); mqttSubscribe('$SYS/broker/uptime/formatted'); mqttSubscribe('$SYS/broker/uptime'); + mqttSubscribe('$SYS/broker/clients/connected'); } }, [isConnected, mqttSubscribe]); @@ -97,6 +99,13 @@ export default function MainGrid() { value: d } setBytesOut(bytesOut => [...bytesOut, newPoint]); + } else if (payload.topic === '$SYS/broker/clients/connected') { + const newPoint: DataPoint = { + timestamp: new Date().toISOString(), + value: d + } + setClientsConnected(clientsConnected => [...clientsConnected, newPoint]); + } else if (payload.topic === '$SYS/broker/uptime/formatted') { const dt = new Date(d + "Z"); setServerStart(dt.toLocaleString()); @@ -138,7 +147,7 @@ export default function MainGrid() {

github: Yakifo/amqtt + href="https://github.com/Yakifo/amqtt">Yakifo/amqtt

PyPi: + + + diff --git a/pyproject.toml b/pyproject.toml index d7609d8..4a2b35f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.13" ] -version = "0.11.0" +version = "0.11.1" requires-python = ">=3.10.0" readme = "README.md" license = { text = "MIT" } @@ -34,6 +34,7 @@ dependencies = [ "PyYAML==6.0.2", # https://pypi.org/project/PyYAML "typer==0.15.4", "dacite>=1.9.2", + "psutil>=7.0.0", ] [dependency-groups] diff --git a/samples/broker_acl.py b/samples/broker_acl.py index c4c740f..7382ffd 100644 --- a/samples/broker_acl.py +++ b/samples/broker_acl.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check acl plugin +""" + logger = logging.getLogger(__name__) config = { @@ -38,15 +42,43 @@ config = { }, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main() -> None: + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/broker_simple.py b/samples/broker_simple.py index 22c4639..751bdf6 100644 --- a/samples/broker_simple.py +++ b/samples/broker_simple.py @@ -1,21 +1,31 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.broker import Broker -broker = Broker() +""" +This sample shows how to run a broker +""" + +formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" +logging.basicConfig(level=logging.INFO, format=formatter) -async def test_coro() -> None: - await broker.start() +async def run_server() -> None: + broker = Broker() + try: + await broker.start() + while True: + await asyncio.sleep(1) + except CancelledError: + await broker.shutdown() +def __main__(): + try: + asyncio.run(run_server()) + except KeyboardInterrupt: + print("Server exiting...") if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - - asyncio.get_event_loop().run_until_complete(test_coro()) - try: - asyncio.get_event_loop().run_forever() - except KeyboardInterrupt: - asyncio.get_event_loop().run_until_complete(broker.shutdown()) + __main__() diff --git a/samples/broker_start.py b/samples/broker_start.py index 1055ff1..578f704 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker without stacktraces on keyboard interrupt +""" + logger = logging.getLogger(__name__) config = { @@ -30,19 +34,41 @@ config = { "topic-check": {"enabled": False}, } -broker = Broker(config) +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -async def test_coro() -> None: - await broker.start() - # await asyncio.sleep(5) - # await broker.shutdown() +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() if __name__ == "__main__": - formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" - # formatter = "%(asctime)s :: %(levelname)s :: %(message)s" - logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() - + __main__() diff --git a/samples/broker_taboo.py b/samples/broker_taboo.py index 4a1107b..7752469 100644 --- a/samples/broker_taboo.py +++ b/samples/broker_taboo.py @@ -4,6 +4,10 @@ import os from amqtt.broker import Broker +""" +This sample shows how to run a broker with the topic check taboo plugin +""" + logger = logging.getLogger(__name__) config = { @@ -30,15 +34,44 @@ config = { "topic-check": {"enabled": True, "plugins": ["topic_taboo"]}, } -broker = Broker(config) + +async def main_loop(): + broker = Broker(config) + try: + await broker.start() + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + await broker.shutdown() -async def test_coro() -> None: - await broker.start() +async def main(): + t = asyncio.create_task(main_loop()) + try: + await t + except asyncio.CancelledError: + pass -if __name__ == "__main__": +def __main__(): + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_forever() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + task = loop.create_task(main()) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received. Stopping server...") + task.cancel() + loop.run_until_complete(task) # Ensure task finishes cleanup + finally: + logger.info("Server stopped.") + loop.close() + +if __name__ == "__main__": + __main__() diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 2e11b9a..13284c2 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -1,13 +1,12 @@ import asyncio import logging +from asyncio import CancelledError from amqtt.client import MQTTClient -# -# This sample shows a client running idle. -# Meanwhile, keepalive is managed through PING messages sent every 5 seconds -# - +""" +This sample shows how to run an idle client +""" logger = logging.getLogger(__name__) @@ -15,17 +14,26 @@ config = { "keep_alive": 5, "ping_delay": 1, } -C = MQTTClient(config=config) + +async def main() -> None: + client = MQTTClient(config=config) + + try: + await client.connect("mqtt://test.mosquitto.org:1883/") + logger.info("client connected") + await asyncio.sleep(15) + except CancelledError: + pass + + await client.disconnect() -async def test_coro() -> None: - await C.connect("mqtt://test.mosquitto.org:1883/") - await asyncio.sleep(18) +def __main__(): - await C.disconnect() + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.run(main()) if __name__ == "__main__": - formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish.py b/samples/client_publish.py index 9b0f4f6..135fb57 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -4,10 +4,9 @@ import logging from amqtt.client import ConnectError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker using different QOS +""" logger = logging.getLogger(__name__) @@ -15,42 +14,45 @@ config = { "will": { "topic": "/will/client", "message": b"Dead or alive", - "qos": 0x01, + "qos": QOS_1, "retain": True, }, } -async def test_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") +async def test_coro1() -> None: + client = MQTTClient() + await client.connect("mqtt://localhost:1883/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) - logger.info("messages published") - await C.disconnect() + logger.info("test_coro1 messages published") + await client.disconnect() async def test_coro2() -> None: try: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org:1883/") - await C.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) - await C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) - logger.info("messages published") - await C.disconnect() + client = MQTTClient(config={'auto_reconnect': False, 'connection_timeout': 1}) + await client.connect("mqtt://localhost:1884/") + await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) + await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) + logger.info("test_coro2 messages published") + await client.disconnect() except ConnectError: - logger.exception(f"Connection failed", exc_info=True) - asyncio.get_event_loop().stop() + logger.info(f"Connection failed", exc_info=True) +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro1()) + asyncio.run(test_coro2()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) - asyncio.get_event_loop().run_until_complete(test_coro2()) + __main__() diff --git a/samples/client_publish_acl.py b/samples/client_publish_acl.py index 2ecf33e..43148d7 100644 --- a/samples/client_publish_acl.py +++ b/samples/client_publish_acl.py @@ -2,37 +2,41 @@ import asyncio import logging from amqtt.client import ConnectError, MQTTClient +from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to broker running either `samples/broker_acl.py` + or `samples/broker_taboo.py`. +""" logger = logging.getLogger(__name__) async def test_coro() -> None: try: - C = MQTTClient() - await C.connect("mqtt://0.0.0.0:1883") - await C.publish("data/classified", b"TOP SECRET", qos=0x01) - await C.publish("data/memes", b"REAL FUN", qos=0x01) - await C.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=0x01) - await C.publish( + client = MQTTClient() + await client.connect("mqtt://0.0.0.0:1883") + await client.publish("data/classified", b"TOP SECRET", qos=QOS_1) + await client.publish("data/memes", b"REAL FUN", qos=QOS_1) + await client.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=QOS_1) + await client.publish( "repositories/amqtt/devel", b"THIS NEEDS TO BE CHECKED", - qos=0x01, + qos=QOS_1, ) - await C.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=0x01) + await client.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=QOS_1) logger.info("messages published") - await C.disconnect() + await client.disconnect() except ConnectError as ce: - logger.exception("Connection failed") - asyncio.get_event_loop().stop() + logger.exception("ERROR: Connection failed") +def __main__(): + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + + asyncio.run(test_coro()) + if __name__ == "__main__": - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" - formatter = "%(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + __main__() diff --git a/samples/client_publish_ssl.py b/samples/client_publish_ssl.py index 0f2de13..528c347 100644 --- a/samples/client_publish_ssl.py +++ b/samples/client_publish_ssl.py @@ -1,41 +1,51 @@ import asyncio import logging +from pathlib import Path from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure broker. + +Use `openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"` to +generate a self-signed certificate for the broker to use. +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, }, + "auto_reconnect": False, + "check_hostname": False, + "certfile": "cert.pem", } -C = MQTTClient(config=config) -# C = MQTTClient() + +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("mqtts://test.mosquitto.org/", cafile="mosquitto.org.crt") + + await client.connect("mqtts://localhost:8883") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_publish_ws.py b/samples/client_publish_ws.py index 4c0fa51..e02c73e 100644 --- a/samples/client_publish_ws.py +++ b/samples/client_publish_ws.py @@ -4,39 +4,39 @@ import logging from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to publish messages to broker using different QOS -# Debug outputs shows the message flows -# +""" +This sample shows how to publish messages to secure websocket broker +""" logger = logging.getLogger(__name__) config = { "will": { "topic": "/will/client", - "message": b"Dead or alive", - "qos": 0x01, + "message": "Dead or alive", + "qos": QOS_1, "retain": True, - }, - "capath": ".", + } } -C = MQTTClient(config=config) -# C = MQTTClient() +client = MQTTClient(config=config) async def test_coro() -> None: - await C.connect("wss://test.mosquitto.org:8081/", cafile="mosquitto.org.crt") + await client.connect("ws://localhost:8080/") tasks = [ - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), - asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), + asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") - await C.disconnect() + await client.disconnect() -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.run(test_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe.py b/samples/client_subscribe.py index 3d175d6..642e66e 100644 --- a/samples/client_subscribe.py +++ b/samples/client_subscribe.py @@ -4,20 +4,19 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1, QOS_2 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# + +""" +This sample shows how to subscribe to different $SYS topics and how to receive incoming messages +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test.mosquitto.org/") - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test.mosquitto.org/") + + await client.subscribe( [ ("$SYS/broker/uptime", QOS_1), ("$SYS/broker/load/#", QOS_2), @@ -25,16 +24,20 @@ async def uptime_coro() -> None: ) logger.info("Subscribed") try: - for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + for _i in range(1, 10): + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) - asyncio.get_event_loop().run_until_complete(uptime_coro()) + asyncio.run(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/client_subscribe_acl.py b/samples/client_subscribe_acl.py index 1810b6b..f133a0b 100644 --- a/samples/client_subscribe_acl.py +++ b/samples/client_subscribe_acl.py @@ -4,41 +4,45 @@ import logging from amqtt.client import ClientError, MQTTClient from amqtt.mqtt.constants import QOS_1 -# -# This sample shows how to subscbribe a topic and receive data from incoming messages -# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned -# by the broker. -# +""" +Run `samples/broker_acl.py` or `samples/broker_taboo.py` + +This sample shows how to subscribe to different topics, some of which are allowed. +""" logger = logging.getLogger(__name__) async def uptime_coro() -> None: - C = MQTTClient() - await C.connect("mqtt://test:test@0.0.0.0:1883") - # await C.connect('mqtt://0.0.0.0:1883') - # Subscribe to '$SYS/broker/uptime' with QOS=1 - await C.subscribe( + client = MQTTClient() + await client.connect("mqtt://test:test@0.0.0.0:1883") + + result = await client.subscribe( [ + ("$SYS/#", QOS_1), # Topic forbidden when running `broker_acl.py` ("data/memes", QOS_1), # Topic allowed ("data/classified", QOS_1), # Topic forbidden ("repositories/amqtt/master", QOS_1), # Topic allowed - ("repositories/amqtt/devel", QOS_1), # Topic forbidden + ("repositories/amqtt/devel", QOS_1), # Topic forbidden when running `broker_acl.py` ("calendar/amqtt/releases", QOS_1), # Topic allowed ], ) - logger.info("Subscribed") + logger.info(f"Subscribed results: {result}") try: for _i in range(1, 100): - await C.deliver_message() - await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"]) + if msg := await client.deliver_message(): + logger.info(f"{msg.topic} >> {msg.data.decode()}") + await client.unsubscribe(["$SYS/#", "data/memes"]) logger.info("UnSubscribed") - await C.disconnect() + await client.disconnect() except ClientError as ce: logger.exception("Client exception") -if __name__ == "__main__": +def __main__(): formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.get_event_loop().run_until_complete(uptime_coro()) + +if __name__ == "__main__": + __main__() diff --git a/samples/mosquitto.org.crt b/samples/mosquitto.org.crt index b8535e8..e76dbd8 100644 --- a/samples/mosquitto.org.crt +++ b/samples/mosquitto.org.crt @@ -1,18 +1,24 @@ -----BEGIN CERTIFICATE----- -MIIC8DCCAlmgAwIBAgIJAOD63PlXjJi8MA0GCSqGSIb3DQEBBQUAMIGQMQswCQYD -VQQGEwJHQjEXMBUGA1UECAwOVW5pdGVkIEtpbmdkb20xDjAMBgNVBAcMBURlcmJ5 -MRIwEAYDVQQKDAlNb3NxdWl0dG8xCzAJBgNVBAsMAkNBMRYwFAYDVQQDDA1tb3Nx -dWl0dG8ub3JnMR8wHQYJKoZIhvcNAQkBFhByb2dlckBhdGNob28ub3JnMB4XDTEy -MDYyOTIyMTE1OVoXDTIyMDYyNzIyMTE1OVowgZAxCzAJBgNVBAYTAkdCMRcwFQYD -VQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwGA1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1v -c3F1aXR0bzELMAkGA1UECwwCQ0ExFjAUBgNVBAMMDW1vc3F1aXR0by5vcmcxHzAd -BgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hvby5vcmcwgZ8wDQYJKoZIhvcNAQEBBQAD -gY0AMIGJAoGBAMYkLmX7SqOT/jJCZoQ1NWdCrr/pq47m3xxyXcI+FLEmwbE3R9vM -rE6sRbP2S89pfrCt7iuITXPKycpUcIU0mtcT1OqxGBV2lb6RaOT2gC5pxyGaFJ+h -A+GIbdYKO3JprPxSBoRponZJvDGEZuM3N7p3S/lRoi7G5wG5mvUmaE5RAgMBAAGj -UDBOMB0GA1UdDgQWBBTad2QneVztIPQzRRGj6ZHKqJTv5jAfBgNVHSMEGDAWgBTa -d2QneVztIPQzRRGj6ZHKqJTv5jAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUA -A4GBAAqw1rK4NlRUCUBLhEFUQasjP7xfFqlVbE2cRy0Rs4o3KS0JwzQVBwG85xge -REyPOFdGdhBY2P1FNRy0MDr6xr+D2ZOwxs63dG1nnAnWZg7qwoLgpZ4fESPD3PkA -1ZgKJc2zbSQ9fCPxt2W3mdVav66c6fsb7els2W2Iz7gERJSX +MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL +BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG +A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU +BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv +by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE +BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES +MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp +dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg +UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW +Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA +s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH +3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo +E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT +MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV +6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC +6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf ++pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK +sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839 +LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE +m/XriWr/Cq4h/JfB7NTsezVslgkBaoU= -----END CERTIFICATE----- diff --git a/tests/plugins/mocks.py b/tests/plugins/mocks.py index 84c1a30..def9920 100644 --- a/tests/plugins/mocks.py +++ b/tests/plugins/mocks.py @@ -2,10 +2,8 @@ import logging from dataclasses import dataclass -from amqtt.broker import Action - from amqtt.plugins.base import BasePlugin, BaseAuthPlugin, BaseTopicPlugin -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext, Action from amqtt.session import Session diff --git a/tests/plugins/test_authentication.py b/tests/plugins/test_authentication.py index 4c42ab5..3cd17eb 100644 --- a/tests/plugins/test_authentication.py +++ b/tests/plugins/test_authentication.py @@ -4,7 +4,7 @@ from pathlib import Path import unittest from amqtt.plugins.authentication import AnonymousAuthPlugin, FileAuthPlugin -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext from amqtt.session import Session formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" diff --git a/tests/plugins/test_manager.py b/tests/plugins/test_manager.py index 42c1424..a081e6f 100644 --- a/tests/plugins/test_manager.py +++ b/tests/plugins/test_manager.py @@ -2,11 +2,11 @@ import asyncio import logging import unittest -from amqtt.broker import Action from amqtt.events import BrokerEvents + from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin from amqtt.plugins.manager import PluginManager -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext, Action from amqtt.session import Session formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" diff --git a/tests/plugins/test_persistence.py b/tests/plugins/test_persistence.py index f0e080a..35aebd3 100644 --- a/tests/plugins/test_persistence.py +++ b/tests/plugins/test_persistence.py @@ -4,7 +4,7 @@ from pathlib import Path import sqlite3 import unittest -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext from amqtt.plugins.persistence import SQLitePlugin from amqtt.session import Session diff --git a/tests/plugins/test_plugins.py b/tests/plugins/test_plugins.py index 801ed04..8ccec5d 100644 --- a/tests/plugins/test_plugins.py +++ b/tests/plugins/test_plugins.py @@ -13,11 +13,11 @@ import pytest import amqtt.plugins from amqtt.broker import Broker, BrokerContext from amqtt.client import MQTTClient -from amqtt.errors import PluginError, PluginInitError, PluginImportError +from amqtt.errors import PluginInitError, PluginImportError from amqtt.events import MQTTEvents, BrokerEvents from amqtt.mqtt.constants import QOS_0 from amqtt.plugins.base import BasePlugin -from amqtt.plugins.contexts import BaseContext +from amqtt.contexts import BaseContext _INVALID_METHOD: str = "invalid_foo" _PLUGIN: str = "Plugin" diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index c01d322..563a313 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -31,7 +31,11 @@ all_sys_topics = [ '$SYS/broker/messages/publish/received', '$SYS/broker/messages/publish/sent', '$SYS/broker/messages/retained/count', - '$SYS/broker/messages/subscriptions/count' + '$SYS/broker/messages/subscriptions/count', + '$SYS/broker/heap/size', + '$SYS/broker/heap/maximum', + '$SYS/broker/cpu/percent', + '$SYS/broker/cpu/maximum', ] diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py index 53d0bc3..30b6996 100644 --- a/tests/plugins/test_topic_checking.py +++ b/tests/plugins/test_topic_checking.py @@ -2,8 +2,9 @@ import logging import pytest -from amqtt.broker import Action, BrokerContext, Broker -from amqtt.plugins.contexts import BaseContext +from amqtt.broker import BrokerContext, Broker + +from amqtt.contexts import BaseContext, Action from amqtt.plugins.topic_checking import TopicAccessControlListPlugin, TopicTabooPlugin from amqtt.plugins.base import BaseTopicPlugin from amqtt.session import Session diff --git a/tests/test_client.py b/tests/test_client.py index 589dafa..4859c4e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -446,6 +446,15 @@ async def test_connect_incorrect_scheme(): await client.connect('"mq://someplace') +@pytest.mark.asyncio +@pytest.mark.timeout(3) +async def test_connect_timeout(): + config = {"auto_reconnect": False, "connection_timeout": 2} + client = MQTTClient(config=config) + with pytest.raises(ClientError): + await client.connect("mqtt://localhost:8888") + + async def test_client_no_auth(): class MockEntryPoints: diff --git a/tests/test_samples.py b/tests/test_samples.py new file mode 100644 index 0000000..d2798da --- /dev/null +++ b/tests/test_samples.py @@ -0,0 +1,242 @@ +import asyncio +import logging +import signal +import subprocess +from pathlib import Path + +import pytest + +from amqtt.broker import Broker +from samples.client_publish import __main__ as client_publish_main +from samples.client_subscribe import __main__ as client_subscribe_main +from samples.client_keepalive import __main__ as client_keepalive_main +from samples.broker_acl import config as broker_acl_config +from samples.broker_taboo import config as broker_taboo_config + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_broker_acl(): + broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py" + process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(5) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_simple(): + broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(5) + + # Send the interrupt signal + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + has_broker_closed = "Broker closed" in stderr.decode("utf-8") + has_loop_stopped = "Broadcast loop stopped by exception" in stderr.decode("utf-8") + + assert has_broker_closed or has_loop_stopped, "Broker didn't close correctly." + + +@pytest.mark.asyncio +async def test_broker_start(): + broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" + process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(5) + + # Send the interrupt signal to stop broker + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.asyncio +async def test_broker_taboo(): + broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" + process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(5) + + # Send the interrupt signal to stop broker + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + +@pytest.mark.timeout(25) +@pytest.mark.asyncio +async def test_client_keepalive(): + + broker = Broker() + await broker.start() + await asyncio.sleep(2) + + keep_alive_script = Path(__file__).parent.parent / "samples/client_keepalive.py" + process = subprocess.Popen(["python", keep_alive_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_client_publish(): + broker = Broker() + await broker.start() + await asyncio.sleep(2) + + client_publish = Path(__file__).parent.parent / "samples/client_publish.py" + process = subprocess.Popen(["python", client_publish], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + + stdout, stderr = process.communicate() + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + +broker_ssl_config = { + "listeners": { + "default": { + "type": "tcp", + "bind": "0.0.0.0:8883", + "ssl": True, + "certfile": "cert.pem", + "keyfile": "key.pem", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} + +@pytest.mark.asyncio +async def test_client_publish_ssl(): + + # generate a self-signed certificate for this test + cmd = 'openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"' + subprocess.run(cmd, shell=True, capture_output=True, text=True) + + # start a secure broker + broker = Broker(config=broker_ssl_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_client_publish_acl(): + + broker = Broker() + await broker.start() + await asyncio.sleep(2) + + broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(2) + + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + +broker_ws_config = { + "listeners": { + "default": { + "type": "ws", + "bind": "0.0.0.0:8080", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} + +@pytest.mark.asyncio +async def test_client_publish_ws(): + # start a secure broker + broker = Broker(config=broker_ws_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +def test_client_subscribe(): + client_subscribe_main() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_acl(): + broker = Broker(config=broker_acl_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(5) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [128, 1, 128, 1, 128, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_client_subscribe_plugin_taboo(): + broker = Broker(config=broker_taboo_config) + await broker.start() + + broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py" + process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Send the interrupt signal + await asyncio.sleep(5) + process.send_signal(signal.SIGINT) + stdout, stderr = process.communicate() + logger.debug(stderr.decode("utf-8")) + assert "Subscribed results: [1, 1, 128, 1, 1, 1]" in stderr.decode("utf-8") + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() \ No newline at end of file diff --git a/uv.lock b/uv.lock index d1a0dc8..fd26736 100644 --- a/uv.lock +++ b/uv.lock @@ -9,11 +9,12 @@ resolution-markers = [ [[package]] name = "amqtt" -version = "0.11.0" +version = "0.11.1" source = { editable = "." } dependencies = [ { name = "dacite" }, { name = "passlib" }, + { name = "psutil" }, { name = "pyyaml" }, { name = "transitions" }, { name = "typer" }, @@ -69,6 +70,7 @@ requires-dist = [ { name = "coveralls", marker = "extra == 'ci'", specifier = "==4.0.1" }, { name = "dacite", specifier = ">=1.9.2" }, { name = "passlib", specifier = "==1.7.4" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "pyyaml", specifier = "==6.0.2" }, { name = "transitions", specifier = "==0.9.2" }, { name = "typer", specifier = "==0.15.4" },