kopia lustrzana https://github.com/Yakifo/amqtt
document the various data received for each event type, update methods instead of using just the generic *args and **kwargs
rodzic
9734c73126
commit
4984927ea5
|
@ -1,5 +1,7 @@
|
||||||
"""INIT."""
|
"""INIT."""
|
||||||
|
|
||||||
|
__all__ = ["MQTTPacket"]
|
||||||
|
|
||||||
from typing import Any, TypeAlias
|
from typing import Any, TypeAlias
|
||||||
|
|
||||||
from amqtt.errors import AMQTTError
|
from amqtt.errors import AMQTTError
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
from collections.abc import Callable, Coroutine
|
from collections.abc import Callable, Coroutine
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import Any, TypeAlias
|
||||||
|
|
||||||
|
from amqtt.mqtt import MQTTPacket
|
||||||
|
from amqtt.mqtt.packet import MQTTFixedHeader, MQTTPayload, MQTTVariableHeader
|
||||||
from amqtt.plugins.base import BasePlugin
|
from amqtt.plugins.base import BasePlugin
|
||||||
from amqtt.plugins.manager import BaseContext
|
from amqtt.plugins.manager import BaseContext
|
||||||
|
from amqtt.session import Session
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
|
||||||
from amqtt.session import Session
|
|
||||||
|
|
||||||
|
|
||||||
class EventLoggerPlugin(BasePlugin[BaseContext]):
|
class EventLoggerPlugin(BasePlugin[BaseContext]):
|
||||||
"""A plugin to log events dynamically based on method names."""
|
"""A plugin to log events dynamically based on method names."""
|
||||||
|
@ -29,20 +30,16 @@ class EventLoggerPlugin(BasePlugin[BaseContext]):
|
||||||
class PacketLoggerPlugin(BasePlugin[BaseContext]):
|
class PacketLoggerPlugin(BasePlugin[BaseContext]):
|
||||||
"""A plugin to log MQTT packets sent and received."""
|
"""A plugin to log MQTT packets sent and received."""
|
||||||
|
|
||||||
async def on_mqtt_packet_received(self, *args: Any, **kwargs: Any) -> None:
|
async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None:
|
||||||
"""Log an MQTT packet when it is received."""
|
"""Log an MQTT packet when it is received."""
|
||||||
packet = kwargs.get("packet")
|
|
||||||
session: Session | None = kwargs.get("session")
|
|
||||||
if self.context.logger.isEnabledFor(logging.DEBUG):
|
if self.context.logger.isEnabledFor(logging.DEBUG):
|
||||||
if session is not None:
|
if session is not None:
|
||||||
self.context.logger.debug(f"{session.client_id} <-in-- {packet!r}")
|
self.context.logger.debug(f"{session.client_id} <-in-- {packet!r}")
|
||||||
else:
|
else:
|
||||||
self.context.logger.debug(f"<-in-- {packet!r}")
|
self.context.logger.debug(f"<-in-- {packet!r}")
|
||||||
|
|
||||||
async def on_mqtt_packet_sent(self, *args: Any, **kwargs: Any) -> None:
|
async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None:
|
||||||
"""Log an MQTT packet when it is sent."""
|
"""Log an MQTT packet when it is sent."""
|
||||||
packet = kwargs.get("packet")
|
|
||||||
session: Session | None = kwargs.get("session")
|
|
||||||
if self.context.logger.isEnabledFor(logging.DEBUG):
|
if self.context.logger.isEnabledFor(logging.DEBUG):
|
||||||
if session is not None:
|
if session is not None:
|
||||||
self.context.logger.debug(f"{session.client_id} -out-> {packet!r}")
|
self.context.logger.debug(f"{session.client_id} -out-> {packet!r}")
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections import deque # pylint: disable=C0412
|
from collections import deque # pylint: disable=C0412
|
||||||
from typing import SupportsIndex, SupportsInt # pylint: disable=C0412
|
from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
|
||||||
|
|
||||||
from amqtt.plugins.base import BasePlugin
|
from amqtt.plugins.base import BasePlugin
|
||||||
|
from amqtt.session import Session
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from collections.abc import Buffer
|
from collections.abc import Buffer
|
||||||
|
@ -26,8 +27,7 @@ except ImportError:
|
||||||
import amqtt
|
import amqtt
|
||||||
from amqtt.broker import BrokerContext
|
from amqtt.broker import BrokerContext
|
||||||
from amqtt.codecs_amqtt import int_to_bytes_str
|
from amqtt.codecs_amqtt import int_to_bytes_str
|
||||||
from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, PacketIdVariableHeader
|
from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader
|
||||||
from amqtt.mqtt.subscribe import SubscribePayload
|
|
||||||
|
|
||||||
DOLLAR_SYS_ROOT = "$SYS/broker/"
|
DOLLAR_SYS_ROOT = "$SYS/broker/"
|
||||||
STAT_BYTES_SENT = "bytes_sent"
|
STAT_BYTES_SENT = "bytes_sent"
|
||||||
|
@ -42,6 +42,9 @@ STAT_CLIENTS_CONNECTED = "clients_connected"
|
||||||
STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
|
STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
|
||||||
|
|
||||||
|
|
||||||
|
PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
|
||||||
|
|
||||||
|
|
||||||
class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
def __init__(self, context: BrokerContext) -> None:
|
def __init__(self, context: BrokerContext) -> None:
|
||||||
super().__init__(context)
|
super().__init__(context)
|
||||||
|
@ -75,11 +78,11 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
loop=self.context.loop,
|
loop=self.context.loop,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def on_broker_pre_start(self, *args: None, **kwargs: None) -> None:
|
async def on_broker_pre_start(self) -> None:
|
||||||
"""Clear statistics before broker start."""
|
"""Clear statistics before broker start."""
|
||||||
self._clear_stats()
|
self._clear_stats()
|
||||||
|
|
||||||
async def on_broker_post_start(self, *args: None, **kwargs: None) -> None:
|
async def on_broker_post_start(self) -> None:
|
||||||
"""Initialize statistics and start $SYS broadcasting."""
|
"""Initialize statistics and start $SYS broadcasting."""
|
||||||
self._stats[STAT_START_TIME] = int(datetime.now(tz=UTC).timestamp())
|
self._stats[STAT_START_TIME] = int(datetime.now(tz=UTC).timestamp())
|
||||||
version = f"aMQTT version {amqtt.__version__}"
|
version = f"aMQTT version {amqtt.__version__}"
|
||||||
|
@ -104,7 +107,7 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
pass
|
pass
|
||||||
# 'sys_interval' config parameter not found
|
# 'sys_interval' config parameter not found
|
||||||
|
|
||||||
async def on_broker_pre_shutdown(self, *args: None, **kwargs: None) -> None:
|
async def on_broker_pre_shutdown(self) -> None:
|
||||||
"""Stop $SYS topics broadcasting."""
|
"""Stop $SYS topics broadcasting."""
|
||||||
if self._sys_handle:
|
if self._sys_handle:
|
||||||
self._sys_handle.cancel()
|
self._sys_handle.cancel()
|
||||||
|
@ -170,13 +173,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
async def on_mqtt_packet_received(
|
async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None:
|
||||||
self,
|
|
||||||
*args: None,
|
|
||||||
**kwargs: MQTTPacket[PacketIdVariableHeader, SubscribePayload, MQTTFixedHeader],
|
|
||||||
) -> None:
|
|
||||||
"""Handle incoming MQTT packets."""
|
"""Handle incoming MQTT packets."""
|
||||||
packet = kwargs.get("packet")
|
|
||||||
if packet:
|
if packet:
|
||||||
packet_size = packet.bytes_length
|
packet_size = packet.bytes_length
|
||||||
self._stats[STAT_BYTES_RECEIVED] += packet_size
|
self._stats[STAT_BYTES_RECEIVED] += packet_size
|
||||||
|
@ -184,13 +182,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
if packet.fixed_header.packet_type == PUBLISH:
|
if packet.fixed_header.packet_type == PUBLISH:
|
||||||
self._stats[STAT_PUBLISH_RECEIVED] += 1
|
self._stats[STAT_PUBLISH_RECEIVED] += 1
|
||||||
|
|
||||||
async def on_mqtt_packet_sent(
|
async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None:
|
||||||
self,
|
|
||||||
*args: None,
|
|
||||||
**kwargs: MQTTPacket[PacketIdVariableHeader, SubscribePayload, MQTTFixedHeader],
|
|
||||||
) -> None:
|
|
||||||
"""Handle sent MQTT packets."""
|
"""Handle sent MQTT packets."""
|
||||||
packet = kwargs.get("packet")
|
|
||||||
if packet:
|
if packet:
|
||||||
packet_size = packet.bytes_length
|
packet_size = packet.bytes_length
|
||||||
self._stats[STAT_BYTES_SENT] += packet_size
|
self._stats[STAT_BYTES_SENT] += packet_size
|
||||||
|
@ -198,7 +191,7 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
if packet.fixed_header.packet_type == PUBLISH:
|
if packet.fixed_header.packet_type == PUBLISH:
|
||||||
self._stats[STAT_PUBLISH_SENT] += 1
|
self._stats[STAT_PUBLISH_SENT] += 1
|
||||||
|
|
||||||
async def on_broker_client_connected(self, *args: None, **kwargs: None) -> None:
|
async def on_broker_client_connected(self, client_id: str) -> None:
|
||||||
"""Handle broker client connection."""
|
"""Handle broker client connection."""
|
||||||
self._stats[STAT_CLIENTS_CONNECTED] += 1
|
self._stats[STAT_CLIENTS_CONNECTED] += 1
|
||||||
self._stats[STAT_CLIENTS_MAXIMUM] = max(
|
self._stats[STAT_CLIENTS_MAXIMUM] = max(
|
||||||
|
@ -206,7 +199,7 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||||
self._stats[STAT_CLIENTS_CONNECTED],
|
self._stats[STAT_CLIENTS_CONNECTED],
|
||||||
)
|
)
|
||||||
|
|
||||||
async def on_broker_client_disconnected(self, *args: None, **kwargs: None) -> None:
|
async def on_broker_client_disconnected(self, client_id: str) -> None:
|
||||||
"""Handle broker client disconnection."""
|
"""Handle broker client disconnection."""
|
||||||
self._stats[STAT_CLIENTS_CONNECTED] -= 1
|
self._stats[STAT_CLIENTS_CONNECTED] -= 1
|
||||||
self._stats[STAT_CLIENTS_DISCONNECTED] += 1
|
self._stats[STAT_CLIENTS_DISCONNECTED] += 1
|
||||||
|
|
|
@ -28,17 +28,21 @@ its own variables to configure its behavior.
|
||||||
Plugins that are defined in the`project.entry-points` are notified of events if the subclass
|
Plugins that are defined in the`project.entry-points` are notified of events if the subclass
|
||||||
implements one or more of these methods:
|
implements one or more of these methods:
|
||||||
|
|
||||||
- `on_mqtt_packet_sent`
|
- `async def on_mqtt_packet_sent(self, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None`
|
||||||
- `on_mqtt_packet_received`
|
- `async def on_mqtt_packet_received(self, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None`
|
||||||
- `on_broker_pre_start`
|
|
||||||
- `on_broker_post_start`
|
- `async def on_broker_pre_start() -> None`
|
||||||
- `on_broker_pre_shutdown`
|
- `async def on_broker_post_start() -> None`
|
||||||
- `on_broker_post_shutdown`
|
- `async def on_broker_pre_shutdown() -> None`
|
||||||
- `on_broker_client_connected`
|
- `async def on_broker_post_shutdown() -> None`
|
||||||
- `on_broker_client_disconnected`
|
|
||||||
- `on_broker_client_subscribed`
|
- `async def on_broker_client_connected(self, client_id:str) -> None`
|
||||||
- `on_broker_client_unsubscribed`
|
- `async def on_broker_client_disconnected(self, client_id:str) -> None`
|
||||||
- `on_broker_message_received`
|
|
||||||
|
- `async def on_broker_client_subscribed(self, client_id: str, topic: str, qos: int) -> None`
|
||||||
|
- `async def on_broker_client_unsubscribed(self, client_id: str, topic: str) -> None`
|
||||||
|
|
||||||
|
- `async def on_broker_message_received(self, client_id: str, message: ApplicationMessage) -> None`
|
||||||
|
|
||||||
|
|
||||||
## Authentication Plugins
|
## Authentication Plugins
|
||||||
|
|
Ładowanie…
Reference in New Issue