amqtt/tests/conftest.py

130 wiersze
3.5 KiB
Python

import logging
from pathlib import Path
import tempfile
from typing import Any
import unittest.mock
import urllib.request
import pytest
from amqtt.broker import Broker
from amqtt.contexts import BaseContext
from amqtt.plugins.base import BasePlugin
log = logging.getLogger(__name__)
pytest_plugins = ["pytest_logdog"]
test_config = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 15},
"ws": {"type": "ws", "bind": "127.0.0.1:8080", "max_connections": 15},
"wss": {"type": "ws", "bind": "127.0.0.1:8081", "max_connections": 15},
},
"sys_interval": 0,
"auth": {
"allow-anonymous": True,
}
}
test_config_acl: dict[str, int | dict[str, Any]] = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1884", "max_connections": 10},
},
"sys_interval": 0,
"auth": {
"plugins": ["auth_file"],
"password-file": Path(__file__).resolve().parent / "plugins" / "passwd",
},
"topic-check": {
"enabled": True,
"plugins": ["topic_acl"],
"acl": {
"user1": ["public/#"],
"user2": ["#"],
},
"publish-acl": {"user1": ["public/subtopic/#"]},
},
}
@pytest.fixture
def mock_plugin_manager():
with (unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager):
plugin_manager_instance = plugin_manager.return_value
# disable topic filtering when using the mock manager
plugin_manager_instance.is_topic_filtering_enabled.return_value = False
# allow any connection when using the mock manager
plugin_manager_instance.map_plugin_auth = unittest.mock.AsyncMock(return_value={ BasePlugin(BaseContext()): True })
yield plugin_manager
@pytest.fixture
async def broker_fixture():
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
async def broker(mock_plugin_manager):
# just making sure the mock is in place before we start our broker
assert mock_plugin_manager is not None
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
async def acl_broker():
broker = Broker(
test_config_acl,
plugin_namespace="amqtt.broker.plugins",
)
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture(scope="module")
def ca_file_fixture():
temp_dir = Path(tempfile.mkdtemp(prefix="amqtt-test-"))
url = "http://test.mosquitto.org/ssl/mosquitto.org.crt"
ca_file = temp_dir / "mosquitto.org.crt"
urllib.request.urlretrieve(url, str(ca_file))
log.info(f"Stored mosquitto cert at {ca_file}")
# Yield the CA file path for tests
yield ca_file
# Cleanup after the tests
if temp_dir.exists():
for file in temp_dir.iterdir():
file.unlink()
temp_dir.rmdir()