kopia lustrzana https://github.com/Yakifo/amqtt
Merge branch '0.11.3-rc.1' into plugin_ldap
commit
2afbf0f643
|
@ -124,7 +124,7 @@ class Broker:
|
|||
"""MQTT 3.1.1 compliant broker implementation.
|
||||
|
||||
Args:
|
||||
config: dictionary of configuration options (see [broker configuration](broker_config.md)).
|
||||
config: `BrokerConfig` or dictionary of equivalent structure options (see [broker configuration](broker_config.md)).
|
||||
loop: asyncio loop. defaults to `asyncio.new_event_loop()`.
|
||||
plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`.
|
||||
|
||||
|
@ -716,7 +716,8 @@ class Broker:
|
|||
"""Stop a running handler and detach if from the session."""
|
||||
try:
|
||||
await handler.stop()
|
||||
except Exception:
|
||||
# a failure in stopping a handler shouldn't cause the broker to fail
|
||||
except asyncio.QueueEmpty:
|
||||
self.logger.exception("Failed to stop handler")
|
||||
|
||||
async def _authenticate(self, session: Session, _: ListenerConfig) -> bool:
|
||||
|
@ -866,7 +867,8 @@ class Broker:
|
|||
task.result()
|
||||
except CancelledError:
|
||||
self.logger.info(f"Task has been cancelled: {task}")
|
||||
except Exception:
|
||||
# if a task fails, don't want it to cause the broker to fail
|
||||
except Exception: # pylint: disable=W0718
|
||||
self.logger.exception(f"Task failed and will be skipped: {task}")
|
||||
|
||||
run_broadcast_task = asyncio.ensure_future(self._run_broadcast(running_tasks))
|
||||
|
|
|
@ -75,18 +75,15 @@ def mqtt_connected(func: _F) -> _F:
|
|||
|
||||
|
||||
class MQTTClient:
|
||||
"""MQTT client implementation.
|
||||
|
||||
MQTTClient instances provides API for connecting to a broker and send/receive
|
||||
messages using the MQTT protocol.
|
||||
"""MQTT client implementation, providing an API for connecting to a broker and send/receive messages using the MQTT protocol.
|
||||
|
||||
Args:
|
||||
client_id: MQTT client ID to use when connecting to the broker. If none,
|
||||
it will be generated randomly by `amqtt.utils.gen_client_id`
|
||||
config: dictionary of configuration options (see [client configuration](client_config.md)).
|
||||
config: `ClientConfig` or dictionary of equivalent structure options (see [client configuration](client_config.md)).
|
||||
|
||||
Raises:
|
||||
PluginImportError: if importing a plugin from configuration
|
||||
PluginImportError: if importing a plugin from configuration fails
|
||||
PluginInitError: if initialization plugin fails
|
||||
|
||||
"""
|
||||
|
@ -159,7 +156,8 @@ class MQTTClient:
|
|||
except asyncio.CancelledError as e:
|
||||
msg = "Future or Task was cancelled"
|
||||
raise ConnectError(msg) from e
|
||||
except Exception as e:
|
||||
# no matter the failure mode, still try to reconnect
|
||||
except Exception as e: # pylint: disable=W0718
|
||||
self.logger.warning(f"Connection failed: {e!r}")
|
||||
if not self.config.get("auto_reconnect", False):
|
||||
raise
|
||||
|
@ -233,7 +231,8 @@ class MQTTClient:
|
|||
except asyncio.CancelledError as e:
|
||||
msg = "Future or Task was cancelled"
|
||||
raise ConnectError(msg) from e
|
||||
except Exception as e:
|
||||
# no matter the failure mode, still try to reconnect
|
||||
except Exception as e: # pylint: disable=W0718
|
||||
self.logger.warning(f"Reconnection attempt failed: {e!r}")
|
||||
self.logger.debug("", exc_info=True)
|
||||
if 0 <= reconnect_retries < nb_attempt:
|
||||
|
|
|
@ -154,9 +154,9 @@ class BrokerConfig(Dictable):
|
|||
listeners: dict[Literal["default"] | str, ListenerConfig] = field(default_factory=default_listeners) # noqa: PYI051
|
||||
"""Network of listeners used by the services. a 'default' named listener is required; if another listener
|
||||
does not set a value, the 'default' settings are applied. See
|
||||
[ListenerConfig](#amqtt.contexts.ListenerConfig) for more information."""
|
||||
[`ListenerConfig`](broker_config.md#amqtt.contexts.ListenerConfig) for more information."""
|
||||
sys_interval: int | None = None
|
||||
"""*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](#sys-topics)
|
||||
"""*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](../plugins/packaged_plugins.md#sys-topics)
|
||||
for recommended configuration.*"""
|
||||
timeout_disconnect_delay: int | None = 0
|
||||
"""Client disconnect timeout without a keep-alive."""
|
||||
|
@ -164,17 +164,17 @@ class BrokerConfig(Dictable):
|
|||
"""Seconds for an inactive session to be retained."""
|
||||
auth: dict[str, Any] | None = None
|
||||
"""*Deprecated field used to config EntryPoint-loaded plugins. See
|
||||
[`AnonymousAuthPlugin`](#anonymous-auth-plugin) and
|
||||
[`FileAuthPlugin`](#password-file-auth-plugin) for recommended configuration.*"""
|
||||
[`AnonymousAuthPlugin`](../plugins/packaged_plugins.md#anonymous-auth-plugin) and
|
||||
[`FileAuthPlugin`](../plugins/packaged_plugins.md#password-file-auth-plugin) for recommended configuration.*"""
|
||||
topic_check: dict[str, Any] | None = None
|
||||
"""*Deprecated field used to config EntryPoint-loaded plugins. See
|
||||
[`TopicTabooPlugin`](#taboo-topic-plugin) and
|
||||
[`TopicACLPlugin`](#acl-topic-plugin) for recommended configuration method.*"""
|
||||
[`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and
|
||||
[`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*"""
|
||||
plugins: dict[str, Any] | list[str | dict[str,Any]] | None = field(default_factory=default_broker_plugins)
|
||||
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin`
|
||||
or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See
|
||||
[Plugins](http://localhost:8000/custom_plugins/) for more information. `list[str | dict[str,Any]]` is not
|
||||
recommended but available to support legacy use cases."""
|
||||
[custom plugins](../plugins/custom_plugins.md) for more information. `list[str | dict[str,Any]]` is deprecated but available
|
||||
to support legacy use cases."""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Check config for errors and transform fields for easier use."""
|
||||
|
@ -327,16 +327,16 @@ class ClientConfig(Dictable):
|
|||
"""Specify the topics and what flags should be set for messages published to them."""
|
||||
broker: ConnectionConfig | None = field(default_factory=ConnectionConfig)
|
||||
"""Configuration for connecting to the broker. See
|
||||
[ConnectionConfig](#amqtt.contexts.ConnectionConfig) for more information."""
|
||||
[`ConnectionConfig`](client_config.md#amqtt.contexts.ConnectionConfig) for more information."""
|
||||
plugins: dict[str, Any] | list[dict[str, Any]] | None = field(default_factory=default_client_plugins)
|
||||
"""The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`; the value is
|
||||
a dictionary of configuration options for that plugin. See [Plugins](http://localhost:8000/custom_plugins/)
|
||||
for more information."""
|
||||
a dictionary of configuration options for that plugin. See [custom plugins](../plugins/custom_plugins.md) for
|
||||
more information. `list[str | dict[str,Any]]` is deprecated but available to support legacy use cases."""
|
||||
check_hostname: bool | None = True
|
||||
"""If establishing a secure connection, should the hostname of the certificate be verified."""
|
||||
will: WillConfig | None = None
|
||||
"""Message, topic and flags that should be sent to if the client disconnects. See
|
||||
[WillConfig](#amqtt.contexts.WillConfig)"""
|
||||
[`WillConfig`](client_config.md#amqtt.contexts.WillConfig) for more information."""
|
||||
|
||||
def __post__init__(self) -> None:
|
||||
"""Check config for errors and transform fields for easier use."""
|
||||
|
|
|
@ -535,7 +535,7 @@ class ProtocolHandler(Generic[C]):
|
|||
self.handle_read_timeout()
|
||||
except NoDataError:
|
||||
self.logger.debug(f"{self.session.client_id} No data available")
|
||||
except Exception as e: # noqa: BLE001
|
||||
except Exception as e: # noqa: BLE001, pylint: disable=W0718
|
||||
self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}")
|
||||
break
|
||||
while running_tasks:
|
||||
|
|
|
@ -37,9 +37,10 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
|
|||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Allow empty username."""
|
||||
"""Configuration for AnonymousAuthPlugin."""
|
||||
|
||||
allow_anonymous: bool = field(default=True)
|
||||
"""Allow all anonymous authentication (even with _no_ username)."""
|
||||
|
||||
|
||||
class FileAuthPlugin(BaseAuthPlugin):
|
||||
|
@ -78,7 +79,7 @@ class FileAuthPlugin(BaseAuthPlugin):
|
|||
self.context.logger.warning(f"Password file '{password_file}' not found")
|
||||
except ValueError:
|
||||
self.context.logger.exception(f"Malformed password file '{password_file}'")
|
||||
except Exception:
|
||||
except OSError:
|
||||
self.context.logger.exception(f"Unexpected error reading password file '{password_file}'")
|
||||
|
||||
async def authenticate(self, *, session: Session) -> bool | None:
|
||||
|
@ -107,6 +108,7 @@ class FileAuthPlugin(BaseAuthPlugin):
|
|||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Path to the properly encoded password file."""
|
||||
"""Configuration for FileAuthPlugin."""
|
||||
|
||||
password_file: str | Path | None = None
|
||||
"""Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512."""
|
||||
|
|
|
@ -52,7 +52,7 @@ class MessageInput:
|
|||
with Path(self.file).open(encoding="utf-8") as f:
|
||||
for line in f:
|
||||
yield line.encode(encoding="utf-8")
|
||||
except Exception:
|
||||
except (FileNotFoundError, OSError):
|
||||
logger.exception(f"Failed to read file '{self.file}'")
|
||||
if self.lines:
|
||||
for line in sys.stdin:
|
||||
|
|
|
@ -17,9 +17,9 @@ that respond with information about client authenticated and topic-level authori
|
|||
|
||||
??? info "recipe for authentication"
|
||||
Provide the client id and username when webpage is initially rendered or passed to the mqtt initialization from stored
|
||||
cookies. If application is secure, the user's password will be hashed should be encrypted and cannot be used to
|
||||
authenticate a client. Instead, the application should create an encrypted password (eg jwt) which the server
|
||||
can then verify when the broker contacts the application.
|
||||
cookies. If application is secure, the user's password will already be stored as a hashed value and, therefore, cannot
|
||||
be used in this context to authenticate a client. Instead, the application should create its own encrypted key (eg jwt)
|
||||
which the server can then verify when the broker contacts the application.
|
||||
|
||||
??? example "mqtt in javascript"
|
||||
Example initialization of mqtt in javascript:
|
||||
|
@ -35,7 +35,7 @@ that respond with information about client authenticated and topic-level authori
|
|||
try {
|
||||
const clientMqtt = await mqtt.connect(url, options);
|
||||
|
||||
See the 'Request and Response Modes' section below for details on `params_mode` and `response_mode`.
|
||||
See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`.
|
||||
|
||||
::: amqtt.contrib.http.HttpAuthPlugin.Config
|
||||
options:
|
||||
|
@ -45,7 +45,8 @@ See the 'Request and Response Modes' section below for details on `params_mode`
|
|||
class_style: "simple"
|
||||
|
||||
[//]: # (manually creating the heading so it doesn't show in the sidebar ToC)
|
||||
<h2>Params and Response Modes</h2>
|
||||
[](){#request-response-modes}
|
||||
<h2>Request and Response Modes</h2>
|
||||
|
||||
Each URI endpoint will receive different information in order to determine authentication and authorization;
|
||||
format will depend on `params_mode` configuration attribute (`json` or `form`).:
|
||||
|
@ -69,7 +70,7 @@ format will depend on `params_mode` configuration attribute (`json` or `form`).:
|
|||
|
||||
All endpoints should respond with the following, dependent on `response_mode` configuration attribute:
|
||||
|
||||
*In `status` mode:
|
||||
*In `status` mode:*
|
||||
|
||||
- status code: 2xx (granted) or 4xx(denied) or 5xx (noop)
|
||||
|
||||
|
@ -81,7 +82,9 @@ All endpoints should respond with the following, dependent on `response_mode` co
|
|||
|
||||
- status code: 2xx
|
||||
- content-type: application/json
|
||||
- response: {'ok': True } (granted), {'ok': False, 'error': 'optional error message' } (denied) or { 'error': 'optional error message' } (noop)
|
||||
- response: {'ok': True } (granted)
|
||||
or {'ok': False, 'error': 'optional error message' } (denied)
|
||||
or { 'error': 'optional error message' } (noop)
|
||||
|
||||
!!! note "excluded 'ok' key"
|
||||
**noop** (no operation): plugin will not participate in the filtering operation and will defer to another
|
||||
|
|
|
@ -31,7 +31,7 @@ and configured for the broker:
|
|||
|
||||
## Client
|
||||
|
||||
By default, the `PacketLoggerPlugin` is activated and configured for the client:
|
||||
By default, the `PacketLoggerPlugin` is activated and configured for the client:
|
||||
|
||||
```yaml
|
||||
--8<-- "amqtt/scripts/default_client.yaml"
|
||||
|
@ -43,15 +43,14 @@ By default, the `PacketLoggerPlugin` is activated and configured for the clien
|
|||
|
||||
`amqtt.plugins.authentication.AnonymousAuthPlugin`
|
||||
|
||||
**Configuration**
|
||||
Authentication plugin allowing anonymous access.
|
||||
|
||||
::: amqtt.plugins.authentication.AnonymousAuthPlugin.Config
|
||||
options:
|
||||
heading_level: 4
|
||||
extra:
|
||||
class_style: "simple"
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
.
|
||||
.
|
||||
amqtt.plugins.authentication.AnonymousAuthPlugin:
|
||||
allow_anonymous: false
|
||||
```
|
||||
|
||||
!!! danger
|
||||
even if `allow_anonymous` is set to `false`, the plugin will still allow access if a username is provided by the client
|
||||
|
@ -71,15 +70,14 @@ plugins:
|
|||
|
||||
`amqtt.plugins.authentication.FileAuthPlugin`
|
||||
|
||||
clients are authorized by providing username and password, compared against file
|
||||
Authentication plugin based on a file-stored user database.
|
||||
|
||||
**Configuration**
|
||||
::: amqtt.plugins.authentication.FileAuthPlugin.Config
|
||||
options:
|
||||
heading_level: 4
|
||||
extra:
|
||||
class_style: "simple"
|
||||
|
||||
```yaml
|
||||
plugins:
|
||||
amqtt.plugins.authentication.FileAuthPlugin:
|
||||
password_file: /path/to/password_file
|
||||
```
|
||||
|
||||
??? warning "EntryPoint-style configuration is deprecated"
|
||||
```yaml
|
||||
|
|
|
@ -21,16 +21,5 @@ The `amqtt.broker` module provides the following key methods in the `Broker` cla
|
|||
- `start()`: Starts the broker and begins serving
|
||||
- `shutdown()`: Gracefully shuts down the broker
|
||||
|
||||
### Broker configuration
|
||||
|
||||
The `Broker` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
|
||||
|
||||
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
|
||||
used by the included broker script: [broker configuration](broker_config.md)
|
||||
|
||||
|
||||
|
||||
::: amqtt.broker.Broker
|
||||
|
||||
|
||||
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.
|
||||
|
|
|
@ -129,15 +129,4 @@ amqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed
|
|||
|
||||
Both coroutines have the same results except that `test_coro2()` manages messages flow in parallel which may be more efficient.
|
||||
|
||||
|
||||
### Client configuration
|
||||
|
||||
The `MQTTClient` class's `__init__` method accepts a `config` parameter which allows setup of default and custom behaviors.
|
||||
|
||||
Details on the `config` parameter structure is a dictionary whose structure is identical to yaml formatted file[^1]
|
||||
used by the included broker script: [client configuration](client_config.md)
|
||||
|
||||
::: amqtt.client.MQTTClient
|
||||
|
||||
[^1]: See [PyYAML](http://pyyaml.org/wiki/PyYAMLDocumentation) for loading YAML files as Python dict.
|
||||
|
||||
|
|
|
@ -2,20 +2,26 @@
|
|||
|
||||
This document describes `aMQTT` common API both used by [MQTT Client](client.md) and [Broker](broker.md).
|
||||
|
||||
## Reference
|
||||
## ApplicationMessage
|
||||
|
||||
### ApplicationMessage
|
||||
::: amqtt.session.ApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
The `amqtt.session` module provides the following message classes:
|
||||
## IncomingApplicationMessage
|
||||
|
||||
#### ApplicationMessage
|
||||
Represents messages received from MQTT clients.
|
||||
|
||||
Base class for MQTT application messages.
|
||||
::: amqtt.session.IncomingApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
#### IncomingApplicationMessage
|
||||
|
||||
Inherits from ApplicationMessage. Represents messages received from MQTT clients.
|
||||
|
||||
#### OutgoingApplicationMessage
|
||||
## OutgoingApplicationMessage
|
||||
|
||||
Inherits from ApplicationMessage. Represents messages to be sent to MQTT clients.
|
||||
|
||||
::: amqtt.session.OutgoingApplicationMessage
|
||||
options:
|
||||
heading_level: 3
|
||||
|
||||
|
|
|
@ -37,6 +37,9 @@ nav:
|
|||
- Broker: references/broker.md
|
||||
- Client: references/client.md
|
||||
- Common: references/common.md
|
||||
- Configuration:
|
||||
- Broker: references/broker_config.md
|
||||
- Client: references/client_config.md
|
||||
- Plugins:
|
||||
- Packaged: plugins/packaged_plugins.md
|
||||
- Custom: plugins/custom_plugins.md
|
||||
|
@ -45,9 +48,6 @@ nav:
|
|||
- Database Auth: plugins/auth_db.md
|
||||
- HTTP Auth: plugins/http.md
|
||||
- LDAP Auth: plugins/ldap.md
|
||||
- Configuration:
|
||||
- Broker: references/broker_config.md
|
||||
- Client: references/client_config.md
|
||||
- Reference:
|
||||
- Containerization: docker.md
|
||||
- Support: support.md
|
||||
|
|
|
@ -242,6 +242,7 @@ jobs = 2
|
|||
ignore = ["tests"]
|
||||
fail-on = ["I"]
|
||||
max-line-length = 130
|
||||
ignore-paths = ["amqtt/plugins/persistence.py"]
|
||||
|
||||
[tool.pylint.BASIC]
|
||||
# Good variable names which should always be accepted, separated by a comma.
|
||||
|
@ -252,7 +253,6 @@ good-names = ["i", "j", "k", "e", "ex", "f", "_", "T", "x", "y", "id", "tg"]
|
|||
# duplicate-code - unavoidable
|
||||
# too-many-* - are not enforced for the sake of readability
|
||||
disable = [
|
||||
"broad-exception-caught", # TODO: improve later
|
||||
"duplicate-code",
|
||||
"fixme",
|
||||
"invalid-name",
|
||||
|
|
|
@ -934,6 +934,35 @@ async def test_broker_socket_open_close(broker):
|
|||
await asyncio.sleep(0.1)
|
||||
s.close()
|
||||
|
||||
std_legacy_config = {
|
||||
"listeners": {
|
||||
"default": {
|
||||
"type": "tcp",
|
||||
"bind": f"127.0.0.1:1883",
|
||||
}
|
||||
},
|
||||
"sys_interval": 10,
|
||||
"auth": {
|
||||
"allow-anonymous": True,
|
||||
"plugins": ["auth_anonymous"],
|
||||
},
|
||||
"topic-check": {"enabled": False},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broker_with_legacy_config():
|
||||
|
||||
broker = Broker(config=std_legacy_config)
|
||||
|
||||
await broker.start()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
mqtt_client = MQTTClient(config={'auto_reconnect': False})
|
||||
await mqtt_client.connect()
|
||||
|
||||
await broker.shutdown()
|
||||
|
||||
|
||||
legacy_config_empty_auth_plugin_list = {
|
||||
"listeners": {
|
||||
|
|
Ładowanie…
Reference in New Issue