Merge branch '0.11.3-rc.1' into session_persistence

pull/256/head
Andrew Mirsky 2025-08-07 19:00:55 -04:00
commit 833fa07b0a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
18 zmienionych plików z 237 dodań i 165 usunięć

Wyświetl plik

@ -140,7 +140,7 @@ class Broker:
"""MQTT 3.1.1 compliant broker implementation. """MQTT 3.1.1 compliant broker implementation.
Args: Args:
config: dictionary of configuration options (see [broker configuration](broker_config.md)). config: `BrokerConfig` or dictionary of equivalent structure options (see [broker configuration](broker_config.md)).
loop: asyncio loop. defaults to `asyncio.new_event_loop()`. loop: asyncio loop. defaults to `asyncio.new_event_loop()`.
plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`. plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`.
@ -750,7 +750,8 @@ class Broker:
"""Stop a running handler and detach if from the session.""" """Stop a running handler and detach if from the session."""
try: try:
await handler.stop() await handler.stop()
except Exception: # a failure in stopping a handler shouldn't cause the broker to fail
except asyncio.QueueEmpty:
self.logger.exception("Failed to stop handler") self.logger.exception("Failed to stop handler")
async def _authenticate(self, session: Session, _: ListenerConfig) -> bool: async def _authenticate(self, session: Session, _: ListenerConfig) -> bool:
@ -913,7 +914,8 @@ class Broker:
task.result() task.result()
except CancelledError: except CancelledError:
self.logger.info(f"Task has been cancelled: {task}") self.logger.info(f"Task has been cancelled: {task}")
except Exception: # if a task fails, don't want it to cause the broker to fail
except Exception: # pylint: disable=W0718
self.logger.exception(f"Task failed and will be skipped: {task}") self.logger.exception(f"Task failed and will be skipped: {task}")
run_broadcast_task = asyncio.ensure_future(self._run_broadcast(running_tasks)) run_broadcast_task = asyncio.ensure_future(self._run_broadcast(running_tasks))

Wyświetl plik

@ -75,18 +75,15 @@ def mqtt_connected(func: _F) -> _F:
class MQTTClient: class MQTTClient:
"""MQTT client implementation. """MQTT client implementation, providing an API for connecting to a broker and send/receive messages using the MQTT protocol.
MQTTClient instances provides API for connecting to a broker and send/receive
messages using the MQTT protocol.
Args: Args:
client_id: MQTT client ID to use when connecting to the broker. If none, client_id: MQTT client ID to use when connecting to the broker. If none,
it will be generated randomly by `amqtt.utils.gen_client_id` it will be generated randomly by `amqtt.utils.gen_client_id`
config: dictionary of configuration options (see [client configuration](client_config.md)). config: `ClientConfig` or dictionary of equivalent structure options (see [client configuration](client_config.md)).
Raises: Raises:
PluginImportError: if importing a plugin from configuration PluginImportError: if importing a plugin from configuration fails
PluginInitError: if initialization plugin fails PluginInitError: if initialization plugin fails
""" """
@ -159,7 +156,8 @@ class MQTTClient:
except asyncio.CancelledError as e: except asyncio.CancelledError as e:
msg = "Future or Task was cancelled" msg = "Future or Task was cancelled"
raise ConnectError(msg) from e raise ConnectError(msg) from e
except Exception as e: # no matter the failure mode, still try to reconnect
except Exception as e: # pylint: disable=W0718
self.logger.warning(f"Connection failed: {e!r}") self.logger.warning(f"Connection failed: {e!r}")
if not self.config.get("auto_reconnect", False): if not self.config.get("auto_reconnect", False):
raise raise
@ -233,7 +231,8 @@ class MQTTClient:
except asyncio.CancelledError as e: except asyncio.CancelledError as e:
msg = "Future or Task was cancelled" msg = "Future or Task was cancelled"
raise ConnectError(msg) from e raise ConnectError(msg) from e
except Exception as e: # no matter the failure mode, still try to reconnect
except Exception as e: # pylint: disable=W0718
self.logger.warning(f"Reconnection attempt failed: {e!r}") self.logger.warning(f"Reconnection attempt failed: {e!r}")
self.logger.debug("", exc_info=True) self.logger.debug("", exc_info=True)
if 0 <= reconnect_retries < nb_attempt: if 0 <= reconnect_retries < nb_attempt:

Wyświetl plik

@ -154,9 +154,9 @@ class BrokerConfig(Dictable):
listeners: dict[Literal["default"] | str, ListenerConfig] = field(default_factory=default_listeners) # noqa: PYI051 listeners: dict[Literal["default"] | str, ListenerConfig] = field(default_factory=default_listeners) # noqa: PYI051
"""Network of listeners used by the services. a 'default' named listener is required; if another listener """Network of listeners used by the services. a 'default' named listener is required; if another listener
does not set a value, the 'default' settings are applied. See does not set a value, the 'default' settings are applied. See
[ListenerConfig](#amqtt.contexts.ListenerConfig) for more information.""" [`ListenerConfig`](broker_config.md#amqtt.contexts.ListenerConfig) for more information."""
sys_interval: int | None = None sys_interval: int | None = None
"""*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](#sys-topics) """*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](../plugins/packaged_plugins.md#sys-topics)
for recommended configuration.*""" for recommended configuration.*"""
timeout_disconnect_delay: int | None = 0 timeout_disconnect_delay: int | None = 0
"""Client disconnect timeout without a keep-alive.""" """Client disconnect timeout without a keep-alive."""
@ -164,17 +164,17 @@ class BrokerConfig(Dictable):
"""Seconds for an inactive session to be retained.""" """Seconds for an inactive session to be retained."""
auth: dict[str, Any] | None = None auth: dict[str, Any] | None = None
"""*Deprecated field used to config EntryPoint-loaded plugins. See """*Deprecated field used to config EntryPoint-loaded plugins. See
[`AnonymousAuthPlugin`](#anonymous-auth-plugin) and [`AnonymousAuthPlugin`](../plugins/packaged_plugins.md#anonymous-auth-plugin) and
[`FileAuthPlugin`](#password-file-auth-plugin) for recommended configuration.*""" [`FileAuthPlugin`](../plugins/packaged_plugins.md#password-file-auth-plugin) for recommended configuration.*"""
topic_check: dict[str, Any] | None = None topic_check: dict[str, Any] | None = None
"""*Deprecated field used to config EntryPoint-loaded plugins. See """*Deprecated field used to config EntryPoint-loaded plugins. See
[`TopicTabooPlugin`](#taboo-topic-plugin) and [`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and
[`TopicACLPlugin`](#acl-topic-plugin) for recommended configuration method.*""" [`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*"""
plugins: dict[str, Any] | list[str | dict[str,Any]] | None = field(default_factory=default_broker_plugins) plugins: dict[str, Any] | list[str | dict[str,Any]] | None = field(default_factory=default_broker_plugins)
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin` """The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin`
or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See
[Plugins](http://localhost:8000/custom_plugins/) for more information. `list[str | dict[str,Any]]` is not [custom plugins](../plugins/custom_plugins.md) for more information. `list[str | dict[str,Any]]` is deprecated but available
recommended but available to support legacy use cases.""" to support legacy use cases."""
def __post_init__(self) -> None: def __post_init__(self) -> None:
"""Check config for errors and transform fields for easier use.""" """Check config for errors and transform fields for easier use."""
@ -327,16 +327,16 @@ class ClientConfig(Dictable):
"""Specify the topics and what flags should be set for messages published to them.""" """Specify the topics and what flags should be set for messages published to them."""
broker: ConnectionConfig | None = field(default_factory=ConnectionConfig) broker: ConnectionConfig | None = field(default_factory=ConnectionConfig)
"""Configuration for connecting to the broker. See """Configuration for connecting to the broker. See
[ConnectionConfig](#amqtt.contexts.ConnectionConfig) for more information.""" [`ConnectionConfig`](client_config.md#amqtt.contexts.ConnectionConfig) for more information."""
plugins: dict[str, Any] | list[dict[str, Any]] | None = field(default_factory=default_client_plugins) plugins: dict[str, Any] | list[dict[str, Any]] | None = field(default_factory=default_client_plugins)
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`; the value is """The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`; the value is
a dictionary of configuration options for that plugin. See [Plugins](http://localhost:8000/custom_plugins/) a dictionary of configuration options for that plugin. See [custom plugins](../plugins/custom_plugins.md) for
for more information.""" more information. `list[str | dict[str,Any]]` is deprecated but available to support legacy use cases."""
check_hostname: bool | None = True check_hostname: bool | None = True
"""If establishing a secure connection, should the hostname of the certificate be verified.""" """If establishing a secure connection, should the hostname of the certificate be verified."""
will: WillConfig | None = None will: WillConfig | None = None
"""Message, topic and flags that should be sent to if the client disconnects. See """Message, topic and flags that should be sent to if the client disconnects. See
[WillConfig](#amqtt.contexts.WillConfig)""" [`WillConfig`](client_config.md#amqtt.contexts.WillConfig) for more information."""
def __post__init__(self) -> None: def __post__init__(self) -> None:
"""Check config for errors and transform fields for easier use.""" """Check config for errors and transform fields for easier use."""

Wyświetl plik

@ -20,7 +20,7 @@ def default_hash_scheme() -> list[str]:
return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"] return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"]
class UserAuthDBPlugin(BaseAuthPlugin, BaseTopicPlugin): class UserAuthDBPlugin(BaseAuthPlugin):
def __init__(self, context: BrokerContext) -> None: def __init__(self, context: BrokerContext) -> None:
super().__init__(context) super().__init__(context)

Wyświetl plik

@ -15,7 +15,7 @@ from aiohttp import ClientResponse, ClientSession, FormData
from amqtt.broker import BrokerContext from amqtt.broker import BrokerContext
from amqtt.contexts import Action from amqtt.contexts import Action
from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin
from amqtt.session import Session from amqtt.session import Session
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -48,7 +48,31 @@ HTTP_4xx_MIN = 400
HTTP_4xx_MAX = 499 HTTP_4xx_MAX = 499
class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin): @dataclass
class HttpConfig:
"""Configuration for the HTTP Auth & ACL Plugin."""
host: str
"""hostname of the server for the auth & acl check"""
port: int
"""port of the server for the auth & acl check"""
request_method: RequestMethod = RequestMethod.GET
"""send the request as a GET, POST or PUT"""
params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details
"""send the request with `JSON` or `FORM` data. *additional details below*"""
response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details
"""expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*"""
with_tls: bool = False
"""http or https"""
user_agent: str = "amqtt"
"""the 'User-Agent' header sent along with the request"""
superuser_uri: str | None = None
"""URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported"""
timeout: int = 5
"""duration, in seconds, to wait for the HTTP server to respond"""
class AuthHttpPlugin(BasePlugin[BrokerContext]):
def __init__(self, context: BrokerContext) -> None: def __init__(self, context: BrokerContext) -> None:
super().__init__(context) super().__init__(context)
@ -113,10 +137,23 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin):
def get_url(self, uri: str) -> str: def get_url(self, uri: str) -> str:
return f"{'https' if self.config.with_tls else 'http'}://{self.config.host}:{self.config.port}{uri}" return f"{'https' if self.config.with_tls else 'http'}://{self.config.host}:{self.config.port}{uri}"
class UserAuthHttpPlugin(AuthHttpPlugin, BaseAuthPlugin):
async def authenticate(self, *, session: Session) -> bool | None: async def authenticate(self, *, session: Session) -> bool | None:
d = {"username": session.username, "password": session.password, "client_id": session.client_id} d = {"username": session.username, "password": session.password, "client_id": session.client_id}
return await self._send_request(self.get_url(self.config.user_uri), d) return await self._send_request(self.get_url(self.config.user_uri), d)
@dataclass
class Config(HttpConfig):
"""Configuration for the HTTP Auth Plugin."""
user_uri: str = "/user"
"""URI of the auth check."""
class TopicAuthHttpPlugin(AuthHttpPlugin, BaseTopicPlugin):
async def topic_filtering(self, *, async def topic_filtering(self, *,
session: Session | None = None, session: Session | None = None,
topic: str | None = None, topic: str | None = None,
@ -136,28 +173,8 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin):
return await self._send_request(self.get_url(self.config.topic_uri), d) return await self._send_request(self.get_url(self.config.topic_uri), d)
@dataclass @dataclass
class Config: class Config(HttpConfig):
"""Configuration for the HTTP Auth & ACL Plugin.""" """Configuration for the HTTP Topic Plugin."""
host: str topic_uri: str = "/acl"
"""hostname of the server for the auth & acl check""" """URI of the topic check."""
port: int
"""port of the server for the auth & acl check"""
user_uri: str
"""URI of the topic check (e.g. '/user')"""
topic_uri: str
"""URI of the topic check (e.g. '/acl')"""
request_method: RequestMethod = RequestMethod.GET
"""send the request as a GET, POST or PUT"""
params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details
"""send the request with `JSON` or `FORM` data. *additional details below*"""
response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details
"""expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*"""
with_tls: bool = False
"""http or https"""
user_agent: str = "amqtt"
"""the 'User-Agent' header sent along with the request"""
superuser_uri: str | None = None
"""URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported"""
timeout: int = 5
"""duration, in seconds, to wait for the HTTP server to respond"""

Wyświetl plik

@ -535,7 +535,7 @@ class ProtocolHandler(Generic[C]):
self.handle_read_timeout() self.handle_read_timeout()
except NoDataError: except NoDataError:
self.logger.debug(f"{self.session.client_id} No data available") self.logger.debug(f"{self.session.client_id} No data available")
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001, pylint: disable=W0718
self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}") self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}")
break break
while running_tasks: while running_tasks:

Wyświetl plik

@ -37,9 +37,10 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
@dataclass @dataclass
class Config: class Config:
"""Allow empty username.""" """Configuration for AnonymousAuthPlugin."""
allow_anonymous: bool = field(default=True) allow_anonymous: bool = field(default=True)
"""Allow all anonymous authentication (even with _no_ username)."""
class FileAuthPlugin(BaseAuthPlugin): class FileAuthPlugin(BaseAuthPlugin):
@ -78,7 +79,7 @@ class FileAuthPlugin(BaseAuthPlugin):
self.context.logger.warning(f"Password file '{password_file}' not found") self.context.logger.warning(f"Password file '{password_file}' not found")
except ValueError: except ValueError:
self.context.logger.exception(f"Malformed password file '{password_file}'") self.context.logger.exception(f"Malformed password file '{password_file}'")
except Exception: except OSError:
self.context.logger.exception(f"Unexpected error reading password file '{password_file}'") self.context.logger.exception(f"Unexpected error reading password file '{password_file}'")
async def authenticate(self, *, session: Session) -> bool | None: async def authenticate(self, *, session: Session) -> bool | None:
@ -107,6 +108,7 @@ class FileAuthPlugin(BaseAuthPlugin):
@dataclass @dataclass
class Config: class Config:
"""Path to the properly encoded password file.""" """Configuration for FileAuthPlugin."""
password_file: str | Path | None = None password_file: str | Path | None = None
"""Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512."""

Wyświetl plik

@ -52,7 +52,7 @@ class MessageInput:
with Path(self.file).open(encoding="utf-8") as f: with Path(self.file).open(encoding="utf-8") as f:
for line in f: for line in f:
yield line.encode(encoding="utf-8") yield line.encode(encoding="utf-8")
except Exception: except (FileNotFoundError, OSError):
logger.exception(f"Failed to read file '{self.file}'") logger.exception(f"Failed to read file '{self.file}'")
if self.lines: if self.lines:
for line in sys.stdin: for line in sys.stdin:

Wyświetl plik

@ -1,7 +1,7 @@
# Relational Database for Authentication and Authorization # Relational Database for Authentication and Authorization
- `amqtt.contrib.auth_db.AuthUserDBPlugin` (authentication) verify a client's ability to connect to broker - `amqtt.contrib.auth_db.UserAuthDBPlugin` (authentication) verify a client's ability to connect to broker
- `amqtt.contrib.auth_db.AuthTopicDBPlugin` (authorization) determine a client's access to topics - `amqtt.contrib.auth_db.TopicAuthDBPlugin` (authorization) determine a client's access to topics
Relational database access is supported using SQLAlchemy so MySQL, MariaDB, Postgres and SQLite support is available. Relational database access is supported using SQLAlchemy so MySQL, MariaDB, Postgres and SQLite support is available.

Wyświetl plik

@ -1,9 +1,16 @@
# Authentication & Authorization via external HTTP server # Authentication & Authorization via external HTTP server
`amqtt.contrib.http.HttpAuthPlugin`
If clients accessing the broker are managed by another application, it can implement API endpoints If clients accessing the broker are managed by another application, it can implement API endpoints
that respond with information about client authenticated and topic-level authorization. that respond with information about client authentication and/or topic-level authorization.
- `amqtt.contrib.http.UserAuthHttpPlugin` (client authentication)
- `amqtt.contrib.http.TopicAuthHttpPlugin` (topic authorization)
Configuration of these plugins is identical (except for the uri name) so that they can be used independently, if desired.
## User Auth
See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`.
!!! info "browser-based mqtt over websockets" !!! info "browser-based mqtt over websockets"
@ -17,9 +24,9 @@ that respond with information about client authenticated and topic-level authori
??? info "recipe for authentication" ??? info "recipe for authentication"
Provide the client id and username when webpage is initially rendered or passed to the mqtt initialization from stored Provide the client id and username when webpage is initially rendered or passed to the mqtt initialization from stored
cookies. If application is secure, the user's password will be hashed should be encrypted and cannot be used to cookies. If application is secure, the user's password will already be stored as a hashed value and, therefore, cannot
authenticate a client. Instead, the application should create an encrypted password (eg jwt) which the server be used in this context to authenticate a client. Instead, the application should create its own encrypted key (eg jwt)
can then verify when the broker contacts the application. which the server can then verify when the broker contacts the application.
??? example "mqtt in javascript" ??? example "mqtt in javascript"
Example initialization of mqtt in javascript: Example initialization of mqtt in javascript:
@ -35,9 +42,18 @@ that respond with information about client authenticated and topic-level authori
try { try {
const clientMqtt = await mqtt.connect(url, options); const clientMqtt = await mqtt.connect(url, options);
See the 'Request and Response Modes' section below for details on `params_mode` and `response_mode`. ::: amqtt.contrib.http.UserAuthHttpPlugin.Config
options:
show_source: false
heading_level: 4
extra:
class_style: "simple"
::: amqtt.contrib.http.HttpAuthPlugin.Config ## Topic ACL
See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`.
::: amqtt.contrib.http.TopicAuthHttpPlugin.Config
options: options:
show_source: false show_source: false
heading_level: 4 heading_level: 4
@ -45,7 +61,8 @@ See the 'Request and Response Modes' section below for details on `params_mode`
class_style: "simple" class_style: "simple"
[//]: # (manually creating the heading so it doesn't show in the sidebar ToC) [//]: # (manually creating the heading so it doesn't show in the sidebar ToC)
<h2>Params and Response Modes</h2> [](){#request-response-modes}
<h2>Request and Response Modes</h2>
Each URI endpoint will receive different information in order to determine authentication and authorization; Each URI endpoint will receive different information in order to determine authentication and authorization;
format will depend on `params_mode` configuration attribute (`json` or `form`).: format will depend on `params_mode` configuration attribute (`json` or `form`).:
@ -56,10 +73,6 @@ format will depend on `params_mode` configuration attribute (`json` or `form`).:
- password *(str)* - password *(str)*
- client_id *(str)* - client_id *(str)*
*For superuser validation, the request will contain:*
- username *(str)*
*For acl check, the request will contain:* *For acl check, the request will contain:*
- username *(str)* - username *(str)*
@ -69,23 +82,25 @@ format will depend on `params_mode` configuration attribute (`json` or `form`).:
All endpoints should respond with the following, dependent on `response_mode` configuration attribute: All endpoints should respond with the following, dependent on `response_mode` configuration attribute:
*In `status` mode: *In `status` mode:*
- status code: 2xx (granted) or 4xx(denied) or 5xx (noop) - status code: 2xx (granted) or 4xx(denied) or 5xx (noop)
!!! note "5xx response" !!! note "5xx response"
**noop** (no operation): plugin will not participate in the filtering operation and will defer to another **noop** (no operation): plugin will not participate in the operation and will defer to another
topic filtering plugin to determine access. if there is no other topic filtering plugin, access will be denied. plugin to determine access. if there is no other auth/filtering plugin, access will be denied.
*In `json` mode:* *In `json` mode:*
- status code: 2xx - status code: 2xx
- content-type: application/json - content-type: application/json
- response: {'ok': True } (granted), {'ok': False, 'error': 'optional error message' } (denied) or { 'error': 'optional error message' } (noop) - response: {'ok': True } (granted)
or {'ok': False, 'error': 'optional error message' } (denied)
or { 'error': 'optional error message' } (noop)
!!! note "excluded 'ok' key" !!! note "excluded 'ok' key"
**noop** (no operation): plugin will not participate in the filtering operation and will defer to another **noop** (no operation): plugin will not participate in the operation and will defer to another
topic filtering plugin to determine access. if there is no other topic filtering plugin, access will be denied. plugin to determine access. if there is no other auth/filtering plugin, access will be denied.
*In `text` mode:* *In `text` mode:*

Wyświetl plik

@ -31,7 +31,7 @@ and configured for the broker:
## Client ## Client
By default, the `PacketLoggerPlugin` is activated and configured for the client: By default, the `PacketLoggerPlugin` is activated and configured for the client:
```yaml ```yaml
--8<-- "amqtt/scripts/default_client.yaml" --8<-- "amqtt/scripts/default_client.yaml"
@ -43,13 +43,13 @@ By default, the `PacketLoggerPlugin` is activated and configured for the clien
`amqtt.plugins.authentication.AnonymousAuthPlugin` `amqtt.plugins.authentication.AnonymousAuthPlugin`
**Configuration** Authentication plugin allowing anonymous access.
```yaml ::: amqtt.plugins.authentication.AnonymousAuthPlugin.Config
plugins: options:
amqtt.plugins.authentication.AnonymousAuthPlugin: heading_level: 4
allow_anonymous: false extra:
``` class_style: "simple"
!!! danger !!! danger
even if `allow_anonymous` is set to `false`, the plugin will still allow access if a username is provided by the client even if `allow_anonymous` is set to `false`, the plugin will still allow access if a username is provided by the client
@ -69,15 +69,14 @@ plugins:
`amqtt.plugins.authentication.FileAuthPlugin` `amqtt.plugins.authentication.FileAuthPlugin`
clients are authorized by providing username and password, compared against file Authentication plugin based on a file-stored user database.
**Configuration** ::: amqtt.plugins.authentication.FileAuthPlugin.Config
options:
heading_level: 4
extra:
class_style: "simple"
```yaml
plugins:
amqtt.plugins.authentication.FileAuthPlugin:
password_file: /path/to/password_file
```
??? warning "EntryPoint-style configuration is deprecated" ??? warning "EntryPoint-style configuration is deprecated"
```yaml ```yaml

Wyświetl plik

@ -21,16 +21,5 @@ The `amqtt.broker` module provides the following key methods in the `Broker` cla
- `start()`: Starts the broker and begins serving - `start()`: Starts the broker and begins serving
- `shutdown()`: Gracefully shuts down the broker - `shutdown()`: Gracefully shuts down the broker
### Broker configuration
The `Broker` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
used by the included broker script: [broker configuration](broker_config.md)
::: amqtt.broker.Broker ::: amqtt.broker.Broker
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.

Wyświetl plik

@ -129,15 +129,4 @@ amqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed
Both coroutines have the same results except that `test_coro2()` manages messages flow in parallel which may be more efficient. Both coroutines have the same results except that `test_coro2()` manages messages flow in parallel which may be more efficient.
### Client configuration
The `MQTTClient` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
used by the included broker script: [client configuration](client_config.md)
::: amqtt.client.MQTTClient ::: amqtt.client.MQTTClient
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.

Wyświetl plik

@ -2,20 +2,26 @@
This document describes `aMQTT` common API both used by [MQTT Client](client.md) and [Broker](broker.md). This document describes `aMQTT` common API both used by [MQTT Client](client.md) and [Broker](broker.md).
## Reference ## ApplicationMessage
### ApplicationMessage ::: amqtt.session.ApplicationMessage
options:
heading_level: 3
The `amqtt.session` module provides the following message classes: ## IncomingApplicationMessage
#### ApplicationMessage Represents messages received from MQTT clients.
Base class for MQTT application messages. ::: amqtt.session.IncomingApplicationMessage
options:
heading_level: 3
#### IncomingApplicationMessage
Inherits from ApplicationMessage. Represents messages received from MQTT clients. ## OutgoingApplicationMessage
#### OutgoingApplicationMessage
Inherits from ApplicationMessage. Represents messages to be sent to MQTT clients. Inherits from ApplicationMessage. Represents messages to be sent to MQTT clients.
::: amqtt.session.OutgoingApplicationMessage
options:
heading_level: 3

Wyświetl plik

@ -37,6 +37,9 @@ nav:
- Broker: references/broker.md - Broker: references/broker.md
- Client: references/client.md - Client: references/client.md
- Common: references/common.md - Common: references/common.md
- Configuration:
- Broker: references/broker_config.md
- Client: references/client_config.md
- Plugins: - Plugins:
- Packaged: plugins/packaged_plugins.md - Packaged: plugins/packaged_plugins.md
- Custom: plugins/custom_plugins.md - Custom: plugins/custom_plugins.md
@ -44,9 +47,6 @@ nav:
- plugins/contrib.md - plugins/contrib.md
- Database Auth: plugins/auth_db.md - Database Auth: plugins/auth_db.md
- HTTP Auth: plugins/http.md - HTTP Auth: plugins/http.md
- Configuration:
- Broker: references/broker_config.md
- Client: references/client_config.md
- Reference: - Reference:
- Containerization: docker.md - Containerization: docker.md
- Support: support.md - Support: support.md

Wyświetl plik

@ -243,6 +243,7 @@ jobs = 2
ignore = ["tests"] ignore = ["tests"]
fail-on = ["I"] fail-on = ["I"]
max-line-length = 130 max-line-length = 130
ignore-paths = ["amqtt/plugins/persistence.py"]
[tool.pylint.BASIC] [tool.pylint.BASIC]
# Good variable names which should always be accepted, separated by a comma. # Good variable names which should always be accepted, separated by a comma.
@ -253,7 +254,6 @@ good-names = ["i", "j", "k", "e", "ex", "f", "_", "T", "x", "y", "id", "tg"]
# duplicate-code - unavoidable # duplicate-code - unavoidable
# too-many-* - are not enforced for the sake of readability # too-many-* - are not enforced for the sake of readability
disable = [ disable = [
"broad-exception-caught", # TODO: improve later
"duplicate-code", "duplicate-code",
"fixme", "fixme",
"invalid-name", "invalid-name",

Wyświetl plik

@ -8,7 +8,7 @@ from aiohttp.web import Response
from amqtt.broker import BrokerContext, Broker from amqtt.broker import BrokerContext, Broker
from amqtt.contexts import Action from amqtt.contexts import Action
from amqtt.contrib.http import HttpAuthPlugin, ParamsMode, ResponseMode, RequestMethod from amqtt.contrib.http import UserAuthHttpPlugin, TopicAuthHttpPlugin, ParamsMode, ResponseMode, RequestMethod
from amqtt.session import Session from amqtt.session import Session
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -91,73 +91,98 @@ def test_server_up_and_down(http_server):
pass pass
class TestKind(Enum): class TypeOfHttpTest(Enum):
AUTH = auto() AUTH = auto()
ACL = auto() ACL = auto()
def generate_use_cases(): def generate_use_cases(kind: TypeOfHttpTest):
# generate all variations of the plugin's configuration options for both auth and topic # generate all variations of the plugin's configuration options for both auth and topic
# e.g. (TestKind.AUTH, '/user/json', RequestMethod.GET, ParamsMode.JSON, ResponseMode.JSON, 'json', 'json', True), # e.g. (TestKind.AUTH, '/user/json', RequestMethod.GET, ParamsMode.JSON, ResponseMode.JSON, 'json', 'json', True),
cases: list[tuple[str, str, RequestMethod, ParamsMode, ResponseMode, str, str, bool]] = [] cases: list[tuple[str, str, RequestMethod, ParamsMode, ResponseMode, str, str, bool]] = []
for kind in TestKind:
root_url = 'user' if kind == TestKind.AUTH else 'acl'
for request in RequestMethod:
for params in ParamsMode:
for response in ResponseMode:
url = f'/{root_url}/json' if params == ParamsMode.JSON else f'/{root_url}/form'
for is_authenticated in [True, False, None]:
if is_authenticated is None:
pwd = 'i_am_null'
elif is_authenticated:
pwd = f'{response.value}'
else:
pwd = f'not{response.value}'
if response == ResponseMode.TEXT and is_authenticated is None: root_url = 'user' if kind == TypeOfHttpTest.AUTH else 'acl'
is_authenticated = False for request in RequestMethod:
for params in ParamsMode:
for response in ResponseMode:
url = f'/{root_url}/json' if params == ParamsMode.JSON else f'/{root_url}/form'
for is_authenticated in [True, False, None]:
if is_authenticated is None:
pwd = 'i_am_null'
elif is_authenticated:
pwd = f'{response.value}'
else:
pwd = f'not{response.value}'
case = (kind, url, request, params, response, response.value, f"{pwd}", is_authenticated) if response == ResponseMode.TEXT and is_authenticated is None:
cases.append(case) is_authenticated = False
case = (kind, url, request, params, response, response.value, f"{pwd}", is_authenticated)
cases.append(case)
return cases return cases
def test_generated_use_cases(): def test_generated_use_cases():
cases = generate_use_cases() cases = generate_use_cases(kind=TypeOfHttpTest.AUTH)
assert len(cases) == 108 assert len(cases) == 54
@pytest.mark.parametrize("kind,url,request_method,params_mode,response_mode,username,matcher,is_allowed", @pytest.mark.parametrize("kind,url,request_method,params_mode,response_mode,username,matcher,is_allowed",
generate_use_cases()) generate_use_cases(TypeOfHttpTest.AUTH))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_request_acl_response(empty_broker, http_server, kind, url, async def test_request_auth_response(empty_broker, http_server, kind, url,
request_method, params_mode, response_mode, request_method, params_mode, response_mode,
username, matcher, is_allowed): username, matcher, is_allowed):
context = BrokerContext(broker=empty_broker) context = BrokerContext(broker=empty_broker)
context.config = HttpAuthPlugin.Config( context.config = UserAuthHttpPlugin.Config(
host="127.0.0.1", host="127.0.0.1",
port=8080, port=8080,
user_uri=url, user_uri=url,
request_method=request_method,
params_mode=params_mode,
response_mode=response_mode,
)
http_acl = UserAuthHttpPlugin(context)
logger.warning(f'kind is {kind}')
session = Session()
session.client_id = "my_client_id"
session.username = username
session.password = matcher
assert await http_acl.authenticate(session=session) == is_allowed
await http_acl.on_broker_pre_shutdown()
@pytest.mark.parametrize("kind,url,request_method,params_mode,response_mode,username,matcher,is_allowed",
generate_use_cases(TypeOfHttpTest.ACL))
@pytest.mark.asyncio
async def test_request_topic_response(empty_broker, http_server, kind, url,
request_method, params_mode, response_mode,
username, matcher, is_allowed):
context = BrokerContext(broker=empty_broker)
context.config = TopicAuthHttpPlugin.Config(
host="127.0.0.1",
port=8080,
topic_uri=url, topic_uri=url,
request_method=request_method, request_method=request_method,
params_mode=params_mode, params_mode=params_mode,
response_mode=response_mode, response_mode=response_mode,
) )
http_acl = HttpAuthPlugin(context) http_acl = TopicAuthHttpPlugin(context)
logger.warning(f'kind is {kind}')
if kind == TestKind.ACL: s = Session()
s = Session() s.username = username
s.username = username s.client_id = matcher
s.client_id = matcher t = 'my/topic'
t = 'my/topic' a = Action.PUBLISH
a = Action.PUBLISH assert await http_acl.topic_filtering(session=s, topic=t, action=a) == is_allowed
assert await http_acl.topic_filtering(session=s, topic=t, action=a) == is_allowed
else:
session = Session()
session.client_id = "my_client_id"
session.username = username
session.password = matcher
assert await http_acl.authenticate(session=session) == is_allowed
await http_acl.on_broker_pre_shutdown() await http_acl.on_broker_pre_shutdown()
# if kind == TestKind.ACL:
# else:

Wyświetl plik

@ -934,6 +934,35 @@ async def test_broker_socket_open_close(broker):
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
s.close() s.close()
std_legacy_config = {
"listeners": {
"default": {
"type": "tcp",
"bind": f"127.0.0.1:1883",
}
},
"sys_interval": 10,
"auth": {
"allow-anonymous": True,
"plugins": ["auth_anonymous"],
},
"topic-check": {"enabled": False},
}
@pytest.mark.asyncio
async def test_broker_with_legacy_config():
broker = Broker(config=std_legacy_config)
await broker.start()
await asyncio.sleep(2)
mqtt_client = MQTTClient(config={'auto_reconnect': False})
await mqtt_client.connect()
await broker.shutdown()
legacy_config_empty_auth_plugin_list = { legacy_config_empty_auth_plugin_list = {
"listeners": { "listeners": {