kopia lustrzana https://github.com/Yakifo/amqtt
added tests for plugin events and broker sys topics
rodzic
4984927ea5
commit
3d6e7a8364
|
@ -117,22 +117,23 @@ Publishes, on a periodic basis, statistics about the broker
|
|||
|
||||
**Supported Topics**
|
||||
|
||||
- `$SYS/broker/load/bytes/received` - payload: `data`, int
|
||||
- `$SYS/broker/load/bytes/sent` - payload: `data`, int
|
||||
- `$SYS/broker/messages/received` - payload: `data`, int
|
||||
- `$SYS/broker/messages/sent` - payload: `data`, int
|
||||
- `$SYS/broker/time` - payload: `data`, int (current time, epoch seconds)
|
||||
- `$SYS/broker/uptime` - payload: `data`, int (seconds since broker start)
|
||||
- `$SYS/broker/uptime/formatted` - payload: `data`, datetime (start time of broker in UTC)
|
||||
- `$SYS/broker/clients/connected` - payload: `data`, int
|
||||
- `$SYS/broker/clients/disconnected` - payload: `data`, int
|
||||
- `$SYS/broker/clients/maximum` - payload: `data`, int
|
||||
- `$SYS/broker/clients/total` - payload: `data`, int
|
||||
- `$SYS/broker/messages/inflight` - payload: `data`, int
|
||||
- `$SYS/broker/messages/inflight/in` - payload: `data`, int
|
||||
- `$SYS/broker/messages/inflight/out` - payload: `data`, int
|
||||
- `$SYS/broker/messages/inflight/stored` - payload: `data`, int
|
||||
- `$SYS/broker/messages/publish/received` - payload: `data`, int
|
||||
- `$SYS/broker/messages/publish/sent` - payload: `data`, int
|
||||
- `$SYS/broker/messages/retained/count` - payload: `data`, int
|
||||
- `$SYS/broker/messages/subscriptions/count` - payload: `data`, int
|
||||
- `$SYS/broker/version` - payload: `str`
|
||||
- `$SYS/broker/load/bytes/received` - payload: `int`
|
||||
- `$SYS/broker/load/bytes/sent` - payload: `int`
|
||||
- `$SYS/broker/messages/received` - payload: `int`
|
||||
- `$SYS/broker/messages/sent` - payload: `int`
|
||||
- `$SYS/broker/time` - payload: `int` (current time, epoch seconds)
|
||||
- `$SYS/broker/uptime` - payload: `int` (seconds since broker start)
|
||||
- `$SYS/broker/uptime/formatted` - payload: `str` (start time of broker in UTC)
|
||||
- `$SYS/broker/clients/connected` - payload: `int` (current number of connected clients)
|
||||
- `$SYS/broker/clients/disconnected` - payload: `int` (number of clients that have disconnected)
|
||||
- `$SYS/broker/clients/maximum` - payload: `int`
|
||||
- `$SYS/broker/clients/total` - payload: `int`
|
||||
- `$SYS/broker/messages/inflight` - payload: `int`
|
||||
- `$SYS/broker/messages/inflight/in` - payload: `int`
|
||||
- `$SYS/broker/messages/inflight/out` - payload: `int`
|
||||
- `$SYS/broker/messages/inflight/stored` - payload: `int`
|
||||
- `$SYS/broker/messages/publish/received` - payload: `int`
|
||||
- `$SYS/broker/messages/publish/sent` - payload: `int`
|
||||
- `$SYS/broker/messages/retained/count` - payload: `int`
|
||||
- `$SYS/broker/messages/subscriptions/count` - payload: `int`
|
||||
|
|
|
@ -1,16 +1,21 @@
|
|||
import asyncio
|
||||
import inspect
|
||||
from functools import partial
|
||||
from importlib.metadata import EntryPoint
|
||||
from logging import getLogger
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
from typing import Any, Callable, Coroutine
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
import amqtt.plugins
|
||||
from amqtt.broker import Broker, BrokerContext
|
||||
from amqtt.client import MQTTClient
|
||||
from amqtt.errors import PluginError, PluginInitError, PluginImportError
|
||||
from amqtt.events import MQTTEvents, BrokerEvents
|
||||
from amqtt.mqtt.constants import QOS_0
|
||||
from amqtt.plugins.base import BasePlugin
|
||||
from amqtt.plugins.manager import BaseContext
|
||||
|
||||
|
@ -125,3 +130,70 @@ async def test_plugin_exception_while_loading() -> None:
|
|||
|
||||
with pytest.raises(PluginImportError):
|
||||
_ = Broker(plugin_namespace='tests.mock_plugins', config=config)
|
||||
|
||||
|
||||
class TestAllEventsPlugin(BasePlugin[BaseContext]):
|
||||
"""A plugin to verify all events get sent to plugins."""
|
||||
def __init__(self, context: BaseContext) -> None:
|
||||
super().__init__(context)
|
||||
|
||||
self.test_flags = { events:False for events in list(MQTTEvents) + list(BrokerEvents)}
|
||||
|
||||
async def call_method(self, event_name: str, **kwargs: Any) -> None:
|
||||
assert event_name in self.test_flags
|
||||
self.test_flags[event_name] = True
|
||||
|
||||
def __getattr__(self, name: str) -> Callable[..., Coroutine[Any, Any, None]]:
|
||||
"""Dynamically handle calls to methods starting with 'on_'."""
|
||||
|
||||
if name.startswith("on_"):
|
||||
event_name = name.replace('on_', '')
|
||||
return partial(self.call_method, event_name)
|
||||
|
||||
if name not in ('authenticate', 'topic_filtering'):
|
||||
pytest.fail(f'unexpected method called: {name}')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_plugin_events():
|
||||
class MockEntryPoints:
|
||||
|
||||
def select(self, group) -> list[EntryPoint]:
|
||||
match group:
|
||||
case 'tests.mock_plugins':
|
||||
return [
|
||||
EntryPoint(name='TestAllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:TestAllEventsPlugin'),
|
||||
]
|
||||
case _:
|
||||
return list()
|
||||
|
||||
# patch the entry points so we can load our test plugin
|
||||
with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish:
|
||||
|
||||
config = {
|
||||
"listeners": {
|
||||
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
|
||||
},
|
||||
'sys_interval': 1
|
||||
}
|
||||
|
||||
|
||||
broker = Broker(plugin_namespace='tests.mock_plugins', config=config)
|
||||
|
||||
await broker.start()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# make sure all expected events get triggered
|
||||
client = MQTTClient()
|
||||
await client.connect("mqtt://127.0.0.1:1883/")
|
||||
await client.subscribe([('my/test/topic', QOS_0),])
|
||||
await client.publish('test/topic', b'my test message')
|
||||
await client.unsubscribe(['my/test/topic',])
|
||||
await client.disconnect()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# get the plugin so it doesn't get gc on shutdown
|
||||
test_plugin = broker.plugins_manager.get_plugin('TestAllEventsPlugin')
|
||||
await broker.shutdown()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
assert all(test_plugin.test_flags.values()), f'event not received: {[event for event, value in test_plugin.test_flags.items() if not value]}'
|
||||
|
|
|
@ -11,10 +11,37 @@ from amqtt.mqtt.constants import QOS_0
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
all_sys_topics = [
|
||||
'$SYS/broker/version',
|
||||
'$SYS/broker/load/bytes/received',
|
||||
'$SYS/broker/load/bytes/sent',
|
||||
'$SYS/broker/messages/received',
|
||||
'$SYS/broker/messages/sent',
|
||||
'$SYS/broker/time',
|
||||
'$SYS/broker/uptime',
|
||||
'$SYS/broker/uptime/formatted',
|
||||
'$SYS/broker/clients/connected',
|
||||
'$SYS/broker/clients/disconnected',
|
||||
'$SYS/broker/clients/maximum',
|
||||
'$SYS/broker/clients/total',
|
||||
'$SYS/broker/messages/inflight',
|
||||
'$SYS/broker/messages/inflight/in',
|
||||
'$SYS/broker/messages/inflight/out',
|
||||
'$SYS/broker/messages/inflight/stored',
|
||||
'$SYS/broker/messages/publish/received',
|
||||
'$SYS/broker/messages/publish/sent',
|
||||
'$SYS/broker/messages/retained/count',
|
||||
'$SYS/broker/messages/subscriptions/count'
|
||||
]
|
||||
|
||||
|
||||
|
||||
# test broker sys
|
||||
@pytest.mark.asyncio
|
||||
async def test_broker_sys_plugin() -> None:
|
||||
|
||||
sys_topic_flags = {sys_topic:False for sys_topic in all_sys_topics}
|
||||
|
||||
class MockEntryPoints:
|
||||
|
||||
def select(self, group) -> list[EntryPoint]:
|
||||
|
@ -40,19 +67,24 @@ async def test_broker_sys_plugin() -> None:
|
|||
await broker.start()
|
||||
client = MQTTClient()
|
||||
await client.connect("mqtt://127.0.0.1:1883/")
|
||||
await client.subscribe([("$SYS/broker/uptime", QOS_0),])
|
||||
await client.subscribe([("$SYS/#", QOS_0),])
|
||||
await client.publish('test/topic', b'my test message')
|
||||
await asyncio.sleep(2)
|
||||
sys_msg_count = 0
|
||||
try:
|
||||
while True:
|
||||
while sys_msg_count < 30:
|
||||
message = await client.deliver_message(timeout_duration=1)
|
||||
if '$SYS' in message.topic:
|
||||
sys_msg_count += 1
|
||||
assert message.topic in sys_topic_flags
|
||||
sys_topic_flags[message.topic] = True
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
logger.debug(f"TimeoutError after {sys_msg_count} messages")
|
||||
|
||||
await client.disconnect()
|
||||
await broker.shutdown()
|
||||
|
||||
assert sys_msg_count > 1
|
||||
|
||||
assert all(sys_topic_flags.values()), f'topic not received: {[ topic for topic, flag in sys_topic_flags.items() if not flag ]}'
|
||||
|
|
Ładowanie…
Reference in New Issue