kopia lustrzana https://github.com/Yakifo/amqtt
creating MQTTEvents enumeration instead of individual string names. consolidating into a single file to eliminate import errors
rodzic
b0cf416c36
commit
99a9594e9f
|
@ -1,26 +1,3 @@
|
||||||
"""INIT."""
|
"""INIT."""
|
||||||
|
|
||||||
__version__ = "0.11.0"
|
__version__ = "0.11.0"
|
||||||
|
|
||||||
from enum import StrEnum
|
|
||||||
|
|
||||||
|
|
||||||
class Events(StrEnum):
|
|
||||||
"""Class for all events."""
|
|
||||||
|
|
||||||
class ClientEvents(Events):
|
|
||||||
"""Events issued by the client."""
|
|
||||||
|
|
||||||
|
|
||||||
class BrokerEvents(Events):
|
|
||||||
"""Events issued by the broker."""
|
|
||||||
|
|
||||||
PRE_START = "broker_pre_start"
|
|
||||||
POST_START = "broker_post_start"
|
|
||||||
PRE_SHUTDOWN = "broker_pre_shutdown"
|
|
||||||
POST_SHUTDOWN = "broker_post_shutdown"
|
|
||||||
CLIENT_CONNECTED = "broker_client_connected"
|
|
||||||
CLIENT_DISCONNECTED = "broker_client_disconnected"
|
|
||||||
CLIENT_SUBSCRIBED = "broker_client_subscribed"
|
|
||||||
CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
|
|
||||||
MESSAGE_RECEIVED = "broker_message_received"
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
||||||
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
|
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
|
||||||
from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
|
from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
|
||||||
|
|
||||||
from . import BrokerEvents
|
from .events import BrokerEvents
|
||||||
from .mqtt.disconnect import DisconnectPacket
|
from .mqtt.disconnect import DisconnectPacket
|
||||||
from .plugins.manager import BaseContext, PluginManager
|
from .plugins.manager import BaseContext, PluginManager
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
|
|
||||||
|
class Events(StrEnum):
|
||||||
|
"""Class for all events."""
|
||||||
|
|
||||||
|
|
||||||
|
class ClientEvents(Events):
|
||||||
|
"""Events issued by the client."""
|
||||||
|
|
||||||
|
|
||||||
|
class MQTTEvents(Events):
|
||||||
|
PACKET_SENT = "mqtt_packet_sent"
|
||||||
|
PACKET_RECEIVED = "mqtt_packet_received"
|
||||||
|
|
||||||
|
|
||||||
|
class BrokerEvents(Events):
|
||||||
|
"""Events issued by the broker."""
|
||||||
|
|
||||||
|
PRE_START = "broker_pre_start"
|
||||||
|
POST_START = "broker_post_start"
|
||||||
|
PRE_SHUTDOWN = "broker_pre_shutdown"
|
||||||
|
POST_SHUTDOWN = "broker_post_shutdown"
|
||||||
|
CLIENT_CONNECTED = "broker_client_connected"
|
||||||
|
CLIENT_DISCONNECTED = "broker_client_disconnected"
|
||||||
|
CLIENT_SUBSCRIBED = "broker_client_subscribed"
|
||||||
|
CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
|
||||||
|
MESSAGE_RECEIVED = "broker_message_received"
|
|
@ -4,6 +4,7 @@ from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
||||||
from amqtt.errors import MQTTError
|
from amqtt.errors import MQTTError
|
||||||
|
from amqtt.events import MQTTEvents
|
||||||
from amqtt.mqtt.connack import (
|
from amqtt.mqtt.connack import (
|
||||||
BAD_USERNAME_PASSWORD,
|
BAD_USERNAME_PASSWORD,
|
||||||
CONNECTION_ACCEPTED,
|
CONNECTION_ACCEPTED,
|
||||||
|
@ -25,8 +26,6 @@ from amqtt.plugins.manager import PluginManager
|
||||||
from amqtt.session import Session
|
from amqtt.session import Session
|
||||||
from amqtt.utils import format_client_message
|
from amqtt.utils import format_client_message
|
||||||
|
|
||||||
from .handler import EVENT_MQTT_PACKET_RECEIVED, EVENT_MQTT_PACKET_SENT
|
|
||||||
|
|
||||||
_MQTT_PROTOCOL_LEVEL_SUPPORTED = 4
|
_MQTT_PROTOCOL_LEVEL_SUPPORTED = 4
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -164,7 +163,7 @@ class BrokerProtocolHandler(ProtocolHandler["BrokerContext"]):
|
||||||
) -> tuple["BrokerProtocolHandler", Session]:
|
) -> tuple["BrokerProtocolHandler", Session]:
|
||||||
"""Initialize from a CONNECT packet and validates the connection."""
|
"""Initialize from a CONNECT packet and validates the connection."""
|
||||||
connect = await ConnectPacket.from_stream(reader)
|
connect = await ConnectPacket.from_stream(reader)
|
||||||
await plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connect)
|
await plugins_manager.fire_event(MQTTEvents.PACKET_RECEIVED, packet=connect)
|
||||||
|
|
||||||
if connect.variable_header is None:
|
if connect.variable_header is None:
|
||||||
msg = "CONNECT packet: variable header not initialized."
|
msg = "CONNECT packet: variable header not initialized."
|
||||||
|
@ -219,7 +218,7 @@ class BrokerProtocolHandler(ProtocolHandler["BrokerContext"]):
|
||||||
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
||||||
|
|
||||||
if connack is not None:
|
if connack is not None:
|
||||||
await plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack)
|
await plugins_manager.fire_event(MQTTEvents.PACKET_SENT, packet=connack)
|
||||||
await connack.to_stream(writer)
|
await connack.to_stream(writer)
|
||||||
await writer.close()
|
await writer.close()
|
||||||
raise MQTTError(error_msg) from None
|
raise MQTTError(error_msg) from None
|
||||||
|
|
|
@ -2,12 +2,13 @@ import asyncio
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from amqtt.errors import AMQTTError, NoDataError
|
from amqtt.errors import AMQTTError, NoDataError
|
||||||
|
from amqtt.events import MQTTEvents
|
||||||
from amqtt.mqtt.connack import ConnackPacket
|
from amqtt.mqtt.connack import ConnackPacket
|
||||||
from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader
|
from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader
|
||||||
from amqtt.mqtt.disconnect import DisconnectPacket
|
from amqtt.mqtt.disconnect import DisconnectPacket
|
||||||
from amqtt.mqtt.pingreq import PingReqPacket
|
from amqtt.mqtt.pingreq import PingReqPacket
|
||||||
from amqtt.mqtt.pingresp import PingRespPacket
|
from amqtt.mqtt.pingresp import PingRespPacket
|
||||||
from amqtt.mqtt.protocol.handler import EVENT_MQTT_PACKET_RECEIVED, ProtocolHandler
|
from amqtt.mqtt.protocol.handler import ProtocolHandler
|
||||||
from amqtt.mqtt.suback import SubackPacket
|
from amqtt.mqtt.suback import SubackPacket
|
||||||
from amqtt.mqtt.subscribe import SubscribePacket
|
from amqtt.mqtt.subscribe import SubscribePacket
|
||||||
from amqtt.mqtt.unsuback import UnsubackPacket
|
from amqtt.mqtt.unsuback import UnsubackPacket
|
||||||
|
@ -93,7 +94,7 @@ class ClientProtocolHandler(ProtocolHandler["ClientContext"]):
|
||||||
connack = await ConnackPacket.from_stream(self.reader)
|
connack = await ConnackPacket.from_stream(self.reader)
|
||||||
except NoDataError as e:
|
except NoDataError as e:
|
||||||
raise ConnectionError from e
|
raise ConnectionError from e
|
||||||
await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack, session=self.session)
|
await self.plugins_manager.fire_event(MQTTEvents.PACKET_RECEIVED, packet=connack, session=self.session)
|
||||||
return connack.return_code
|
return connack.return_code
|
||||||
|
|
||||||
def handle_write_timeout(self) -> None:
|
def handle_write_timeout(self) -> None:
|
||||||
|
|
|
@ -21,6 +21,7 @@ from typing import Generic, TypeVar, cast
|
||||||
|
|
||||||
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
||||||
from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError
|
from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError
|
||||||
|
from amqtt.events import MQTTEvents
|
||||||
from amqtt.mqtt import packet_class
|
from amqtt.mqtt import packet_class
|
||||||
from amqtt.mqtt.connack import ConnackPacket
|
from amqtt.mqtt.connack import ConnackPacket
|
||||||
from amqtt.mqtt.connect import ConnectPacket
|
from amqtt.mqtt.connect import ConnectPacket
|
||||||
|
@ -59,9 +60,6 @@ from amqtt.mqtt.unsubscribe import UnsubscribePacket
|
||||||
from amqtt.plugins.manager import BaseContext, PluginManager
|
from amqtt.plugins.manager import BaseContext, PluginManager
|
||||||
from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session
|
from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session
|
||||||
|
|
||||||
EVENT_MQTT_PACKET_SENT = "mqtt_packet_sent"
|
|
||||||
EVENT_MQTT_PACKET_RECEIVED = "mqtt_packet_received"
|
|
||||||
|
|
||||||
C = TypeVar("C", bound=BaseContext)
|
C = TypeVar("C", bound=BaseContext)
|
||||||
|
|
||||||
class ProtocolHandler(Generic[C]):
|
class ProtocolHandler(Generic[C]):
|
||||||
|
@ -473,7 +471,7 @@ class ProtocolHandler(Generic[C]):
|
||||||
|
|
||||||
cls = packet_class(fixed_header)
|
cls = packet_class(fixed_header)
|
||||||
packet = await cls.from_stream(self.reader, fixed_header=fixed_header)
|
packet = await cls.from_stream(self.reader, fixed_header=fixed_header)
|
||||||
await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session)
|
await self.plugins_manager.fire_event(MQTTEvents.PACKET_RECEIVED, packet=packet, session=self.session)
|
||||||
if packet.fixed_header is None or packet.fixed_header.packet_type not in (
|
if packet.fixed_header is None or packet.fixed_header.packet_type not in (
|
||||||
CONNACK,
|
CONNACK,
|
||||||
SUBSCRIBE,
|
SUBSCRIBE,
|
||||||
|
@ -570,7 +568,7 @@ class ProtocolHandler(Generic[C]):
|
||||||
self._keepalive_task.cancel()
|
self._keepalive_task.cancel()
|
||||||
if self.keepalive_timeout is not None:
|
if self.keepalive_timeout is not None:
|
||||||
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
||||||
await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session)
|
await self.plugins_manager.fire_event(MQTTEvents.PACKET_SENT, packet=packet, session=self.session)
|
||||||
except (ConnectionResetError, BrokenPipeError):
|
except (ConnectionResetError, BrokenPipeError):
|
||||||
await self.handle_connection_closed()
|
await self.handle_connection_closed()
|
||||||
except asyncio.CancelledError as e:
|
except asyncio.CancelledError as e:
|
||||||
|
|
|
@ -8,8 +8,8 @@ from importlib.metadata import EntryPoint, EntryPoints, entry_points
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Optional, TypeVar
|
from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Optional, TypeVar
|
||||||
|
|
||||||
from amqtt import Events
|
|
||||||
from amqtt.errors import PluginImportError, PluginInitError
|
from amqtt.errors import PluginImportError, PluginInitError
|
||||||
|
from amqtt.events import Events
|
||||||
from amqtt.session import Session
|
from amqtt.session import Session
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
|
@ -6,7 +6,7 @@ from unittest.mock import MagicMock, call, patch
|
||||||
import psutil
|
import psutil
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from amqtt import BrokerEvents
|
from amqtt.events import BrokerEvents
|
||||||
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
|
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
|
||||||
from amqtt.broker import Broker
|
from amqtt.broker import Broker
|
||||||
from amqtt.client import MQTTClient
|
from amqtt.client import MQTTClient
|
||||||
|
|
|
@ -6,7 +6,7 @@ from unittest.mock import MagicMock, call, patch
|
||||||
import pytest
|
import pytest
|
||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
|
|
||||||
from amqtt.broker import BrokerEvents
|
from amqtt.events import BrokerEvents
|
||||||
from amqtt.client import MQTTClient
|
from amqtt.client import MQTTClient
|
||||||
from amqtt.mqtt.constants import QOS_1, QOS_2
|
from amqtt.mqtt.constants import QOS_1, QOS_2
|
||||||
|
|
||||||
|
|
Ładowanie…
Reference in New Issue