amqtt/tests/plugins/test_topic_checking.py

506 wiersze
16 KiB
Python

import logging
import pytest
from amqtt.broker import BrokerContext, Broker
from amqtt.contexts import BaseContext, Action
from amqtt.plugins.topic_checking import TopicAccessControlListPlugin, TopicTabooPlugin
from amqtt.plugins.base import BaseTopicPlugin
from amqtt.session import Session
logger = logging.getLogger(__name__)
# Base plug-in object
@pytest.mark.asyncio
async def test_base_no_config(logdog):
"""Check BaseTopicPlugin returns false if no topic-check is present."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {}
plugin = BaseTopicPlugin(context)
authorised = await plugin.topic_filtering()
assert authorised is False
# Warning messages are only generated if using deprecated plugin configuration on initial load
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 1
assert log_records[0].levelno == logging.WARNING
assert log_records[0].message == "'topic-check' section not found in context configuration"
@pytest.mark.asyncio
async def test_base_empty_config(logdog):
"""Check BaseTopicPlugin returns false if topic-check is empty."""
with logdog() as pile:
broker = Broker()
context = BrokerContext(broker)
context.logger = logging.getLogger("testlog")
context.config = {"topic-check": {}}
plugin = BaseTopicPlugin(context)
authorised = await plugin.topic_filtering()
assert authorised is False
# Warning messages are only generated if using deprecated plugin configuration on initial load
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 1
assert log_records[0].levelno == logging.WARNING
assert log_records[0].message == "'topic-check' section not found in context configuration"
@pytest.mark.asyncio
async def test_base_disabled_config(logdog):
"""Check BaseTopicPlugin returns true if disabled. (it doesn't actually check)."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {"topic-check": {"enabled": False}}
plugin = BaseTopicPlugin(context)
authorised = await plugin.topic_filtering()
assert authorised is True
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
@pytest.mark.asyncio
async def test_base_enabled_config(logdog):
"""Check BaseTopicPlugin returns true if enabled."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {"topic-check": {"enabled": True}}
plugin = BaseTopicPlugin(context)
authorised = await plugin.topic_filtering()
assert authorised is True
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
# Taboo plug-in
@pytest.mark.asyncio
async def test_taboo_empty_config(logdog):
"""Check TopicTabooPlugin returns false if topic-check absent."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {}
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering()) is False
# Warning messages are only generated if using deprecated plugin configuration on initial load
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 1
assert log_records[0].levelno == logging.WARNING
assert log_records[0].message == "'topic-check' section not found in context configuration"
@pytest.mark.asyncio
async def test_taboo_disabled(logdog):
"""Check TopicTabooPlugin returns true if checking disabled."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {"topic-check": {"enabled": False}}
session = Session()
session.username = "anybody"
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="not/prohibited")) is True
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
@pytest.mark.parametrize("test_config", [
({"topic-check": {"enabled": True}}),
(TopicTabooPlugin.Config())
])
@pytest.mark.asyncio
async def test_taboo_not_taboo_topic(logdog, test_config):
"""Check TopicTabooPlugin returns true if topic not taboo."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "anybody"
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="not/prohibited")) is True
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
@pytest.mark.parametrize("test_config", [
({"topic-check": {"enabled": True}}),
(TopicTabooPlugin.Config())
])
@pytest.mark.asyncio
async def test_taboo_anon_taboo_topic(logdog, test_config):
"""Check TopicTabooPlugin returns false if topic is taboo and session is anonymous."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = ""
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
@pytest.mark.parametrize("test_config", [
({"topic-check": {"enabled": True}}),
(TopicTabooPlugin.Config())
])
@pytest.mark.asyncio
async def test_taboo_notadmin_taboo_topic(logdog, test_config):
"""Check TopicTabooPlugin returns false if topic is taboo and user is not "admin"."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "notadmin"
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is False
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
@pytest.mark.parametrize("test_config", [
({"topic-check": {"enabled": True}}),
(TopicTabooPlugin.Config())
])
@pytest.mark.asyncio
async def test_taboo_admin_taboo_topic(logdog, test_config):
"""Check TopicTabooPlugin returns true if topic is taboo and user is "admin"."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "admin"
plugin = TopicTabooPlugin(context)
assert (await plugin.topic_filtering(session=session, topic="prohibited")) is True
# Should NOT have printed warnings
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 0
# TopicAccessControlListPlugin tests
def test_topic_ac_not_match():
"""Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match."""
assert TopicAccessControlListPlugin.topic_ac("a/topic/to/match", "a/topic/to/notmatch") is False
def test_topic_ac_not_match_longer_acl():
"""Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and ACL topic is longer."""
assert TopicAccessControlListPlugin.topic_ac("topic", "topic/is/longer") is False
def test_topic_ac_not_match_longer_rq():
"""Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and RQ topic is longer."""
assert TopicAccessControlListPlugin.topic_ac("topic/is/longer", "topic") is False
def test_topic_ac_match_exact():
"""Test TopicAccessControlListPlugin.topic_ac returns true if topics match exactly."""
assert TopicAccessControlListPlugin.topic_ac("exact/topic", "exact/topic") is True
def test_topic_ac_match_plus():
"""Test TopicAccessControlListPlugin.topic_ac correctly handles '+' wildcard."""
assert (
TopicAccessControlListPlugin.topic_ac(
"a/topic/anything/value",
"a/topic/+/value",
)
is True
)
def test_topic_ac_match_hash():
"""Test TopicAccessControlListPlugin.topic_ac correctly handles '#' wildcard."""
assert (
TopicAccessControlListPlugin.topic_ac(
"topic/prefix/and/suffix",
"topic/prefix/#",
)
is True
)
@pytest.mark.asyncio
async def test_taclp_empty_config(logdog):
"""Check TopicAccessControlListPlugin returns false if topic-check absent."""
with logdog() as pile:
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {}
plugin = TopicAccessControlListPlugin(context)
assert (await plugin.topic_filtering()) is False
# Warning messages are only generated if using deprecated plugin configuration on initial load
log_records = list(pile.drain(name="testlog"))
assert len(log_records) == 1
assert log_records[0].levelno == logging.WARNING
assert log_records[0].message == "'topic-check' section not found in context configuration"
@pytest.mark.asyncio
async def test_taclp_true_disabled(logdog):
"""Check TopicAccessControlListPlugin returns true if topic checking is disabled."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = {"topic-check": {"enabled": False}}
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.PUBLISH,
session=session,
topic="a/topic",
)
assert authorised is True
@pytest.mark.parametrize("test_config", [
({"topic-check": {"enabled": True}}),
(TopicAccessControlListPlugin.Config())
])
@pytest.mark.asyncio
async def test_taclp_true_no_pub_acl(logdog, test_config):
"""Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given.
(This is for backward-compatibility with existing installations.).
"""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.PUBLISH,
session=session,
topic="a/topic",
)
assert authorised is True
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
acl={"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_false_sub_no_topic(logdog, test_config):
"""Check TopicAccessControlListPlugin returns false user there is no topic."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.SUBSCRIBE,
session=session,
topic="",
)
assert authorised is False
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
acl={"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_false_sub_unknown_user(logdog, test_config):
"""Check TopicAccessControlListPlugin returns false user is not listed in ACL."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.SUBSCRIBE,
session=session,
topic="allowed/topic",
)
assert authorised is False
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
acl={"user": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_false_sub_no_permission(logdog, test_config):
"""Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.SUBSCRIBE,
session=session,
topic="forbidden/topic",
)
assert authorised is False
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
acl={"user": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_true_sub_permission(logdog, test_config):
"""Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.SUBSCRIBE,
session=session,
topic="allowed/topic",
)
assert authorised is True
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
publish_acl={"user": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_true_pub_permission(logdog, test_config):
"""Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = "user"
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.PUBLISH,
session=session,
topic="allowed/topic",
)
assert authorised is True
@pytest.mark.parametrize("test_config", [
({
"topic-check": {
"enabled": True,
"acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]},
},
}),
(TopicAccessControlListPlugin.Config(
acl={"anonymous": ["allowed/topic", "another/allowed/topic/#"]}
))
])
@pytest.mark.asyncio
async def test_taclp_true_anon_sub_permission(logdog, test_config):
"""Check TopicAccessControlListPlugin handles anonymous users."""
context = BaseContext()
context.logger = logging.getLogger("testlog")
context.config = test_config
session = Session()
session.username = None
plugin = TopicAccessControlListPlugin(context)
authorised = await plugin.topic_filtering(
action=Action.SUBSCRIBE,
session=session,
topic="allowed/topic",
)
assert authorised is True