Merge branch 'rc' into plugin_call_optimization

pull/212/head
Andrew Mirsky 2025-06-14 09:51:12 -04:00
commit b72c7d4f28
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
12 zmienionych plików z 157 dodań i 117 usunięć

Wyświetl plik

@ -7,7 +7,9 @@
jobs: jobs:
pre_install: pre_install:
- pip install --upgrade pip - pip install --upgrade pip
- pip install --group docs - pip install uv
- uv pip install --group dev --group docs
- uv run pytest
mkdocs: mkdocs:
configuration: mkdocs.rtd.yml configuration: mkdocs.rtd.yml

Wyświetl plik

@ -17,9 +17,7 @@
- Communication over TCP and/or websocket, including support for SSL/TLS - Communication over TCP and/or websocket, including support for SSL/TLS
- Support QoS 0, QoS 1 and QoS 2 messages flow - Support QoS 0, QoS 1 and QoS 2 messages flow
- Client auto-reconnection on network lost - Client auto-reconnection on network lost
- Functionality expansion; plugins included: - Functionality expansion; plugins included: authentication and `$SYS` topic publishing
- Authentication through password file
- Basic `$SYS` topics
## Installation ## Installation

Wyświetl plik

@ -41,18 +41,24 @@ _defaults = read_yaml_config(Path(__file__).parent / "scripts/default_broker.yam
DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} DEFAULT_PORTS = {"tcp": 1883, "ws": 8883}
AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80 AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80
EVENT_BROKER_PRE_START = "broker_pre_start"
EVENT_BROKER_POST_START = "broker_post_start" class EventBroker(Enum):
EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown" """Events issued by the broker."""
EVENT_BROKER_POST_SHUTDOWN = "broker_post_shutdown"
EVENT_BROKER_CLIENT_CONNECTED = "broker_client_connected" PRE_START = "broker_pre_start"
EVENT_BROKER_CLIENT_DISCONNECTED = "broker_client_disconnected" POST_START = "broker_post_start"
EVENT_BROKER_CLIENT_SUBSCRIBED = "broker_client_subscribed" PRE_SHUTDOWN = "broker_pre_shutdown"
EVENT_BROKER_CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed" POST_SHUTDOWN = "broker_post_shutdown"
EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" CLIENT_CONNECTED = "broker_client_connected"
CLIENT_DISCONNECTED = "broker_client_disconnected"
CLIENT_SUBSCRIBED = "broker_client_subscribed"
CLIENT_UNSUBSCRIBED = "broker_client_unsubscribed"
MESSAGE_RECEIVED = "broker_message_received"
class Action(Enum): class Action(Enum):
"""Actions issued by the broker."""
SUBSCRIBE = "subscribe" SUBSCRIBE = "subscribe"
PUBLISH = "publish" PUBLISH = "publish"
@ -142,7 +148,7 @@ class Broker:
Args: Args:
config: dictionary of configuration options (see [broker configuration](broker_config.md)). config: dictionary of configuration options (see [broker configuration](broker_config.md)).
loop: asyncio loop. defaults to `asyncio.get_event_loop()`. loop: asyncio loop. defaults to `asyncio.new_event_loop()`.
plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`. plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`.
""" """
@ -170,7 +176,7 @@ class Broker:
self.config.update(config) self.config.update(config)
self._build_listeners_config(self.config) self._build_listeners_config(self.config)
self._loop = loop or asyncio.get_event_loop() self._loop = loop or asyncio.new_event_loop()
self._servers: dict[str, Server] = {} self._servers: dict[str, Server] = {}
self._init_states() self._init_states()
self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {} self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {}
@ -242,11 +248,11 @@ class Broker:
msg = f"Broker instance can't be started: {exc}" msg = f"Broker instance can't be started: {exc}"
raise BrokerError(msg) from exc raise BrokerError(msg) from exc
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_START) await self.plugins_manager.fire_event(EventBroker.PRE_START.value)
try: try:
await self._start_listeners() await self._start_listeners()
self.transitions.starting_success() self.transitions.starting_success()
await self.plugins_manager.fire_event(EVENT_BROKER_POST_START) await self.plugins_manager.fire_event(EventBroker.POST_START.value)
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop()) self._broadcast_task = asyncio.ensure_future(self._broadcast_loop())
self.logger.debug("Broker started") self.logger.debug("Broker started")
except Exception as e: except Exception as e:
@ -327,7 +333,7 @@ class Broker:
"""Stop broker instance.""" """Stop broker instance."""
self.logger.info("Shutting down broker...") self.logger.info("Shutting down broker...")
# Fire broker_shutdown event to plugins # Fire broker_shutdown event to plugins
await self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN) await self.plugins_manager.fire_event(EventBroker.PRE_SHUTDOWN.value)
# Cleanup all sessions # Cleanup all sessions
for client_id in list(self._sessions.keys()): for client_id in list(self._sessions.keys()):
@ -351,7 +357,7 @@ class Broker:
self._broadcast_queue.get_nowait() self._broadcast_queue.get_nowait()
self.logger.info("Broker closed") self.logger.info("Broker closed")
await self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN) await self.plugins_manager.fire_event(EventBroker.POST_SHUTDOWN.value)
self.transitions.stopping_success() self.transitions.stopping_success()
async def _cleanup_session(self, client_id: str) -> None: async def _cleanup_session(self, client_id: str) -> None:
@ -494,7 +500,7 @@ class Broker:
self._sessions[client_session.client_id] = (client_session, handler) self._sessions[client_session.client_id] = (client_session, handler)
await handler.mqtt_connack_authorize(authenticated) await handler.mqtt_connack_authorize(authenticated)
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id) await self.plugins_manager.fire_event(EventBroker.CLIENT_CONNECTED.value, client_id=client_session.client_id)
self.logger.debug(f"{client_session.client_id} Start messages handling") self.logger.debug(f"{client_session.client_id} Start messages handling")
await handler.start() await handler.start()
@ -582,7 +588,7 @@ class Broker:
self.logger.debug(f"{client_session.client_id} Disconnecting session") self.logger.debug(f"{client_session.client_id} Disconnecting session")
await self._stop_handler(handler) await self._stop_handler(handler)
client_session.transitions.disconnect() client_session.transitions.disconnect()
await self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id) await self.plugins_manager.fire_event(EventBroker.CLIENT_DISCONNECTED.value, client_id=client_session.client_id)
return False return False
return True return True
@ -600,7 +606,7 @@ class Broker:
for index, subscription in enumerate(subscriptions.topics): for index, subscription in enumerate(subscriptions.topics):
if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED: if return_codes[index] != AMQTT_MAGIC_VALUE_RET_SUBSCRIBED:
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
topic=subscription[0], topic=subscription[0],
qos=subscription[1], qos=subscription[1],
@ -619,7 +625,7 @@ class Broker:
for topic in unsubscription.topics: for topic in unsubscription.topics:
self._del_subscription(topic, client_session) self._del_subscription(topic, client_session)
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED, EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
topic=topic, topic=topic,
) )
@ -654,7 +660,7 @@ class Broker:
self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.") self.logger.info(f"{client_session.client_id} forbidden TOPIC {app_message.topic} sent in PUBLISH message.")
else: else:
await self.plugins_manager.fire_event( await self.plugins_manager.fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=client_session.client_id, client_id=client_session.client_id,
message=app_message, message=app_message,
) )

Wyświetl plik

@ -1,7 +1,7 @@
import asyncio import asyncio
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from amqtt.errors import AMQTTError from amqtt.errors import AMQTTError, NoDataError
from amqtt.mqtt.connack import ConnackPacket from amqtt.mqtt.connack import ConnackPacket
from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader
from amqtt.mqtt.disconnect import DisconnectPacket from amqtt.mqtt.disconnect import DisconnectPacket
@ -89,8 +89,10 @@ class ClientProtocolHandler(ProtocolHandler["ClientContext"]):
if self.reader is None: if self.reader is None:
msg = "Reader is not initialized." msg = "Reader is not initialized."
raise AMQTTError(msg) raise AMQTTError(msg)
try:
connack = await ConnackPacket.from_stream(self.reader) connack = await ConnackPacket.from_stream(self.reader)
except NoDataError as e:
raise ConnectionError from e
await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack, session=self.session) await self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack, session=self.session)
return connack.return_code return connack.return_code

Wyświetl plik

@ -55,20 +55,21 @@ def broker_main(
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
broker = Broker(config) broker = Broker(config)
except (BrokerError, ParserError) as exc: except (BrokerError, ParserError) as exc:
typer.echo(f"❌ Broker failed to start: {exc}", err=True) typer.echo(f"❌ Broker failed to start: {exc}", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
_ = loop.create_task(broker.start()) #noqa : RUF006
try: try:
loop.run_until_complete(broker.start())
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
loop.run_until_complete(broker.shutdown()) loop.run_until_complete(broker.shutdown())
except Exception as exc: except Exception as exc:
typer.echo("Connection failed", err=True) typer.echo("Broker execution halted", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
finally: finally:
loop.close() loop.close()

Wyświetl plik

@ -182,8 +182,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
logger.debug(f"Using default configuration from {default_config_path}") logger.debug(f"Using default configuration from {default_config_path}")
config = read_yaml_config(default_config_path) config = read_yaml_config(default_config_path)
loop = asyncio.get_event_loop()
if not client_id: if not client_id:
client_id = _gen_client_id() client_id = _gen_client_id()
@ -217,7 +215,7 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
) )
with contextlib.suppress(KeyboardInterrupt): with contextlib.suppress(KeyboardInterrupt):
try: try:
loop.run_until_complete( asyncio.run(
do_pub( do_pub(
client=client, client=client,
message_input=message_input, message_input=message_input,
@ -234,8 +232,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
typer.echo("❌ Connection failed", err=True) typer.echo("❌ Connection failed", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
typer.run(main) typer.run(main)

Wyświetl plik

@ -147,8 +147,6 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
logger.debug(f"Using default configuration from {default_config_path}") logger.debug(f"Using default configuration from {default_config_path}")
config = read_yaml_config(default_config_path) config = read_yaml_config(default_config_path)
loop = asyncio.get_event_loop()
if not client_id: if not client_id:
client_id = _gen_client_id() client_id = _gen_client_id()
@ -175,7 +173,7 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
) )
with contextlib.suppress(KeyboardInterrupt): with contextlib.suppress(KeyboardInterrupt):
try: try:
loop.run_until_complete(do_sub(client, asyncio.run(do_sub(client,
url=url, url=url,
topics=topics, topics=topics,
ca_info=ca_info, ca_info=ca_info,
@ -184,10 +182,10 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
max_count=max_count, max_count=max_count,
clean_session=clean_session, clean_session=clean_session,
)) ))
except (ClientError, ConnectError) as exc: except (ClientError, ConnectError) as exc:
typer.echo("❌ Connection failed", err=True) typer.echo("❌ Connection failed", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop.close()
if __name__ == "__main__": if __name__ == "__main__":

Wyświetl plik

@ -1,11 +1,14 @@
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from amqtt.broker import Action from amqtt.broker import Action
from amqtt.plugins.authentication import BaseAuthPlugin
from amqtt.plugins.base import BasePlugin from amqtt.plugins.base import BasePlugin
from amqtt.plugins.manager import BaseContext from amqtt.plugins.manager import BaseContext
from amqtt.plugins.topic_checking import BaseTopicPlugin from amqtt.plugins.topic_checking import BaseTopicPlugin
from amqtt.plugins.authentication import BaseAuthPlugin
from amqtt.session import Session from amqtt.session import Session
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,10 +31,14 @@ class TestConfigPlugin(BasePlugin):
option2: str option2: str
class TestAuthPlugin(BaseAuthPlugin): class AuthPlugin(BaseAuthPlugin):
async def authenticate(self, *, session: Session) -> bool | None:
return True
class NoAuthPlugin(BaseAuthPlugin):
def __init__(self, context: BaseContext):
super().__init__(context)
async def authenticate(self, *, session: Session) -> bool | None: async def authenticate(self, *, session: Session) -> bool | None:
return False return False

Wyświetl plik

@ -13,49 +13,49 @@ from amqtt.mqtt.constants import QOS_0
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# test broker sys # test broker sys
@pytest.mark.asyncio # @pytest.mark.asyncio
async def test_broker_sys_plugin() -> None: # async def test_broker_sys_plugin() -> None:
#
class MockEntryPoints: # class MockEntryPoints:
#
def select(self, group) -> list[EntryPoint]: # def select(self, group) -> list[EntryPoint]:
match group: # match group:
case 'tests.mock_plugins': # case 'tests.mock_plugins':
return [ # return [
EntryPoint(name='BrokerSysPlugin', group='tests.mock_plugins', value='amqtt.plugins.sys.broker:BrokerSysPlugin'), # EntryPoint(name='BrokerSysPlugin', group='tests.mock_plugins', value='amqtt.plugins.sys.broker:BrokerSysPlugin'),
] # ]
case _: # case _:
return list() # return list()
#
#
with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish: # with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish:
#
config = { # config = {
"listeners": { # "listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, # "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
}, # },
'sys_interval': 1 # 'sys_interval': 1
} # }
#
broker = Broker(plugin_namespace='tests.mock_plugins', config=config) # broker = Broker(plugin_namespace='tests.mock_plugins', config=config)
await broker.start() # await broker.start()
client = MQTTClient() # client = MQTTClient()
await client.connect("mqtt://127.0.0.1:1883/") # await client.connect("mqtt://127.0.0.1:1883/")
await client.subscribe([("$SYS/broker/uptime", QOS_0),]) # await client.subscribe([("$SYS/broker/uptime", QOS_0),])
await client.publish('test/topic', b'my test message') # await client.publish('test/topic', b'my test message')
await asyncio.sleep(2) # await asyncio.sleep(2)
sys_msg_count = 0 # sys_msg_count = 0
try: # try:
while True: # while True:
message = await client.deliver_message(timeout_duration=0.5) # message = await client.deliver_message(timeout_duration=0.5)
if '$SYS' in message.topic: # if '$SYS' in message.topic:
sys_msg_count += 1 # sys_msg_count += 1
except asyncio.TimeoutError: # except asyncio.TimeoutError:
pass # pass
#
logger.warning(f">>> sys message: {message.topic} - {message.data}") # logger.warning(f">>> sys message: {message.topic} - {message.data}")
await client.disconnect() # await client.disconnect()
await broker.shutdown() # await broker.shutdown()
#
#
assert sys_msg_count > 1 # assert sys_msg_count > 1

Wyświetl plik

@ -7,18 +7,7 @@ import psutil
import pytest import pytest
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
from amqtt.broker import ( from amqtt.broker import EventBroker, Broker
EVENT_BROKER_CLIENT_CONNECTED,
EVENT_BROKER_CLIENT_DISCONNECTED,
EVENT_BROKER_CLIENT_SUBSCRIBED,
EVENT_BROKER_CLIENT_UNSUBSCRIBED,
EVENT_BROKER_MESSAGE_RECEIVED,
EVENT_BROKER_POST_SHUTDOWN,
EVENT_BROKER_POST_START,
EVENT_BROKER_PRE_SHUTDOWN,
EVENT_BROKER_PRE_START,
Broker,
)
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.errors import ConnectError from amqtt.errors import ConnectError
from amqtt.mqtt.connack import ConnackPacket from amqtt.mqtt.connack import ConnackPacket
@ -67,8 +56,8 @@ def test_split_bindaddr_port(input_str, output_addr, output_port):
async def test_start_stop(broker, mock_plugin_manager): async def test_start_stop(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event(EVENT_BROKER_PRE_START), call().fire_event(EventBroker.PRE_START.value),
call().fire_event(EVENT_BROKER_POST_START), call().fire_event(EventBroker.POST_START.value),
], ],
any_order=True, any_order=True,
) )
@ -76,8 +65,8 @@ async def test_start_stop(broker, mock_plugin_manager):
await broker.shutdown() await broker.shutdown()
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event(EVENT_BROKER_PRE_SHUTDOWN), call().fire_event(EventBroker.PRE_SHUTDOWN.value),
call().fire_event(EVENT_BROKER_POST_SHUTDOWN), call().fire_event(EventBroker.POST_SHUTDOWN.value),
], ],
any_order=True, any_order=True,
) )
@ -98,11 +87,11 @@ async def test_client_connect(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_CONNECTED, EventBroker.CLIENT_CONNECTED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
), ),
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED, EventBroker.CLIENT_DISCONNECTED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
), ),
], ],
@ -235,7 +224,7 @@ async def test_client_subscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
@ -272,7 +261,7 @@ async def test_client_subscribe_twice(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
@ -306,13 +295,13 @@ async def test_client_unsubscribe(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_SUBSCRIBED, EventBroker.CLIENT_SUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
qos=QOS_0, qos=QOS_0,
), ),
call().fire_event( call().fire_event(
EVENT_BROKER_CLIENT_UNSUBSCRIBED, EventBroker.CLIENT_UNSUBSCRIBED.value,
client_id=client.session.client_id, client_id=client.session.client_id,
topic="/topic", topic="/topic",
), ),
@ -337,7 +326,7 @@ async def test_client_publish(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id, client_id=pub_client.session.client_id,
message=ret_message, message=ret_message,
), ),
@ -509,7 +498,7 @@ async def test_client_publish_big(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls( mock_plugin_manager.assert_has_calls(
[ [
call().fire_event( call().fire_event(
EVENT_BROKER_MESSAGE_RECEIVED, EventBroker.MESSAGE_RECEIVED.value,
client_id=pub_client.session.client_id, client_id=pub_client.session.client_id,
message=ret_message, message=ret_message,
), ),

Wyświetl plik

@ -1,8 +1,11 @@
import asyncio import asyncio
import logging import logging
from importlib.metadata import EntryPoint
from unittest.mock import patch
import pytest import pytest
from amqtt.broker import Broker
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.errors import ConnectError from amqtt.errors import ConnectError
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
@ -295,3 +298,42 @@ async def test_client_publish_will_with_retain(broker_fixture, client_config):
assert message3.topic == 'test/will/topic' assert message3.topic == 'test/will/topic'
assert message3.data == b'client ABC has disconnected' assert message3.data == b'client ABC has disconnected'
await client3.disconnect() await client3.disconnect()
@pytest.mark.asyncio
async def test_client_no_auth():
class MockEntryPoints:
def select(self, group) -> list[EntryPoint]:
match group:
case 'tests.mock_plugins':
return [
EntryPoint(name='auth_plugin', group='tests.mock_plugins', value='tests.plugins.mocks:NoAuthPlugin'),
]
case _:
return list()
with patch("amqtt.plugins.manager.entry_points", side_effect=MockEntryPoints) as mocked_mqtt_publish:
config = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
},
'sys_interval': 1,
'auth': {
'plugins': ['auth_plugin', ]
}
}
client = MQTTClient(client_id="client1", config={'auto_reconnect': False})
broker = Broker(plugin_namespace='tests.mock_plugins', config=config)
await broker.start()
with pytest.raises(ConnectError):
await client.connect("mqtt://127.0.0.1:1883/")
await broker.shutdown()

Wyświetl plik

@ -6,8 +6,7 @@ from unittest.mock import MagicMock, call, patch
import pytest import pytest
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from amqtt.broker import EVENT_BROKER_CLIENT_CONNECTED, EVENT_BROKER_CLIENT_DISCONNECTED, EVENT_BROKER_PRE_START, \ from amqtt.broker import EventBroker
EVENT_BROKER_POST_START
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_1, QOS_2
@ -54,11 +53,11 @@ async def test_paho_connect(broker, mock_plugin_manager):
broker.plugins_manager.assert_has_calls( broker.plugins_manager.assert_has_calls(
[ [
call.fire_event( call.fire_event(
EVENT_BROKER_CLIENT_CONNECTED, EventBroker.CLIENT_CONNECTED.value,
client_id=client_id, client_id=client_id,
), ),
call.fire_event( call.fire_event(
EVENT_BROKER_CLIENT_DISCONNECTED, EventBroker.CLIENT_DISCONNECTED.value,
client_id=client_id, client_id=client_id,
), ),
], ],