amqtt/tests/plugins/test_config.py

320 wiersze
7.6 KiB
Python

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
import pytest
import yaml
from amqtt.broker import Broker
from yaml import CLoader as Loader
from dacite import from_dict, Config, UnexpectedDataError
from amqtt.client import MQTTClient
from amqtt.errors import PluginLoadError, ConnectError, PluginCoroError
from amqtt.mqtt.constants import QOS_0
logger = logging.getLogger(__name__)
plugin_config = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestSimplePlugin:
- tests.plugins.mocks.TestConfigPlugin:
option1: 1
option2: bar
"""
plugin_invalid_config_one = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestSimplePlugin:
option1: 1
option2: bar
"""
plugin_invalid_config_two = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestConfigPlugin:
"""
plugin_coro_error_config = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestCoroErrorPlugin:
"""
plugin_config_auth = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestAuthPlugin:
"""
plugin_config_no_auth = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestNoAuthPlugin:
"""
plugin_config_topic = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- amqtt.plugins.authentication.AnonymousAuthPlugin
- tests.plugins.mocks.TestAllowTopicPlugin:
"""
plugin_config_topic_block = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- amqtt.plugins.authentication.AnonymousAuthPlugin
- tests.plugins.mocks.TestBlockTopicPlugin:
"""
@pytest.mark.asyncio
async def test_plugin_config_extra_fields():
cfg: dict[str, Any] = yaml.load(plugin_invalid_config_one, Loader=Loader)
with pytest.raises(PluginLoadError):
_ = Broker(config=cfg)
@pytest.mark.asyncio
async def test_plugin_config_missing_fields():
cfg: dict[str, Any] = yaml.load(plugin_invalid_config_one, Loader=Loader)
with pytest.raises(PluginLoadError):
_ = Broker(config=cfg)
@pytest.mark.asyncio
async def test_alternate_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_config, Loader=Loader)
broker = Broker(config=cfg)
await broker.start()
await broker.shutdown()
@pytest.mark.asyncio
async def test_coro_error_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_coro_error_config, Loader=Loader)
with pytest.raises(PluginCoroError):
_ = Broker(config=cfg)
@pytest.mark.asyncio
async def test_auth_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_config_auth, Loader=Loader)
broker = Broker(config=cfg)
await broker.start()
await asyncio.sleep(0.5)
client1 = MQTTClient()
await client1.connect()
await client1.publish('my/topic', b'my message')
await client1.disconnect()
await asyncio.sleep(0.5)
await broker.shutdown()
@pytest.mark.asyncio
async def test_no_auth_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_config_no_auth, Loader=Loader)
broker = Broker(config=cfg)
await broker.start()
await asyncio.sleep(0.5)
client1 = MQTTClient(config={'auto_reconnect': False})
with pytest.raises(ConnectError):
await client1.connect()
await asyncio.sleep(0.5)
await broker.shutdown()
@pytest.mark.asyncio
async def test_allow_topic_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_config_topic, Loader=Loader)
broker = Broker(config=cfg)
await broker.start()
await asyncio.sleep(0.5)
client2 = MQTTClient(config={'auto_reconnect': False})
await client2.connect()
await client2.subscribe([
('my/topic', QOS_0)
])
client1 = MQTTClient(config={'auto_reconnect': True})
await client1.connect()
await client1.publish('my/topic', b'my message')
message = await client2.deliver_message(timeout_duration=1)
assert message.topic == 'my/topic'
assert message.data == b'my message'
await client2.disconnect()
await client1.disconnect()
await broker.shutdown()
@pytest.mark.asyncio
async def test_block_topic_plugin_load():
cfg: dict[str, Any] = yaml.load(plugin_config_topic_block, Loader=Loader)
broker = Broker(config=cfg)
await broker.start()
await asyncio.sleep(0.5)
client2 = MQTTClient(config={'auto_reconnect': False})
await client2.connect()
await client2.subscribe([
('my/topic', QOS_0)
])
client1 = MQTTClient(config={'auto_reconnect': True})
await client1.connect()
await client1.publish('my/topic', b'my message')
with pytest.raises(asyncio.TimeoutError):
message = await client2.deliver_message(timeout_duration=1)
logger.debug(f"msg received: {message.topic} >> {message.data}")
await client2.disconnect()
await client1.disconnect()
await broker.shutdown()
plugin_yaml_list_config_one = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestSimplePlugin:
- tests.plugins.mocks.TestConfigPlugin:
option1: 1
option2: bar
option3: 3
"""
plugin_yaml_list_config_two = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
- tests.plugins.mocks.TestSimplePlugin
- tests.plugins.mocks.TestConfigPlugin:
option1: 1
option2: bar
option3: 3
"""
plugin_yaml_dict_config = """---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
tests.plugins.mocks.TestSimplePlugin:
tests.plugins.mocks.TestConfigPlugin:
option1: 1
option2: bar
option3: 3
"""
plugin_empty_dict_config = {
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
'plugins': {
'tests.plugins.mocks.TestSimplePlugin': {},
}
}
plugin_dict_option_config = {
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
'plugins': {
'tests.plugins.mocks.TestConfigPlugin': {'option1': 1, 'option2': 'bar', 'option3': 3}
}
}
@pytest.mark.asyncio
async def test_plugin_yaml_list_config():
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_one, Loader=Loader)
broker = Broker(config=cfg)
await asyncio.sleep(0.5)
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
assert getattr(plugin.context.config, 'option1', None) == 1
assert getattr(plugin.context.config, 'option3', None) == 3
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_two, Loader=Loader)
broker = Broker(config=cfg)
await asyncio.sleep(0.5)
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
assert getattr(plugin.context.config, 'option1', None) == 1
assert getattr(plugin.context.config, 'option3', None) == 3
@pytest.mark.asyncio
async def test_plugin_yaml_dict_config():
cfg: dict[str, Any] = yaml.load(plugin_yaml_dict_config, Loader=Loader)
broker = Broker(config=cfg)
await asyncio.sleep(0.5)
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
@pytest.mark.asyncio
async def test_plugin_empty_dict_config():
broker = Broker(config=plugin_empty_dict_config)
await asyncio.sleep(0.5)
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
@pytest.mark.asyncio
async def test_plugin_option_dict_config():
broker = Broker(config=plugin_dict_option_config)
await asyncio.sleep(0.5)
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
assert getattr(plugin.context.config, 'option1', None) == 1
assert getattr(plugin.context.config, 'option3', None) == 3