From 1e1e2026d3ab0eb526ba5ad599c16deac3a11a6d Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Fri, 30 May 2025 18:50:31 -0400 Subject: [PATCH] cleaning up plugin interface and documentation --- amqtt/plugins/authentication.py | 31 +++++++++--------- amqtt/plugins/logging_amqtt.py | 13 +++----- amqtt/plugins/manager.py | 7 +++- amqtt/plugins/sys/broker.py | 6 ++-- amqtt/plugins/topic_checking.py | 39 ++++++++++------------ docs/custom_plugins.md | 36 +++++++++++++++++---- docs/packaged_plugins.md | 37 ++++++++++++--------- docs/references/broker_config.md | 48 ++++++++++++++++++++++++---- pyproject.toml | 3 ++ tests/plugins/test_topic_checking.py | 8 ++--- 10 files changed, 145 insertions(+), 83 deletions(-) diff --git a/amqtt/plugins/authentication.py b/amqtt/plugins/authentication.py index c23549e..0e843dc 100644 --- a/amqtt/plugins/authentication.py +++ b/amqtt/plugins/authentication.py @@ -3,35 +3,34 @@ from pathlib import Path from passlib.apps import custom_app_context as pwd_context from amqtt.broker import BrokerContext +from amqtt.plugins.manager import BaseContext, BasePlugin from amqtt.session import Session _PARTS_EXPECTED_LENGTH = 2 # Expected number of parts in a valid line -class BaseAuthPlugin: +class BaseAuthPlugin(BasePlugin): """Base class for authentication plugins.""" - def __init__(self, context: BrokerContext) -> None: - self.context = context + def __init__(self, context: BaseContext) -> None: + super().__init__(context) + self.auth_config = self.context.config.get("auth", None) if self.context.config else None if not self.auth_config: self.context.logger.warning("'auth' section not found in context configuration") - async def authenticate(self, *args: None, **kwargs: Session) -> bool | None: + async def authenticate(self, *, session: Session) -> bool | None: """Logic for session authentication. Args: - *args: positional arguments (not used) - **kwargs: payload from broker - ``` - session: amqtt.session.Session - ``` + session: amqtt.session.Session Returns: - `True` if user is authentication succeed, `False` if user authentication fails - `None` if authentication can't be achieved (then plugin result is then ignored) """ + if not self.auth_config: # auth config section not found self.context.logger.warning("'auth' section not found in context configuration") @@ -42,8 +41,8 @@ class BaseAuthPlugin: class AnonymousAuthPlugin(BaseAuthPlugin): """Authentication plugin allowing anonymous access.""" - async def authenticate(self, *args: None, **kwargs: Session) -> bool: - authenticated = await super().authenticate(*args, **kwargs) + async def authenticate(self, *, session: Session) -> bool: + authenticated = await super().authenticate(session=session) if authenticated: # Default to allowing anonymous allow_anonymous = self.auth_config.get("allow-anonymous", True) if isinstance(self.auth_config, dict) else True @@ -51,7 +50,7 @@ class AnonymousAuthPlugin(BaseAuthPlugin): self.context.logger.debug("Authentication success: config allows anonymous") return True - session: Session | None = kwargs.get("session") + session: Session | None = session if session and session.username: self.context.logger.debug(f"Authentication success: session has username '{session.username}'") return True @@ -62,7 +61,7 @@ class AnonymousAuthPlugin(BaseAuthPlugin): class FileAuthPlugin(BaseAuthPlugin): """Authentication plugin based on a file-stored user database.""" - def __init__(self, context: BrokerContext) -> None: + def __init__(self, context: BaseContext) -> None: super().__init__(context) self._users: dict[str, str] = {} self._read_password_file() @@ -95,11 +94,11 @@ class FileAuthPlugin(BaseAuthPlugin): except Exception: self.context.logger.exception(f"Unexpected error reading password file '{password_file}'") - async def authenticate(self, *args: None, **kwargs: Session) -> bool | None: + async def authenticate(self, *, session: Session) -> bool | None: """Authenticate users based on the file-stored user database.""" - authenticated = await super().authenticate(*args, **kwargs) + authenticated = await super().authenticate(session=session) if authenticated: - session = kwargs.get("session") + if not session: self.context.logger.debug("Authentication failure: no session provided") return False diff --git a/amqtt/plugins/logging_amqtt.py b/amqtt/plugins/logging_amqtt.py index 42124f8..7652cfd 100644 --- a/amqtt/plugins/logging_amqtt.py +++ b/amqtt/plugins/logging_amqtt.py @@ -3,18 +3,16 @@ from functools import partial import logging from typing import TYPE_CHECKING, Any -from amqtt.plugins.manager import BaseContext +from amqtt.plugins.manager import BasePlugin + if TYPE_CHECKING: from amqtt.session import Session -class EventLoggerPlugin: +class EventLoggerPlugin(BasePlugin): """A plugin to log events dynamically based on method names.""" - def __init__(self, context: BaseContext) -> None: - self.context = context - async def log_event(self, *args: Any, **kwargs: Any) -> None: """Log the occurrence of an event.""" event_name = kwargs["event_name"].replace("old", "") @@ -28,12 +26,9 @@ class EventLoggerPlugin: raise AttributeError(msg) -class PacketLoggerPlugin: +class PacketLoggerPlugin(BasePlugin): """A plugin to log MQTT packets sent and received.""" - def __init__(self, context: BaseContext) -> None: - self.context = context - async def on_mqtt_packet_received(self, *args: Any, **kwargs: Any) -> None: """Log an MQTT packet when it is received.""" packet = kwargs.get("packet") diff --git a/amqtt/plugins/manager.py b/amqtt/plugins/manager.py index e1db79a..20350e6 100644 --- a/amqtt/plugins/manager.py +++ b/amqtt/plugins/manager.py @@ -1,4 +1,4 @@ -__all__ = ["BaseContext", "PluginManager", "get_plugin_manager"] +__all__ = ["BaseContext", "PluginManager", "get_plugin_manager", "BasePlugin"] import asyncio from collections.abc import Awaitable, Callable @@ -35,6 +35,11 @@ class BaseContext: self.logger: logging.Logger = _LOGGER self.config: dict[str, Any] | None = None +class BasePlugin: + def __init__(self, context: BaseContext) -> None: + self.context = context + + class PluginManager: """Wraps contextlib Entry point mechanism to provide a basic plugin system. diff --git a/amqtt/plugins/sys/broker.py b/amqtt/plugins/sys/broker.py index 1ecd1ee..6116740 100644 --- a/amqtt/plugins/sys/broker.py +++ b/amqtt/plugins/sys/broker.py @@ -2,6 +2,8 @@ import asyncio from collections import deque # pylint: disable=C0412 from typing import SupportsIndex, SupportsInt # pylint: disable=C0412 +from amqtt.plugins.manager import BasePlugin + try: from collections.abc import Buffer except ImportError: @@ -40,9 +42,9 @@ STAT_CLIENTS_CONNECTED = "clients_connected" STAT_CLIENTS_DISCONNECTED = "clients_disconnected" -class BrokerSysPlugin: +class BrokerSysPlugin(BasePlugin): def __init__(self, context: BrokerContext) -> None: - self.context = context + super().__init__(context) # Broker statistics initialization self._stats: dict[str, int] = {} self._sys_handle: asyncio.Handle | None = None diff --git a/amqtt/plugins/topic_checking.py b/amqtt/plugins/topic_checking.py index e520203..2805078 100644 --- a/amqtt/plugins/topic_checking.py +++ b/amqtt/plugins/topic_checking.py @@ -1,30 +1,27 @@ from typing import Any from amqtt.broker import Action -from amqtt.plugins.manager import BaseContext +from amqtt.plugins.manager import BaseContext, BasePlugin +from amqtt.session import Session -class BaseTopicPlugin: +class BaseTopicPlugin(BasePlugin): """Base class for topic plugins.""" def __init__(self, context: BaseContext) -> None: - self.context = context + super().__init__(context) + self.topic_config: dict[str, Any] | None = self.context.config.get("topic-check", None) if self.context.config else None if self.topic_config is None: self.context.logger.warning("'topic-check' section not found in context configuration") - async def topic_filtering(self, *args: Any, **kwargs: Any) -> bool: + async def topic_filtering(self, *, session: Session = None, topic: str = None, action: Action = None) -> bool: """Logic for filtering out topics. Args: - *args: positional arguments (not used) - - **kwargs: payload from broker - ``` - session: amqtt.session.Session - topic: str - action: amqtt.broker.Action - ``` + session: amqtt.session.Session + topic: str + action: amqtt.broker.Action Returns: bool: `True` if topic is allowed, `False` otherwise @@ -32,7 +29,7 @@ class BaseTopicPlugin: """ if not self.topic_config: # auth config section not found - self.context.logger.warning("'auth' section not found in context configuration") + self.context.logger.warning("'topic-check' section not found in context configuration") return False return True @@ -42,11 +39,9 @@ class TopicTabooPlugin(BaseTopicPlugin): super().__init__(context) self._taboo: list[str] = ["prohibited", "top-secret", "data/classified"] - async def topic_filtering(self, *args: Any, **kwargs: Any) -> bool: - filter_result = await super().topic_filtering(*args, **kwargs) + async def topic_filtering(self, *, session: Session = None, topic: str = None, action: Action = None) -> bool: + filter_result = await super().topic_filtering(session=session, topic=topic, action=action) if filter_result: - session = kwargs.get("session") - topic = kwargs.get("topic") if session and session.username == "admin": return True return not (topic and topic in self._taboo) @@ -74,22 +69,20 @@ class TopicAccessControlListPlugin(BaseTopicPlugin): break return ret - async def topic_filtering(self, *args: Any, **kwargs: Any) -> bool: - filter_result = await super().topic_filtering(*args, **kwargs) + async def topic_filtering(self, *, session: Session = None, topic: str = None, action: Action = None) -> bool: + filter_result = await super().topic_filtering(session=session, topic=topic, action=action) if not filter_result: return False # hbmqtt and older amqtt do not support publish filtering - action = kwargs.get("action") if action == Action.PUBLISH and self.topic_config is not None and "publish-acl" not in self.topic_config: # maintain backward compatibility, assume permitted return True - req_topic = kwargs.get("topic") + req_topic = topic if not req_topic: return False - session = kwargs.get("session") username = session.username if session else None if username is None: username = "anonymous" @@ -100,7 +93,7 @@ class TopicAccessControlListPlugin(BaseTopicPlugin): elif self.topic_config is not None and action == Action.SUBSCRIBE: acl = self.topic_config.get("acl", {}) - allowed_topics = acl.get(username, None) + allowed_topics = acl.get(username, []) if not allowed_topics: return False diff --git a/docs/custom_plugins.md b/docs/custom_plugins.md index fe534d5..9b7df0d 100644 --- a/docs/custom_plugins.md +++ b/docs/custom_plugins.md @@ -1,7 +1,30 @@ # Custom Plugins -Every plugin listed in the `project.entry-points` is loaded and notified of events -by defining any of the following methods: +With the aMQTT Broker plugins framework, one can add additional functionality to the broker without +having to subclass or rewrite any of the core broker logic. To define a custom list of plugins to be loaded, +add this section to your `pyproject.toml`" + +```toml +[project.entry-points."mypackage.mymodule.plugins"] +plugin_alias = "module.submodule.file:ClassName" +``` + +and specify the namespace when instantiating the broker: + +```python +from amqtt.broker import Broker + +broker = Broker(plugin_namespace='mypackage.mymodule.plugins') + +``` + +Each plugin has access to the full configuration file through the provided `BaseContext` and can define +its own variables to configure its behavior. + +::: amqtt.plugins.manager.BasePlugin + +Plugins that are defined in the`project.entry-points` are loaded and notified of events by when the subclass +implements one or more of these methods: - `on_mqtt_packet_sent` - `on_mqtt_packet_received` @@ -18,7 +41,7 @@ by defining any of the following methods: ## Authentication Plugins -Of the plugins listed in `project.entry-points`, plugins can be used to validate client sessions +Of the plugins listed in `project.entry-points`, one or more can be used to validate client sessions by specifying their alias in `auth` > `plugins` section of the config: ```yaml @@ -27,22 +50,23 @@ auth: - plugin_alias_name ``` -These plugins should sub-class from `BaseAuthPlugin` and implement the `authenticate` method. +These plugins should subclass from `BaseAuthPlugin` and implement the `authenticate` method. ::: amqtt.plugins.authentication.BaseAuthPlugin ## Topic Filter Plugins -Of the plugins listed in `project.entry-points`, plugins can be used to validate client sessions +Of the plugins listed in `project.entry-points`, one or more can be used to determine topic access by specifying their alias in `topic-check` > `plugins` section of the config: ```yaml topic-check: + enable: True plugins: - plugin_alias_name ``` -These plugins should sub-class from `BaseTopicPlugin` and implement the `topic_filtering` method. +These plugins should subclass from `BaseTopicPlugin` and implement the `topic_filtering` method. ::: amqtt.plugins.topic_checking.BaseTopicPlugin diff --git a/docs/packaged_plugins.md b/docs/packaged_plugins.md index 5d2237c..c2502e4 100644 --- a/docs/packaged_plugins.md +++ b/docs/packaged_plugins.md @@ -1,12 +1,10 @@ # Existing Plugins With the aMQTT Broker plugins framework, one can add additional functionality without -having to rewrite core logic. The list of plugins that get loaded are specified in `pyproject.toml`; -each plugin can then check the configuration to determine how to behave (including disabling). +having to rewrite core logic. Plugins loaded by default are specified in `pyproject.toml`: -```toml -[project.entry-points."amqtt.broker.plugins"] -plugin_alias = "module.submodule.file:ClassName" +```yaml +--8<-- "pyproject.toml:included" ``` ## auth_anonymous (Auth Plugin) @@ -14,7 +12,7 @@ plugin_alias = "module.submodule.file:ClassName" `amqtt.plugins.authentication:AnonymousAuthPlugin` -**Config Options** +**Configuration** ```yaml auth: @@ -34,7 +32,7 @@ auth: clients are authorized by providing username and password, compared against file -**Config Options** +**Configuration** ```yaml @@ -64,7 +62,6 @@ print(sha512_crypt.hash(passwd)) `amqtt.plugins.topic_checking:TopicTabooPlugin` - Prevents using topics named: `prohibited`, `top-secret`, and `data/classified` **Configuration** @@ -82,6 +79,19 @@ topic-check: **Configuration** +- `acl` *(list)*: determines subscription access; if `publish-acl` is not specified, determine both publish and subscription access. + The list should be a key-value pair, where: +`:[, , ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`). + + +- `publish-acl` *(list)*: determines publish access. This parameter defines the list of access control rules; each item is a key-value pair, where: +`:[, , ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`). + + !!! info "Reserved usernames" + + - The username `admin` is allowed access to all topics. + - The username `anonymous` will control allowed topics, if using the `auth_anonymous` plugin. + ```yaml topic-check: enabled: true @@ -95,20 +105,17 @@ topic-check: - . ``` - - - - - ## Plugin: $SYS +`amqtt.plugins.sys.broker:BrokerSysPlugin` + Publishes, on a periodic basis, statistics about the broker -**Config Options** +**Configuration** - `sys_interval` - int, seconds between updates -### Supported Topics +**Supported Topics** - `$SYS/broker/load/bytes/received` - payload: `data`, int - `$SYS/broker/load/bytes/sent` - payload: `data`, int diff --git a/docs/references/broker_config.md b/docs/references/broker_config.md index 157679d..a2fd26c 100644 --- a/docs/references/broker_config.md +++ b/docs/references/broker_config.md @@ -33,9 +33,30 @@ Client disconnect timeout without a keep-alive Configuration for authentication behaviour: -- `plugins` *(list[string])*: defines the list of plugins which are activated as authentication plugins. Note the plugins must be defined in the `amqtt.broker.plugins` [entry point](https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/#using-package-metadata). -- `allow-anonymous` *(bool)*: used by the internal `amqtt.plugins.authentication.AnonymousAuthPlugin` plugin. This parameter enables (`on`) or disable anonymous connection, i.e. connection without username. -- `password-file` *(string)*: used by the internal `amqtt.plugins.authentication.FileAuthPlugin` plugin. Path to file which includes `username:password` pair, one per line. The password should be encoded using sha-512 with `mkpasswd -m sha-512` or: +- `plugins` *(list[string])*: defines the list of plugins which are activated as authentication plugins. + + !!! note "Entry points" + Plugins used here must first be defined in the `amqtt.broker.plugins` [entry point](https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/#using-package-metadata). + + + !!! danger "Legacy behavior" + if `plugins` is omitted from the `auth` section, all plugins listed in the `amqtt.broker.plugins` entrypoint will be enabled + for authentication, *including allowing anonymous login.* + + `plugins: []` will deny connections from all clients. + +- `allow-anonymous` *(bool)*: `True` will allow anonymous connections. + + *Used by the internal `amqtt.plugins.authentication.AnonymousAuthPlugin` plugin* + + !!! danger "Username only connections" + `False` does not disable the `auth_anonymous` plugin; connections will still be allowed as long as a username is provided. + + If security is required, do not include `auth_anonymous` in the `plugins` list. + + + +- `password-file` *(string)*: Path to file which includes `username:password` pair, one per line. The password should be encoded using sha-512 with `mkpasswd -m sha-512` or: ```python import sys from getpass import getpass @@ -44,6 +65,8 @@ Configuration for authentication behaviour: passwd = input() if not sys.stdin.isatty() else getpass() print(sha512_crypt.hash(passwd)) ``` + + *Used by the internal `amqtt.plugins.authentication.FileAuthPlugin` plugin.* ### `topic-check` *(mapping)* @@ -51,12 +74,23 @@ Configuration for access control policies for publishing and subscribing to topi - `enabled` *(bool)*: Enable access control policies (`true`). `false` will allow clients to publish and subscribe to any topic. - `plugins` *(list[string])*: defines the list of plugins which are activated as access control plugins. Note the plugins must be defined in the `amqtt.broker.plugins` [entry point](https://pythonhosted.org/setuptools/setuptools.html#dynamic-discovery-of-services-and-plugins). -- `acl` *(list)*: used by the internal `amqtt.plugins.topic_acl.TopicAclPlugin` plugin to determine subscription access. This parameter defines the list of access control rules; each item is a key-value pair, where: + +- `acl` *(list)*: plugin to determine subscription access; if `publish-acl` is not specified, determine both publish and subscription access. + The list should be a key-value pair, where: `:[, , ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`). -use `anonymous` username for the list of allowed topics if using the `auth_anonymous` plugin. -- `publish-acl` *(list)*: used by the internal `amqtt.plugins.topic_acl.TopicAclPlugin` plugin to determine publish access. This parameter defines the list of access control rules; each item is a key-value pair, where: + + *used by the `amqtt.plugins.topic_acl.TopicAclPlugin`* + +- `publish-acl` *(list)*: plugin to determine publish access. This parameter defines the list of access control rules; each item is a key-value pair, where: `:[, , ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`). -use `anonymous` username for the list of allowed topics if using the `auth_anonymous` plugin. + + !!! info "Reserved usernames" + + - The username `admin` is allowed access to all topic. + - The username `anonymous` will control allowed topics if using the `auth_anonymous` plugin. + + + *used by the `amqtt.plugins.topic_acl.TopicAclPlugin`* diff --git a/pyproject.toml b/pyproject.toml index 20da2bf..1e99711 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,7 @@ test_plugin = "tests.plugins.test_manager:EmptyTestPlugin" event_plugin = "tests.plugins.test_manager:EventTestPlugin" packet_logger_plugin = "amqtt.plugins.logging_amqtt:PacketLoggerPlugin" +# --8<-- [start:included] [project.entry-points."amqtt.broker.plugins"] event_logger_plugin = "amqtt.plugins.logging_amqtt:EventLoggerPlugin" packet_logger_plugin = "amqtt.plugins.logging_amqtt:PacketLoggerPlugin" @@ -104,6 +105,8 @@ auth_file = "amqtt.plugins.authentication:FileAuthPlugin" topic_taboo = "amqtt.plugins.topic_checking:TopicTabooPlugin" topic_acl = "amqtt.plugins.topic_checking:TopicAccessControlListPlugin" broker_sys = "amqtt.plugins.sys.broker:BrokerSysPlugin" +# --8<-- [end:included] + [project.entry-points."amqtt.client.plugins"] packet_logger_plugin = "amqtt.plugins.logging_amqtt:PacketLoggerPlugin" diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py index 6237be4..477fbdd 100644 --- a/tests/plugins/test_topic_checking.py +++ b/tests/plugins/test_topic_checking.py @@ -29,7 +29,7 @@ async def test_base_no_config(logdog): assert log_records[0].message == "'topic-check' section not found in context configuration" assert log_records[1].levelno == logging.WARNING - assert log_records[1].message == "'auth' section not found in context configuration" + assert log_records[1].message == "'topic-check' section not found in context configuration" assert pile.is_empty() @@ -49,7 +49,7 @@ async def test_base_empty_config(logdog): log_records = list(pile.drain(name="testlog")) assert len(log_records) == 1 assert log_records[0].levelno == logging.WARNING - assert log_records[0].message == "'auth' section not found in context configuration" + assert log_records[0].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio @@ -106,7 +106,7 @@ async def test_taboo_empty_config(logdog): assert log_records[0].levelno == logging.WARNING assert log_records[0].message == "'topic-check' section not found in context configuration" assert log_records[1].levelno == logging.WARNING - assert log_records[1].message == "'auth' section not found in context configuration" + assert log_records[1].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio @@ -264,7 +264,7 @@ async def test_taclp_empty_config(logdog): log_records = list(pile.drain(name="testlog")) assert len(log_records) == 2 assert log_records[0].message == "'topic-check' section not found in context configuration" - assert log_records[1].message == "'auth' section not found in context configuration" + assert log_records[1].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio