import logging import pytest from amqtt.broker import BrokerContext, Broker from amqtt.contexts import BaseContext, Action from amqtt.plugins.topic_checking import TopicAccessControlListPlugin, TopicTabooPlugin from amqtt.plugins.base import BaseTopicPlugin from amqtt.session import Session logger = logging.getLogger(__name__) # Base plug-in object @pytest.mark.asyncio async def test_base_no_config(logdog): """Check BaseTopicPlugin returns false if no topic-check is present.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {} plugin = BaseTopicPlugin(context) authorised = await plugin.topic_filtering() assert authorised is False # Warning messages are only generated if using deprecated plugin configuration on initial load log_records = list(pile.drain(name="testlog")) assert len(log_records) == 1 assert log_records[0].levelno == logging.WARNING assert log_records[0].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio async def test_base_empty_config(logdog): """Check BaseTopicPlugin returns false if topic-check is empty.""" with logdog() as pile: broker = Broker() context = BrokerContext(broker) context.logger = logging.getLogger("testlog") context.config = {"topic-check": {}} plugin = BaseTopicPlugin(context) authorised = await plugin.topic_filtering() assert authorised is False # Warning messages are only generated if using deprecated plugin configuration on initial load log_records = list(pile.drain(name="testlog")) assert len(log_records) == 1 assert log_records[0].levelno == logging.WARNING assert log_records[0].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio async def test_base_disabled_config(logdog): """Check BaseTopicPlugin returns true if disabled. (it doesn't actually check).""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {"topic-check": {"enabled": False}} plugin = BaseTopicPlugin(context) authorised = await plugin.topic_filtering() assert authorised is True # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 @pytest.mark.asyncio async def test_base_enabled_config(logdog): """Check BaseTopicPlugin returns true if enabled.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {"topic-check": {"enabled": True}} plugin = BaseTopicPlugin(context) authorised = await plugin.topic_filtering() assert authorised is True # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 # Taboo plug-in @pytest.mark.asyncio async def test_taboo_empty_config(logdog): """Check TopicTabooPlugin returns false if topic-check absent.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {} plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering()) is False # Warning messages are only generated if using deprecated plugin configuration on initial load log_records = list(pile.drain(name="testlog")) assert len(log_records) == 1 assert log_records[0].levelno == logging.WARNING assert log_records[0].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio async def test_taboo_disabled(logdog): """Check TopicTabooPlugin returns true if checking disabled.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {"topic-check": {"enabled": False}} session = Session() session.username = "anybody" plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering(session=session, topic="not/prohibited")) is True # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 @pytest.mark.parametrize("test_config", [ ({"topic-check": {"enabled": True}}), (TopicTabooPlugin.Config()) ]) @pytest.mark.asyncio async def test_taboo_not_taboo_topic(logdog, test_config): """Check TopicTabooPlugin returns true if topic not taboo.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "anybody" plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering(session=session, topic="not/prohibited")) is True # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 @pytest.mark.parametrize("test_config", [ ({"topic-check": {"enabled": True}}), (TopicTabooPlugin.Config()) ]) @pytest.mark.asyncio async def test_taboo_anon_taboo_topic(logdog, test_config): """Check TopicTabooPlugin returns false if topic is taboo and session is anonymous.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "" plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 @pytest.mark.parametrize("test_config", [ ({"topic-check": {"enabled": True}}), (TopicTabooPlugin.Config()) ]) @pytest.mark.asyncio async def test_taboo_notadmin_taboo_topic(logdog, test_config): """Check TopicTabooPlugin returns false if topic is taboo and user is not "admin".""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "notadmin" plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 @pytest.mark.parametrize("test_config", [ ({"topic-check": {"enabled": True}}), (TopicTabooPlugin.Config()) ]) @pytest.mark.asyncio async def test_taboo_admin_taboo_topic(logdog, test_config): """Check TopicTabooPlugin returns true if topic is taboo and user is "admin".""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "admin" plugin = TopicTabooPlugin(context) assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True # Should NOT have printed warnings log_records = list(pile.drain(name="testlog")) assert len(log_records) == 0 # TopicAccessControlListPlugin tests def test_topic_ac_not_match(): """Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match.""" assert TopicAccessControlListPlugin.topic_ac("a/topic/to/match", "a/topic/to/notmatch") is False def test_topic_ac_not_match_longer_acl(): """Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and ACL topic is longer.""" assert TopicAccessControlListPlugin.topic_ac("topic", "topic/is/longer") is False def test_topic_ac_not_match_longer_rq(): """Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and RQ topic is longer.""" assert TopicAccessControlListPlugin.topic_ac("topic/is/longer", "topic") is False def test_topic_ac_match_exact(): """Test TopicAccessControlListPlugin.topic_ac returns true if topics match exactly.""" assert TopicAccessControlListPlugin.topic_ac("exact/topic", "exact/topic") is True def test_topic_ac_match_plus(): """Test TopicAccessControlListPlugin.topic_ac correctly handles '+' wildcard.""" assert ( TopicAccessControlListPlugin.topic_ac( "a/topic/anything/value", "a/topic/+/value", ) is True ) def test_topic_ac_match_hash(): """Test TopicAccessControlListPlugin.topic_ac correctly handles '#' wildcard.""" assert ( TopicAccessControlListPlugin.topic_ac( "topic/prefix/and/suffix", "topic/prefix/#", ) is True ) @pytest.mark.asyncio async def test_taclp_empty_config(logdog): """Check TopicAccessControlListPlugin returns false if topic-check absent.""" with logdog() as pile: context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {} plugin = TopicAccessControlListPlugin(context) assert (await plugin.topic_filtering()) is False # Warning messages are only generated if using deprecated plugin configuration on initial load log_records = list(pile.drain(name="testlog")) assert len(log_records) == 1 assert log_records[0].levelno == logging.WARNING assert log_records[0].message == "'topic-check' section not found in context configuration" @pytest.mark.asyncio async def test_taclp_true_disabled(logdog): """Check TopicAccessControlListPlugin returns true if topic checking is disabled.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = {"topic-check": {"enabled": False}} session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.PUBLISH, session=session, topic="a/topic", ) assert authorised is True @pytest.mark.parametrize("test_config", [ ({"topic-check": {"enabled": True}}), (TopicAccessControlListPlugin.Config()) ]) @pytest.mark.asyncio async def test_taclp_true_no_pub_acl(logdog, test_config): """Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given. (This is for backward-compatibility with existing installations.). """ context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.PUBLISH, session=session, topic="a/topic", ) assert authorised is True @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( acl={"anotheruser": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_false_sub_no_topic(logdog, test_config): """Check TopicAccessControlListPlugin returns false user there is no topic.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.SUBSCRIBE, session=session, topic="", ) assert authorised is False @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( acl={"anotheruser": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_false_sub_unknown_user(logdog, test_config): """Check TopicAccessControlListPlugin returns false user is not listed in ACL.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.SUBSCRIBE, session=session, topic="allowed/topic", ) assert authorised is False @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( acl={"user": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_false_sub_no_permission(logdog, test_config): """Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.SUBSCRIBE, session=session, topic="forbidden/topic", ) assert authorised is False @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( acl={"user": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_true_sub_permission(logdog, test_config): """Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.SUBSCRIBE, session=session, topic="allowed/topic", ) assert authorised is True @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( publish_acl={"user": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_true_pub_permission(logdog, test_config): """Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = "user" plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.PUBLISH, session=session, topic="allowed/topic", ) assert authorised is True @pytest.mark.parametrize("test_config", [ ({ "topic-check": { "enabled": True, "acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]}, }, }), (TopicAccessControlListPlugin.Config( acl={"anonymous": ["allowed/topic", "another/allowed/topic/#"]} )) ]) @pytest.mark.asyncio async def test_taclp_true_anon_sub_permission(logdog, test_config): """Check TopicAccessControlListPlugin handles anonymous users.""" context = BaseContext() context.logger = logging.getLogger("testlog") context.config = test_config session = Session() session.username = None plugin = TopicAccessControlListPlugin(context) authorised = await plugin.topic_filtering( action=Action.SUBSCRIBE, session=session, topic="allowed/topic", ) assert authorised is True