amqtt/tests/plugins/test_authentication.py

130 wiersze
4.5 KiB
Python

import asyncio
import logging
from pathlib import Path
import unittest
import pytest
from amqtt.plugins.authentication import AnonymousAuthPlugin, FileAuthPlugin
from amqtt.contexts import BaseContext
from amqtt.plugins.base import BaseAuthPlugin
from amqtt.session import Session
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
@pytest.mark.asyncio
async def test_base_no_config(logdog):
"""Check BaseTopicPlugin returns false if no topic-check is present."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {}
plugin = BaseAuthPlugin(context)
s = Session()
authorised = await plugin.authenticate(session=s)
assert authorised is False
# Warning messages are only generated if using deprecated plugin configuration on initial load
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 1
assert log_records[0].levelno == logging.WARNING
assert log_records[0].message == "'auth' section not found in context configuration"
class TestAnonymousAuthPlugin(unittest.TestCase):
def setUp(self) -> None:
self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
def test_allow_anonymous_dict_config(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {"auth": {"allow-anonymous": True}}
s = Session()
s.username = ""
auth_plugin = AnonymousAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert ret
def test_allow_anonymous_dataclass_config(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = AnonymousAuthPlugin.Config(allow_anonymous=True)
s = Session()
s.username = ""
auth_plugin = AnonymousAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert ret
def test_disallow_anonymous(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {"auth": {"allow-anonymous": False}}
s = Session()
s.username = ""
auth_plugin = AnonymousAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert not ret
def test_allow_nonanonymous(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {"auth": {"allow-anonymous": False}}
s = Session()
s.username = "test"
auth_plugin = AnonymousAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert ret
class TestFileAuthPlugin(unittest.TestCase):
def setUp(self) -> None:
self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
def test_allow(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {
"auth": {
"password-file": Path(__file__).parent / "passwd",
},
}
s = Session()
s.username = "user"
s.password = "test"
auth_plugin = FileAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert ret
def test_wrong_password(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {
"auth": {
"password-file": Path(__file__).parent / "passwd",
},
}
s = Session()
s.username = "user"
s.password = "wrong password"
auth_plugin = FileAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert not ret
def test_unknown_password(self) -> None:
context = BaseContext()
context.logger = logging.getLogger(__name__)
context.config = {
"auth": {
"password-file": Path(__file__).parent / "passwd",
},
}
s = Session()
s.username = "some user"
s.password = "some password"
auth_plugin = FileAuthPlugin(context)
ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
assert not ret