diff --git a/amqtt/broker.py b/amqtt/broker.py index 1a2916b..408e4d9 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -124,7 +124,7 @@ class Broker: """MQTT 3.1.1 compliant broker implementation. Args: - config: dictionary of configuration options (see [broker configuration](broker_config.md)). + config: `BrokerConfig` or dictionary of equivalent structure options (see [broker configuration](broker_config.md)). loop: asyncio loop. defaults to `asyncio.new_event_loop()`. plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`. @@ -724,7 +724,8 @@ class Broker: """Stop a running handler and detach if from the session.""" try: await handler.stop() - except Exception: + # a failure in stopping a handler shouldn't cause the broker to fail + except asyncio.QueueEmpty: self.logger.exception("Failed to stop handler") async def _authenticate(self, session: Session, _: ListenerConfig) -> bool: @@ -874,7 +875,8 @@ class Broker: task.result() except CancelledError: self.logger.info(f"Task has been cancelled: {task}") - except Exception: + # if a task fails, don't want it to cause the broker to fail + except Exception: # pylint: disable=W0718 self.logger.exception(f"Task failed and will be skipped: {task}") run_broadcast_task = asyncio.ensure_future(self._run_broadcast(running_tasks)) diff --git a/amqtt/client.py b/amqtt/client.py index 26b565d..0264ee5 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -75,18 +75,15 @@ def mqtt_connected(func: _F) -> _F: class MQTTClient: - """MQTT client implementation. - - MQTTClient instances provides API for connecting to a broker and send/receive - messages using the MQTT protocol. + """MQTT client implementation, providing an API for connecting to a broker and send/receive messages using the MQTT protocol. Args: client_id: MQTT client ID to use when connecting to the broker. If none, it will be generated randomly by `amqtt.utils.gen_client_id` - config: dictionary of configuration options (see [client configuration](client_config.md)). + config: `ClientConfig` or dictionary of equivalent structure options (see [client configuration](client_config.md)). Raises: - PluginImportError: if importing a plugin from configuration + PluginImportError: if importing a plugin from configuration fails PluginInitError: if initialization plugin fails """ @@ -159,7 +156,8 @@ class MQTTClient: except asyncio.CancelledError as e: msg = "Future or Task was cancelled" raise ConnectError(msg) from e - except Exception as e: + # no matter the failure mode, still try to reconnect + except Exception as e: # pylint: disable=W0718 self.logger.warning(f"Connection failed: {e!r}") if not self.config.get("auto_reconnect", False): raise @@ -233,7 +231,8 @@ class MQTTClient: except asyncio.CancelledError as e: msg = "Future or Task was cancelled" raise ConnectError(msg) from e - except Exception as e: + # no matter the failure mode, still try to reconnect + except Exception as e: # pylint: disable=W0718 self.logger.warning(f"Reconnection attempt failed: {e!r}") self.logger.debug("", exc_info=True) if 0 <= reconnect_retries < nb_attempt: diff --git a/amqtt/contexts.py b/amqtt/contexts.py index 1b37153..afd5dc1 100644 --- a/amqtt/contexts.py +++ b/amqtt/contexts.py @@ -154,9 +154,9 @@ class BrokerConfig(Dictable): listeners: dict[Literal["default"] | str, ListenerConfig] = field(default_factory=default_listeners) # noqa: PYI051 """Network of listeners used by the services. a 'default' named listener is required; if another listener does not set a value, the 'default' settings are applied. See - [ListenerConfig](#amqtt.contexts.ListenerConfig) for more information.""" + [`ListenerConfig`](broker_config.md#amqtt.contexts.ListenerConfig) for more information.""" sys_interval: int | None = None - """*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](#sys-topics) + """*Deprecated field to configure the `BrokerSysPlugin`. See [`BrokerSysPlugin`](../plugins/packaged_plugins.md#sys-topics) for recommended configuration.*""" timeout_disconnect_delay: int | None = 0 """Client disconnect timeout without a keep-alive.""" @@ -164,17 +164,17 @@ class BrokerConfig(Dictable): """Seconds for an inactive session to be retained.""" auth: dict[str, Any] | None = None """*Deprecated field used to config EntryPoint-loaded plugins. See - [`AnonymousAuthPlugin`](#anonymous-auth-plugin) and - [`FileAuthPlugin`](#password-file-auth-plugin) for recommended configuration.*""" + [`AnonymousAuthPlugin`](../plugins/packaged_plugins.md#anonymous-auth-plugin) and + [`FileAuthPlugin`](../plugins/packaged_plugins.md#password-file-auth-plugin) for recommended configuration.*""" topic_check: dict[str, Any] | None = None """*Deprecated field used to config EntryPoint-loaded plugins. See - [`TopicTabooPlugin`](#taboo-topic-plugin) and - [`TopicACLPlugin`](#acl-topic-plugin) for recommended configuration method.*""" + [`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and + [`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*""" plugins: dict[str, Any] | list[str | dict[str,Any]] | None = field(default_factory=default_broker_plugins) """The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin` or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See - [Plugins](http://localhost:8000/custom_plugins/) for more information. `list[str | dict[str,Any]]` is not - recommended but available to support legacy use cases.""" + [custom plugins](../plugins/custom_plugins.md) for more information. `list[str | dict[str,Any]]` is deprecated but available + to support legacy use cases.""" def __post_init__(self) -> None: """Check config for errors and transform fields for easier use.""" @@ -327,16 +327,16 @@ class ClientConfig(Dictable): """Specify the topics and what flags should be set for messages published to them.""" broker: ConnectionConfig | None = field(default_factory=ConnectionConfig) """Configuration for connecting to the broker. See - [ConnectionConfig](#amqtt.contexts.ConnectionConfig) for more information.""" + [`ConnectionConfig`](client_config.md#amqtt.contexts.ConnectionConfig) for more information.""" plugins: dict[str, Any] | list[dict[str, Any]] | None = field(default_factory=default_client_plugins) """The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`; the value is - a dictionary of configuration options for that plugin. See [Plugins](http://localhost:8000/custom_plugins/) - for more information.""" + a dictionary of configuration options for that plugin. See [custom plugins](../plugins/custom_plugins.md) for + more information. `list[str | dict[str,Any]]` is deprecated but available to support legacy use cases.""" check_hostname: bool | None = True """If establishing a secure connection, should the hostname of the certificate be verified.""" will: WillConfig | None = None """Message, topic and flags that should be sent to if the client disconnects. See - [WillConfig](#amqtt.contexts.WillConfig)""" + [`WillConfig`](client_config.md#amqtt.contexts.WillConfig) for more information.""" def __post__init__(self) -> None: """Check config for errors and transform fields for easier use.""" diff --git a/amqtt/contrib/auth_db/plugin.py b/amqtt/contrib/auth_db/plugin.py index d4fbe6b..51b665d 100644 --- a/amqtt/contrib/auth_db/plugin.py +++ b/amqtt/contrib/auth_db/plugin.py @@ -20,7 +20,7 @@ def default_hash_scheme() -> list[str]: return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"] -class UserAuthDBPlugin(BaseAuthPlugin, BaseTopicPlugin): +class UserAuthDBPlugin(BaseAuthPlugin): def __init__(self, context: BrokerContext) -> None: super().__init__(context) diff --git a/amqtt/contrib/http.py b/amqtt/contrib/http.py index 2883401..22d03ff 100644 --- a/amqtt/contrib/http.py +++ b/amqtt/contrib/http.py @@ -15,7 +15,7 @@ from aiohttp import ClientResponse, ClientSession, FormData from amqtt.broker import BrokerContext from amqtt.contexts import Action -from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin +from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin from amqtt.session import Session logger = logging.getLogger(__name__) @@ -48,7 +48,31 @@ HTTP_4xx_MIN = 400 HTTP_4xx_MAX = 499 -class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin): +@dataclass +class HttpConfig: + """Configuration for the HTTP Auth & ACL Plugin.""" + + host: str + """hostname of the server for the auth & acl check""" + port: int + """port of the server for the auth & acl check""" + request_method: RequestMethod = RequestMethod.GET + """send the request as a GET, POST or PUT""" + params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details + """send the request with `JSON` or `FORM` data. *additional details below*""" + response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details + """expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*""" + with_tls: bool = False + """http or https""" + user_agent: str = "amqtt" + """the 'User-Agent' header sent along with the request""" + superuser_uri: str | None = None + """URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported""" + timeout: int = 5 + """duration, in seconds, to wait for the HTTP server to respond""" + + +class AuthHttpPlugin(BasePlugin[BrokerContext]): def __init__(self, context: BrokerContext) -> None: super().__init__(context) @@ -113,10 +137,23 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin): def get_url(self, uri: str) -> str: return f"{'https' if self.config.with_tls else 'http'}://{self.config.host}:{self.config.port}{uri}" + +class UserAuthHttpPlugin(AuthHttpPlugin, BaseAuthPlugin): + async def authenticate(self, *, session: Session) -> bool | None: d = {"username": session.username, "password": session.password, "client_id": session.client_id} return await self._send_request(self.get_url(self.config.user_uri), d) + @dataclass + class Config(HttpConfig): + """Configuration for the HTTP Auth Plugin.""" + + user_uri: str = "/user" + """URI of the auth check.""" + + +class TopicAuthHttpPlugin(AuthHttpPlugin, BaseTopicPlugin): + async def topic_filtering(self, *, session: Session | None = None, topic: str | None = None, @@ -136,28 +173,8 @@ class HttpAuthPlugin(BaseAuthPlugin, BaseTopicPlugin): return await self._send_request(self.get_url(self.config.topic_uri), d) @dataclass - class Config: - """Configuration for the HTTP Auth & ACL Plugin.""" + class Config(HttpConfig): + """Configuration for the HTTP Topic Plugin.""" - host: str - """hostname of the server for the auth & acl check""" - port: int - """port of the server for the auth & acl check""" - user_uri: str - """URI of the topic check (e.g. '/user')""" - topic_uri: str - """URI of the topic check (e.g. '/acl')""" - request_method: RequestMethod = RequestMethod.GET - """send the request as a GET, POST or PUT""" - params_mode: ParamsMode = ParamsMode.JSON # see docs/plugins/http.md for additional details - """send the request with `JSON` or `FORM` data. *additional details below*""" - response_mode: ResponseMode = ResponseMode.JSON # see docs/plugins/http.md for additional details - """expected response from the auth/acl server. `STATUS` (code), `JSON`, or `TEXT`. *additional details below*""" - with_tls: bool = False - """http or https""" - user_agent: str = "amqtt" - """the 'User-Agent' header sent along with the request""" - superuser_uri: str | None = None - """URI to verify if the user is a superuser (e.g. '/superuser'), `None` if superuser is not supported""" - timeout: int = 5 - """duration, in seconds, to wait for the HTTP server to respond""" + topic_uri: str = "/acl" + """URI of the topic check.""" diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index 66bc1cb..0b8fdfe 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -535,7 +535,7 @@ class ProtocolHandler(Generic[C]): self.handle_read_timeout() except NoDataError: self.logger.debug(f"{self.session.client_id} No data available") - except Exception as e: # noqa: BLE001 + except Exception as e: # noqa: BLE001, pylint: disable=W0718 self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}") break while running_tasks: diff --git a/amqtt/plugins/authentication.py b/amqtt/plugins/authentication.py index f3934d9..8a9f291 100644 --- a/amqtt/plugins/authentication.py +++ b/amqtt/plugins/authentication.py @@ -37,9 +37,10 @@ class AnonymousAuthPlugin(BaseAuthPlugin): @dataclass class Config: - """Allow empty username.""" + """Configuration for AnonymousAuthPlugin.""" allow_anonymous: bool = field(default=True) + """Allow all anonymous authentication (even with _no_ username).""" class FileAuthPlugin(BaseAuthPlugin): @@ -78,7 +79,7 @@ class FileAuthPlugin(BaseAuthPlugin): self.context.logger.warning(f"Password file '{password_file}' not found") except ValueError: self.context.logger.exception(f"Malformed password file '{password_file}'") - except Exception: + except OSError: self.context.logger.exception(f"Unexpected error reading password file '{password_file}'") async def authenticate(self, *, session: Session) -> bool | None: @@ -107,6 +108,7 @@ class FileAuthPlugin(BaseAuthPlugin): @dataclass class Config: - """Path to the properly encoded password file.""" + """Configuration for FileAuthPlugin.""" password_file: str | Path | None = None + """Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512.""" diff --git a/amqtt/scripts/pub_script.py b/amqtt/scripts/pub_script.py index 41e8f05..b6801c4 100644 --- a/amqtt/scripts/pub_script.py +++ b/amqtt/scripts/pub_script.py @@ -52,7 +52,7 @@ class MessageInput: with Path(self.file).open(encoding="utf-8") as f: for line in f: yield line.encode(encoding="utf-8") - except Exception: + except (FileNotFoundError, OSError): logger.exception(f"Failed to read file '{self.file}'") if self.lines: for line in sys.stdin: diff --git a/docs/plugins/auth_db.md b/docs/plugins/auth_db.md index ac996e3..b4620c1 100644 --- a/docs/plugins/auth_db.md +++ b/docs/plugins/auth_db.md @@ -1,7 +1,7 @@ # Relational Database for Authentication and Authorization -- `amqtt.contrib.auth_db.AuthUserDBPlugin` (authentication) verify a client's ability to connect to broker -- `amqtt.contrib.auth_db.AuthTopicDBPlugin` (authorization) determine a client's access to topics +- `amqtt.contrib.auth_db.UserAuthDBPlugin` (authentication) verify a client's ability to connect to broker +- `amqtt.contrib.auth_db.TopicAuthDBPlugin` (authorization) determine a client's access to topics Relational database access is supported using SQLAlchemy so MySQL, MariaDB, Postgres and SQLite support is available. diff --git a/docs/plugins/http.md b/docs/plugins/http.md index 382691c..9ef639c 100644 --- a/docs/plugins/http.md +++ b/docs/plugins/http.md @@ -1,9 +1,16 @@ # Authentication & Authorization via external HTTP server -`amqtt.contrib.http.HttpAuthPlugin` - If clients accessing the broker are managed by another application, it can implement API endpoints -that respond with information about client authenticated and topic-level authorization. +that respond with information about client authentication and/or topic-level authorization. + +- `amqtt.contrib.http.UserAuthHttpPlugin` (client authentication) +- `amqtt.contrib.http.TopicAuthHttpPlugin` (topic authorization) + +Configuration of these plugins is identical (except for the uri name) so that they can be used independently, if desired. + +## User Auth + +See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`. !!! info "browser-based mqtt over websockets" @@ -17,9 +24,9 @@ that respond with information about client authenticated and topic-level authori ??? info "recipe for authentication" Provide the client id and username when webpage is initially rendered or passed to the mqtt initialization from stored - cookies. If application is secure, the user's password will be hashed should be encrypted and cannot be used to - authenticate a client. Instead, the application should create an encrypted password (eg jwt) which the server - can then verify when the broker contacts the application. + cookies. If application is secure, the user's password will already be stored as a hashed value and, therefore, cannot + be used in this context to authenticate a client. Instead, the application should create its own encrypted key (eg jwt) + which the server can then verify when the broker contacts the application. ??? example "mqtt in javascript" Example initialization of mqtt in javascript: @@ -35,9 +42,18 @@ that respond with information about client authenticated and topic-level authori try { const clientMqtt = await mqtt.connect(url, options); -See the 'Request and Response Modes' section below for details on `params_mode` and `response_mode`. +::: amqtt.contrib.http.UserAuthHttpPlugin.Config + options: + show_source: false + heading_level: 4 + extra: + class_style: "simple" -::: amqtt.contrib.http.HttpAuthPlugin.Config +## Topic ACL + +See the [Request and Response Modes](#request-response-modes) section below for details on `params_mode` and `response_mode`. + +::: amqtt.contrib.http.TopicAuthHttpPlugin.Config options: show_source: false heading_level: 4 @@ -45,7 +61,8 @@ See the 'Request and Response Modes' section below for details on `params_mode` class_style: "simple" [//]: # (manually creating the heading so it doesn't show in the sidebar ToC) -