amqtt/tests/conftest.py

120 wiersze
3.0 KiB
Python

import logging
from pathlib import Path
import tempfile
from typing import Any
import unittest.mock
import urllib.request
import pytest
from amqtt.broker import Broker
log = logging.getLogger(__name__)
pytest_plugins = ["pytest_logdog"]
test_config = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
"ws": {"type": "ws", "bind": "127.0.0.1:8080", "max_connections": 10},
"wss": {"type": "ws", "bind": "127.0.0.1:8081", "max_connections": 10},
},
"sys_interval": 0,
"auth": {
"allow-anonymous": True,
},
}
test_config_acl: dict[str, int | dict[str, Any]] = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1884", "max_connections": 10},
},
"sys_interval": 0,
"auth": {
"plugins": ["auth_file"],
"password-file": Path(__file__).resolve().parent / "plugins" / "passwd",
},
"topic-check": {
"enabled": True,
"plugins": ["topic_acl"],
"acl": {
"user1": ["public/#"],
"user2": ["#"],
},
"publish-acl": {"user1": ["public/subtopic/#"]},
},
}
@pytest.fixture
def mock_plugin_manager():
with unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager:
yield plugin_manager
@pytest.fixture
async def broker_fixture():
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
async def broker(mock_plugin_manager):
# just making sure the mock is in place before we start our broker
assert mock_plugin_manager is not None
broker = Broker(test_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture
async def acl_broker():
broker = Broker(
test_config_acl,
plugin_namespace="amqtt.broker.plugins",
)
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()
@pytest.fixture(scope="module")
def ca_file_fixture():
temp_dir = Path(tempfile.mkdtemp(prefix="amqtt-test-"))
url = "http://test.mosquitto.org/ssl/mosquitto.org.crt"
ca_file = temp_dir / "mosquitto.org.crt"
urllib.request.urlretrieve(url, str(ca_file))
log.info(f"Stored mosquitto cert at {ca_file}")
# Yield the CA file path for tests
yield ca_file
# Cleanup after the tests
if temp_dir.exists():
for file in temp_dir.iterdir():
file.unlink()
temp_dir.rmdir()