diff --git a/docs/packaged_plugins.md b/docs/packaged_plugins.md index c2502e4..d3dca4d 100644 --- a/docs/packaged_plugins.md +++ b/docs/packaged_plugins.md @@ -117,22 +117,23 @@ Publishes, on a periodic basis, statistics about the broker **Supported Topics** -- `$SYS/broker/load/bytes/received` - payload: `data`, int -- `$SYS/broker/load/bytes/sent` - payload: `data`, int -- `$SYS/broker/messages/received` - payload: `data`, int -- `$SYS/broker/messages/sent` - payload: `data`, int -- `$SYS/broker/time` - payload: `data`, int (current time, epoch seconds) -- `$SYS/broker/uptime` - payload: `data`, int (seconds since broker start) -- `$SYS/broker/uptime/formatted` - payload: `data`, datetime (start time of broker in UTC) -- `$SYS/broker/clients/connected` - payload: `data`, int -- `$SYS/broker/clients/disconnected` - payload: `data`, int -- `$SYS/broker/clients/maximum` - payload: `data`, int -- `$SYS/broker/clients/total` - payload: `data`, int -- `$SYS/broker/messages/inflight` - payload: `data`, int -- `$SYS/broker/messages/inflight/in` - payload: `data`, int -- `$SYS/broker/messages/inflight/out` - payload: `data`, int -- `$SYS/broker/messages/inflight/stored` - payload: `data`, int -- `$SYS/broker/messages/publish/received` - payload: `data`, int -- `$SYS/broker/messages/publish/sent` - payload: `data`, int -- `$SYS/broker/messages/retained/count` - payload: `data`, int -- `$SYS/broker/messages/subscriptions/count` - payload: `data`, int +- `$SYS/broker/version` - payload: `str` +- `$SYS/broker/load/bytes/received` - payload: `int` +- `$SYS/broker/load/bytes/sent` - payload: `int` +- `$SYS/broker/messages/received` - payload: `int` +- `$SYS/broker/messages/sent` - payload: `int` +- `$SYS/broker/time` - payload: `int` (current time, epoch seconds) +- `$SYS/broker/uptime` - payload: `int` (seconds since broker start) +- `$SYS/broker/uptime/formatted` - payload: `str` (start time of broker in UTC) +- `$SYS/broker/clients/connected` - payload: `int` (current number of connected clients) +- `$SYS/broker/clients/disconnected` - payload: `int` (number of clients that have disconnected) +- `$SYS/broker/clients/maximum` - payload: `int` +- `$SYS/broker/clients/total` - payload: `int` +- `$SYS/broker/messages/inflight` - payload: `int` +- `$SYS/broker/messages/inflight/in` - payload: `int` +- `$SYS/broker/messages/inflight/out` - payload: `int` +- `$SYS/broker/messages/inflight/stored` - payload: `int` +- `$SYS/broker/messages/publish/received` - payload: `int` +- `$SYS/broker/messages/publish/sent` - payload: `int` +- `$SYS/broker/messages/retained/count` - payload: `int` +- `$SYS/broker/messages/subscriptions/count` - payload: `int` diff --git a/tests/plugins/test_plugins.py b/tests/plugins/test_plugins.py index 0e6a0fc..5782ad7 100644 --- a/tests/plugins/test_plugins.py +++ b/tests/plugins/test_plugins.py @@ -1,16 +1,21 @@ +import asyncio import inspect +from functools import partial from importlib.metadata import EntryPoint from logging import getLogger from pathlib import Path from types import ModuleType -from typing import Any +from typing import Any, Callable, Coroutine from unittest.mock import patch import pytest import amqtt.plugins from amqtt.broker import Broker, BrokerContext +from amqtt.client import MQTTClient from amqtt.errors import PluginError, PluginInitError, PluginImportError +from amqtt.events import MQTTEvents, BrokerEvents +from amqtt.mqtt.constants import QOS_0 from amqtt.plugins.base import BasePlugin from amqtt.plugins.manager import BaseContext @@ -125,3 +130,70 @@ async def test_plugin_exception_while_loading() -> None: with pytest.raises(PluginImportError): _ = Broker(plugin_namespace='tests.mock_plugins', config=config) + + +class TestAllEventsPlugin(BasePlugin[BaseContext]): + """A plugin to verify all events get sent to plugins.""" + def __init__(self, context: BaseContext) -> None: + super().__init__(context) + + self.test_flags = { events:False for events in list(MQTTEvents) + list(BrokerEvents)} + + async def call_method(self, event_name: str, **kwargs: Any) -> None: + assert event_name in self.test_flags + self.test_flags[event_name] = True + + def __getattr__(self, name: str) -> Callable[..., Coroutine[Any, Any, None]]: + """Dynamically handle calls to methods starting with 'on_'.""" + + if name.startswith("on_"): + event_name = name.replace('on_', '') + return partial(self.call_method, event_name) + + if name not in ('authenticate', 'topic_filtering'): + pytest.fail(f'unexpected method called: {name}') + +@pytest.mark.asyncio +async def test_all_plugin_events(): + class MockEntryPoints: + + def select(self, group) -> list[EntryPoint]: + match group: + case 'tests.mock_plugins': + return [ + EntryPoint(name='TestAllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:TestAllEventsPlugin'), + ] + case _: + return list() + + # patch the entry points so we can load our test plugin + with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish: + + config = { + "listeners": { + "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, + }, + 'sys_interval': 1 + } + + + broker = Broker(plugin_namespace='tests.mock_plugins', config=config) + + await broker.start() + await asyncio.sleep(2) + + # make sure all expected events get triggered + client = MQTTClient() + await client.connect("mqtt://127.0.0.1:1883/") + await client.subscribe([('my/test/topic', QOS_0),]) + await client.publish('test/topic', b'my test message') + await client.unsubscribe(['my/test/topic',]) + await client.disconnect() + await asyncio.sleep(1) + + # get the plugin so it doesn't get gc on shutdown + test_plugin = broker.plugins_manager.get_plugin('TestAllEventsPlugin') + await broker.shutdown() + await asyncio.sleep(1) + + assert all(test_plugin.test_flags.values()), f'event not received: {[event for event, value in test_plugin.test_flags.items() if not value]}' diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index 19341e2..c01d322 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -11,10 +11,37 @@ from amqtt.mqtt.constants import QOS_0 logger = logging.getLogger(__name__) +all_sys_topics = [ + '$SYS/broker/version', + '$SYS/broker/load/bytes/received', + '$SYS/broker/load/bytes/sent', + '$SYS/broker/messages/received', + '$SYS/broker/messages/sent', + '$SYS/broker/time', + '$SYS/broker/uptime', + '$SYS/broker/uptime/formatted', + '$SYS/broker/clients/connected', + '$SYS/broker/clients/disconnected', + '$SYS/broker/clients/maximum', + '$SYS/broker/clients/total', + '$SYS/broker/messages/inflight', + '$SYS/broker/messages/inflight/in', + '$SYS/broker/messages/inflight/out', + '$SYS/broker/messages/inflight/stored', + '$SYS/broker/messages/publish/received', + '$SYS/broker/messages/publish/sent', + '$SYS/broker/messages/retained/count', + '$SYS/broker/messages/subscriptions/count' +] + + + # test broker sys @pytest.mark.asyncio async def test_broker_sys_plugin() -> None: + sys_topic_flags = {sys_topic:False for sys_topic in all_sys_topics} + class MockEntryPoints: def select(self, group) -> list[EntryPoint]: @@ -40,19 +67,24 @@ async def test_broker_sys_plugin() -> None: await broker.start() client = MQTTClient() await client.connect("mqtt://127.0.0.1:1883/") - await client.subscribe([("$SYS/broker/uptime", QOS_0),]) + await client.subscribe([("$SYS/#", QOS_0),]) await client.publish('test/topic', b'my test message') await asyncio.sleep(2) sys_msg_count = 0 try: - while True: + while sys_msg_count < 30: message = await client.deliver_message(timeout_duration=1) if '$SYS' in message.topic: sys_msg_count += 1 + assert message.topic in sys_topic_flags + sys_topic_flags[message.topic] = True + except asyncio.TimeoutError: - pass + logger.debug(f"TimeoutError after {sys_msg_count} messages") await client.disconnect() await broker.shutdown() assert sys_msg_count > 1 + + assert all(sys_topic_flags.values()), f'topic not received: {[ topic for topic, flag in sys_topic_flags.items() if not flag ]}'