amqtt/tests/test_broker.py

1002 wiersze
30 KiB
Python

import asyncio
import logging
import logging.config
import secrets
import socket
import string
from unittest.mock import MagicMock, call, patch
import psutil
import pytest
from amqtt.events import BrokerEvents
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
from amqtt.broker import Broker
from amqtt.client import MQTTClient
from amqtt.errors import ConnectError
from amqtt.mqtt.connack import ConnackPacket
from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
from amqtt.mqtt.disconnect import DisconnectPacket
from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
from amqtt.mqtt.pubcomp import PubcompPacket
from amqtt.mqtt.publish import PublishPacket
from amqtt.mqtt.pubrec import PubrecPacket
from amqtt.mqtt.pubrel import PubrelPacket
from amqtt.session import OutgoingApplicationMessage
# formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
# logging.basicConfig(level=logging.DEBUG, format=formatter)
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)s %(name)s: %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'default',
'stream': 'ext://sys.stdout',
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG',
},
'loggers': {
'transitions': {
'handlers': ['console'],
'level': 'WARNING',
'propagate': False,
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
log = logging.getLogger(__name__)
# monkey patch MagicMock
# taken from https://stackoverflow.com/questions/51394411/python-object-magicmock-cant-be-used-in-await-expression
async def async_magic():
pass
MagicMock.__await__ = lambda _: async_magic().__await__()
@pytest.mark.parametrize(
"input_str, output_addr, output_port",
[
("1234", None, 1234),
(":1234", None, 1234),
("0.0.0.0:1234", "0.0.0.0", 1234),
("[::]:1234", "[::]", 1234),
("0.0.0.0", "0.0.0.0", 5678),
("[::]", "[::]", 5678),
("localhost", "localhost", 5678),
("localhost:1234", "localhost", 1234),
],
)
def test_split_bindaddr_port(input_str, output_addr, output_port):
assert Broker._split_bindaddr_port(input_str, 5678) == (output_addr, output_port)
@pytest.mark.asyncio
async def test_start_stop(broker, mock_plugin_manager):
mock_plugin_manager.assert_has_calls(
[
call().fire_event(BrokerEvents.PRE_START),
call().fire_event(BrokerEvents.POST_START),
],
any_order=True,
)
mock_plugin_manager.reset_mock()
await broker.shutdown()
mock_plugin_manager.assert_has_calls(
[
call().fire_event(BrokerEvents.PRE_SHUTDOWN),
call().fire_event(BrokerEvents.POST_SHUTDOWN),
],
any_order=True,
)
assert broker.transitions.is_stopped()
@pytest.mark.asyncio
async def test_client_connect(broker, mock_plugin_manager):
client = MQTTClient(config={'auto_reconnect':False})
ret = await client.connect("mqtt://127.0.0.1/")
assert ret == 0
assert client.session is not None
assert client.session.client_id in broker._sessions
await client.disconnect()
await asyncio.sleep(0.01)
broker.plugins_manager.fire_event.assert_called()
assert broker.plugins_manager.fire_event.call_count > 2
# double indexing is ugly, but call_args_list returns a tuple of tuples
events = [c[0][0] for c in broker.plugins_manager.fire_event.call_args_list]
assert BrokerEvents.CLIENT_CONNECTED in events
assert BrokerEvents.CLIENT_DISCONNECTED in events
@pytest.mark.asyncio
async def test_connect_tcp(broker):
process = psutil.Process()
connections_number = 10
# mqtt 3.1 requires a connect packet, otherwise the socket connection is rejected
sockets = []
for i in range(connections_number):
static_connect_packet = b'\x10\x1b\x00\x04MQTT\x04\x02\x00<\x00\x0ftest-client-12' + f"{i}".encode()
s = socket.create_connection(("127.0.0.1", 1883))
s.send(static_connect_packet)
sockets.append(s)
# Wait for a brief moment to ensure connections are established
await asyncio.sleep(0.1)
# # Get the current number of TCP connections
connections = process.net_connections()
# max number of connections on the TCP listener is 10
assert broker._servers["default"].conn_count == connections_number
# Ensure connections are only on the TCP listener (port 1883)
tcp_connections = [conn for conn in connections if conn.laddr.port == 1883]
assert len(tcp_connections) == connections_number + 1 # Including the Broker's listening socket
await asyncio.sleep(0.1)
for conn in connections:
assert conn.status in ("ESTABLISHED", "LISTEN")
await asyncio.sleep(0.1)
# close all connections
for s in sockets:
s.close()
# Wait a moment for connections to be closed
await asyncio.sleep(0.1)
# Recheck connections after closing
connections = process.net_connections()
tcp_connections = [conn for conn in connections if conn.laddr.port == 1883]
for conn in tcp_connections:
assert conn.status in ("CLOSE_WAIT", "LISTEN")
# Ensure no active connections for the default listener
assert broker._servers["default"].conn_count == 0
# Add one more connection to the TCP listener
s = socket.create_connection(("127.0.0.1", 1883))
s.send(static_connect_packet)
open_connections = []
open_connections = [conn for conn in process.net_connections() if conn.status == "ESTABLISHED"]
# Ensure that only one TCP connection is active now
assert len(open_connections) == 1
await asyncio.sleep(0.1)
assert broker._servers["default"].conn_count == 1
@pytest.mark.asyncio
async def test_client_connect_will_flag(broker):
conn_reader, conn_writer = await asyncio.open_connection("127.0.0.1", 1883)
reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer)
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = 10
vh.clean_session_flag = False
vh.will_retain_flag = False
vh.will_flag = True
vh.will_qos = QOS_0
payload.client_id = "test_id"
payload.will_message = b"test"
payload.will_topic = "/topic"
connect = ConnectPacket(variable_header=vh, payload=payload)
await connect.to_stream(writer)
await ConnackPacket.from_stream(reader)
await asyncio.sleep(0.1)
disconnect = DisconnectPacket()
await disconnect.to_stream(writer)
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_connect_clean_session_false(broker):
client = MQTTClient(client_id="", config={"auto_reconnect": False})
return_code = None
try:
await client.connect("mqtt://127.0.0.1/", cleansession=False)
except ConnectError as ce:
return_code = ce.return_code
assert return_code == 0x02
assert client.session is not None
assert client.session.client_id not in broker._sessions
await client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_subscribe(broker, mock_plugin_manager):
client = MQTTClient()
ret = await client.connect("mqtt://127.0.0.1/")
assert ret == 0
await client.subscribe([("/topic", QOS_0)])
# Test if the client test client subscription is registered
assert "/topic" in broker._subscriptions
subs = broker._subscriptions["/topic"]
assert len(subs) == 1
(s, qos) = subs[0]
assert s == client.session
assert qos == QOS_0
await client.disconnect()
await asyncio.sleep(0.1)
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
BrokerEvents.CLIENT_SUBSCRIBED,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
),
],
any_order=True,
)
@pytest.mark.asyncio
async def test_client_subscribe_twice(broker, mock_plugin_manager):
client = MQTTClient()
ret = await client.connect("mqtt://127.0.0.1/")
assert ret == 0
await client.subscribe([("/topic", QOS_0)])
# Test if the client test client subscription is registered
assert "/topic" in broker._subscriptions
subs = broker._subscriptions["/topic"]
assert len(subs) == 1
(s, qos) = subs[0]
assert s == client.session
assert qos == QOS_0
await client.subscribe([("/topic", QOS_0)])
assert len(subs) == 1
(s, qos) = subs[0]
assert s == client.session
assert qos == QOS_0
await client.disconnect()
await asyncio.sleep(0.1)
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
BrokerEvents.CLIENT_SUBSCRIBED,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
),
],
any_order=True,
)
@pytest.mark.asyncio
async def test_client_unsubscribe(broker, mock_plugin_manager):
client = MQTTClient()
ret = await client.connect("mqtt://127.0.0.1/")
assert ret == 0
await client.subscribe([("/topic", QOS_0)])
# Test if the client test client subscription is registered
assert "/topic" in broker._subscriptions
subs = broker._subscriptions["/topic"]
assert len(subs) == 1
(s, qos) = subs[0]
assert s == client.session
assert qos == QOS_0
await client.unsubscribe(["/topic"])
await asyncio.sleep(0.1)
assert broker._subscriptions["/topic"] == []
await client.disconnect()
await asyncio.sleep(0.1)
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
BrokerEvents.CLIENT_SUBSCRIBED,
client_id=client.session.client_id,
topic="/topic",
qos=QOS_0,
),
call().fire_event(
BrokerEvents.CLIENT_UNSUBSCRIBED,
client_id=client.session.client_id,
topic="/topic",
),
],
any_order=True,
)
@pytest.mark.asyncio
async def test_client_publish(broker, mock_plugin_manager):
pub_client = MQTTClient()
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
ret_message = await pub_client.publish("/topic", b"data", QOS_0)
await pub_client.disconnect()
assert broker._retained_messages == {}
await asyncio.sleep(0.1)
assert pub_client.session is not None
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
BrokerEvents.MESSAGE_RECEIVED,
client_id=pub_client.session.client_id,
message=ret_message,
),
],
any_order=True,
)
@pytest.mark.asyncio
async def test_client_publish_acl_permitted(acl_broker):
sub_client = MQTTClient()
ret_conn = await sub_client.connect("mqtt://user2:user2password@127.0.0.1:1884/")
assert ret_conn == 0
ret_sub = await sub_client.subscribe([("public/subtopic/test", QOS_0)])
assert ret_sub == [QOS_0]
pub_client = MQTTClient()
ret_conn = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/")
assert ret_conn == 0
await pub_client.publish("public/subtopic/test", b"data", QOS_0)
message = await sub_client.deliver_message(timeout_duration=1)
await pub_client.disconnect()
await sub_client.disconnect()
assert message is not None
assert message.topic == "public/subtopic/test"
assert message.data == b"data"
assert message.qos == QOS_0
@pytest.mark.asyncio
async def test_client_publish_acl_forbidden(acl_broker):
sub_client = MQTTClient()
ret_conn = await sub_client.connect("mqtt://user2:user2password@127.0.0.1:1884/")
assert ret_conn == 0
ret_sub = await sub_client.subscribe([("public/forbidden/test", QOS_0)])
assert ret_sub == [QOS_0]
pub_client = MQTTClient()
ret_conn = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/")
assert ret_conn == 0
await pub_client.publish("public/forbidden/test", b"data", QOS_0)
try:
await sub_client.deliver_message(timeout_duration=1)
msg = "Should not have worked"
raise AssertionError(msg)
except Exception:
pass
await pub_client.disconnect()
await sub_client.disconnect()
@pytest.mark.asyncio
async def test_client_publish_acl_permitted_sub_forbidden(acl_broker):
sub_client1 = MQTTClient()
ret_conn = await sub_client1.connect("mqtt://user2:user2password@127.0.0.1:1884/")
assert ret_conn == 0
sub_client2 = MQTTClient()
ret_conn = await sub_client2.connect("mqtt://user3:user3password@127.0.0.1:1884/")
assert ret_conn == 0
ret_sub = await sub_client1.subscribe([("public/subtopic/test", QOS_0)])
assert ret_sub == [QOS_0]
ret_sub = await sub_client2.subscribe([("public/subtopic/test", QOS_0)])
assert ret_sub == [128]
pub_client = MQTTClient()
ret_conn = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/")
assert ret_conn == 0
await pub_client.publish("public/subtopic/test", b"data", QOS_0)
message = await sub_client1.deliver_message(timeout_duration=1)
try:
await sub_client2.deliver_message(timeout_duration=1)
msg = "Should not have worked"
raise AssertionError(msg)
except Exception:
pass
await pub_client.disconnect()
await sub_client1.disconnect()
await sub_client2.disconnect()
assert message is not None
assert message.topic == "public/subtopic/test"
assert message.data == b"data"
assert message.qos == QOS_0
@pytest.mark.asyncio
async def test_client_publish_dup(broker):
conn_reader, conn_writer = await asyncio.open_connection("127.0.0.1", 1883)
reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer)
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = 10
vh.clean_session_flag = False
vh.will_retain_flag = False
payload.client_id = "test_id"
connect = ConnectPacket(variable_header=vh, payload=payload)
await connect.to_stream(writer)
await ConnackPacket.from_stream(reader)
publish_1 = PublishPacket.build("/test", b"data", 1, False, QOS_2, False)
await publish_1.to_stream(writer)
# Store the future of PubrecPacket.from_stream() in a variable
pubrec_task_1 = asyncio.ensure_future(PubrecPacket.from_stream(reader))
await asyncio.sleep(2)
publish_dup = PublishPacket.build("/test", b"data", 1, True, QOS_2, False)
await publish_dup.to_stream(writer)
await PubrecPacket.from_stream(reader)
pubrel = PubrelPacket.build(1)
await pubrel.to_stream(writer)
await PubcompPacket.from_stream(reader)
# Ensure we wait for the Pubrec packets to be processed
await pubrec_task_1
disconnect = DisconnectPacket()
await disconnect.to_stream(writer)
@pytest.mark.asyncio
async def test_client_publishing_invalid_topic(broker):
assert broker.transitions.is_started()
pub_client = MQTTClient(config={'auto_reconnect': False})
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
await pub_client.subscribe([
("my/+/topic", QOS_0)
])
await asyncio.sleep(0.5)
# need to build & send packet directly to bypass client's check of invalid topic name
# see test_client.py::test_publish_to_incorrect_wildcard for client checks
packet = PublishPacket.build(topic_name='my/topic', message=b'messages',
packet_id=None, dup_flag=False, qos=QOS_0, retain=False)
packet.topic_name = "my/+/topic"
await pub_client._handler._send_packet(packet)
await asyncio.sleep(0.5)
with pytest.raises(asyncio.TimeoutError):
msg = await pub_client.deliver_message(timeout_duration=1)
assert msg is None
await asyncio.sleep(0.1)
await pub_client.disconnect()
@pytest.mark.asyncio
async def test_client_publish_asterisk(broker):
"""'*' is a valid, non-wildcard character for MQTT."""
assert broker.transitions.is_started()
pub_client = MQTTClient(config={'auto_reconnect': False})
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
await pub_client.subscribe([
("my*/topic", QOS_0),
("my/+/topic", QOS_0)
])
await asyncio.sleep(0.1)
await pub_client.publish('my*/topic', b'my valid message', QOS_0, retain=False)
await asyncio.sleep(0.1)
msg = await pub_client.deliver_message(timeout_duration=1)
assert msg is not None
assert msg.topic == "my*/topic"
assert msg.data == b'my valid message'
await asyncio.sleep(0.1)
msg = await pub_client.publish('my/****/topic', b'my valid message', QOS_0, retain=False)
assert msg is not None
assert msg.topic == "my/****/topic"
assert msg.data == b'my valid message'
await pub_client.disconnect()
@pytest.mark.asyncio
async def test_client_publish_big(broker, mock_plugin_manager):
pub_client = MQTTClient()
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
ret_message = await pub_client.publish(
"/topic",
bytearray(b"\x99" * 256 * 1024),
QOS_2,
)
await pub_client.disconnect()
assert broker._retained_messages == {}
await asyncio.sleep(0.1)
assert pub_client.session is not None
mock_plugin_manager.assert_has_calls(
[
call().fire_event(
BrokerEvents.MESSAGE_RECEIVED,
client_id=pub_client.session.client_id,
message=ret_message,
),
],
any_order=True,
)
@pytest.mark.asyncio
async def test_client_publish_retain(broker):
pub_client = MQTTClient()
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
await pub_client.publish("/topic", b"data", QOS_0, retain=True)
await pub_client.disconnect()
await asyncio.sleep(0.1)
assert "/topic" in broker._retained_messages
retained_message = broker._retained_messages["/topic"]
assert retained_message.source_session == pub_client.session
assert retained_message.topic == "/topic"
assert retained_message.data == b"data"
assert retained_message.qos == QOS_0
@pytest.mark.asyncio
async def test_client_publish_retain_delete(broker):
pub_client = MQTTClient()
ret = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
await pub_client.publish("/topic", b"", QOS_0, retain=True)
await pub_client.disconnect()
await asyncio.sleep(0.1)
assert "/topic" not in broker._retained_messages
@pytest.mark.asyncio
async def test_client_subscribe_publish(broker):
sub_client = MQTTClient()
await sub_client.connect("mqtt://127.0.0.1")
ret = await sub_client.subscribe(
[("/qos0", QOS_0), ("/qos1", QOS_1), ("/qos2", QOS_2)],
)
assert ret == [QOS_0, QOS_1, QOS_2]
await _client_publish("/qos0", b"data", QOS_0)
await _client_publish("/qos1", b"data", QOS_1)
await _client_publish("/qos2", b"data", QOS_2)
await asyncio.sleep(0.1)
for qos in [QOS_0, QOS_1, QOS_2]:
message = await sub_client.deliver_message()
assert message is not None
assert message.topic == f"/qos{qos}"
assert message.data == b"data"
assert message.qos == qos
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_subscribe_invalid(broker):
sub_client = MQTTClient()
await sub_client.connect("mqtt://127.0.0.1")
ret = await sub_client.subscribe(
[
("+", QOS_0),
("+/tennis/#", QOS_0),
("sport+", QOS_0),
("sport/+/player1", QOS_0),
],
)
assert ret == [QOS_0, QOS_0, 0x80, QOS_0]
await asyncio.sleep(0.1)
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_subscribe_publish_dollar_topic_1(broker):
assert broker.transitions.is_started()
sub_client = MQTTClient()
await sub_client.connect("mqtt://127.0.0.1")
ret = await sub_client.subscribe([("#", QOS_0)])
assert ret == [QOS_0]
await _client_publish("/topic", b"data", QOS_0)
message = await sub_client.deliver_message()
assert message is not None
await _client_publish("$topic", b"data", QOS_0)
await asyncio.sleep(0.1)
message = None
try:
message = await sub_client.deliver_message(timeout_duration=2)
except Exception:
pass
except RuntimeError as e:
# The loop is closed with pending tasks. Needs fine tuning.
log.warning(e)
assert message is None
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_subscribe_publish_dollar_topic_2(broker):
sub_client = MQTTClient()
await sub_client.connect("mqtt://127.0.0.1")
ret = await sub_client.subscribe([("+/monitor/Clients", QOS_0)])
assert ret == [QOS_0]
await _client_publish("test/monitor/Clients", b"data", QOS_0)
message = await sub_client.deliver_message()
assert message is not None
await _client_publish("$SYS/monitor/Clients", b"data", QOS_0)
await asyncio.sleep(0.1)
message = None
try:
message = await sub_client.deliver_message(timeout_duration=2)
except Exception:
pass
except RuntimeError as e:
# The loop is closed with pending tasks. Needs fine tuning.
log.warning(e)
assert message is None
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_publish_clean_session_subscribe(broker):
sub_client = MQTTClient(client_id='test_client', config={'auto_reconnect': False})
await sub_client.connect("mqtt://127.0.0.1", cleansession=False)
ret = await sub_client.subscribe(
[("/qos0", QOS_0), ("/qos1", QOS_1), ("/qos2", QOS_2)],
)
assert ret == [QOS_0, QOS_1, QOS_2]
await sub_client.disconnect()
await asyncio.sleep(0.5)
await _client_publish("/qos0", b"data0", QOS_0) # should not be retained
await _client_publish("/qos1", b"data1", QOS_1)
await _client_publish("/qos2", b"data2", QOS_2)
await asyncio.sleep(2)
await sub_client.reconnect(cleansession=False)
for qos in [QOS_1, QOS_2]:
log.debug(f"TEST QOS: {qos}")
message = await sub_client.deliver_message(timeout_duration=2)
log.debug(f"Message: {message.publish_packet if message else None!r}")
assert message is not None
assert message.topic == f"/qos{qos}"
assert message.data == f"data{qos}".encode("utf-8")
assert message.qos == qos
try:
while True:
message = await sub_client.deliver_message(timeout_duration=1)
assert message is not None, "no other messages should have been retained"
except asyncio.TimeoutError:
pass
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_publish_retain_with_new_subscribe(broker):
await asyncio.sleep(2)
sub_client1 = MQTTClient(client_id='test_client1')
await sub_client1.connect("mqtt://127.0.0.1")
await sub_client1.disconnect()
await asyncio.sleep(0.5)
await _client_publish("/qos0", b"data0", QOS_0, retain=True)
await asyncio.sleep(0.5)
sub_client2 = MQTTClient(client_id='test_client2')
await sub_client2.connect("mqtt://127.0.0.1")
# should receive the retained message on subscription
ret = await sub_client2.subscribe(
[("/qos0", QOS_0)],
)
assert ret == [QOS_0]
message = await sub_client2.deliver_message(timeout_duration=1)
assert message is not None
assert message.topic == "/qos0"
assert message.data == b"data0"
assert message.qos == QOS_0
await sub_client2.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_publish_retain_latest_with_new_subscribe(broker):
await asyncio.sleep(2)
sub_client1 = MQTTClient(client_id='test_client1')
await sub_client1.connect("mqtt://127.0.0.1")
await sub_client1.disconnect()
await asyncio.sleep(0.5)
await _client_publish("/qos0", b"data a", QOS_0, retain=True)
await asyncio.sleep(0.5)
sub_client2 = MQTTClient(client_id='test_client2')
await sub_client2.connect("mqtt://127.0.0.1")
await _client_publish("/qos0", b"data b", QOS_0, retain=True)
# should receive the retained message on subscription
ret = await sub_client2.subscribe(
[("/qos0", QOS_0)],
)
assert ret == [QOS_0]
message = await sub_client2.deliver_message(timeout_duration=1)
assert message is not None
assert message.topic == "/qos0"
assert message.data == b"data b"
assert message.qos == QOS_0
await sub_client2.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_client_publish_retain_subscribe_on_reconnect(broker):
await asyncio.sleep(2)
sub_client = MQTTClient(client_id='test_client')
await sub_client.connect("mqtt://127.0.0.1", cleansession=False)
ret = await sub_client.subscribe(
[("/qos0", QOS_0)],
)
assert ret == [QOS_0]
await sub_client.disconnect()
await asyncio.sleep(0.5)
await _client_publish("/qos0", b"data0", QOS_0, retain=True)
await asyncio.sleep(0.5)
await sub_client.reconnect(cleansession=False)
message = await sub_client.deliver_message(timeout_duration=1)
assert message is not None
assert message.topic == "/qos0"
assert message.data == b"data0"
assert message.qos == QOS_0
await sub_client.disconnect()
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def _client_publish(topic, data, qos, retain=False) -> int | OutgoingApplicationMessage:
gen_id = "pub_"
valid_chars = string.ascii_letters + string.digits
gen_id += "".join(secrets.choice(valid_chars) for _ in range(16))
pub_client = MQTTClient(client_id=gen_id)
ret: int | OutgoingApplicationMessage = await pub_client.connect("mqtt://127.0.0.1/")
assert ret == 0
ret = await pub_client.publish(topic, data, qos, retain)
await pub_client.disconnect()
return ret
def test_matches_multi_level_wildcard(broker):
test_filter = "sport/tennis/player1/#"
for bad_topic in [
"sport/tennis",
"sport/tennis/",
]:
assert not broker._matches(bad_topic, test_filter)
for good_topic in [
"sport/tennis/player1",
"sport/tennis/player1/",
"sport/tennis/player1/ranking",
"sport/tennis/player1/score/wimbledon",
]:
assert broker._matches(good_topic, test_filter)
def test_matches_single_level_wildcard(broker):
test_filter = "sport/tennis/+"
for bad_topic in [
"sport/tennis",
"sport/tennis/player1/",
"sport/tennis/player1/ranking",
]:
assert not broker._matches(bad_topic, test_filter)
for good_topic in [
"sport/tennis/",
"sport/tennis/player1",
"sport/tennis/player2",
]:
assert broker._matches(good_topic, test_filter)
@pytest.mark.asyncio
async def test_broker_broadcast_cancellation(broker):
topic = "test"
data = b"data"
qos = QOS_0
sub_client = MQTTClient()
await sub_client.connect("mqtt://127.0.0.1")
await sub_client.subscribe([(topic, qos)])
with patch.object(BrokerProtocolHandler, "mqtt_publish", side_effect=asyncio.CancelledError) as mocked_mqtt_publish:
await _client_publish(topic, data, qos)
# Second publish triggers the awaiting of first `mqtt_publish` task
await _client_publish(topic, data, qos)
await asyncio.sleep(0.01)
mocked_mqtt_publish.assert_awaited()
# Ensure broadcast loop is still functional and can deliver the message
await _client_publish(topic, data, qos)
message = await asyncio.wait_for(sub_client.deliver_message(), timeout=1)
assert message
@pytest.mark.asyncio
async def test_broker_socket_open_close(broker):
# check that https://github.com/Yakifo/amqtt/issues/86 is fixed
# mqtt 3.1 requires a connect packet, otherwise the socket connection is rejected
static_connect_packet = b'\x10\x1b\x00\x04MQTT\x04\x02\x00<\x00\x0ftest-client-123'
s = socket.create_connection(("127.0.0.1", 1883))
s.send(static_connect_packet)
await asyncio.sleep(0.1)
s.close()
legacy_config_empty_auth_plugin_list = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
},
'sys_interval': 0,
'auth':{
'plugins':[] # explicitly declare no auth plugins
}
}
class_path_config_no_auth = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
},
'plugins':{
'tests.plugins.test_plugins.AllEventsPlugin': {}
}
}
@pytest.mark.parametrize("test_config", [
legacy_config_empty_auth_plugin_list,
class_path_config_no_auth,
])
@pytest.mark.asyncio
async def test_broker_without_auth_plugin(test_config):
broker = Broker(config=test_config)
await broker.start()
await asyncio.sleep(2)
# make sure all expected events get triggered
with pytest.raises(ConnectError):
mqtt_client = MQTTClient(config={'auto_reconnect': False})
await mqtt_client.connect()
await broker.shutdown()
legacy_config_with_absent_auth_plugin_filter = {
"listeners": {
"default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10},
},
'sys_interval': 0,
'auth':{
'allow-anonymous': True
}
}
@pytest.mark.asyncio
async def test_broker_with_absent_auth_plugin_filter():
# maintain legacy behavior that if a config is missing the 'auth' > 'plugins' filter, all plugins are active
broker = Broker(config=legacy_config_with_absent_auth_plugin_filter)
await broker.start()
await asyncio.sleep(2)
mqtt_client = MQTTClient(config={'auto_reconnect': False})
await mqtt_client.connect()
await broker.shutdown()