diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py index 75829c8..e9aec43 100644 --- a/tests/plugins/test_topic_checking.py +++ b/tests/plugins/test_topic_checking.py @@ -195,6 +195,9 @@ async def test_taboo_admin_taboo_topic(): assert len(context.logger.messages) == 0 +# TopicAccessControlListPlugin tests + + def test_topic_ac_not_match(): """ Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match. @@ -252,3 +255,212 @@ def test_topic_ac_match_hash(): ) is True ) + + +@pytest.mark.asyncio +async def test_taclp_empty_config(): + """ + Check TopicAccessControlListPlugin returns false if topic-check absent. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = {} + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering() + assert authorised is False + + # Should have printed a couple of warnings + assert len(context.logger.messages) == 2 + assert context.logger.messages[0] == ( + ("'topic-check' section not found in context configuration",), + {}, + ) + assert context.logger.messages[1] == ( + ("'auth' section not found in context configuration",), + {}, + ) + + +@pytest.mark.asyncio +async def test_taclp_true_no_pub_acl(): + """ + Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given. + (This is for backward-compatibility with existing installations.) + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='publish', session=session, topic='a/topic') + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_false_sub_no_topic(): + """ + Check TopicAccessControlListPlugin returns false user there is no topic. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'acl': { + 'anotheruser': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='subscribe', session=session, topic='') + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_false_sub_unknown_user(): + """ + Check TopicAccessControlListPlugin returns false user is not listed in ACL. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'acl': { + 'anotheruser': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='subscribe', session=session, topic='allowed/topic') + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_false_sub_no_permission(): + """ + Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'acl': { + 'user': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='subscribe', session=session, topic='forbidden/topic') + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_true_sub_permission(): + """ + Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'acl': { + 'user': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='subscribe', session=session, topic='allowed/topic') + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_true_pub_permission(): + """ + Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'publish-acl': { + 'user': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='publish', session=session, topic='allowed/topic') + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_true_anon_sub_permission(): + """ + Check TopicAccessControlListPlugin handles anonymous users. + """ + context = BaseContext() + context.logger = DummyLogger() + context.config = { + 'topic-check': { + 'enabled': True, + 'acl': { + 'anonymous': [ + 'allowed/topic', + 'another/allowed/topic/#' + ] + } + } + } + + session = Session() + session.username = None + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering(action='subscribe', session=session, topic='allowed/topic') + assert authorised is True