diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py index 3dfcb44..2e4f80f 100644 --- a/tests/plugins/test_topic_checking.py +++ b/tests/plugins/test_topic_checking.py @@ -1,4 +1,5 @@ import pytest +import logging from amqtt.plugins.manager import BaseContext from amqtt.plugins.topic_checking import ( @@ -10,221 +11,225 @@ from amqtt.plugins.topic_checking import ( from amqtt.session import Session -class DummyLogger(object): - def __init__(self): - self.messages = [] - - def warning(self, *args, **kwargs): - self.messages.append((args, kwargs)) - - # Base plug-in object @pytest.mark.asyncio -async def test_base_no_config(): +async def test_base_no_config(logdog): """ Check BaseTopicPlugin returns false if no topic-check is present. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {} - plugin = BaseTopicPlugin(context) - authorised = plugin.topic_filtering() - assert authorised is False + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is False - # Should have printed a couple of warnings - assert len(context.logger.messages) == 2 - assert context.logger.messages[0] == ( - ("'topic-check' section not found in context configuration",), - {}, - ) - assert context.logger.messages[1] == ( - ("'auth' section not found in context configuration",), - {}, - ) + # Should have printed a couple of warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 2 + assert log_records[0].levelno == logging.WARN + assert log_records[0].message == "'topic-check' section not found in context configuration" + + assert log_records[1].levelno == logging.WARN + assert log_records[1].message == "'auth' section not found in context configuration" + assert pile.is_empty() @pytest.mark.asyncio -async def test_base_empty_config(): +async def test_base_empty_config(logdog): """ Check BaseTopicPlugin returns false if topic-check is empty. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {}} - plugin = BaseTopicPlugin(context) - authorised = plugin.topic_filtering() - assert authorised is False + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is False - # Should have printed just one warning - assert len(context.logger.messages) == 1 - assert context.logger.messages[0] == ( - ("'auth' section not found in context configuration",), - {}, - ) + # Should have printed just one warning + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 1 + assert log_records[0].levelno == logging.WARN + assert log_records[0].message == "'auth' section not found in context configuration" @pytest.mark.asyncio -async def test_base_disabled_config(): +async def test_base_disabled_config(logdog): """ Check BaseTopicPlugin returns true if disabled. (it doesn't actually check) """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": False}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": False}} - plugin = BaseTopicPlugin(context) - authorised = plugin.topic_filtering() - assert authorised is True + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is True - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 @pytest.mark.asyncio -async def test_base_enabled_config(): +async def test_base_enabled_config(logdog): """ Check BaseTopicPlugin returns true if enabled. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - plugin = BaseTopicPlugin(context) - authorised = plugin.topic_filtering() - assert authorised is True + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is True - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 # Taboo plug-in @pytest.mark.asyncio -async def test_taboo_empty_config(): +async def test_taboo_empty_config(logdog): """ Check TopicTabooPlugin returns false if topic-check absent. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {} - plugin = TopicTabooPlugin(context) - assert (await plugin.topic_filtering()) is False + plugin = TopicTabooPlugin(context) + assert (await plugin.topic_filtering()) is False - # Should have printed a couple of warnings - assert len(context.logger.messages) == 2 - assert context.logger.messages[0] == ( - ("'topic-check' section not found in context configuration",), - {}, - ) - assert context.logger.messages[1] == ( - ("'auth' section not found in context configuration",), - {}, - ) + # Should have printed a couple of warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 2 + assert log_records[0].levelno == logging.WARN + assert log_records[0].message == "'topic-check' section not found in context configuration" + assert log_records[1].levelno == logging.WARN + assert log_records[1].message == "'auth' section not found in context configuration" @pytest.mark.asyncio -async def test_taboo_not_taboo_topic(): +async def test_taboo_not_taboo_topic(logdog): """ Check TopicTabooPlugin returns true if checking disabled. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": False}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": False}} - session = Session() - session.username = "anybody" + session = Session() + session.username = "anybody" - plugin = TopicTabooPlugin(context) - assert ( - await plugin.topic_filtering(session=session, topic="not/prohibited") - ) is True + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="not/prohibited") + ) is True - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 @pytest.mark.asyncio -async def test_taboo_not_taboo_topic(): +async def test_taboo_not_taboo_topic(logdog): """ Check TopicTabooPlugin returns true if topic not taboo """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - session = Session() - session.username = "anybody" + session = Session() + session.username = "anybody" - plugin = TopicTabooPlugin(context) - assert ( - await plugin.topic_filtering(session=session, topic="not/prohibited") - ) is True + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="not/prohibited") + ) is True - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 @pytest.mark.asyncio -async def test_taboo_anon_taboo_topic(): +async def test_taboo_anon_taboo_topic(logdog): """ Check TopicTabooPlugin returns false if topic is taboo and session is anonymous. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - session = Session() - session.username = "" + session = Session() + session.username = "" - plugin = TopicTabooPlugin(context) - assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False + plugin = TopicTabooPlugin(context) + assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 @pytest.mark.asyncio -async def test_taboo_notadmin_taboo_topic(): +async def test_taboo_notadmin_taboo_topic(logdog): """ Check TopicTabooPlugin returns false if topic is taboo and user is not "admin". """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - session = Session() - session.username = "notadmin" + session = Session() + session.username = "notadmin" - plugin = TopicTabooPlugin(context) - assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False + plugin = TopicTabooPlugin(context) + assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 @pytest.mark.asyncio -async def test_taboo_admin_taboo_topic(): +async def test_taboo_admin_taboo_topic(logdog): """ Check TopicTabooPlugin returns true if topic is taboo and user is "admin". """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - session = Session() - session.username = "admin" + session = Session() + session.username = "admin" - plugin = TopicTabooPlugin(context) - assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True + plugin = TopicTabooPlugin(context) + assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True - # Should NOT have printed warnings - assert len(context.logger.messages) == 0 + # Should NOT have printed warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 0 # TopicAccessControlListPlugin tests @@ -286,207 +291,211 @@ def test_topic_ac_match_hash(): @pytest.mark.asyncio -async def test_taclp_empty_config(): +async def test_taclp_empty_config(logdog): """ Check TopicAccessControlListPlugin returns false if topic-check absent. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {} - plugin = TopicAccessControlListPlugin(context) - assert (await plugin.topic_filtering()) is False + plugin = TopicAccessControlListPlugin(context) + assert (await plugin.topic_filtering()) is False - # Should have printed a couple of warnings - assert len(context.logger.messages) == 2 - assert context.logger.messages[0] == ( - ("'topic-check' section not found in context configuration",), - {}, - ) - assert context.logger.messages[1] == ( - ("'auth' section not found in context configuration",), - {}, - ) + # Should have printed a couple of warnings + log_records = list(pile.drain(name='testlog')) + assert len(log_records) == 2 + assert log_records[0].message == "'topic-check' section not found in context configuration" + assert log_records[1].message == "'auth' section not found in context configuration" @pytest.mark.asyncio -async def test_taclp_true_disabled(): +async def test_taclp_true_disabled(logdog): """ Check TopicAccessControlListPlugin returns true if topic checking is disabled. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": False}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": False}} - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.publish, session=session, topic="a/topic" - ) - assert authorised is True + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="a/topic" + ) + assert authorised is True @pytest.mark.asyncio -async def test_taclp_true_no_pub_acl(): +async def test_taclp_true_no_pub_acl(logdog): """ Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given. (This is for backward-compatibility with existing installations.) """ - context = BaseContext() - context.logger = DummyLogger() - context.config = {"topic-check": {"enabled": True}} + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = {"topic-check": {"enabled": True}} - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.publish, session=session, topic="a/topic" - ) - assert authorised is True + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="a/topic" + ) + assert authorised is True @pytest.mark.asyncio -async def test_taclp_false_sub_no_topic(): +async def test_taclp_false_sub_no_topic(logdog): """ Check TopicAccessControlListPlugin returns false user there is no topic. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.subscribe, session=session, topic="" - ) - assert authorised is False + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="" + ) + assert authorised is False @pytest.mark.asyncio -async def test_taclp_false_sub_unknown_user(): +async def test_taclp_false_sub_unknown_user(logdog): """ Check TopicAccessControlListPlugin returns false user is not listed in ACL. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.subscribe, session=session, topic="allowed/topic" - ) - assert authorised is False + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is False @pytest.mark.asyncio -async def test_taclp_false_sub_no_permission(): +async def test_taclp_false_sub_no_permission(logdog): """ Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.subscribe, session=session, topic="forbidden/topic" - ) - assert authorised is False + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="forbidden/topic" + ) + assert authorised is False @pytest.mark.asyncio -async def test_taclp_true_sub_permission(): +async def test_taclp_true_sub_permission(logdog): """ Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.subscribe, session=session, topic="allowed/topic" - ) - assert authorised is True + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is True @pytest.mark.asyncio -async def test_taclp_true_pub_permission(): +async def test_taclp_true_pub_permission(logdog): """ Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = "user" + session = Session() + session.username = "user" - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.publish, session=session, topic="allowed/topic" - ) - assert authorised is True + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="allowed/topic" + ) + assert authorised is True @pytest.mark.asyncio -async def test_taclp_true_anon_sub_permission(): +async def test_taclp_true_anon_sub_permission(logdog): """ Check TopicAccessControlListPlugin handles anonymous users. """ - context = BaseContext() - context.logger = DummyLogger() - context.config = { - "topic-check": { - "enabled": True, - "acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]}, + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger('testlog') + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]}, + } } - } - session = Session() - session.username = None + session = Session() + session.username = None - plugin = TopicAccessControlListPlugin(context) - authorised = await plugin.topic_filtering( - action=Action.subscribe, session=session, topic="allowed/topic" - ) - assert authorised is True + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is True