amqtt/tests/contrib/test_db_scripts.py

421 wiersze
13 KiB
Python

Plugin: authentication against a relational database (#280) * Yakifo/amqtt#259 : db schema (orm) and authentication of users plugin, initial * expanding to include a simple admin interface * create directory into module * user management ui react app * including research associated with session auth for the admin ui * updating to handle username and acl datagrid * multi cli optional approaches * basic cli functions for manipulating users * Yakifo/amqtt#259 : lint and type cleanup * Yakifo/amqtt#259 : added tests for AuthDBPlugin as well as the UserManager. includes a command line interface for listing, adding, removing and updating users. added documentation. * Yakifo/amqtt#259 : need to install the 'contrib' extra package for the workflows * fixing class name change * fixed incorrect install of optional library for ci. auth db test wasn't shutting down broker, causing subsequent tests to fail" * test case not cleaning up after itself * adding topic authentication to database plugin * renamed command line script * moved auth db ui to a different branch. added a temp directory to the temp file used for sqlite tests * more renaming to leave space to expand to topic management * updating dependencies * renaming scripts * adding topic filter and test cases * Yakifo/amqtt#259 separate plugin for DB topic-based checking. additional topic auth testing Yakifo/amqtt#259 updating uv.lock fixing lint and typing errors handling strenum for 3.10 updating documentation for topic and user auth plugins update user and topic manager scripts, add tests and fix bugs add tests for the user and topic mgr cli client connection timeout should yield a connecterror * adding 'receive' action, added along with the http plugin
2025-07-26 21:04:14 +00:00
import asyncio
import logging
import tempfile
from pathlib import Path
import pytest
from passlib.context import CryptContext
from typer.testing import CliRunner
from amqtt.contexts import Action
from amqtt.contrib.auth_db.managers import TopicManager, UserManager
from amqtt.contrib.auth_db.models import PasswordHasher, AllowedTopic
from amqtt.contrib.auth_db.topic_mgr_cli import topic_app
from amqtt.contrib.auth_db.user_mgr_cli import user_app
runner = CliRunner()
@pytest.fixture
def password_hasher():
pwd_hasher = PasswordHasher()
pwd_hasher.crypt_context = CryptContext(schemes=["argon2", ], deprecated="auto")
yield pwd_hasher
@pytest.fixture
def db_file():
with tempfile.TemporaryDirectory() as temp_dir:
with tempfile.NamedTemporaryFile(mode='wb', delete=True) as tmp:
yield Path(temp_dir) / f"{tmp.name}.db"
@pytest.fixture
def db_connection(db_file):
test_db_connect = f"sqlite+aiosqlite:///{db_file}"
yield test_db_connect
@pytest.fixture
@pytest.mark.asyncio
async def user_manager(password_hasher, db_connection):
um = UserManager(db_connection)
await um.db_sync()
yield um
@pytest.fixture
@pytest.mark.asyncio
async def topic_manager(password_hasher, db_connection):
tm = TopicManager(db_connection)
await tm.db_sync()
yield tm
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_params(app, error_msg):
result = runner.invoke(app, [])
assert result.exit_code == 0, f"{result.output}"
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_db_type(app, error_msg):
result = runner.invoke(topic_app, ["sync"])
assert result.exit_code == 2
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_db_username(app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, ["-d", "mysql", "sync"])
assert result.exit_code == 1
assert "DB access requires a username be provided." in caplog.text
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_db_not_installed(app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app,
["-d", "mysql", "-u", "mydbname", "sync",],
input="mydbpassword\n"
)
assert result.exit_code == 1
assert isinstance(result.exception, ModuleNotFoundError)
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_sync(db_file, app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, [
"-d", "sqlite",
"-f", f"{db_file}",
"sync"
])
assert result.exit_code == 0
assert "Success: database synced." in caplog.text
@pytest.mark.parametrize("app,success_msg", [
(user_app, "authentications"),
(topic_app, "authorizations"),
])
def test_topic_empty_list(db_file, topic_manager, caplog, app, success_msg):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
assert f"No client {success_msg} exist." in caplog.text
def test_user_mgr_list_clients(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
await user_manager.create_user_auth("client456", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
info_records = [record for record in caplog.records if record.levelname == "INFO"]
assert 'client123' in info_records[0].message
assert 'client456' in info_records[1].message
def test_topic_mgr_list_clients(db_file, topic_manager, caplog):
async def init_topic_auths():
await topic_manager.create_topic_auth('device123')
await topic_manager.create_topic_auth('device456')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
await topic_manager.add_allowed_topic('device456', 'my/topic', Action.PUBLISH)
asyncio.run(init_topic_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
info_records = [record for record in caplog.records if record.levelname == "INFO"]
assert 'device123' in info_records[0].message
assert 'my/topic' in info_records[0].message
assert 'device456' in info_records[1].message
assert 'my/topic' in info_records[1].message
def test_user_mgr_add_auth_missing_param(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add"
])
assert result.exit_code == 2
def test_add_allowed_topic_missing_param(db_file, topic_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", "client123", "my/topic"
])
assert result.exit_code == 2
def test_user_mgr_add_auth_missing_password(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", 'client123'
], input=" \n")
assert result.exit_code == 1
assert "Error: client password cannot be empty." in caplog.text, caplog.text
async def verify_add():
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is None
asyncio.run(verify_add())
def test_user_mgr_add_auth(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", 'client123'
], input="dbpassword\nuserpassword\n")
assert result.exit_code == 0
assert "Success: created 'client123'" in caplog.text, caplog.text
async def verify_add():
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is not None
asyncio.run(verify_add())
def test_add_allowed_topic(db_file, topic_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add",
"-c", "client123",
"-a", "publish",
"my/topic"
])
assert result.exit_code == 0
assert "Success: topic 'my/topic' added to publish for 'client123'" in caplog.text
async def verify_add():
topic_auth = await topic_manager.get_topic_auth('client123')
assert topic_auth is not None
assert AllowedTopic('my/topic') in topic_auth.publish_acl
asyncio.run(verify_add())
def test_remove_user_auth_mismatch(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
])
assert result.exit_code == 1, caplog.text
assert "Error: client 'device123' does not exist." in caplog.text, result.output
def test_remove_allowed_topic_mismatch(db_file, topic_manager, caplog):
async def init_topic_auths():
await topic_manager.create_topic_auth('device123')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
asyncio.run(init_topic_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
"-a", "publish",
"my/topic"
])
assert result.exit_code == 1, caplog.text
assert "Error: topic 'my/topic' not in the publish allow list for device123." in caplog.text
def test_remove_user_auth_abort(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "client123",
], input="N\n")
assert result.exit_code == 0, caplog.text
async def verify_user_exists():
assert await user_manager.get_user_auth('client123') is not None
asyncio.run(verify_user_exists())
def test_remove_user_auth_confirm(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "client123",
], input="y\n")
assert result.exit_code == 0, caplog.text
assert "Success: 'client123' was removed." in caplog.text, result.output
async def verify_user_removed():
assert await user_manager.get_user_auth('client123') is None
asyncio.run(verify_user_removed())
def test_remove_allowed_topic(db_file, topic_manager, caplog):
async def init_topic_auth():
await topic_manager.create_topic_auth('device123')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
asyncio.run(init_topic_auth())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
"-a", "subscribe",
"my/topic"
])
assert result.exit_code == 0
assert "Success: removed topic 'my/topic' from subscribe for 'device123'" in caplog.text, caplog.text
def test_user_mgr_change_password_empty(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"pwd",
"-c", "client123",
], input=" \n")
assert result.exit_code == 1, caplog.text
assert "Error: client password cannot be empty." in caplog.text, caplog.text
def test_user_mgr_change_password(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
user_auth = await user_manager.get_user_auth('client123')
return user_auth._password_hash
orig_pwd_hash = asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"pwd",
"-c", "client123",
], input="myotherpassword\n")
assert result.exit_code == 0, caplog.text
assert "Success: client 'client123' password updated." in caplog.text, caplog.text
async def verify_user_exists(pwd_hash):
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is not None
assert user_auth._password_hash != pwd_hash
asyncio.run(verify_user_exists(orig_pwd_hash))
@pytest.mark.asyncio
async def test_user_mgr_cli():
cmd = [
"user_mgr",
"--help"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(0.2)
stdout, stderr = await proc.communicate()
assert proc.returncode == 0, f"user_mgr error code: {proc.returncode} - {stdout} - {stderr}"
@pytest.mark.asyncio
async def test_topic_mgr_cli():
cmd = [
"topic_mgr",
"--help"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(0.2)
stdout, stderr = await proc.communicate()
assert proc.returncode == 0, f"topic_mgr error code: {proc.returncode} - {stdout} - {stderr}"