kopia lustrzana https://github.com/Yakifo/amqtt
81 wiersze
1.9 KiB
Python
81 wiersze
1.9 KiB
Python
import unittest.mock
|
|
import os.path
|
|
|
|
import pytest
|
|
|
|
import amqtt.broker
|
|
|
|
pytest_plugins = ["pytest_logdog"]
|
|
|
|
test_config = {
|
|
"listeners": {
|
|
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
|
|
},
|
|
"sys_interval": 0,
|
|
"auth": {
|
|
"allow-anonymous": True,
|
|
},
|
|
}
|
|
|
|
|
|
test_config_acl = {
|
|
"listeners": {
|
|
"default": {"type": "tcp", "bind": "127.0.0.1:1884", "max_connections": 10},
|
|
},
|
|
"sys_interval": 0,
|
|
"auth": {
|
|
"plugins": ["auth_file"],
|
|
"password-file": os.path.join(
|
|
os.path.dirname(os.path.realpath(__file__)), "plugins", "passwd"
|
|
),
|
|
},
|
|
"topic-check": {
|
|
"enabled": True,
|
|
"plugins": ["topic_acl"],
|
|
"acl": {
|
|
"user1": ["public/#"],
|
|
"user2": ["#"],
|
|
},
|
|
"publish-acl": {"user1": ["public/subtopic/#"]},
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_plugin_manager():
|
|
with unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager:
|
|
yield plugin_manager
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def broker(mock_plugin_manager):
|
|
# just making sure the mock is in place before we start our broker
|
|
assert mock_plugin_manager is not None
|
|
|
|
broker = amqtt.broker.Broker(test_config, plugin_namespace="amqtt.test.plugins")
|
|
await broker.start()
|
|
assert broker.transitions.is_started()
|
|
assert broker._sessions == {}
|
|
assert "default" in broker._servers
|
|
|
|
yield broker
|
|
|
|
if not broker.transitions.is_stopped():
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def acl_broker():
|
|
broker = amqtt.broker.Broker(
|
|
test_config_acl, plugin_namespace="amqtt.broker.plugins"
|
|
)
|
|
await broker.start()
|
|
assert broker.transitions.is_started()
|
|
assert broker._sessions == {}
|
|
assert "default" in broker._servers
|
|
|
|
yield broker
|
|
|
|
if not broker.transitions.is_stopped():
|
|
await broker.shutdown()
|