kopia lustrzana https://github.com/Yakifo/amqtt
421 wiersze
13 KiB
Python
421 wiersze
13 KiB
Python
import asyncio
|
|
import logging
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from passlib.context import CryptContext
|
|
from typer.testing import CliRunner
|
|
|
|
from amqtt.contexts import Action
|
|
from amqtt.contrib.auth_db.managers import TopicManager, UserManager
|
|
from amqtt.contrib.auth_db.models import PasswordHasher, AllowedTopic
|
|
from amqtt.contrib.auth_db.topic_mgr_cli import topic_app
|
|
from amqtt.contrib.auth_db.user_mgr_cli import user_app
|
|
|
|
runner = CliRunner()
|
|
|
|
@pytest.fixture
|
|
def password_hasher():
|
|
pwd_hasher = PasswordHasher()
|
|
pwd_hasher.crypt_context = CryptContext(schemes=["argon2", ], deprecated="auto")
|
|
yield pwd_hasher
|
|
|
|
|
|
@pytest.fixture
|
|
def db_file():
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
with tempfile.NamedTemporaryFile(mode='wb', delete=True) as tmp:
|
|
yield Path(temp_dir) / f"{tmp.name}.db"
|
|
|
|
|
|
@pytest.fixture
|
|
def db_connection(db_file):
|
|
test_db_connect = f"sqlite+aiosqlite:///{db_file}"
|
|
yield test_db_connect
|
|
|
|
|
|
@pytest.fixture
|
|
@pytest.mark.asyncio
|
|
async def user_manager(password_hasher, db_connection):
|
|
um = UserManager(db_connection)
|
|
await um.db_sync()
|
|
yield um
|
|
|
|
|
|
@pytest.fixture
|
|
@pytest.mark.asyncio
|
|
async def topic_manager(password_hasher, db_connection):
|
|
tm = TopicManager(db_connection)
|
|
await tm.db_sync()
|
|
yield tm
|
|
|
|
|
|
@pytest.mark.parametrize("app,error_msg", [
|
|
(user_app, "user cli"),
|
|
(topic_app, "topic cli"),
|
|
])
|
|
def test_cli_mgr_no_params(app, error_msg):
|
|
|
|
result = runner.invoke(app, [])
|
|
assert result.exit_code == 0, f"{result.output}"
|
|
|
|
|
|
@pytest.mark.parametrize("app,error_msg", [
|
|
(user_app, "user cli"),
|
|
(topic_app, "topic cli"),
|
|
])
|
|
def test_cli_mgr_no_db_type(app, error_msg):
|
|
result = runner.invoke(topic_app, ["sync"])
|
|
assert result.exit_code == 2
|
|
|
|
|
|
@pytest.mark.parametrize("app,error_msg", [
|
|
(user_app, "user cli"),
|
|
(topic_app, "topic cli"),
|
|
])
|
|
def test_cli_mgr_no_db_username(app, error_msg, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(app, ["-d", "mysql", "sync"])
|
|
assert result.exit_code == 1
|
|
assert "DB access requires a username be provided." in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize("app,error_msg", [
|
|
(user_app, "user cli"),
|
|
(topic_app, "topic cli"),
|
|
])
|
|
def test_cli_mgr_db_not_installed(app, error_msg, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(app,
|
|
["-d", "mysql", "-u", "mydbname", "sync",],
|
|
input="mydbpassword\n"
|
|
)
|
|
assert result.exit_code == 1
|
|
assert isinstance(result.exception, ModuleNotFoundError)
|
|
|
|
|
|
@pytest.mark.parametrize("app,error_msg", [
|
|
(user_app, "user cli"),
|
|
(topic_app, "topic cli"),
|
|
])
|
|
def test_cli_mgr_sync(db_file, app, error_msg, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"sync"
|
|
])
|
|
assert result.exit_code == 0
|
|
assert "Success: database synced." in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize("app,success_msg", [
|
|
(user_app, "authentications"),
|
|
(topic_app, "authorizations"),
|
|
])
|
|
def test_topic_empty_list(db_file, topic_manager, caplog, app, success_msg):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"list"
|
|
])
|
|
assert result.exit_code == 0
|
|
assert f"No client {success_msg} exist." in caplog.text
|
|
|
|
|
|
def test_user_mgr_list_clients(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
await user_manager.create_user_auth("client456", "randompassword")
|
|
|
|
asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"list"
|
|
])
|
|
assert result.exit_code == 0
|
|
|
|
info_records = [record for record in caplog.records if record.levelname == "INFO"]
|
|
|
|
assert 'client123' in info_records[0].message
|
|
assert 'client456' in info_records[1].message
|
|
|
|
|
|
def test_topic_mgr_list_clients(db_file, topic_manager, caplog):
|
|
async def init_topic_auths():
|
|
await topic_manager.create_topic_auth('device123')
|
|
await topic_manager.create_topic_auth('device456')
|
|
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
|
|
await topic_manager.add_allowed_topic('device456', 'my/topic', Action.PUBLISH)
|
|
|
|
asyncio.run(init_topic_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(topic_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"list"
|
|
])
|
|
assert result.exit_code == 0
|
|
|
|
info_records = [record for record in caplog.records if record.levelname == "INFO"]
|
|
|
|
assert 'device123' in info_records[0].message
|
|
assert 'my/topic' in info_records[0].message
|
|
assert 'device456' in info_records[1].message
|
|
assert 'my/topic' in info_records[1].message
|
|
|
|
|
|
def test_user_mgr_add_auth_missing_param(db_file, user_manager, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"add"
|
|
])
|
|
assert result.exit_code == 2
|
|
|
|
|
|
def test_add_allowed_topic_missing_param(db_file, topic_manager, caplog):
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(topic_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"add", "-c", "client123", "my/topic"
|
|
])
|
|
assert result.exit_code == 2
|
|
|
|
|
|
def test_user_mgr_add_auth_missing_password(db_file, user_manager, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"add", "-c", 'client123'
|
|
], input=" \n")
|
|
assert result.exit_code == 1
|
|
assert "Error: client password cannot be empty." in caplog.text, caplog.text
|
|
|
|
async def verify_add():
|
|
user_auth = await user_manager.get_user_auth('client123')
|
|
assert user_auth is None
|
|
|
|
asyncio.run(verify_add())
|
|
|
|
|
|
def test_user_mgr_add_auth(db_file, user_manager, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"add", "-c", 'client123'
|
|
], input="dbpassword\nuserpassword\n")
|
|
assert result.exit_code == 0
|
|
assert "Success: created 'client123'" in caplog.text, caplog.text
|
|
|
|
async def verify_add():
|
|
user_auth = await user_manager.get_user_auth('client123')
|
|
assert user_auth is not None
|
|
|
|
asyncio.run(verify_add())
|
|
|
|
|
|
def test_add_allowed_topic(db_file, topic_manager, caplog):
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(topic_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"add",
|
|
"-c", "client123",
|
|
"-a", "publish",
|
|
"my/topic"
|
|
])
|
|
assert result.exit_code == 0
|
|
assert "Success: topic 'my/topic' added to publish for 'client123'" in caplog.text
|
|
|
|
async def verify_add():
|
|
topic_auth = await topic_manager.get_topic_auth('client123')
|
|
assert topic_auth is not None
|
|
assert AllowedTopic('my/topic') in topic_auth.publish_acl
|
|
|
|
asyncio.run(verify_add())
|
|
|
|
|
|
def test_remove_user_auth_mismatch(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
|
|
asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"rm",
|
|
"-c", "device123",
|
|
])
|
|
assert result.exit_code == 1, caplog.text
|
|
assert "Error: client 'device123' does not exist." in caplog.text, result.output
|
|
|
|
|
|
def test_remove_allowed_topic_mismatch(db_file, topic_manager, caplog):
|
|
async def init_topic_auths():
|
|
await topic_manager.create_topic_auth('device123')
|
|
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
|
|
|
|
asyncio.run(init_topic_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(topic_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"rm",
|
|
"-c", "device123",
|
|
"-a", "publish",
|
|
"my/topic"
|
|
])
|
|
assert result.exit_code == 1, caplog.text
|
|
assert "Error: topic 'my/topic' not in the publish allow list for device123." in caplog.text
|
|
|
|
|
|
def test_remove_user_auth_abort(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
|
|
asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"rm",
|
|
"-c", "client123",
|
|
], input="N\n")
|
|
assert result.exit_code == 0, caplog.text
|
|
|
|
async def verify_user_exists():
|
|
assert await user_manager.get_user_auth('client123') is not None
|
|
|
|
asyncio.run(verify_user_exists())
|
|
|
|
|
|
def test_remove_user_auth_confirm(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
|
|
asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"rm",
|
|
"-c", "client123",
|
|
], input="y\n")
|
|
assert result.exit_code == 0, caplog.text
|
|
assert "Success: 'client123' was removed." in caplog.text, result.output
|
|
|
|
async def verify_user_removed():
|
|
assert await user_manager.get_user_auth('client123') is None
|
|
|
|
asyncio.run(verify_user_removed())
|
|
|
|
|
|
def test_remove_allowed_topic(db_file, topic_manager, caplog):
|
|
async def init_topic_auth():
|
|
await topic_manager.create_topic_auth('device123')
|
|
await topic_manager.add_allowed_topic('device123', 'my/topic', Action.SUBSCRIBE)
|
|
|
|
asyncio.run(init_topic_auth())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(topic_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"rm",
|
|
"-c", "device123",
|
|
"-a", "subscribe",
|
|
"my/topic"
|
|
])
|
|
assert result.exit_code == 0
|
|
assert "Success: removed topic 'my/topic' from subscribe for 'device123'" in caplog.text, caplog.text
|
|
|
|
|
|
def test_user_mgr_change_password_empty(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
|
|
asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"pwd",
|
|
"-c", "client123",
|
|
], input=" \n")
|
|
assert result.exit_code == 1, caplog.text
|
|
assert "Error: client password cannot be empty." in caplog.text, caplog.text
|
|
|
|
|
|
def test_user_mgr_change_password(db_file, user_manager, caplog):
|
|
async def init_user_auths():
|
|
await user_manager.create_user_auth("client123", "randompassword")
|
|
user_auth = await user_manager.get_user_auth('client123')
|
|
return user_auth._password_hash
|
|
|
|
orig_pwd_hash = asyncio.run(init_user_auths())
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
result = runner.invoke(user_app, [
|
|
"-d", "sqlite",
|
|
"-f", f"{db_file}",
|
|
"pwd",
|
|
"-c", "client123",
|
|
], input="myotherpassword\n")
|
|
assert result.exit_code == 0, caplog.text
|
|
assert "Success: client 'client123' password updated." in caplog.text, caplog.text
|
|
|
|
async def verify_user_exists(pwd_hash):
|
|
user_auth = await user_manager.get_user_auth('client123')
|
|
assert user_auth is not None
|
|
assert user_auth._password_hash != pwd_hash
|
|
|
|
asyncio.run(verify_user_exists(orig_pwd_hash))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_mgr_cli():
|
|
cmd = [
|
|
"user_mgr",
|
|
"--help"]
|
|
|
|
proc = await asyncio.create_subprocess_shell(
|
|
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
await asyncio.sleep(0.2)
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
assert proc.returncode == 0, f"user_mgr error code: {proc.returncode} - {stdout} - {stderr}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_topic_mgr_cli():
|
|
cmd = [
|
|
"topic_mgr",
|
|
"--help"]
|
|
|
|
proc = await asyncio.create_subprocess_shell(
|
|
" ".join(cmd), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
await asyncio.sleep(0.2)
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
assert proc.returncode == 0, f"topic_mgr error code: {proc.returncode} - {stdout} - {stderr}"
|
|
|