kopia lustrzana https://github.com/Yakifo/amqtt
Merge pull request #252 from ajmirsky/plugin_config_error_case_checking
updated samples; plugin config consistency (yaml and python dict)pull/257/head
commit
57597dfea4
|
@ -58,7 +58,10 @@ class FileAuthPlugin(BaseAuthPlugin):
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with Path(password_file).open(mode="r", encoding="utf-8") as file:
|
file = password_file
|
||||||
|
if isinstance(file, str):
|
||||||
|
file = Path(file)
|
||||||
|
with file.open(mode="r", encoding="utf-8") as file:
|
||||||
self.context.logger.debug(f"Reading user database from {password_file}")
|
self.context.logger.debug(f"Reading user database from {password_file}")
|
||||||
for _line in file:
|
for _line in file:
|
||||||
line = _line.strip()
|
line = _line.strip()
|
||||||
|
@ -106,4 +109,4 @@ class FileAuthPlugin(BaseAuthPlugin):
|
||||||
class Config:
|
class Config:
|
||||||
"""Path to the properly encoded password file."""
|
"""Path to the properly encoded password file."""
|
||||||
|
|
||||||
password_file: str | None = None
|
password_file: str | Path | None = None
|
||||||
|
|
|
@ -82,14 +82,45 @@ class PluginManager(Generic[C]):
|
||||||
return self.context
|
return self.context
|
||||||
|
|
||||||
def _load_plugins(self, namespace: str | None = None) -> None:
|
def _load_plugins(self, namespace: str | None = None) -> None:
|
||||||
|
"""Load plugins from entrypoint or config dictionary.
|
||||||
|
|
||||||
|
config style is now recommended; entrypoint has been deprecated
|
||||||
|
Example:
|
||||||
|
config = {
|
||||||
|
'listeners':...,
|
||||||
|
'plugins': {
|
||||||
|
'myproject.myfile.MyPlugin': {}
|
||||||
|
}
|
||||||
|
"""
|
||||||
if self.app_context.config and self.app_context.config.get("plugins", None) is not None:
|
if self.app_context.config and self.app_context.config.get("plugins", None) is not None:
|
||||||
|
# plugins loaded directly from config dictionary
|
||||||
|
|
||||||
|
|
||||||
if "auth" in self.app_context.config:
|
if "auth" in self.app_context.config:
|
||||||
self.logger.warning("Loading plugins from config will ignore 'auth' section of config")
|
self.logger.warning("Loading plugins from config will ignore 'auth' section of config")
|
||||||
if "topic-check" in self.app_context.config:
|
if "topic-check" in self.app_context.config:
|
||||||
self.logger.warning("Loading plugins from config will ignore 'topic-check' section of config")
|
self.logger.warning("Loading plugins from config will ignore 'topic-check' section of config")
|
||||||
|
|
||||||
plugin_list: list[Any] = self.app_context.config.get("plugins", [])
|
plugins_config: list[Any] | dict[str, Any] = self.app_context.config.get("plugins", [])
|
||||||
self._load_str_plugins(plugin_list)
|
|
||||||
|
# if the config was generated from yaml, the plugins maybe a list instead of a dictionary; transform before loading
|
||||||
|
#
|
||||||
|
# plugins:
|
||||||
|
# - myproject.myfile.MyPlugin:
|
||||||
|
|
||||||
|
if isinstance(plugins_config, list):
|
||||||
|
plugins_info: dict[str, Any] = {}
|
||||||
|
for plugin_config in plugins_config:
|
||||||
|
if isinstance(plugin_config, str):
|
||||||
|
plugins_info.update({plugin_config: {}})
|
||||||
|
elif not isinstance(plugin_config, dict):
|
||||||
|
msg = "malformed 'plugins' configuration"
|
||||||
|
raise PluginLoadError(msg)
|
||||||
|
else:
|
||||||
|
plugins_info.update(plugin_config)
|
||||||
|
self._load_str_plugins(plugins_info)
|
||||||
|
elif isinstance(plugins_config, dict):
|
||||||
|
self._load_str_plugins(plugins_config)
|
||||||
else:
|
else:
|
||||||
if not namespace:
|
if not namespace:
|
||||||
msg = "Namespace needs to be provided for EntryPoint plugin definitions"
|
msg = "Namespace needs to be provided for EntryPoint plugin definitions"
|
||||||
|
@ -104,6 +135,7 @@ class PluginManager(Generic[C]):
|
||||||
|
|
||||||
self._load_ep_plugins(namespace)
|
self._load_ep_plugins(namespace)
|
||||||
|
|
||||||
|
# for all the loaded plugins, find all event callbacks
|
||||||
for plugin in self._plugins:
|
for plugin in self._plugins:
|
||||||
for event in list(BrokerEvents) + list(MQTTEvents):
|
for event in list(BrokerEvents) + list(MQTTEvents):
|
||||||
if awaitable := getattr(plugin, f"on_{event}", None):
|
if awaitable := getattr(plugin, f"on_{event}", None):
|
||||||
|
@ -114,7 +146,7 @@ class PluginManager(Generic[C]):
|
||||||
self._event_plugin_callbacks[event].append(awaitable)
|
self._event_plugin_callbacks[event].append(awaitable)
|
||||||
|
|
||||||
def _load_ep_plugins(self, namespace:str) -> None:
|
def _load_ep_plugins(self, namespace:str) -> None:
|
||||||
|
"""Load plugins from `pyproject.toml` entrypoints. Deprecated."""
|
||||||
self.logger.debug(f"Loading plugins for namespace {namespace}")
|
self.logger.debug(f"Loading plugins for namespace {namespace}")
|
||||||
auth_filter_list = []
|
auth_filter_list = []
|
||||||
topic_filter_list = []
|
topic_filter_list = []
|
||||||
|
@ -144,6 +176,7 @@ class PluginManager(Generic[C]):
|
||||||
self.logger.debug(f" Plugin {item.name} ready")
|
self.logger.debug(f" Plugin {item.name} ready")
|
||||||
|
|
||||||
def _load_ep_plugin(self, ep: EntryPoint) -> Plugin | None:
|
def _load_ep_plugin(self, ep: EntryPoint) -> Plugin | None:
|
||||||
|
"""Load plugins from `pyproject.toml` entrypoints. Deprecated."""
|
||||||
try:
|
try:
|
||||||
self.logger.debug(f" Loading plugin {ep!s}")
|
self.logger.debug(f" Loading plugin {ep!s}")
|
||||||
plugin = ep.load()
|
plugin = ep.load()
|
||||||
|
@ -163,40 +196,31 @@ class PluginManager(Generic[C]):
|
||||||
self.logger.debug(f"Plugin init failed: {ep!r}", exc_info=True)
|
self.logger.debug(f"Plugin init failed: {ep!r}", exc_info=True)
|
||||||
raise PluginInitError(ep) from e
|
raise PluginInitError(ep) from e
|
||||||
|
|
||||||
def _load_str_plugins(self, plugin_list: list[Any]) -> None:
|
def _load_str_plugins(self, plugins_info: dict[str, Any]) -> None:
|
||||||
|
|
||||||
self.logger.info("Loading plugins from config")
|
self.logger.info("Loading plugins from config")
|
||||||
|
# legacy had a filtering 'enabled' flag, even if plugins were loaded/listed
|
||||||
self._is_topic_filtering_enabled = True
|
self._is_topic_filtering_enabled = True
|
||||||
self._is_auth_filtering_enabled = True
|
self._is_auth_filtering_enabled = True
|
||||||
for plugin_info in plugin_list:
|
for plugin_path, plugin_config in plugins_info.items():
|
||||||
|
|
||||||
if isinstance(plugin_info, dict):
|
|
||||||
if len(plugin_info.keys()) > 1:
|
|
||||||
msg = f"config file should have only one key: {plugin_info.keys()}"
|
|
||||||
raise ValueError(msg)
|
|
||||||
plugin_path = next(iter(plugin_info.keys()))
|
|
||||||
plugin_cfg = plugin_info[plugin_path]
|
|
||||||
plugin = self._load_str_plugin(plugin_path, plugin_cfg)
|
|
||||||
elif isinstance(plugin_info, str):
|
|
||||||
plugin = self._load_str_plugin(plugin_info, {})
|
|
||||||
else:
|
|
||||||
msg = "Unexpected entry in plugins config"
|
|
||||||
raise PluginLoadError(msg)
|
|
||||||
|
|
||||||
|
plugin = self._load_str_plugin(plugin_path, plugin_config)
|
||||||
self._plugins.append(plugin)
|
self._plugins.append(plugin)
|
||||||
|
|
||||||
|
# make sure that authenticate and topic filtering plugins have the appropriate async signature
|
||||||
if isinstance(plugin, BaseAuthPlugin):
|
if isinstance(plugin, BaseAuthPlugin):
|
||||||
if not iscoroutinefunction(plugin.authenticate):
|
if not iscoroutinefunction(plugin.authenticate):
|
||||||
msg = f"Auth plugin {plugin_info} has non-async authenticate method."
|
msg = f"Auth plugin {plugin_path} has non-async authenticate method."
|
||||||
raise PluginCoroError(msg)
|
raise PluginCoroError(msg)
|
||||||
self._auth_plugins.append(plugin)
|
self._auth_plugins.append(plugin)
|
||||||
if isinstance(plugin, BaseTopicPlugin):
|
if isinstance(plugin, BaseTopicPlugin):
|
||||||
if not iscoroutinefunction(plugin.topic_filtering):
|
if not iscoroutinefunction(plugin.topic_filtering):
|
||||||
msg = f"Topic plugin {plugin_info} has non-async topic_filtering method."
|
msg = f"Topic plugin {plugin_path} has non-async topic_filtering method."
|
||||||
raise PluginCoroError(msg)
|
raise PluginCoroError(msg)
|
||||||
self._topic_plugins.append(plugin)
|
self._topic_plugins.append(plugin)
|
||||||
|
|
||||||
def _load_str_plugin(self, plugin_path: str, plugin_cfg: dict[str, Any] | None = None) -> "BasePlugin[C]":
|
def _load_str_plugin(self, plugin_path: str, plugin_cfg: dict[str, Any] | None = None) -> "BasePlugin[C]":
|
||||||
|
"""Load plugin from string dotted path: mymodule.myfile.MyPlugin."""
|
||||||
try:
|
try:
|
||||||
plugin_class: Any = import_string(plugin_path)
|
plugin_class: Any = import_string(plugin_path)
|
||||||
except ImportError as ep:
|
except ImportError as ep:
|
||||||
|
@ -210,6 +234,8 @@ class PluginManager(Generic[C]):
|
||||||
plugin_context = copy.copy(self.app_context)
|
plugin_context = copy.copy(self.app_context)
|
||||||
plugin_context.logger = self.logger.getChild(plugin_class.__name__)
|
plugin_context.logger = self.logger.getChild(plugin_class.__name__)
|
||||||
try:
|
try:
|
||||||
|
# populate the config based on the inner dataclass called `Config`
|
||||||
|
# use `dacite` package to type check
|
||||||
plugin_context.config = from_dict(data_class=plugin_class.Config,
|
plugin_context.config = from_dict(data_class=plugin_class.Config,
|
||||||
data=plugin_cfg or {},
|
data=plugin_cfg or {},
|
||||||
config=DaciteConfig(strict=True))
|
config=DaciteConfig(strict=True))
|
||||||
|
@ -230,6 +256,8 @@ class PluginManager(Generic[C]):
|
||||||
def get_plugin(self, name: str) -> Optional["BasePlugin[C]"]:
|
def get_plugin(self, name: str) -> Optional["BasePlugin[C]"]:
|
||||||
"""Get a plugin by its name from the plugins loaded for the current namespace.
|
"""Get a plugin by its name from the plugins loaded for the current namespace.
|
||||||
|
|
||||||
|
Only used for testing purposes to verify plugin loading correctly.
|
||||||
|
|
||||||
:param name:
|
:param name:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -240,4 +240,4 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
class Config:
|
class Config:
|
||||||
"""Configuration struct for plugin."""
|
"""Configuration struct for plugin."""
|
||||||
|
|
||||||
sys_interval: int = 0
|
sys_interval: int = 20
|
||||||
|
|
|
@ -4,9 +4,9 @@ listeners:
|
||||||
type: tcp
|
type: tcp
|
||||||
bind: 0.0.0.0:1883
|
bind: 0.0.0.0:1883
|
||||||
plugins:
|
plugins:
|
||||||
- amqtt.plugins.logging_amqtt.EventLoggerPlugin:
|
amqtt.plugins.logging_amqtt.EventLoggerPlugin:
|
||||||
- amqtt.plugins.logging_amqtt.PacketLoggerPlugin:
|
amqtt.plugins.logging_amqtt.PacketLoggerPlugin:
|
||||||
- amqtt.plugins.authentication.AnonymousAuthPlugin:
|
amqtt.plugins.authentication.AnonymousAuthPlugin:
|
||||||
allow_anonymous: true
|
allow_anonymous: true
|
||||||
- amqtt.plugins.sys.broker.BrokerSysPlugin:
|
amqtt.plugins.sys.broker.BrokerSysPlugin:
|
||||||
sys_interval: 20
|
sys_interval: 20
|
|
@ -10,4 +10,4 @@ reconnect_retries: 2
|
||||||
broker:
|
broker:
|
||||||
uri: "mqtt://127.0.0.1"
|
uri: "mqtt://127.0.0.1"
|
||||||
plugins:
|
plugins:
|
||||||
- amqtt.plugins.logging_amqtt.PacketLoggerPlugin:
|
amqtt.plugins.logging_amqtt.PacketLoggerPlugin:
|
||||||
|
|
|
@ -40,9 +40,9 @@ dictionary passed to the `Broker` or `MQTTClient`).
|
||||||
...
|
...
|
||||||
...
|
...
|
||||||
plugins:
|
plugins:
|
||||||
- module.submodule.file.OneClassName:
|
module.submodule.file.OneClassName:
|
||||||
- module.submodule.file.TwoClassName:
|
module.submodule.file.TwoClassName:
|
||||||
option1: 123
|
option1: 123
|
||||||
```
|
```
|
||||||
|
|
||||||
??? warning "Deprecated: activating plugins using `EntryPoints`"
|
??? warning "Deprecated: activating plugins using `EntryPoints`"
|
||||||
|
|
|
@ -47,11 +47,10 @@ By default, the `PacketLoggerPlugin` is activated and configured for the clien
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
plugins:
|
plugins:
|
||||||
- ...
|
.
|
||||||
- amqtt.plugins.authentication.AnonymousAuthPlugin:
|
.
|
||||||
allow_anonymous: false
|
amqtt.plugins.authentication.AnonymousAuthPlugin:
|
||||||
- ...
|
allow_anonymous: false
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
!!! danger
|
!!! danger
|
||||||
|
@ -78,10 +77,8 @@ clients are authorized by providing username and password, compared against file
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
plugins:
|
plugins:
|
||||||
- ...
|
amqtt.plugins.authentication.FileAuthPlugin:
|
||||||
- amqtt.plugins.authentication.FileAuthPlugin:
|
password_file: /path/to/password_file
|
||||||
password_file: /path/to/password_file
|
|
||||||
- ...
|
|
||||||
```
|
```
|
||||||
|
|
||||||
??? warning "EntryPoint-style configuration is deprecated"
|
??? warning "EntryPoint-style configuration is deprecated"
|
||||||
|
@ -119,9 +116,7 @@ Prevents using topics named: `prohibited`, `top-secret`, and `data/classified`
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
plugins:
|
plugins:
|
||||||
- ...
|
amqtt.plugins.topic_checking.TopicTabooPlugin:
|
||||||
- amqtt.plugins.topic_checking.TopicTabooPlugin:
|
|
||||||
- ...
|
|
||||||
```
|
```
|
||||||
|
|
||||||
??? warning "EntryPoint-style configuration is deprecated"
|
??? warning "EntryPoint-style configuration is deprecated"
|
||||||
|
@ -139,13 +134,13 @@ plugins:
|
||||||
|
|
||||||
**Configuration**
|
**Configuration**
|
||||||
|
|
||||||
- `acl` *(mapping)*: determines subscription access; if `publish-acl` is not specified, determine both publish and subscription access.
|
- `acl` *(mapping)*: determines subscription access
|
||||||
The list should be a key-value pair, where:
|
The list should be a key-value pair, where:
|
||||||
`<username>:[<topic1>, <topic2>, ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`).
|
`<username>:[<topic1>, <topic2>, ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`).
|
||||||
|
|
||||||
|
|
||||||
- `publish-acl` *(mapping)*: determines publish access. This parameter defines the list of access control rules; each item is a key-value pair, where:
|
- `publish-acl` *(mapping)*: determines publish access. If absent, no restrictions are placed on client publishing.
|
||||||
`<username>:[<topic1>, <topic2>, ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`).
|
`<username>:[<topic1>, <topic2>, ...]` *(string, list[string])*: username of the client followed by a list of allowed topics (wildcards are supported: `#`, `+`).
|
||||||
|
|
||||||
!!! info "Reserved usernames"
|
!!! info "Reserved usernames"
|
||||||
|
|
||||||
|
@ -154,13 +149,13 @@ plugins:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
plugins:
|
plugins:
|
||||||
- ...
|
amqtt.plugins.topic_checking.TopicAccessControlListPlugin:
|
||||||
- amqtt.plugins.topic_checking.TopicAccessControlListPlugin:
|
acl:
|
||||||
publish_acl:
|
- username: ["list", "of", "allowed", "topics", "for", "subscribing"]
|
||||||
- username: ["list", "of", "allowed", "topics", "for", "publishing"]
|
- .
|
||||||
acl:
|
publish_acl:
|
||||||
- username: ["list", "of", "allowed", "topics", "for", "subscribing"]
|
- username: ["list", "of", "allowed", "topics", "for", "publishing"]
|
||||||
- ...
|
- .
|
||||||
```
|
```
|
||||||
|
|
||||||
??? warning "EntryPoint-style configuration is deprecated"
|
??? warning "EntryPoint-style configuration is deprecated"
|
||||||
|
@ -185,13 +180,12 @@ Publishes, on a periodic basis, statistics about the broker
|
||||||
|
|
||||||
**Configuration**
|
**Configuration**
|
||||||
|
|
||||||
- `sys_interval` - int, seconds between updates
|
- `sys_interval` - int, seconds between updates (default: 20)
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
plugins:
|
plugins:
|
||||||
- ...
|
amqtt.plugins.sys.broker.BrokerSysPlugin:
|
||||||
- amqtt.plugins.sys.broker.BrokerSysPlugin:
|
sys_interval: 20 # int, seconds between updates
|
||||||
sys_interval: 20 # int, seconds between updates
|
|
||||||
- ...
|
|
||||||
```
|
```
|
||||||
|
|
||||||
**Supported Topics**
|
**Supported Topics**
|
||||||
|
@ -231,6 +225,11 @@ This plugin issues log messages when [broker and mqtt events](custom_plugins.md#
|
||||||
- info level messages for `client connected` and `client disconnected`
|
- info level messages for `client connected` and `client disconnected`
|
||||||
- debug level for all others
|
- debug level for all others
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
plugins:
|
||||||
|
amqtt.plugins.logging_amqtt.EventLoggerPlugin:
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
### Packet Logger
|
### Packet Logger
|
||||||
|
|
||||||
|
@ -239,3 +238,7 @@ This plugin issues log messages when [broker and mqtt events](custom_plugins.md#
|
||||||
This plugin issues debug-level messages for [mqtt events](custom_plugins.md#client-and-broker): `on_mqtt_packet_sent`
|
This plugin issues debug-level messages for [mqtt events](custom_plugins.md#client-and-broker): `on_mqtt_packet_sent`
|
||||||
and `on_mqtt_packet_received`.
|
and `on_mqtt_packet_received`.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
plugins:
|
||||||
|
amqtt.plugins.logging_amqtt.PacketLoggerPlugin:
|
||||||
|
```
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from amqtt.broker import Broker
|
from amqtt.broker import Broker
|
||||||
|
|
||||||
|
@ -22,24 +23,20 @@ config = {
|
||||||
"max_connections": 10,
|
"max_connections": 10,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"sys_interval": 10,
|
"plugins": {
|
||||||
"auth": {
|
'amqtt.plugins.authentication.AnonymousAuthPlugin': { 'allow_anonymous': True},
|
||||||
"allow-anonymous": True,
|
'amqtt.plugins.authentication.FileAuthPlugin': {
|
||||||
"password-file": os.path.join(
|
'password_file': Path(__file__).parent / 'passwd',
|
||||||
os.path.dirname(os.path.realpath(__file__)),
|
|
||||||
"passwd",
|
|
||||||
),
|
|
||||||
"plugins": ["auth_file", "auth_anonymous"],
|
|
||||||
},
|
|
||||||
"topic-check": {
|
|
||||||
"enabled": True,
|
|
||||||
"plugins": ["topic_acl"],
|
|
||||||
"acl": {
|
|
||||||
# username: [list of allowed topics]
|
|
||||||
"test": ["repositories/+/master", "calendar/#", "data/memes"],
|
|
||||||
"anonymous": [],
|
|
||||||
},
|
},
|
||||||
},
|
'amqtt.plugins.sys.broker.BrokerSysPlugin': { "sys_interval": 10},
|
||||||
|
'amqtt.plugins.topic_checking.TopicAccessControlListPlugin': {
|
||||||
|
'acl': {
|
||||||
|
# username: [list of allowed topics]
|
||||||
|
"test": ["repositories/+/master", "calendar/#", "data/memes"],
|
||||||
|
"anonymous": [],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from amqtt.broker import Broker
|
from amqtt.broker import Broker
|
||||||
|
|
||||||
|
@ -22,16 +23,13 @@ config = {
|
||||||
"max_connections": 10,
|
"max_connections": 10,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"sys_interval": 10,
|
"plugins": {
|
||||||
"auth": {
|
'amqtt.plugins.authentication.AnonymousAuthPlugin': { 'allow_anonymous': True},
|
||||||
"allow-anonymous": True,
|
'amqtt.plugins.authentication.FileAuthPlugin': {
|
||||||
"password-file": os.path.join(
|
'password_file': Path(__file__).parent / 'passwd',
|
||||||
os.path.dirname(os.path.realpath(__file__)),
|
},
|
||||||
"passwd",
|
'amqtt.plugins.sys.broker.BrokerSysPlugin': { "sys_interval": 10},
|
||||||
),
|
}
|
||||||
"plugins": ["auth_file", "auth_anonymous"],
|
|
||||||
},
|
|
||||||
"topic-check": {"enabled": False},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async def main_loop():
|
async def main_loop():
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from amqtt.broker import Broker
|
from amqtt.broker import Broker
|
||||||
|
|
||||||
|
@ -22,16 +23,14 @@ config = {
|
||||||
"max_connections": 10,
|
"max_connections": 10,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"sys_interval": 10,
|
"plugins": {
|
||||||
"auth": {
|
'amqtt.plugins.authentication.AnonymousAuthPlugin': {'allow_anonymous': True},
|
||||||
"allow-anonymous": True,
|
'amqtt.plugins.authentication.FileAuthPlugin': {
|
||||||
"password-file": os.path.join(
|
'password_file': Path(__file__).parent / 'passwd',
|
||||||
os.path.dirname(os.path.realpath(__file__)),
|
},
|
||||||
"passwd",
|
'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 10},
|
||||||
),
|
'amqtt.plugins.topic_checking.TopicTabooPlugin': {},
|
||||||
"plugins": ["auth_file", "auth_anonymous"],
|
}
|
||||||
},
|
|
||||||
"topic-check": {"enabled": True, "plugins": ["topic_taboo"]},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ class TestConfigPlugin(BasePlugin):
|
||||||
class Config:
|
class Config:
|
||||||
option1: int
|
option1: int
|
||||||
option2: str
|
option2: str
|
||||||
|
option3: int = 20
|
||||||
|
|
||||||
|
|
||||||
class TestCoroErrorPlugin(BaseAuthPlugin):
|
class TestCoroErrorPlugin(BaseAuthPlugin):
|
||||||
|
|
|
@ -219,3 +219,101 @@ async def test_block_topic_plugin_load():
|
||||||
await client1.disconnect()
|
await client1.disconnect()
|
||||||
|
|
||||||
await broker.shutdown()
|
await broker.shutdown()
|
||||||
|
|
||||||
|
plugin_yaml_list_config_one = """---
|
||||||
|
listeners:
|
||||||
|
default:
|
||||||
|
type: tcp
|
||||||
|
bind: 0.0.0.0:1883
|
||||||
|
plugins:
|
||||||
|
- tests.plugins.mocks.TestSimplePlugin:
|
||||||
|
- tests.plugins.mocks.TestConfigPlugin:
|
||||||
|
option1: 1
|
||||||
|
option2: bar
|
||||||
|
option3: 3
|
||||||
|
"""
|
||||||
|
|
||||||
|
plugin_yaml_list_config_two = """---
|
||||||
|
listeners:
|
||||||
|
default:
|
||||||
|
type: tcp
|
||||||
|
bind: 0.0.0.0:1883
|
||||||
|
plugins:
|
||||||
|
- tests.plugins.mocks.TestSimplePlugin
|
||||||
|
- tests.plugins.mocks.TestConfigPlugin:
|
||||||
|
option1: 1
|
||||||
|
option2: bar
|
||||||
|
option3: 3
|
||||||
|
"""
|
||||||
|
|
||||||
|
plugin_yaml_dict_config = """---
|
||||||
|
listeners:
|
||||||
|
default:
|
||||||
|
type: tcp
|
||||||
|
bind: 0.0.0.0:1883
|
||||||
|
plugins:
|
||||||
|
tests.plugins.mocks.TestSimplePlugin:
|
||||||
|
tests.plugins.mocks.TestConfigPlugin:
|
||||||
|
option1: 1
|
||||||
|
option2: bar
|
||||||
|
option3: 3
|
||||||
|
"""
|
||||||
|
|
||||||
|
plugin_empty_dict_config = {
|
||||||
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
||||||
|
'plugins': {
|
||||||
|
'tests.plugins.mocks.TestSimplePlugin': {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin_dict_option_config = {
|
||||||
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
||||||
|
'plugins': {
|
||||||
|
'tests.plugins.mocks.TestConfigPlugin': {'option1': 1, 'option2': 'bar', 'option3': 3}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_plugin_yaml_list_config():
|
||||||
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_one, Loader=Loader)
|
||||||
|
broker = Broker(config=cfg)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
||||||
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
||||||
|
assert getattr(plugin.context.config, 'option3', None) == 3
|
||||||
|
|
||||||
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_two, Loader=Loader)
|
||||||
|
broker = Broker(config=cfg)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
||||||
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
||||||
|
assert getattr(plugin.context.config, 'option3', None) == 3
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_plugin_yaml_dict_config():
|
||||||
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_dict_config, Loader=Loader)
|
||||||
|
broker = Broker(config=cfg)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_plugin_empty_dict_config():
|
||||||
|
broker = Broker(config=plugin_empty_dict_config)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_plugin_option_dict_config():
|
||||||
|
broker = Broker(config=plugin_dict_option_config)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
||||||
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
||||||
|
assert getattr(plugin.context.config, 'option3', None) == 3
|
||||||
|
|
Ładowanie…
Reference in New Issue