kopia lustrzana https://github.com/Yakifo/amqtt
Merge branch '0.11.3-rc.1' into plugin_shadows
commit
2eed8c848f
|
@ -124,7 +124,7 @@ class Broker:
|
|||
"""MQTT 3.1.1 compliant broker implementation.
|
||||
|
||||
Args:
|
||||
config: dictionary of configuration options (see [broker configuration](broker_config.md)).
|
||||
config: `BrokerConfig` or dictionary of equivalent structure options (see [broker configuration](broker_config.md)).
|
||||
loop: asyncio loop. defaults to `asyncio.new_event_loop()`.
|
||||
plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`.
|
||||
|
||||
|
@ -724,7 +724,8 @@ class Broker:
|
|||
"""Stop a running handler and detach if from the session."""
|
||||
try:
|
||||
await handler.stop()
|
||||
except Exception:
|
||||
# a failure in stopping a handler shouldn't cause the broker to fail
|
||||
except asyncio.QueueEmpty:
|
||||
self.logger.exception("Failed to stop handler")
|
||||
|
||||
async def _authenticate(self, session: Session, _: ListenerConfig) -> bool:
|
||||
|
@ -874,7 +875,8 @@ class Broker:
|
|||
task.result()
|
||||
except CancelledError:
|
||||
self.logger.info(f"Task has been cancelled: {task}")
|
||||
except Exception:
|
||||
# if a task fails, don't want it to cause the broker to fail
|
||||
except Exception: # pylint: disable=W0718
|
||||
self.logger.exception(f"Task failed and will be skipped: {task}")
|
||||
|
||||
run_broadcast_task = asyncio.ensure_future(self._run_broadcast(running_tasks))
|
||||
|
|
|
@ -75,18 +75,15 @@ def mqtt_connected(func: _F) -> _F:
|
|||
|
||||
|
||||
class MQTTClient:
|
||||
"""MQTT client implementation.
|
||||
|
||||
MQTTClient instances provides API for connecting to a broker and send/receive
|
||||
messages using the MQTT protocol.
|
||||
"""MQTT client implementation, providing an API for connecting to a broker and send/receive messages using the MQTT protocol.
|
||||
|
||||
Args:
|
||||
client_id: MQTT client ID to use when connecting to the broker. If none,
|
||||
it will be generated randomly by `amqtt.utils.gen_client_id`
|
||||
config: dictionary of configuration options (see [client configuration](client_config.md)).
|
||||
config: `ClientConfig` or dictionary of equivalent structure options (see [client configuration](client_config.md)).
|
||||
|
||||
Raises:
|
||||
PluginImportError: if importing a plugin from configuration
|
||||
PluginImportError: if importing a plugin from configuration fails
|
||||
PluginInitError: if initialization plugin fails
|
||||
|
||||
"""
|
||||
|
@ -159,7 +156,8 @@ class MQTTClient:
|
|||
except asyncio.CancelledError as e:
|
||||
msg = "Future or Task was cancelled"
|
||||
raise ConnectError(msg) from e
|
||||
except Exception as e:
|
||||
# no matter the failure mode, still try to reconnect
|
||||
except Exception as e: # pylint: disable=W0718
|
||||
self.logger.warning(f"Connection failed: {e!r}")
|
||||
if not self.config.get("auto_reconnect", False):
|
||||
raise
|
||||
|
@ -233,7 +231,8 @@ class MQTTClient:
|
|||
except asyncio.CancelledError as e:
|
||||
msg = "Future or Task was cancelled"
|
||||
raise ConnectError(msg) from e
|
||||
except Exception as e:
|
||||
# no matter the failure mode, still try to reconnect
|
||||
except Exception as e: # pylint: disable=W0718
|
||||
self.logger.warning(f"Reconnection attempt failed: {e!r}")
|
||||
self.logger.debug("", exc_info=True)
|
||||
if 0 <= reconnect_retries < nb_attempt:
|
||||
|
|
|
@ -154,9 +154,9 @@ class BrokerConfig(Dictable):
|
|||
listeners: dict[Literal["default"] | str, ListenerConfig] = field(default_factory=default_listeners) # noqa: PYI051
|
||||
"""Network of listeners used by the services. a 'default' named listener is required; if another listener
|
||||
does not set a value, the 'default' settings are applied. See
|
||||
[ListenerConfig](#amqtt.contexts.ListenerConfig) for more information."""
|
||||
[`ListenerConfig`](broker_config.md#amqtt.contexts.ListenerConfig) for more information."""
|
||||
sys_interval: int | None = None
|
||||
"""*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](#sys-topics)
|
||||
"""*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](../plugins/packaged_plugins.md#sys-topics)
|
||||
for recommended configuration.*"""
|
||||
timeout_disconnect_delay: int | None = 0
|
||||
"""Client disconnect timeout without a keep-alive."""
|
||||
|
@ -164,17 +164,17 @@ class BrokerConfig(Dictable):
|
|||
"""Seconds for an inactive session to be retained."""
|
||||
auth: dict[str, Any] | None = None
|
||||
"""*Deprecated field used to config EntryPoint-loaded plugins. See
|
||||
[`AnonymousAuthPlugin`](#anonymous-auth-plugin) and
|
||||
[`FileAuthPlugin`](#password-file-auth-plugin) for recommended configuration.*"""
|
||||
[`AnonymousAuthPlugin`](../plugins/packaged_plugins.md#anonymous-auth-plugin) and
|
||||
[`FileAuthPlugin`](../plugins/packaged_plugins.md#password-file-auth-plugin) for recommended configuration.*"""
|
||||
topic_check: dict[str, Any] | None = None
|
||||
"""*Deprecated field used to config EntryPoint-loaded plugins. See
|
||||
[`TopicTabooPlugin`](#taboo-topic-plugin) and
|
||||
[`TopicACLPlugin`](#acl-topic-plugin) for recommended configuration method.*"""
|
||||
[`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and
|
||||
[`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*"""
|
||||
plugins: dict[str, Any] | list[str | dict[str,Any]] | None = field(default_factory=default_broker_plugins)
|
||||
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin`
|
||||
or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See
|
||||
[Plugins](http://localhost:8000/custom_plugins/) for more information. `list[str | dict[str,Any]]` is not
|
||||
recommended but available to support legacy use cases."""
|
||||
[custom plugins](../plugins/custom_plugins.md) for more information. `list[str | dict[str,Any]]` is deprecated but available
|
||||
to support legacy use cases."""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Check config for errors and transform fields for easier use."""
|
||||
|
@ -327,16 +327,16 @@ class ClientConfig(Dictable):
|
|||
"""Specify the topics and what flags should be set for messages published to them."""
|
||||
broker: ConnectionConfig | None = field(default_factory=ConnectionConfig)
|
||||
"""Configuration for connecting to the broker. See
|
||||
[ConnectionConfig](#amqtt.contexts.ConnectionConfig) for more information."""
|
||||
[`ConnectionConfig`](client_config.md#amqtt.contexts.ConnectionConfig) for more information."""
|
||||
plugins: dict[str, Any] | list[dict[str, Any]] | None = field(default_factory=default_client_plugins)
|
||||
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`; the value is
|
||||
a dictionary of configuration options for that plugin. See [Plugins](http://localhost:8000/custom_plugins/)
|
||||
for more information."""
|
||||
a dictionary of configuration options for that plugin. See [custom plugins](../plugins/custom_plugins.md) for
|
||||
more information. `list[str | dict[str,Any]]` is deprecated but available to support legacy use cases."""
|
||||
check_hostname: bool | None = True
|
||||
"""If establishing a secure connection, should the hostname of the certificate be verified."""
|
||||
will: WillConfig | None = None
|
||||
"""Message, topic and flags that should be sent to if the client disconnects. See
|
||||
[WillConfig](#amqtt.contexts.WillConfig)"""
|
||||
[`WillConfig`](client_config.md#amqtt.contexts.WillConfig) for more information."""
|
||||
|
||||
def __post__init__(self) -> None:
|
||||
"""Check config for errors and transform fields for easier use."""
|
||||
|
|
|
@ -20,7 +20,7 @@ def default_hash_scheme() -> list[str]:
|
|||
return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"]
|
||||
|
||||
|
||||
class UserAuthDBPlugin(BaseAuthPlugin, BaseTopicPlugin):
|
||||
class UserAuthDBPlugin(BaseAuthPlugin):
|
||||
|
||||
def __init__(self, context: BrokerContext) -> None:
|
||||
super().__init__(context)
|
||||
|
|
|
@ -15,7 +15,7 @@ from aiohttp import ClientResponse, ClientSession, FormData
|
|||
|
||||
from amqtt.broker import BrokerContext
|
||||
from amqtt.contexts import Action
|
||||
from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin
|
||||
from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin
|
||||
from amqtt.session import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -48,7 +48,31 @@ HTTP_4xx_MIN = 400
|
|||
HTTP_4xx_MAX = 499
|
||||
|
||||
|
||||
class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin):
|
||||
@dataclass
|
||||
class HttpConfig:
|
||||
"""Configuration for the HTTP Auth & ACL Plugin."""
|
||||
|
||||
host: str
|
||||
"""hostname of the server for the auth & acl check"""
|
||||
port: int
|
||||
"""port of the server for the auth & acl check"""
|
||||
request_method: RequestMethod = RequestMethod.GET
|
||||
"""send the request as a GET, POST or PUT"""
|
||||
params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details
|
||||
"""send the request with `JSON` or `FORM` data. *additional details below*"""
|
||||
response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details
|
||||
"""expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*"""
|
||||
with_tls: bool = False
|
||||
"""http or https"""
|
||||
user_agent: str = "amqtt"
|
||||
"""the 'User-Agent' header sent along with the request"""
|
||||
superuser_uri: str | None = None
|
||||
"""URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported"""
|
||||
timeout: int = 5
|
||||
"""duration, in seconds, to wait for the HTTP server to respond"""
|
||||
|
||||
|
||||
class AuthHttpPlugin(BasePlugin[BrokerContext]):
|
||||
|
||||
def __init__(self, context: BrokerContext) -> None:
|
||||
super().__init__(context)
|
||||
|
@ -113,10 +137,23 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin):
|
|||
def get_url(self, uri: str) -> str:
|
||||
return f"{'https' if self.config.with_tls else 'http'}://{self.config.host}:{self.config.port}{uri}"
|
||||
|
||||
|
||||
class UserAuthHttpPlugin(AuthHttpPlugin, BaseAuthPlugin):
|
||||
|
||||
async def authenticate(self, *, session: Session) -> bool | None:
|
||||
d = {"username": session.username, "password": session.password, "client_id": session.client_id}
|
||||
return await self._send_request(self.get_url(self.config.user_uri), d)
|
||||
|
||||
@dataclass
|
||||
class Config(HttpConfig):
|
||||
"""Configuration for the HTTP Auth Plugin."""
|
||||
|
||||
user_uri: str = "/user"
|
||||
"""URI of the auth check."""
|
||||
|
||||
|
||||
class TopicAuthHttpPlugin(AuthHttpPlugin, BaseTopicPlugin):
|
||||
|
||||
async def topic_filtering(self, *,
|
||||
session: Session | None = None,
|
||||
topic: str | None = None,
|
||||
|
@ -136,28 +173,8 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin):
|
|||
return await self._send_request(self.get_url(self.config.topic_uri), d)
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Configuration for the HTTP Auth & ACL Plugin."""
|
||||
class Config(HttpConfig):
|
||||
"""Configuration for the HTTP Topic Plugin."""
|
||||
|
||||
host: str
|
||||
"""hostname of the server for the auth & acl check"""
|
||||
port: int
|
||||
"""port of the server for the auth & acl check"""
|
||||
user_uri: str
|
||||
"""URI of the topic check (e.g. '/user')"""
|
||||
topic_uri: str
|
||||
"""URI of the topic check (e.g. '/acl')"""
|
||||
request_method: RequestMethod = RequestMethod.GET
|
||||
"""send the request as a GET, POST or PUT"""
|
||||
params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details
|
||||
"""send the request with `JSON` or `FORM` data. *additional details below*"""
|
||||
response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details
|
||||
"""expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*"""
|
||||
with_tls: bool = False
|
||||
"""http or https"""
|
||||
user_agent: str = "amqtt"
|
||||
"""the 'User-Agent' header sent along with the request"""
|
||||
superuser_uri: str | None = None
|
||||
"""URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported"""
|
||||
timeout: int = 5
|
||||
"""duration, in seconds, to wait for the HTTP server to respond"""
|
||||
topic_uri: str = "/acl"
|
||||
"""URI of the topic check."""
|
||||
|
|
|
@ -535,7 +535,7 @@ class ProtocolHandler(Generic[C]):
|
|||
self.handle_read_timeout()
|
||||
except NoDataError:
|
||||
self.logger.debug(f"{self.session.client_id} No data available")
|
||||
except Exception as e: # noqa: BLE001
|
||||
except Exception as e: # noqa: BLE001, pylint: disable=W0718
|
||||
self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}")
|
||||
break
|
||||
while running_tasks:
|
||||
|
|
|
@ -37,9 +37,10 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
|
|||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Allow empty username."""
|
||||
"""Configuration for AnonymousAuthPlugin."""
|
||||
|
||||
allow_anonymous: bool = field(default=True)
|
||||
"""Allow all anonymous authentication (even with _no_ username)."""
|
||||
|
||||
|
||||
class FileAuthPlugin(BaseAuthPlugin):
|
||||
|
@ -78,7 +79,7 @@ class FileAuthPlugin(BaseAuthPlugin):
|
|||
self.context.logger.warning(f"Password file '{password_file}' not found")
|
||||
except ValueError:
|
||||
self.context.logger.exception(f"Malformed password file '{password_file}'")
|
||||
except Exception:
|
||||
except OSError:
|
||||
self.context.logger.exception(f"Unexpected error reading password file '{password_file}'")
|
||||
|
||||
async def authenticate(self, *, session: Session) -> bool | None:
|
||||
|
@ -107,6 +108,7 @@ class FileAuthPlugin(BaseAuthPlugin):
|
|||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Path to the properly encoded password file."""
|
||||
"""Configuration for FileAuthPlugin."""
|
||||
|
||||
password_file: str | Path | None = None
|
||||
"""Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512."""
|
||||
|
|
|
@ -52,7 +52,7 @@ class MessageInput:
|
|||
with Path(self.file).open(encoding="utf-8") as f:
|
||||
for line in f:
|
||||
yield line.encode(encoding="utf-8")
|
||||
except Exception:
|
||||
except (FileNotFoundError, OSError):
|
||||
logger.exception(f"Failed to read file '{self.file}'")
|
||||
if self.lines:
|
||||
for line in sys.stdin:
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# Relational Database for Authentication and Authorization
|
||||
|
||||
- `amqtt.contrib.auth_db.AuthUserDBPlugin` (authentication) verify a client's ability to connect to broker
|
||||
- `amqtt.contrib.auth_db.AuthTopicDBPlugin` (authorization) determine a client's access to topics
|
||||
- `amqtt.contrib.auth_db.UserAuthDBPlugin` (authentication) verify a client's ability to connect to broker
|
||||
- `amqtt.contrib.auth_db.TopicAuthDBPlugin` (authorization) determine a client's access to topics
|
||||
|
||||
Relational database access is supported using SQLAlchemy so MySQL, MariaDB, Postgres and SQLite support is available.
|
||||
|
||||
|
|
|
@ -1,9 +1,16 @@
|
|||
# Authentication & Authorization via external HTTP server
|
||||
|
||||
`amqtt.contrib.http.HttpAuthPlugin`
|
||||
|
||||
If clients accessing the broker are managed by another application, it can implement API endpoints
|
||||
that respond with information about client authenticated and topic-level authorization.
|
||||
that respond with information about client authentication and/or topic-level authorization.
|
||||
|
||||
- `amqtt.contrib.http.UserAuthHttpPlugin` (client authentication)
|
||||
- `amqtt.contrib.http.TopicAuthHttpPlugin` (topic authorization)
|
||||
|
||||
Configuration of these plugins is identical (except for the uri name) so that they can be used independently, if desired.
|
||||
|
||||
## User Auth
|
||||
|
||||
See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`.
|
||||
|
||||
!!! info "browser-based mqtt over websockets"
|
||||
|
||||
|
@ -17,9 +24,9 @@ that respond with information about client authenticated and topic-level authori
|
|||
|
||||
??? info "recipe for authentication"
|
||||
Provide the client id and username when webpage is initially rendered or passed to the mqtt initialization from stored
|
||||
cookies. If application is secure, the user's password will be hashed should be encrypted and cannot be used to
|
||||
authenticate a client. Instead, the application should create an encrypted password (eg jwt) which the server
|
||||
can then verify when the broker contacts the application.
|
||||
cookies. If application is secure, the user's password will already be stored as a hashed value and, therefore, cannot
|
||||
be used in this context to authenticate a client. Instead, the application should create its own encrypted key (eg jwt)
|
||||
which the server can then verify when the broker contacts the application.
|
||||
|
||||
??? example "mqtt in javascript"
|
||||
Example initialization of mqtt in javascript:
|
||||
|
@ -35,9 +42,18 @@ that respond with information about client authenticated and topic-level authori
|
|||
try {
|
||||
const clientMqtt = await mqtt.connect(url, options);
|
||||
|
||||
See the 'Request and Response Modes' section below for details on `params_mode` and `response_mode`.
|
||||
::: amqtt.contrib.http.UserAuthHttpPlugin.Config
|
||||
options:
|
||||
show_source: false
|
||||
heading_level: 4
|
||||
extra:
|
||||
class_style: "simple"
|
||||
|
||||
::: amqtt.contrib.http.HttpAuthPlugin.Config
|
||||
## Topic ACL
|
||||
|
||||
See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`.
|
||||
|
||||
::: amqtt.contrib.http.TopicAuthHttpPlugin.Config
|
||||
options:
|
||||
show_source: false
|
||||
heading_level: 4
|
||||
|
@ -45,7 +61,8 @@ See the 'Request and Response Modes' section below for details on `params_mode`
|
|||
class_style: "simple"
|
||||
|
||||
[//]: # (manually creating the heading so it doesn't show in the sidebar ToC)
|
||||
<h2>Params and Response Modes</h2>
|
||||
[](){#request-response-modes}
|
||||
<h2>Request and Response Modes</h2>
|
||||
|
||||
Each URI endpoint will receive different information in order to determine authentication and authorization;
|
||||
format will depend on `params_mode` configuration attribute (`json` or `form`).:
|
||||
|
@ -56,10 +73,6 @@ format will depend on `params_mode` configuration attribute (`json` or `form`).:
|
|||
- password *(str)*
|
||||
- client_id *(str)*
|
||||
|
||||
*For superuser validation, the request will contain:*
|
||||
|
||||
- username *(str)*
|
||||
|
||||
*For acl check, the request will contain:*
|
||||
|
||||
- username *(str)*
|
||||
|
@ -69,23 +82,25 @@ format will depend on `params_mode` configuration attribute (`json` or `form`).:
|
|||
|
||||
All endpoints should respond with the following, dependent on `response_mode` configuration attribute:
|
||||
|
||||
*In `status` mode:
|
||||
*In `status` mode:*
|
||||
|
||||
- status code: 2xx (granted) or 4xx(denied) or 5xx (noop)
|
||||
|
||||
!!! note "5xx response"
|
||||
**noop** (no operation): plugin will not participate in the filtering operation and will defer to another
|
||||
topic filtering plugin to determine access. if there is no other topic filtering plugin, access will be denied.
|
||||
**noop** (no operation): plugin will not participate in the operation and will defer to another
|
||||
plugin to determine access. if there is no other auth/filtering plugin, access will be denied.
|
||||
|
||||
*In `json` mode:*
|
||||
|
||||
- status code: 2xx
|
||||
- content-type: application/json
|
||||
- response: {'ok': True } (granted), {'ok': False, 'error': 'optional error message' } (denied) or { 'error': 'optional error message' } (noop)
|
||||
- response: {'ok': True } (granted)
|
||||
or {'ok': False, 'error': 'optional error message' } (denied)
|
||||
or { 'error': 'optional error message' } (noop)
|
||||
|
||||
!!! note "excluded 'ok' key"
|
||||
**noop** (no operation): plugin will not participate in the filtering operation and will defer to another
|
||||
topic filtering plugin to determine access. if there is no other topic filtering plugin, access will be denied.
|
||||
**noop** (no operation): plugin will not participate in the operation and will defer to another
|
||||
plugin to determine access. if there is no other auth/filtering plugin, access will be denied.
|
||||
|
||||
*In `text` mode:*
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ and configured for the broker:
|
|||
|
||||
## Client
|
||||
|
||||
By default, the `PacketLoggerPlugin` is activated and configured for the client:
|
||||
By default, the `PacketLoggerPlugin` is activated and configured for the client:
|
||||
|
||||
```yaml
|
||||
--8<-- "amqtt/scripts/default_client.yaml"
|
||||
|
@ -43,15 +43,14 @@ By default, the `PacketLoggerPlugin` is activated and configured for the clien
|
|||
|
||||
`amqtt.plugins.authentication.AnonymousAuthPlugin`
|
||||
|
||||
**Configuration**
|
||||
Authentication plugin allowing anonymous access.
|
||||
|
||||
::: amqtt.plugins.authentication.AnonymousAuthPlugin.Config
|
||||
options:
|
||||
heading_level: 4
|
||||
extra:
|
||||
class_style: "simple"
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
.
|
||||
.
|
||||
amqtt.plugins.authentication.AnonymousAuthPlugin:
|
||||
allow_anonymous: false
|
||||
```
|
||||
|
||||
!!! danger
|
||||
even if `allow_anonymous` is set to `false`, the plugin will still allow access if a username is provided by the client
|
||||
|
@ -71,15 +70,14 @@ plugins:
|
|||
|
||||
`amqtt.plugins.authentication.FileAuthPlugin`
|
||||
|
||||
clients are authorized by providing username and password, compared against file
|
||||
Authentication plugin based on a file-stored user database.
|
||||
|
||||
**Configuration**
|
||||
::: amqtt.plugins.authentication.FileAuthPlugin.Config
|
||||
options:
|
||||
heading_level: 4
|
||||
extra:
|
||||
class_style: "simple"
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
amqtt.plugins.authentication.FileAuthPlugin:
|
||||
password_file: /path/to/password_file
|
||||
```
|
||||
|
||||
??? warning "EntryPoint-style configuration is deprecated"
|
||||
```yaml
|
||||
|
|
|
@ -21,16 +21,5 @@ The `amqtt.broker` module provides the following key methods in the `Broker` cla
|
|||
- `start()`: Starts the broker and begins serving
|
||||
- `shutdown()`: Gracefully shuts down the broker
|
||||
|
||||
### Broker configuration
|
||||
|
||||
The `Broker` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
|
||||
|
||||
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
|
||||
used by the included broker script: [broker configuration](broker_config.md)
|
||||
|
||||
|
||||
|
||||
::: amqtt.broker.Broker
|
||||
|
||||
|
||||
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.
|
||||
|
|
|
@ -129,15 +129,4 @@ amqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed
|
|||
|
||||
Both coroutines have the same results except that `test_coro2()` manages messages flow in parallel which may be more efficient.
|
||||
|
||||
|
||||
### Client configuration
|
||||
|
||||
The `MQTTClient` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
|
||||
|
||||
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
|
||||
used by the included broker script: [client configuration](client_config.md)
|
||||
|
||||
::: amqtt.client.MQTTClient
|
||||
|
||||
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.
|
||||
|
||||
|
|
|
@ -2,20 +2,26 @@
|
|||
|
||||
This document describes `aMQTT` common API both used by [MQTT Client](client.md) and [Broker](broker.md).
|
||||
|
||||
## Reference
|
||||
## ApplicationMessage
|
||||
|
||||
### ApplicationMessage
|
||||
::: amqtt.session.ApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
The `amqtt.session` module provides the following message classes:
|
||||
## IncomingApplicationMessage
|
||||
|
||||
#### ApplicationMessage
|
||||
Represents messages received from MQTT clients.
|
||||
|
||||
Base class for MQTT application messages.
|
||||
::: amqtt.session.IncomingApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
#### IncomingApplicationMessage
|
||||
|
||||
Inherits from ApplicationMessage. Represents messages received from MQTT clients.
|
||||
|
||||
#### OutgoingApplicationMessage
|
||||
## OutgoingApplicationMessage
|
||||
|
||||
Inherits from ApplicationMessage. Represents messages to be sent to MQTT clients.
|
||||
|
||||
::: amqtt.session.OutgoingApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
|
|
|
@ -37,6 +37,9 @@ nav:
|
|||
- Broker: references/broker.md
|
||||
- Client: references/client.md
|
||||
- Common: references/common.md
|
||||
- Configuration:
|
||||
- Broker: references/broker_config.md
|
||||
- Client: references/client_config.md
|
||||
- Plugins:
|
||||
- Packaged: plugins/packaged_plugins.md
|
||||
- Custom: plugins/custom_plugins.md
|
||||
|
@ -45,9 +48,6 @@ nav:
|
|||
- Database Auth: plugins/auth_db.md
|
||||
- HTTP Auth: plugins/http.md
|
||||
- Shadows: plugins/shadows.md
|
||||
- Configuration:
|
||||
- Broker: references/broker_config.md
|
||||
- Client: references/client_config.md
|
||||
- Reference:
|
||||
- Containerization: docker.md
|
||||
- Support: support.md
|
||||
|
|
|
@ -243,6 +243,7 @@ jobs = 2
|
|||
ignore = ["tests"]
|
||||
fail-on = ["I"]
|
||||
max-line-length = 130
|
||||
ignore-paths = ["amqtt/plugins/persistence.py"]
|
||||
|
||||
[tool.pylint.BASIC]
|
||||
# Good variable names which should always be accepted, separated by a comma.
|
||||
|
@ -253,7 +254,6 @@ good-names = ["i", "j", "k", "e", "ex", "f", "_", "T", "x", "y", "id", "tg"]
|
|||
# duplicate-code - unavoidable
|
||||
# too-many-* - are not enforced for the sake of readability
|
||||
disable = [
|
||||
"broad-exception-caught", # TODO: improve later
|
||||
"duplicate-code",
|
||||
"fixme",
|
||||
"invalid-name",
|
||||
|
|
|
@ -8,7 +8,7 @@ from aiohttp.web import Response
|
|||
|
||||
from amqtt.broker import BrokerContext, Broker
|
||||
from amqtt.contexts import Action
|
||||
from amqtt.contrib.http import HttpAuthPlugin, ParamsMode, ResponseMode, RequestMethod
|
||||
from amqtt.contrib.http import UserAuthHttpPlugin, TopicAuthHttpPlugin, ParamsMode, ResponseMode, RequestMethod
|
||||
from amqtt.session import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -91,73 +91,98 @@ def test_server_up_and_down(http_server):
|
|||
pass
|
||||
|
||||
|
||||
class TestKind(Enum):
|
||||
class TypeOfHttpTest(Enum):
|
||||
AUTH = auto()
|
||||
ACL = auto()
|
||||
|
||||
|
||||
def generate_use_cases():
|
||||
def generate_use_cases(kind: TypeOfHttpTest):
|
||||
# generate all variations of the plugin's configuration options for both auth and topic
|
||||
# e.g. (TestKind.AUTH, '/user/json', RequestMethod.GET, ParamsMode.JSON, ResponseMode.JSON, 'json', 'json', True),
|
||||
|
||||
cases: list[tuple[str, str, RequestMethod, ParamsMode, ResponseMode, str, str, bool]] = []
|
||||
for kind in TestKind:
|
||||
root_url = 'user' if kind == TestKind.AUTH else 'acl'
|
||||
for request in RequestMethod:
|
||||
for params in ParamsMode:
|
||||
for response in ResponseMode:
|
||||
url = f'/{root_url}/json' if params == ParamsMode.JSON else f'/{root_url}/form'
|
||||
for is_authenticated in [True, False, None]:
|
||||
if is_authenticated is None:
|
||||
pwd = 'i_am_null'
|
||||
elif is_authenticated:
|
||||
pwd = f'{response.value}'
|
||||
else:
|
||||
pwd = f'not{response.value}'
|
||||
|
||||
if response == ResponseMode.TEXT and is_authenticated is None:
|
||||
is_authenticated = False
|
||||
root_url = 'user' if kind == TypeOfHttpTest.AUTH else 'acl'
|
||||
for request in RequestMethod:
|
||||
for params in ParamsMode:
|
||||
for response in ResponseMode:
|
||||
url = f'/{root_url}/json' if params == ParamsMode.JSON else f'/{root_url}/form'
|
||||
for is_authenticated in [True, False, None]:
|
||||
if is_authenticated is None:
|
||||
pwd = 'i_am_null'
|
||||
elif is_authenticated:
|
||||
pwd = f'{response.value}'
|
||||
else:
|
||||
pwd = f'not{response.value}'
|
||||
|
||||
case = (kind, url, request, params, response, response.value, f"{pwd}", is_authenticated)
|
||||
cases.append(case)
|
||||
if response == ResponseMode.TEXT and is_authenticated is None:
|
||||
is_authenticated = False
|
||||
|
||||
case = (kind, url, request, params, response, response.value, f"{pwd}", is_authenticated)
|
||||
cases.append(case)
|
||||
return cases
|
||||
|
||||
def test_generated_use_cases():
|
||||
cases = generate_use_cases()
|
||||
assert len(cases) == 108
|
||||
cases = generate_use_cases(kind=TypeOfHttpTest.AUTH)
|
||||
assert len(cases) == 54
|
||||
|
||||
|
||||
@pytest.mark.parametrize("kind,url,request_method,params_mode,response_mode,username,matcher,is_allowed",
|
||||
generate_use_cases())
|
||||
generate_use_cases(TypeOfHttpTest.AUTH))
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_acl_response(empty_broker, http_server, kind, url,
|
||||
async def test_request_auth_response(empty_broker, http_server, kind, url,
|
||||
request_method, params_mode, response_mode,
|
||||
username, matcher, is_allowed):
|
||||
|
||||
context = BrokerContext(broker=empty_broker)
|
||||
context.config = HttpAuthPlugin.Config(
|
||||
context.config = UserAuthHttpPlugin.Config(
|
||||
host="127.0.0.1",
|
||||
port=8080,
|
||||
user_uri=url,
|
||||
request_method=request_method,
|
||||
params_mode=params_mode,
|
||||
response_mode=response_mode,
|
||||
)
|
||||
http_acl = UserAuthHttpPlugin(context)
|
||||
logger.warning(f'kind is {kind}')
|
||||
|
||||
session = Session()
|
||||
session.client_id = "my_client_id"
|
||||
session.username = username
|
||||
session.password = matcher
|
||||
assert await http_acl.authenticate(session=session) == is_allowed
|
||||
|
||||
await http_acl.on_broker_pre_shutdown()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("kind,url,request_method,params_mode,response_mode,username,matcher,is_allowed",
|
||||
generate_use_cases(TypeOfHttpTest.ACL))
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_topic_response(empty_broker, http_server, kind, url,
|
||||
request_method, params_mode, response_mode,
|
||||
username, matcher, is_allowed):
|
||||
context = BrokerContext(broker=empty_broker)
|
||||
context.config = TopicAuthHttpPlugin.Config(
|
||||
host="127.0.0.1",
|
||||
port=8080,
|
||||
topic_uri=url,
|
||||
request_method=request_method,
|
||||
params_mode=params_mode,
|
||||
response_mode=response_mode,
|
||||
)
|
||||
http_acl = HttpAuthPlugin(context)
|
||||
logger.warning(f'kind is {kind}')
|
||||
if kind == TestKind.ACL:
|
||||
s = Session()
|
||||
s.username = username
|
||||
s.client_id = matcher
|
||||
t = 'my/topic'
|
||||
a = Action.PUBLISH
|
||||
assert await http_acl.topic_filtering(session=s, topic=t, action=a) == is_allowed
|
||||
else:
|
||||
session = Session()
|
||||
session.client_id = "my_client_id"
|
||||
session.username = username
|
||||
session.password = matcher
|
||||
assert await http_acl.authenticate(session=session) == is_allowed
|
||||
http_acl = TopicAuthHttpPlugin(context)
|
||||
|
||||
s = Session()
|
||||
s.username = username
|
||||
s.client_id = matcher
|
||||
t = 'my/topic'
|
||||
a = Action.PUBLISH
|
||||
assert await http_acl.topic_filtering(session=s, topic=t, action=a) == is_allowed
|
||||
|
||||
await http_acl.on_broker_pre_shutdown()
|
||||
|
||||
|
||||
|
||||
# if kind == TestKind.ACL:
|
||||
|
||||
# else:
|
|
@ -934,6 +934,35 @@ async def test_broker_socket_open_close(broker):
|
|||
await asyncio.sleep(0.1)
|
||||
s.close()
|
||||
|
||||
std_legacy_config = {
|
||||
"listeners": {
|
||||
"default": {
|
||||
"type": "tcp",
|
||||
"bind": f"127.0.0.1:1883",
|
||||
}
|
||||
},
|
||||
"sys_interval": 10,
|
||||
"auth": {
|
||||
"allow-anonymous": True,
|
||||
"plugins": ["auth_anonymous"],
|
||||
},
|
||||
"topic-check": {"enabled": False},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broker_with_legacy_config():
|
||||
|
||||
broker = Broker(config=std_legacy_config)
|
||||
|
||||
await broker.start()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
mqtt_client = MQTTClient(config={'auto_reconnect': False})
|
||||
await mqtt_client.connect()
|
||||
|
||||
await broker.shutdown()
|
||||
|
||||
|
||||
legacy_config_empty_auth_plugin_list = {
|
||||
"listeners": {
|
||||
|
|
Ładowanie…
Reference in New Issue