amqtt/tests/conftest.py

39 wiersze
946 B
Python
Czysty Zwykły widok Historia

2021-03-06 17:58:34 +00:00
import unittest.mock
import pytest
2021-03-27 12:16:42 +00:00
import amqtt.broker
2021-03-06 17:58:34 +00:00
test_config = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
},
"sys_interval": 0,
"auth": {
"allow-anonymous": True,
2021-03-06 17:58:34 +00:00
},
}
@pytest.fixture(scope="function")
def mock_plugin_manager():
2021-03-27 12:16:42 +00:00
with unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager:
2021-03-06 17:58:34 +00:00
yield plugin_manager
@pytest.fixture(scope="function")
async def broker(mock_plugin_manager):
# just making sure the mock is in place before we start our broker
assert mock_plugin_manager is not None
2021-03-06 17:58:34 +00:00
2021-03-27 12:16:42 +00:00
broker = amqtt.broker.Broker(test_config, plugin_namespace="amqtt.test.plugins")
2021-03-06 17:58:34 +00:00
await broker.start()
assert broker.transitions.is_started()
assert broker._sessions == {}
assert "default" in broker._servers
2021-03-06 17:58:34 +00:00
yield broker
if not broker.transitions.is_stopped():
await broker.shutdown()