kopia lustrzana https://github.com/Yakifo/amqtt
refactor: manual merge some changes from last commits
rodzic
769d108b1f
commit
0019f5d024
|
@ -181,6 +181,8 @@ class Broker:
|
||||||
self._broadcast_task: asyncio.Task[Any] | None = None
|
self._broadcast_task: asyncio.Task[Any] | None = None
|
||||||
self._broadcast_shutdown_waiter: asyncio.Future[Any] = futures.Future()
|
self._broadcast_shutdown_waiter: asyncio.Future[Any] = futures.Future()
|
||||||
|
|
||||||
|
self._tasks_queue: deque[asyncio.Task[OutgoingApplicationMessage]] = deque()
|
||||||
|
|
||||||
# Init plugins manager
|
# Init plugins manager
|
||||||
context = BrokerContext(self)
|
context = BrokerContext(self)
|
||||||
context.config = self.config
|
context.config = self.config
|
||||||
|
@ -747,7 +749,7 @@ class Broker:
|
||||||
return bool(match_pattern.fullmatch(topic))
|
return bool(match_pattern.fullmatch(topic))
|
||||||
|
|
||||||
async def _broadcast_loop(self) -> None:
|
async def _broadcast_loop(self) -> None:
|
||||||
running_tasks: deque[asyncio.Task[OutgoingApplicationMessage]] = deque()
|
running_tasks: deque[asyncio.Task[OutgoingApplicationMessage]] = self._tasks_queue
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
while running_tasks and running_tasks[0].done():
|
while running_tasks and running_tasks[0].done():
|
||||||
|
|
|
@ -218,6 +218,8 @@ class BrokerProtocolHandler(ProtocolHandler):
|
||||||
incoming_session.will_message = connect.will_message
|
incoming_session.will_message = connect.will_message
|
||||||
incoming_session.username = connect.username
|
incoming_session.username = connect.username
|
||||||
incoming_session.password = connect.password
|
incoming_session.password = connect.password
|
||||||
|
incoming_session.remote_address = remote_address
|
||||||
|
incoming_session.remote_port = remote_port
|
||||||
incoming_session.keep_alive = max(connect.keep_alive, 0)
|
incoming_session.keep_alive = max(connect.keep_alive, 0)
|
||||||
|
|
||||||
handler = cls(plugins_manager, loop=loop)
|
handler = cls(plugins_manager, loop=loop)
|
||||||
|
|
|
@ -2,14 +2,14 @@ from pathlib import Path
|
||||||
|
|
||||||
from passlib.apps import custom_app_context as pwd_context
|
from passlib.apps import custom_app_context as pwd_context
|
||||||
|
|
||||||
from amqtt.plugins.manager import BaseContext
|
from amqtt.broker import BrokerContext
|
||||||
from amqtt.session import Session
|
from amqtt.session import Session
|
||||||
|
|
||||||
|
|
||||||
class BaseAuthPlugin:
|
class BaseAuthPlugin:
|
||||||
"""Base class for authentication plugins."""
|
"""Base class for authentication plugins."""
|
||||||
|
|
||||||
def __init__(self, context: BaseContext) -> None:
|
def __init__(self, context: BrokerContext) -> None:
|
||||||
self.context = context
|
self.context = context
|
||||||
self.auth_config = self.context.config.get("auth") if self.context.config else None
|
self.auth_config = self.context.config.get("auth") if self.context.config else None
|
||||||
if not self.auth_config:
|
if not self.auth_config:
|
||||||
|
@ -31,7 +31,7 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
|
||||||
authenticated = await super().authenticate(*args, **kwargs)
|
authenticated = await super().authenticate(*args, **kwargs)
|
||||||
if authenticated:
|
if authenticated:
|
||||||
# Default to allowing anonymous
|
# Default to allowing anonymous
|
||||||
allow_anonymous = self.auth_config.get("allow-anonymous", True) if self.auth_config is not None else True
|
allow_anonymous = self.auth_config.get("allow-anonymous", True) if isinstance(self.auth_config, dict) else True
|
||||||
if allow_anonymous:
|
if allow_anonymous:
|
||||||
self.context.logger.debug("Authentication success: config allows anonymous")
|
self.context.logger.debug("Authentication success: config allows anonymous")
|
||||||
return True
|
return True
|
||||||
|
@ -47,14 +47,14 @@ class AnonymousAuthPlugin(BaseAuthPlugin):
|
||||||
class FileAuthPlugin(BaseAuthPlugin):
|
class FileAuthPlugin(BaseAuthPlugin):
|
||||||
"""Authentication plugin based on a file-stored user database."""
|
"""Authentication plugin based on a file-stored user database."""
|
||||||
|
|
||||||
def __init__(self, context: BaseContext) -> None:
|
def __init__(self, context: BrokerContext) -> None:
|
||||||
super().__init__(context)
|
super().__init__(context)
|
||||||
self._users: dict[str, str] = {}
|
self._users: dict[str, str] = {}
|
||||||
self._read_password_file()
|
self._read_password_file()
|
||||||
|
|
||||||
def _read_password_file(self) -> None:
|
def _read_password_file(self) -> None:
|
||||||
"""Read the password file and populates the user dictionary."""
|
"""Read the password file and populates the user dictionary."""
|
||||||
password_file = self.auth_config.get("password-file") if self.auth_config is not None else None
|
password_file = self.auth_config.get("password-file") if isinstance(self.auth_config, dict) else None
|
||||||
if not password_file:
|
if not password_file:
|
||||||
self.context.logger.warning("Configuration parameter 'password-file' not found")
|
self.context.logger.warning("Configuration parameter 'password-file' not found")
|
||||||
return
|
return
|
||||||
|
|
Ładowanie…
Reference in New Issue