kopia lustrzana https://github.com/Yakifo/amqtt
320 wiersze
7.6 KiB
Python
320 wiersze
7.6 KiB
Python
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from amqtt.broker import Broker
|
|
from yaml import CLoader as Loader
|
|
from dacite import from_dict, Config, UnexpectedDataError
|
|
|
|
from amqtt.client import MQTTClient
|
|
from amqtt.errors import PluginLoadError, ConnectError, PluginCoroError
|
|
from amqtt.mqtt.constants import QOS_0
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
plugin_config = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestSimplePlugin:
|
|
- tests.plugins.mocks.TestConfigPlugin:
|
|
option1: 1
|
|
option2: bar
|
|
"""
|
|
|
|
|
|
plugin_invalid_config_one = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestSimplePlugin:
|
|
option1: 1
|
|
option2: bar
|
|
"""
|
|
|
|
plugin_invalid_config_two = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestConfigPlugin:
|
|
"""
|
|
|
|
plugin_coro_error_config = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestCoroErrorPlugin:
|
|
"""
|
|
|
|
plugin_config_auth = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestAuthPlugin:
|
|
"""
|
|
|
|
plugin_config_no_auth = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestNoAuthPlugin:
|
|
"""
|
|
|
|
|
|
plugin_config_topic = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- amqtt.plugins.authentication.AnonymousAuthPlugin
|
|
- tests.plugins.mocks.TestAllowTopicPlugin:
|
|
"""
|
|
|
|
|
|
plugin_config_topic_block = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- amqtt.plugins.authentication.AnonymousAuthPlugin
|
|
- tests.plugins.mocks.TestBlockTopicPlugin:
|
|
"""
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_config_extra_fields():
|
|
|
|
cfg: dict[str, Any] = yaml.load(plugin_invalid_config_one, Loader=Loader)
|
|
|
|
with pytest.raises(PluginLoadError):
|
|
_ = Broker(config=cfg)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_config_missing_fields():
|
|
cfg: dict[str, Any] = yaml.load(plugin_invalid_config_one, Loader=Loader)
|
|
|
|
with pytest.raises(PluginLoadError):
|
|
_ = Broker(config=cfg)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alternate_plugin_load():
|
|
|
|
cfg: dict[str, Any] = yaml.load(plugin_config, Loader=Loader)
|
|
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_coro_error_plugin_load():
|
|
|
|
cfg: dict[str, Any] = yaml.load(plugin_coro_error_config, Loader=Loader)
|
|
|
|
with pytest.raises(PluginCoroError):
|
|
_ = Broker(config=cfg)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auth_plugin_load():
|
|
cfg: dict[str, Any] = yaml.load(plugin_config_auth, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
await asyncio.sleep(0.5)
|
|
|
|
client1 = MQTTClient()
|
|
await client1.connect()
|
|
await client1.publish('my/topic', b'my message')
|
|
await client1.disconnect()
|
|
|
|
await asyncio.sleep(0.5)
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_auth_plugin_load():
|
|
cfg: dict[str, Any] = yaml.load(plugin_config_no_auth, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
await asyncio.sleep(0.5)
|
|
|
|
client1 = MQTTClient(config={'auto_reconnect': False})
|
|
with pytest.raises(ConnectError):
|
|
await client1.connect()
|
|
|
|
await asyncio.sleep(0.5)
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allow_topic_plugin_load():
|
|
cfg: dict[str, Any] = yaml.load(plugin_config_topic, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
await asyncio.sleep(0.5)
|
|
|
|
client2 = MQTTClient(config={'auto_reconnect': False})
|
|
await client2.connect()
|
|
await client2.subscribe([
|
|
('my/topic', QOS_0)
|
|
])
|
|
|
|
client1 = MQTTClient(config={'auto_reconnect': True})
|
|
await client1.connect()
|
|
await client1.publish('my/topic', b'my message')
|
|
|
|
message = await client2.deliver_message(timeout_duration=1)
|
|
assert message.topic == 'my/topic'
|
|
assert message.data == b'my message'
|
|
|
|
await client2.disconnect()
|
|
await client1.disconnect()
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_block_topic_plugin_load():
|
|
cfg: dict[str, Any] = yaml.load(plugin_config_topic_block, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
await asyncio.sleep(0.5)
|
|
|
|
client2 = MQTTClient(config={'auto_reconnect': False})
|
|
await client2.connect()
|
|
await client2.subscribe([
|
|
('my/topic', QOS_0)
|
|
])
|
|
|
|
client1 = MQTTClient(config={'auto_reconnect': True})
|
|
await client1.connect()
|
|
await client1.publish('my/topic', b'my message')
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
message = await client2.deliver_message(timeout_duration=1)
|
|
logger.debug(f"msg received: {message.topic} >> {message.data}")
|
|
|
|
await client2.disconnect()
|
|
await client1.disconnect()
|
|
|
|
await broker.shutdown()
|
|
|
|
plugin_yaml_list_config_one = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestSimplePlugin:
|
|
- tests.plugins.mocks.TestConfigPlugin:
|
|
option1: 1
|
|
option2: bar
|
|
option3: 3
|
|
"""
|
|
|
|
plugin_yaml_list_config_two = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
- tests.plugins.mocks.TestSimplePlugin
|
|
- tests.plugins.mocks.TestConfigPlugin:
|
|
option1: 1
|
|
option2: bar
|
|
option3: 3
|
|
"""
|
|
|
|
plugin_yaml_dict_config = """---
|
|
listeners:
|
|
default:
|
|
type: tcp
|
|
bind: 0.0.0.0:1883
|
|
plugins:
|
|
tests.plugins.mocks.TestSimplePlugin:
|
|
tests.plugins.mocks.TestConfigPlugin:
|
|
option1: 1
|
|
option2: bar
|
|
option3: 3
|
|
"""
|
|
|
|
plugin_empty_dict_config = {
|
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
|
'plugins': {
|
|
'tests.plugins.mocks.TestSimplePlugin': {},
|
|
}
|
|
}
|
|
|
|
plugin_dict_option_config = {
|
|
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
|
|
'plugins': {
|
|
'tests.plugins.mocks.TestConfigPlugin': {'option1': 1, 'option2': 'bar', 'option3': 3}
|
|
}
|
|
}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_yaml_list_config():
|
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_one, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
|
|
await asyncio.sleep(0.5)
|
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
|
assert getattr(plugin.context.config, 'option3', None) == 3
|
|
|
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_list_config_two, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
|
|
await asyncio.sleep(0.5)
|
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
|
assert getattr(plugin.context.config, 'option3', None) == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_yaml_dict_config():
|
|
cfg: dict[str, Any] = yaml.load(plugin_yaml_dict_config, Loader=Loader)
|
|
broker = Broker(config=cfg)
|
|
|
|
await asyncio.sleep(0.5)
|
|
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_empty_dict_config():
|
|
broker = Broker(config=plugin_empty_dict_config)
|
|
|
|
await asyncio.sleep(0.5)
|
|
assert broker.plugins_manager.get_plugin('TestSimplePlugin') is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_option_dict_config():
|
|
broker = Broker(config=plugin_dict_option_config)
|
|
|
|
await asyncio.sleep(0.5)
|
|
plugin = broker.plugins_manager.get_plugin('TestConfigPlugin')
|
|
assert getattr(plugin.context.config, 'option1', None) == 1
|
|
assert getattr(plugin.context.config, 'option3', None) == 3
|