adding comments to plugin manager code

pull/252/head
Andrew Mirsky 2025-07-04 17:07:17 -04:00
rodzic 6a45eeb533
commit 2684ffa7b0
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
1 zmienionych plików z 30 dodań i 3 usunięć

Wyświetl plik

@ -82,13 +82,32 @@ class PluginManager(Generic[C]):
return self.context
def _load_plugins(self, namespace: str | None = None) -> None:
"""Load plugins from entrypoint or config dictionary.
config style is now recommended; entrypoint has been deprecated
Example:
config = {
'listeners':...,
'plugins': {
'myproject.myfile.MyPlugin': {}
}
"""
if self.app_context.config and self.app_context.config.get("plugins", None) is not None:
# plugins loaded directly from config dictionary
if "auth" in self.app_context.config:
self.logger.warning("Loading plugins from config will ignore 'auth' section of config")
if "topic-check" in self.app_context.config:
self.logger.warning("Loading plugins from config will ignore 'topic-check' section of config")
plugins_config: list[Any] | dict[str, Any] = self.app_context.config.get("plugins", [])
# if the config was generated from yaml, the plugins maybe a list instead of a dictionary; transform before loading
#
# plugins:
# - myproject.myfile.MyPlugin:
if isinstance(plugins_config, list):
plugins_info: dict[str, Any] = {}
for plugin_config in plugins_config:
@ -100,7 +119,7 @@ class PluginManager(Generic[C]):
else:
plugins_info.update(plugin_config)
self._load_str_plugins(plugins_info)
if isinstance(plugins_config, dict):
elif isinstance(plugins_config, dict):
self._load_str_plugins(plugins_config)
else:
if not namespace:
@ -116,6 +135,7 @@ class PluginManager(Generic[C]):
self._load_ep_plugins(namespace)
# for all the loaded plugins, find all event callbacks
for plugin in self._plugins:
for event in list(BrokerEvents) + list(MQTTEvents):
if awaitable := getattr(plugin, f"on_{event}", None):
@ -126,7 +146,7 @@ class PluginManager(Generic[C]):
self._event_plugin_callbacks[event].append(awaitable)
def _load_ep_plugins(self, namespace:str) -> None:
"""Load plugins from `pyproject.toml` entrypoints. Deprecated."""
self.logger.debug(f"Loading plugins for namespace {namespace}")
auth_filter_list = []
topic_filter_list = []
@ -156,6 +176,7 @@ class PluginManager(Generic[C]):
self.logger.debug(f" Plugin {item.name} ready")
def _load_ep_plugin(self, ep: EntryPoint) -> Plugin | None:
"""Load plugins from `pyproject.toml` entrypoints. Deprecated."""
try:
self.logger.debug(f" Loading plugin {ep!s}")
plugin = ep.load()
@ -178,6 +199,7 @@ class PluginManager(Generic[C]):
def _load_str_plugins(self, plugins_info: dict[str, Any]) -> None:
self.logger.info("Loading plugins from config")
# legacy had a filtering 'enabled' flag, even if plugins were loaded/listed
self._is_topic_filtering_enabled = True
self._is_auth_filtering_enabled = True
for plugin_path, plugin_config in plugins_info.items():
@ -185,6 +207,7 @@ class PluginManager(Generic[C]):
plugin = self._load_str_plugin(plugin_path, plugin_config)
self._plugins.append(plugin)
# make sure that authenticate and topic filtering plugins have the appropriate async signature
if isinstance(plugin, BaseAuthPlugin):
if not iscoroutinefunction(plugin.authenticate):
msg = f"Auth plugin {plugin_path} has non-async authenticate method."
@ -197,7 +220,7 @@ class PluginManager(Generic[C]):
self._topic_plugins.append(plugin)
def _load_str_plugin(self, plugin_path: str, plugin_cfg: dict[str, Any] | None = None) -> "BasePlugin[C]":
"""Load plugin from string dotted path: mymodule.myfile.MyPlugin."""
try:
plugin_class: Any = import_string(plugin_path)
except ImportError as ep:
@ -211,6 +234,8 @@ class PluginManager(Generic[C]):
plugin_context = copy.copy(self.app_context)
plugin_context.logger = self.logger.getChild(plugin_class.__name__)
try:
# populate the config based on the inner dataclass called `Config`
# use `dacite` package to type check
plugin_context.config = from_dict(data_class=plugin_class.Config,
data=plugin_cfg or {},
config=DaciteConfig(strict=True))
@ -231,6 +256,8 @@ class PluginManager(Generic[C]):
def get_plugin(self, name: str) -> Optional["BasePlugin[C]"]:
"""Get a plugin by its name from the plugins loaded for the current namespace.
Only used for testing purposes to verify plugin loading correctly.
:param name:
:return:
"""