amqtt/tests/contrib/test_db_scripts.py

421 wiersze
13 KiB
Python

import asyncio
import logging
import tempfile
from pathlib import Path
import pytest
from passlib.context import CryptContext
from typer.testing import CliRunner
from amqtt.contexts import Action
from amqtt.contrib.auth_db.managers import TopicManager, UserManager
from amqtt.contrib.auth_db.models import PasswordHasher, AllowedTopic
from amqtt.contrib.auth_db.topic_mgr_cli import topic_app
from amqtt.contrib.auth_db.user_mgr_cli import user_app
runner = CliRunner()
@pytest.fixture
def password_hasher():
pwd_hasher = PasswordHasher()
pwd_hasher.crypt_context = CryptContext(schemes=["argon2", ], deprecated="auto")
yield pwd_hasher
@pytest.fixture
def db_file():
with tempfile.TemporaryDirectory() as temp_dir:
with tempfile.NamedTemporaryFile(mode='wb', delete=True) as tmp:
yield Path(temp_dir) / f"{tmp.name}.db"
@pytest.fixture
def db_connection(db_file):
test_db_connect = f"sqlite+aiosqlite:///{db_file}"
yield test_db_connect
@pytest.fixture
@pytest.mark.asyncio
async def user_manager(password_hasher, db_connection):
um = UserManager(db_connection)
await um.db_sync()
yield um
@pytest.fixture
@pytest.mark.asyncio
async def topic_manager(password_hasher, db_connection):
tm = TopicManager(db_connection)
await tm.db_sync()
yield tm
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_params(app, error_msg):
result = runner.invoke(app, [])
assert result.exit_code == 0, f"{result.output}"
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_db_type(app, error_msg):
result = runner.invoke(topic_app, ["sync"])
assert result.exit_code == 2
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_no_db_username(app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, ["-d", "mysql", "sync"])
assert result.exit_code == 1
assert "DB access requires a username be provided." in caplog.text
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_db_not_installed(app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app,
["-d", "mysql", "-u", "mydbname", "sync",],
input="mydbpassword\n"
)
assert result.exit_code == 1
assert isinstance(result.exception, ModuleNotFoundError)
@pytest.mark.parametrize("app,error_msg", [
(user_app, "user cli"),
(topic_app, "topic cli"),
])
def test_cli_mgr_sync(db_file, app, error_msg, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, [
"-d", "sqlite",
"-f", f"{db_file}",
"sync"
])
assert result.exit_code == 0
assert "Success: database synced." in caplog.text
@pytest.mark.parametrize("app,success_msg", [
(user_app, "authentications"),
(topic_app, "authorizations"),
])
def test_topic_empty_list(db_file, topic_manager, caplog, app, success_msg):
with caplog.at_level(logging.INFO):
result = runner.invoke(app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
assert f"No client {success_msg} exist." in caplog.text
def test_user_mgr_list_clients(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
await user_manager.create_user_auth("client456", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
info_records = [record for record in caplog.records if record.levelname == "INFO"]
assert 'client123' in info_records[0].message
assert 'client456' in info_records[1].message
def test_topic_mgr_list_clients(db_file, topic_manager, caplog):
async def init_topic_auths():
await topic_manager.create_topic_auth('device123')
await topic_manager.create_topic_auth('device456')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
await topic_manager.add_allowed_topic('device456', 'my/topic', Action.PUBLISH)
asyncio.run(init_topic_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"list"
])
assert result.exit_code == 0
info_records = [record for record in caplog.records if record.levelname == "INFO"]
assert 'device123' in info_records[0].message
assert 'my/topic' in info_records[0].message
assert 'device456' in info_records[1].message
assert 'my/topic' in info_records[1].message
def test_user_mgr_add_auth_missing_param(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add"
])
assert result.exit_code == 2
def test_add_allowed_topic_missing_param(db_file, topic_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", "client123", "my/topic"
])
assert result.exit_code == 2
def test_user_mgr_add_auth_missing_password(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", 'client123'
], input=" \n")
assert result.exit_code == 1
assert "Error: client password cannot be empty." in caplog.text, caplog.text
async def verify_add():
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is None
asyncio.run(verify_add())
def test_user_mgr_add_auth(db_file, user_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add", "-c", 'client123'
], input="dbpassword\nuserpassword\n")
assert result.exit_code == 0
assert "Success: created 'client123'" in caplog.text, caplog.text
async def verify_add():
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is not None
asyncio.run(verify_add())
def test_add_allowed_topic(db_file, topic_manager, caplog):
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"add",
"-c", "client123",
"-a", "publish",
"my/topic"
])
assert result.exit_code == 0
assert "Success: topic 'my/topic' added to publish for 'client123'" in caplog.text
async def verify_add():
topic_auth = await topic_manager.get_topic_auth('client123')
assert topic_auth is not None
assert AllowedTopic('my/topic') in topic_auth.publish_acl
asyncio.run(verify_add())
def test_remove_user_auth_mismatch(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
])
assert result.exit_code == 1, caplog.text
assert "Error: client 'device123' does not exist." in caplog.text, result.output
def test_remove_allowed_topic_mismatch(db_file, topic_manager, caplog):
async def init_topic_auths():
await topic_manager.create_topic_auth('device123')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
asyncio.run(init_topic_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
"-a", "publish",
"my/topic"
])
assert result.exit_code == 1, caplog.text
assert "Error: topic 'my/topic' not in the publish allow list for device123." in caplog.text
def test_remove_user_auth_abort(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "client123",
], input="N\n")
assert result.exit_code == 0, caplog.text
async def verify_user_exists():
assert await user_manager.get_user_auth('client123') is not None
asyncio.run(verify_user_exists())
def test_remove_user_auth_confirm(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "client123",
], input="y\n")
assert result.exit_code == 0, caplog.text
assert "Success: 'client123' was removed." in caplog.text, result.output
async def verify_user_removed():
assert await user_manager.get_user_auth('client123') is None
asyncio.run(verify_user_removed())
def test_remove_allowed_topic(db_file, topic_manager, caplog):
async def init_topic_auth():
await topic_manager.create_topic_auth('device123')
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
asyncio.run(init_topic_auth())
with caplog.at_level(logging.INFO):
result = runner.invoke(topic_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"rm",
"-c", "device123",
"-a", "subscribe",
"my/topic"
])
assert result.exit_code == 0
assert "Success: removed topic 'my/topic' from subscribe for 'device123'" in caplog.text, caplog.text
def test_user_mgr_change_password_empty(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"pwd",
"-c", "client123",
], input=" \n")
assert result.exit_code == 1, caplog.text
assert "Error: client password cannot be empty." in caplog.text, caplog.text
def test_user_mgr_change_password(db_file, user_manager, caplog):
async def init_user_auths():
await user_manager.create_user_auth("client123", "randompassword")
user_auth = await user_manager.get_user_auth('client123')
return user_auth._password_hash
orig_pwd_hash = asyncio.run(init_user_auths())
with caplog.at_level(logging.INFO):
result = runner.invoke(user_app, [
"-d", "sqlite",
"-f", f"{db_file}",
"pwd",
"-c", "client123",
], input="myotherpassword\n")
assert result.exit_code == 0, caplog.text
assert "Success: client 'client123' password updated." in caplog.text, caplog.text
async def verify_user_exists(pwd_hash):
user_auth = await user_manager.get_user_auth('client123')
assert user_auth is not None
assert user_auth._password_hash != pwd_hash
asyncio.run(verify_user_exists(orig_pwd_hash))
@pytest.mark.asyncio
async def test_user_mgr_cli():
cmd = [
"user_mgr",
"--help"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(0.2)
stdout, stderr = await proc.communicate()
assert proc.returncode == 0, f"user_mgr error code: {proc.returncode} - {stdout} - {stderr}"
@pytest.mark.asyncio
async def test_topic_mgr_cli():
cmd = [
"topic_mgr",
"--help"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(0.2)
stdout, stderr = await proc.communicate()
assert proc.returncode == 0, f"topic_mgr error code: {proc.returncode} - {stdout} - {stderr}"