amqtt/tests/conftest.py

130 wiersze
3.5 KiB
Python

import logging
from pathlib import Path
import tempfile
from typing import Any
2021-03-06 17:58:34 +00:00
import unittest.mock
import urllib.request
2021-03-06 17:58:34 +00:00
import pytest
from amqtt.broker import Broker
from amqtt.contexts import BaseContext
from amqtt.plugins.base import BasePlugin
log = logging.getLogger(__name__)
2021-03-06 17:58:34 +00:00
pytest_plugins = ["pytest_logdog"]
2021-03-06 17:58:34 +00:00
test_config = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 15},
"ws": {"type": "ws", "bind": "127.0.0.1:8080", "max_connections": 15},
"wss": {"type": "ws", "bind": "127.0.0.1:8081", "max_connections": 15},
},
"sys_interval": 0,
"auth": {
"allow-anonymous": True,
}
2021-03-06 17:58:34 +00:00
}
test_config_acl: dict[str, int | dict[str, Any]] = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1884", "max_connections": 10},
},
"sys_interval": 0,
"auth": {
"plugins": ["auth_file"],
"password-file": Path(__file__).resolve().parent / "plugins" / "passwd",
},
"topic-check": {
"enabled": True,
"plugins": ["topic_acl"],
"acl": {
2021-07-08 00:23:24 +00:00
"user1": ["public/#"],
"user2": ["#"],
},
2021-07-08 00:23:24 +00:00
"publish-acl": {"user1": ["public/subtopic/#"]},
},
}
@pytest.fixture
2021-03-06 17:58:34 +00:00
def mock_plugin_manager():
with (unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager):
plugin_manager_instance = plugin_manager.return_value
# disable topic filtering when using the mock manager
plugin_manager_instance.is_topic_filtering_enabled.return_value = False
# allow any connection when using the mock manager
plugin_manager_instance.map_plugin_auth = unittest.mock.AsyncMock(return_value={ BasePlugin(BaseContext()): True })
2021-03-06 17:58:34 +00:00
yield plugin_manager
@pytest.fixture
async def broker_fixture():
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
2021-03-06 17:58:34 +00:00
async def broker(mock_plugin_manager):
# just making sure the mock is in place before we start our broker
assert mock_plugin_manager is not None
2021-03-06 17:58:34 +00:00
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
2021-03-06 17:58:34 +00:00
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
2021-03-06 17:58:34 +00:00
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
async def acl_broker():
broker = Broker(
test_config_acl,
plugin_namespace="amqtt.broker.plugins",
2021-07-08 00:23:24 +00:00
)
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture(scope="module")
def ca_file_fixture():
temp_dir = Path(tempfile.mkdtemp(prefix="amqtt-test-"))
url = "http://test.mosquitto.org/ssl/mosquitto.org.crt"
ca_file = temp_dir / "mosquitto.org.crt"
2025-01-12 19:52:50 +00:00
urllib.request.urlretrieve(url, str(ca_file))
log.info(f"Stored mosquitto cert at {ca_file}")
# Yield the CA file path for tests
yield ca_file
# Cleanup after the tests
if temp_dir.exists():
for file in temp_dir.iterdir():
file.unlink()
temp_dir.rmdir()