From 0019f5d0248cb5859d2a2257a701e829ba511aee Mon Sep 17 00:00:00 2001 From: MVladislav Date: Tue, 1 Apr 2025 20:18:38 +0200 Subject: [PATCH] refactor: manual merge some changes from last commits --- amqtt/broker.py | 4 +++- amqtt/mqtt/protocol/broker_handler.py | 2 ++ amqtt/plugins/authentication.py | 10 +++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index 01f9c9a..1b15eba 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -181,6 +181,8 @@ class Broker: self._broadcast_task: asyncio.Task[Any] | None = None self._broadcast_shutdown_waiter: asyncio.Future[Any] = futures.Future() + self._tasks_queue: deque[asyncio.Task[OutgoingApplicationMessage]] = deque() + # Init plugins manager context = BrokerContext(self) context.config = self.config @@ -747,7 +749,7 @@ class Broker: return bool(match_pattern.fullmatch(topic)) async def _broadcast_loop(self) -> None: - running_tasks: deque[asyncio.Task[OutgoingApplicationMessage]] = deque() + running_tasks: deque[asyncio.Task[OutgoingApplicationMessage]] = self._tasks_queue try: while True: while running_tasks and running_tasks[0].done(): diff --git a/amqtt/mqtt/protocol/broker_handler.py b/amqtt/mqtt/protocol/broker_handler.py index 59e89fa..b7a1731 100644 --- a/amqtt/mqtt/protocol/broker_handler.py +++ b/amqtt/mqtt/protocol/broker_handler.py @@ -218,6 +218,8 @@ class BrokerProtocolHandler(ProtocolHandler): incoming_session.will_message = connect.will_message incoming_session.username = connect.username incoming_session.password = connect.password + incoming_session.remote_address = remote_address + incoming_session.remote_port = remote_port incoming_session.keep_alive = max(connect.keep_alive, 0) handler = cls(plugins_manager, loop=loop) diff --git a/amqtt/plugins/authentication.py b/amqtt/plugins/authentication.py index c89194e..4e545be 100644 --- a/amqtt/plugins/authentication.py +++ b/amqtt/plugins/authentication.py @@ -2,14 +2,14 @@ from pathlib import Path from passlib.apps import custom_app_context as pwd_context -from amqtt.plugins.manager import BaseContext +from amqtt.broker import BrokerContext from amqtt.session import Session class BaseAuthPlugin: """Base class for authentication plugins.""" - def __init__(self, context: BaseContext) -> None: + def __init__(self, context: BrokerContext) -> None: self.context = context self.auth_config = self.context.config.get("auth") if self.context.config else None if not self.auth_config: @@ -31,7 +31,7 @@ class AnonymousAuthPlugin(BaseAuthPlugin): authenticated = await super().authenticate(*args, **kwargs) if authenticated: # Default to allowing anonymous - allow_anonymous = self.auth_config.get("allow-anonymous", True) if self.auth_config is not None else True + allow_anonymous = self.auth_config.get("allow-anonymous", True) if isinstance(self.auth_config, dict) else True if allow_anonymous: self.context.logger.debug("Authentication success: config allows anonymous") return True @@ -47,14 +47,14 @@ class AnonymousAuthPlugin(BaseAuthPlugin): class FileAuthPlugin(BaseAuthPlugin): """Authentication plugin based on a file-stored user database.""" - def __init__(self, context: BaseContext) -> None: + def __init__(self, context: BrokerContext) -> None: super().__init__(context) self._users: dict[str, str] = {} self._read_password_file() def _read_password_file(self) -> None: """Read the password file and populates the user dictionary.""" - password_file = self.auth_config.get("password-file") if self.auth_config is not None else None + password_file = self.auth_config.get("password-file") if isinstance(self.auth_config, dict) else None if not password_file: self.context.logger.warning("Configuration parameter 'password-file' not found") return