plugins.topic_checking tests: Replace DummyLogger with `logdog` plug-in.

pull/69/head
Stuart Longland 2021-07-04 15:56:34 +10:00 zatwierdzone przez Florian Ludwig
rodzic 5ad48d9129
commit 0760bd7613
1 zmienionych plików z 263 dodań i 254 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
import pytest import pytest
import logging
from amqtt.plugins.manager import BaseContext from amqtt.plugins.manager import BaseContext
from amqtt.plugins.topic_checking import ( from amqtt.plugins.topic_checking import (
@ -10,221 +11,225 @@ from amqtt.plugins.topic_checking import (
from amqtt.session import Session from amqtt.session import Session
class DummyLogger(object):
def __init__(self):
self.messages = []
def warning(self, *args, **kwargs):
self.messages.append((args, kwargs))
# Base plug-in object # Base plug-in object
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_no_config(): async def test_base_no_config(logdog):
""" """
Check BaseTopicPlugin returns false if no topic-check is present. Check BaseTopicPlugin returns false if no topic-check is present.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {} context.logger = logging.getLogger('testlog')
context.config = {}
plugin = BaseTopicPlugin(context) plugin = BaseTopicPlugin(context)
authorised = plugin.topic_filtering() authorised = plugin.topic_filtering()
assert authorised is False assert authorised is False
# Should have printed a couple of warnings # Should have printed a couple of warnings
assert len(context.logger.messages) == 2 log_records = list(pile.drain(name='testlog'))
assert context.logger.messages[0] == ( assert len(log_records) == 2
("'topic-check' section not found in context configuration",), assert log_records[0].levelno == logging.WARN
{}, assert log_records[0].message == "'topic-check' section not found in context configuration"
)
assert context.logger.messages[1] == ( assert log_records[1].levelno == logging.WARN
("'auth' section not found in context configuration",), assert log_records[1].message == "'auth' section not found in context configuration"
{}, assert pile.is_empty()
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_empty_config(): async def test_base_empty_config(logdog):
""" """
Check BaseTopicPlugin returns false if topic-check is empty. Check BaseTopicPlugin returns false if topic-check is empty.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {}}
plugin = BaseTopicPlugin(context) plugin = BaseTopicPlugin(context)
authorised = plugin.topic_filtering() authorised = plugin.topic_filtering()
assert authorised is False assert authorised is False
# Should have printed just one warning # Should have printed just one warning
assert len(context.logger.messages) == 1 log_records = list(pile.drain(name='testlog'))
assert context.logger.messages[0] == ( assert len(log_records) == 1
("'auth' section not found in context configuration",), assert log_records[0].levelno == logging.WARN
{}, assert log_records[0].message == "'auth' section not found in context configuration"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_disabled_config(): async def test_base_disabled_config(logdog):
""" """
Check BaseTopicPlugin returns true if disabled. (it doesn't actually check) Check BaseTopicPlugin returns true if disabled. (it doesn't actually check)
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": False}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": False}}
plugin = BaseTopicPlugin(context) plugin = BaseTopicPlugin(context)
authorised = plugin.topic_filtering() authorised = plugin.topic_filtering()
assert authorised is True assert authorised is True
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_enabled_config(): async def test_base_enabled_config(logdog):
""" """
Check BaseTopicPlugin returns true if enabled. Check BaseTopicPlugin returns true if enabled.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
plugin = BaseTopicPlugin(context) plugin = BaseTopicPlugin(context)
authorised = plugin.topic_filtering() authorised = plugin.topic_filtering()
assert authorised is True assert authorised is True
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
# Taboo plug-in # Taboo plug-in
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_empty_config(): async def test_taboo_empty_config(logdog):
""" """
Check TopicTabooPlugin returns false if topic-check absent. Check TopicTabooPlugin returns false if topic-check absent.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {} context.logger = logging.getLogger('testlog')
context.config = {}
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering()) is False assert (await plugin.topic_filtering()) is False
# Should have printed a couple of warnings # Should have printed a couple of warnings
assert len(context.logger.messages) == 2 log_records = list(pile.drain(name='testlog'))
assert context.logger.messages[0] == ( assert len(log_records) == 2
("'topic-check' section not found in context configuration",), assert log_records[0].levelno == logging.WARN
{}, assert log_records[0].message == "'topic-check' section not found in context configuration"
) assert log_records[1].levelno == logging.WARN
assert context.logger.messages[1] == ( assert log_records[1].message == "'auth' section not found in context configuration"
("'auth' section not found in context configuration",),
{},
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_not_taboo_topic(): async def test_taboo_not_taboo_topic(logdog):
""" """
Check TopicTabooPlugin returns true if checking disabled. Check TopicTabooPlugin returns true if checking disabled.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": False}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": False}}
session = Session() session = Session()
session.username = "anybody" session.username = "anybody"
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert ( assert (
await plugin.topic_filtering(session=session, topic="not/prohibited") await plugin.topic_filtering(session=session, topic="not/prohibited")
) is True ) is True
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_not_taboo_topic(): async def test_taboo_not_taboo_topic(logdog):
""" """
Check TopicTabooPlugin returns true if topic not taboo Check TopicTabooPlugin returns true if topic not taboo
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
session = Session() session = Session()
session.username = "anybody" session.username = "anybody"
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert ( assert (
await plugin.topic_filtering(session=session, topic="not/prohibited") await plugin.topic_filtering(session=session, topic="not/prohibited")
) is True ) is True
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_anon_taboo_topic(): async def test_taboo_anon_taboo_topic(logdog):
""" """
Check TopicTabooPlugin returns false if topic is taboo and session is anonymous. Check TopicTabooPlugin returns false if topic is taboo and session is anonymous.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
session = Session() session = Session()
session.username = "" session.username = ""
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_notadmin_taboo_topic(): async def test_taboo_notadmin_taboo_topic(logdog):
""" """
Check TopicTabooPlugin returns false if topic is taboo and user is not "admin". Check TopicTabooPlugin returns false if topic is taboo and user is not "admin".
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
session = Session() session = Session()
session.username = "notadmin" session.username = "notadmin"
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taboo_admin_taboo_topic(): async def test_taboo_admin_taboo_topic(logdog):
""" """
Check TopicTabooPlugin returns true if topic is taboo and user is "admin". Check TopicTabooPlugin returns true if topic is taboo and user is "admin".
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
session = Session() session = Session()
session.username = "admin" session.username = "admin"
plugin = TopicTabooPlugin(context) plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True
# Should NOT have printed warnings # Should NOT have printed warnings
assert len(context.logger.messages) == 0 log_records = list(pile.drain(name='testlog'))
assert len(log_records) == 0
# TopicAccessControlListPlugin tests # TopicAccessControlListPlugin tests
@ -286,207 +291,211 @@ def test_topic_ac_match_hash():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_empty_config(): async def test_taclp_empty_config(logdog):
""" """
Check TopicAccessControlListPlugin returns false if topic-check absent. Check TopicAccessControlListPlugin returns false if topic-check absent.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {} context.logger = logging.getLogger('testlog')
context.config = {}
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
assert (await plugin.topic_filtering()) is False assert (await plugin.topic_filtering()) is False
# Should have printed a couple of warnings # Should have printed a couple of warnings
assert len(context.logger.messages) == 2 log_records = list(pile.drain(name='testlog'))
assert context.logger.messages[0] == ( assert len(log_records) == 2
("'topic-check' section not found in context configuration",), assert log_records[0].message == "'topic-check' section not found in context configuration"
{}, assert log_records[1].message == "'auth' section not found in context configuration"
)
assert context.logger.messages[1] == (
("'auth' section not found in context configuration",),
{},
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_true_disabled(): async def test_taclp_true_disabled(logdog):
""" """
Check TopicAccessControlListPlugin returns true if topic checking is disabled. Check TopicAccessControlListPlugin returns true if topic checking is disabled.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": False}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": False}}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.publish, session=session, topic="a/topic" action=Action.publish, session=session, topic="a/topic"
) )
assert authorised is True assert authorised is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_true_no_pub_acl(): async def test_taclp_true_no_pub_acl(logdog):
""" """
Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given. Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given.
(This is for backward-compatibility with existing installations.) (This is for backward-compatibility with existing installations.)
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = {"topic-check": {"enabled": True}} context.logger = logging.getLogger('testlog')
context.config = {"topic-check": {"enabled": True}}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.publish, session=session, topic="a/topic" action=Action.publish, session=session, topic="a/topic"
) )
assert authorised is True assert authorised is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_false_sub_no_topic(): async def test_taclp_false_sub_no_topic(logdog):
""" """
Check TopicAccessControlListPlugin returns false user there is no topic. Check TopicAccessControlListPlugin returns false user there is no topic.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.subscribe, session=session, topic="" action=Action.subscribe, session=session, topic=""
) )
assert authorised is False assert authorised is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_false_sub_unknown_user(): async def test_taclp_false_sub_unknown_user(logdog):
""" """
Check TopicAccessControlListPlugin returns false user is not listed in ACL. Check TopicAccessControlListPlugin returns false user is not listed in ACL.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.subscribe, session=session, topic="allowed/topic" action=Action.subscribe, session=session, topic="allowed/topic"
) )
assert authorised is False assert authorised is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_false_sub_no_permission(): async def test_taclp_false_sub_no_permission(logdog):
""" """
Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic. Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.subscribe, session=session, topic="forbidden/topic" action=Action.subscribe, session=session, topic="forbidden/topic"
) )
assert authorised is False assert authorised is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_true_sub_permission(): async def test_taclp_true_sub_permission(logdog):
""" """
Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic. Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.subscribe, session=session, topic="allowed/topic" action=Action.subscribe, session=session, topic="allowed/topic"
) )
assert authorised is True assert authorised is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_true_pub_permission(): async def test_taclp_true_pub_permission(logdog):
""" """
Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action. Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = "user" session.username = "user"
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.publish, session=session, topic="allowed/topic" action=Action.publish, session=session, topic="allowed/topic"
) )
assert authorised is True assert authorised is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_taclp_true_anon_sub_permission(): async def test_taclp_true_anon_sub_permission(logdog):
""" """
Check TopicAccessControlListPlugin handles anonymous users. Check TopicAccessControlListPlugin handles anonymous users.
""" """
context = BaseContext() with logdog() as pile:
context.logger = DummyLogger() context = BaseContext()
context.config = { context.logger = logging.getLogger('testlog')
"topic-check": { context.config = {
"enabled": True, "topic-check": {
"acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]}, "enabled": True,
"acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]},
}
} }
}
session = Session() session = Session()
session.username = None session.username = None
plugin = TopicAccessControlListPlugin(context) plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering( authorised = await plugin.topic_filtering(
action=Action.subscribe, session=session, topic="allowed/topic" action=Action.subscribe, session=session, topic="allowed/topic"
) )
assert authorised is True assert authorised is True