updating ldap schema to allow multiple entries for topics. update ldap user auth tests for new schema and user

refactoring ldap plugin and aligning plugin naming convention
pull/287/head
Andrew Mirsky 2025-08-06 00:35:05 -04:00
rodzic ae9ecd074b
commit 068c233a74
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
7 zmienionych plików z 86 dodań i 80 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
import logging import logging
from typing import ClassVar
import ldap import ldap
@ -7,14 +8,29 @@ from amqtt.broker import BrokerContext
from amqtt.contexts import Action from amqtt.contexts import Action
from amqtt.errors import PluginInitError from amqtt.errors import PluginInitError
from amqtt.plugins import TopicMatcher from amqtt.plugins import TopicMatcher
from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin, BasePlugin
from amqtt.session import Session from amqtt.session import Session
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LDAPAuthPlugin(BaseAuthPlugin): @dataclass
"""Plugin to authenticate a user with an LDAP directory server.""" class LdapConfig:
"""Configuration for the LDAP Plugins."""
server: str
"""uri formatted server location. e.g `ldap://localhost:389`"""
base_dn: str
"""distinguished name (dn) of the ldap server. e.g. `dc=amqtt,dc=io`"""
user_attribute: str
"""attribute in ldap entry to match the username against"""
bind_dn: str
"""distinguished name (dn) of known, preferably read-only, user. e.g. `cn=admin,dc=amqtt,dc=io`"""
bind_password: str
"""password for known, preferably read-only, user"""
class AuthLdapPlugin(BasePlugin[BrokerContext]):
def __init__(self, context: BrokerContext) -> None: def __init__(self, context: BrokerContext) -> None:
super().__init__(context) super().__init__(context)
@ -26,6 +42,8 @@ class LDAPAuthPlugin(BaseAuthPlugin):
except ldap.INVALID_CREDENTIALS as e: # pylint: disable=E1101 except ldap.INVALID_CREDENTIALS as e: # pylint: disable=E1101
raise PluginInitError(self.__class__) from e raise PluginInitError(self.__class__) from e
class UserAuthLdapPlugin(AuthLdapPlugin, BaseAuthPlugin):
"""Plugin to authenticate a user with an LDAP directory server."""
async def authenticate(self, *, session: Session) -> bool | None: async def authenticate(self, *, session: Session) -> bool | None:
@ -55,55 +73,39 @@ class LDAPAuthPlugin(BaseAuthPlugin):
return True return True
@dataclass @dataclass
class Config: class Config(LdapConfig):
"""Configuration for the LDAPAuthPlugin.""" """Configuration for the User Auth LDAP Plugin."""
server: str class TopicAuthLdapPlugin(AuthLdapPlugin, BaseTopicPlugin):
"""uri formatted server location. e.g `ldap://localhost:389`"""
base_dn: str
"""distinguished name (dn) of the ldap server. e.g. `dc=amqtt,dc=io`"""
user_attribute: str
"""attribute in ldap entry to match the username against"""
bind_dn: str
"""distinguished name (dn) of known, preferably read-only, user. e.g. `cn=admin,dc=amqtt,dc=io`"""
bind_password: str
"""password for known, preferably read-only, user"""
class LDAPTopicPlugin(BaseTopicPlugin):
"""Plugin to authenticate a user with an LDAP directory server.""" """Plugin to authenticate a user with an LDAP directory server."""
_action_attr_map = { _action_attr_map: ClassVar = {
Action.PUBLISH: 'publish_attribute', Action.PUBLISH: "publish_attribute",
Action.SUBSCRIBE: 'subscribe_attribute', Action.SUBSCRIBE: "subscribe_attribute",
Action.RECEIVE: 'receive_attribute' Action.RECEIVE: "receive_attribute"
} }
def __init__(self, context: BrokerContext) -> None: def __init__(self, context: BrokerContext) -> None:
super().__init__(context) super().__init__(context)
self.conn = ldap.initialize(self.config.server)
self.conn.protocol_version = ldap.VERSION3 # pylint: disable=E1101
try:
self.conn.simple_bind_s(self.config.bind_dn, self.config.bind_password)
except ldap.INVALID_CREDENTIALS as e: # pylint: disable=E1101
raise PluginInitError(self.__class__) from e
self.topic_matcher = TopicMatcher() self.topic_matcher = TopicMatcher()
async def topic_filtering( async def topic_filtering(
self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None
) -> bool | None: ) -> bool | None:
# search_filter = f"({self.config.user_attribute}={session.username})"
search_filter = "(uid=jdoe)" # if not provided needed criteria, can't properly evaluate topic filtering
if not session or not action or not topic:
return None
search_filter = f"({self.config.user_attribute}={session.username})"
attrs = [ attrs = [
"cn", "cn",
self.config.publish_attribute, self.config.publish_attribute,
self.config.subscribe_attribute, self.config.subscribe_attribute,
self.config.receive_attribute self.config.receive_attribute
] ]
results = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, attrs) results = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, attrs) # pylint: disable=E1101
if not results: if not results:
@ -118,29 +120,19 @@ class LDAPTopicPlugin(BaseTopicPlugin):
dn, entry = results[0] dn, entry = results[0]
ldap_attribute = getattr(self.config, self._action_attr_map[action]) ldap_attribute = getattr(self.config, self._action_attr_map[action])
allowed_topics = [t.decode("utf-8") for t in entry.get(ldap_attribute, [])] topic_filters = [t.decode("utf-8") for t in entry.get(ldap_attribute, [])]
logger.debug(f"DN: {dn} - {ldap_attribute}={allowed_topics}") logger.debug(f"DN: {dn} - {ldap_attribute}={topic_filters}")
return self.topic_matcher.are_topics_allowed(topic, allowed_topics) return self.topic_matcher.are_topics_allowed(topic, topic_filters)
# print(f"{self.config.publish_attribute} : ", entry.get(self.config.publish_attribute, []))
# print(f"{self.config.subscribe_attribute} : ", entry.get(self.config.subscribe_attribute, []))
# print(f"{self.config.receive_attribute} : ", entry.get(self.config.receive_attribute, []))
@dataclass @dataclass
class Config: class Config(LdapConfig):
"""Configuration for the LDAPAuthPlugin.""" """Configuration for the LDAPAuthPlugin."""
server: str
"""uri formatted server location. e.g `ldap://localhost:389`"""
base_dn: str
"""distinguished name (dn) of the ldap server. e.g. `dc=amqtt,dc=io`"""
user_attribute: str
"""attribute in ldap entry to match the username against"""
bind_dn: str
"""distinguished name (dn) of known, preferably read-only, user. e.g. `cn=admin,dc=amqtt,dc=io`"""
bind_password: str
"""password for known, preferably read-only, user"""
publish_attribute: str publish_attribute: str
"""LDAP attribute which contains a list of permissible publish topics."""
subscribe_attribute: str subscribe_attribute: str
receive_attribute: str """LDAP attribute which contains a list of permissible subscribe topics."""
receive_attribute: str
"""LDAP attribute which contains a list of permissible receive topics."""

Wyświetl plik

@ -35,4 +35,4 @@ class TopicMatcher:
def are_topics_allowed(self, topic: str, many_filters: list[str]) -> bool: def are_topics_allowed(self, topic: str, many_filters: list[str]) -> bool:
return any([self.is_topic_allowed(topic, a_filter) for a_filter in many_filters]) return any(self.is_topic_allowed(topic, a_filter) for a_filter in many_filters)

Wyświetl plik

@ -1,10 +1,24 @@
# Authentication with LDAP Server # Authentication with LDAP Server
`amqtt.contrib.ldap.LDAPAuthPlugin` If clients accessing the broker are managed by an LDAP server, this plugin can verify credentials
for client authentication and/or topic-level authorization.
- `amqtt.contrib.ldap.UserAuthLdapPlugin` (client authentication)
- `amqtt.contrib.ldap.TopicAuthLdapPlugin` (topic authorization)
Authenticate a user with an LDAP directory server. Authenticate a user with an LDAP directory server.
::: amqtt.contrib.ldap.LDAPAuthPlugin.Config ## User Auth
::: amqtt.contrib.ldap.UserAuthLdapPlugin.Config
options:
heading_level: 4
extra:
class_style: "simple"
## Topic Auth (ACL)
::: amqtt.contrib.ldap.TopicAuthLdapPlugin.Config
options: options:
heading_level: 4 heading_level: 4
extra: extra:

Wyświetl plik

@ -208,7 +208,7 @@ max-returns = 10
addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=html"] addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=html"]
testpaths = ["tests"] testpaths = ["tests"]
asyncio_mode = "auto" asyncio_mode = "auto"
timeout = 10 timeout = 15
asyncio_default_fixture_loop_scope = "function" asyncio_default_fixture_loop_scope = "function"
#addopts = ["--tb=short", "--capture=tee-sys"] #addopts = ["--tb=short", "--capture=tee-sys"]
#log_cli = true #log_cli = true

Wyświetl plik

@ -8,7 +8,7 @@ from amqtt.broker import BrokerContext, Broker
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.contexts import BrokerConfig, ListenerConfig, ClientConfig, Action from amqtt.contexts import BrokerConfig, ListenerConfig, ClientConfig, Action
from amqtt.contrib.auth_db.user_mgr_cli import user_app from amqtt.contrib.auth_db.user_mgr_cli import user_app
from amqtt.contrib.ldap import LDAPAuthPlugin, LDAPTopicPlugin from amqtt.contrib.ldap import UserAuthLdapPlugin, TopicAuthLdapPlugin
from amqtt.errors import ConnectError from amqtt.errors import ConnectError
from amqtt.session import Session from amqtt.session import Session
from tests.test_cli import broker from tests.test_cli import broker
@ -39,31 +39,30 @@ def ldap_service(docker_ip, docker_services):
return url return url
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_ldap(ldap_service): async def test_ldap_user_plugin(ldap_service):
ctx = BrokerContext(Broker()) ctx = BrokerContext(Broker())
ctx.config = LDAPAuthPlugin.Config( ctx.config = UserAuthLdapPlugin.Config(
# server="ldap://localhost:10389",
server=ldap_service, server=ldap_service,
base_dn="dc=amqtt,dc=io", base_dn="dc=amqtt,dc=io",
user_attribute="uid", user_attribute="uid",
bind_dn="cn=admin,dc=amqtt,dc=io", bind_dn="cn=admin,dc=amqtt,dc=io",
bind_password="adminpassword", bind_password="adminpassword",
) )
ldap_plugin = LDAPAuthPlugin(context=ctx) ldap_plugin = UserAuthLdapPlugin(context=ctx)
s = Session() s = Session()
s.username = "alpha.beta" s.username = "jdoe"
s.password = "password456" s.password = "johndoepassword"
assert await ldap_plugin.authenticate(session=s), "could not authenticate user" assert await ldap_plugin.authenticate(session=s), "could not authenticate user"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_auth_ldap(ldap_service): async def test_ldap_user(ldap_service):
cfg = BrokerConfig( cfg = BrokerConfig(
listeners={ 'default' : ListenerConfig() }, listeners={ 'default' : ListenerConfig() },
plugins={ plugins={
'amqtt.contrib.ldap.LDAPAuthPlugin': { 'amqtt.contrib.ldap.UserAuthLdapPlugin': {
'server': ldap_service, 'server': ldap_service,
'base_dn': 'dc=amqtt,dc=io', 'base_dn': 'dc=amqtt,dc=io',
'user_attribute': 'uid', 'user_attribute': 'uid',
@ -79,7 +78,7 @@ async def test_auth_ldap(ldap_service):
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
client = MQTTClient(config=ClientConfig(auto_reconnect=False)) client = MQTTClient(config=ClientConfig(auto_reconnect=False))
await client.connect('mqtt://gamma.delta:password789@127.0.0.1:1883') await client.connect('mqtt://jdoe:johndoepassword@127.0.0.1:1883')
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
await client.publish('my/topic', b'my message') await client.publish('my/topic', b'my message')
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@ -88,12 +87,12 @@ async def test_auth_ldap(ldap_service):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_auth_ldap_incorrect_creds(ldap_service): async def test_ldap_user_invalid_creds(ldap_service):
cfg = BrokerConfig( cfg = BrokerConfig(
listeners={ 'default' : ListenerConfig() }, listeners={ 'default' : ListenerConfig() },
plugins={ plugins={
'amqtt.contrib.ldap.LDAPAuthPlugin': { 'amqtt.contrib.ldap.UserAuthLdapPlugin': {
'server': ldap_service, 'server': ldap_service,
'base_dn': 'dc=amqtt,dc=io', 'base_dn': 'dc=amqtt,dc=io',
'user_attribute': 'uid', 'user_attribute': 'uid',
@ -110,17 +109,16 @@ async def test_auth_ldap_incorrect_creds(ldap_service):
client = MQTTClient(config=ClientConfig(auto_reconnect=False)) client = MQTTClient(config=ClientConfig(auto_reconnect=False))
with pytest.raises(ConnectError): with pytest.raises(ConnectError):
await client.connect('mqtt://gamma.delta:wrongpassword@127.0.0.1:1883') await client.connect('mqtt://jdoe:wrongpassword@127.0.0.1:1883')
await broker.shutdown() await broker.shutdown()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_topic_ldap_plugin(): async def test_ldap_topic_plugin(ldap_service):
ctx = BrokerContext(Broker()) ctx = BrokerContext(Broker())
ctx.config = LDAPTopicPlugin.Config( ctx.config = TopicAuthLdapPlugin.Config(
server="ldap://localhost:1389", server=ldap_service,
# server=ldap_service,
base_dn="dc=amqtt,dc=io", base_dn="dc=amqtt,dc=io",
user_attribute="uid", user_attribute="uid",
bind_dn="cn=admin,dc=amqtt,dc=io", bind_dn="cn=admin,dc=amqtt,dc=io",
@ -129,10 +127,10 @@ async def test_topic_ldap_plugin():
subscribe_attribute="subscribeACL", subscribe_attribute="subscribeACL",
receive_attribute="receiveACL" receive_attribute="receiveACL"
) )
ldap_plugin = LDAPTopicPlugin(context=ctx) ldap_plugin = TopicAuthLdapPlugin(context=ctx)
s = Session() s = Session()
s.username = "testuser" s.username = "jdoe"
s.password = "testpassword" s.password = "wrongpassword"
assert await ldap_plugin.topic_filtering(session=s, topic='my/topic/one', action=Action.PUBLISH), "access not granted" assert await ldap_plugin.topic_filtering(session=s, topic='my/topic/one', action=Action.PUBLISH), "access not granted"

Wyświetl plik

@ -1,7 +1,7 @@
attributetype ( 1.3.6.1.4.1.4203.666.1.1 NAME 'publishACL' DESC 'topics for publishing' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE ) attributetype ( 1.3.6.1.4.1.4203.666.1.1 NAME 'publishACL' DESC 'topics for publishing' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15)
attributetype ( 1.3.6.1.4.1.4203.666.1.2 NAME 'subscribeACL' DESC 'topics for subscribing' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE ) attributetype ( 1.3.6.1.4.1.4203.666.1.2 NAME 'subscribeACL' DESC 'topics for subscribing' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15)
attributetype ( 1.3.6.1.4.1.4203.666.1.3 NAME 'receiveACL' DESC 'topics for receiving' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE ) attributetype ( 1.3.6.1.4.1.4203.666.1.3 NAME 'receiveACL' DESC 'topics for receiving' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15)
objectclass ( 1.3.6.1.4.1.4203.666.2.1 NAME 'customuserinfo' DESC 'User info with custom attributes' SUP inetOrgPerson STRUCTURAL MUST ( cn $ sn ) MAY ( publishACL $ subscribeACL $ receiveACL ) ) objectclass ( 1.3.6.1.4.1.4203.666.2.1 NAME 'customuserinfo' DESC 'User info with custom attributes' SUP inetOrgPerson STRUCTURAL MUST ( cn $ sn ) MAY ( publishACL $ subscribeACL $ receiveACL ) )

Wyświetl plik

@ -10,7 +10,9 @@ cn: John Doe
sn: Doe sn: Doe
uid: jdoe uid: jdoe
mail: jdoe@amqtt.io mail: jdoe@amqtt.io
userPassword: {SSHA}w0RfYQaFGMQq7c5QnW7xvXb+iXG0P5gB # `slappasswd -s johndoepassword`
publishACL: my/topic/one userPassword: {SSHA}ANVSnjfMu85vXHNS5XW7i4EHGJ8VjMtu
publishACL: my/topic/#
publishACL: other/+/topic
subscribeACL: my/topic/two subscribeACL: my/topic/two
receiveACL: my/topic/three receiveACL: my/topic/three