2015-09-27 19:17:42 +00:00
|
|
|
import asyncio
|
|
|
|
import logging
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
from amqtt.client import MQTTClient
|
2025-01-12 19:52:50 +00:00
|
|
|
from amqtt.errors import ConnectError
|
2021-03-27 12:16:42 +00:00
|
|
|
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
2015-09-27 19:17:42 +00:00
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
2017-06-02 19:17:05 +00:00
|
|
|
logging.basicConfig(level=logging.ERROR, format=formatter)
|
2015-09-27 19:17:42 +00:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2021-03-06 17:59:03 +00:00
|
|
|
|
2024-12-19 19:34:09 +00:00
|
|
|
# @pytest.mark.asyncio
|
|
|
|
# async def test_connect_tcp():
|
|
|
|
# client = MQTTClient()
|
2024-12-21 10:52:26 +00:00
|
|
|
# await client.connect("mqtt://test.mosquitto.org:1883/")
|
2024-12-19 19:34:09 +00:00
|
|
|
# assert client.session is not None
|
|
|
|
# await client.disconnect()
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
|
2024-12-19 19:34:09 +00:00
|
|
|
# @pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
# async def test_connect_tcp_secure(ca_file_fixture):
|
2024-12-19 19:34:09 +00:00
|
|
|
# client = MQTTClient(config={"check_hostname": False})
|
2025-04-04 17:18:17 +00:00
|
|
|
# await client.connect("mqtts://test.mosquitto.org:8883/", cafile=ca_file_fixture)
|
2024-12-19 19:34:09 +00:00
|
|
|
# assert client.session is not None
|
|
|
|
# await client.disconnect()
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_connect_tcp_failure():
|
2021-03-14 20:44:41 +00:00
|
|
|
config = {"auto_reconnect": False}
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient(config=config)
|
2025-01-12 19:52:50 +00:00
|
|
|
with pytest.raises(ConnectError):
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_connect_ws(broker_fixture):
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("ws://127.0.0.1:8080/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_reconnect_ws_retain_username_password(broker_fixture):
|
|
|
|
client = MQTTClient()
|
|
|
|
await client.connect("ws://fred:password@127.0.0.1:8080/")
|
|
|
|
assert client.session is not None
|
|
|
|
await client.disconnect()
|
|
|
|
await client.reconnect()
|
2021-03-06 17:59:03 +00:00
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
assert client.session.username is not None
|
|
|
|
assert client.session.password is not None
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_connect_ws_secure(ca_file_fixture, broker_fixture):
|
|
|
|
client = MQTTClient()
|
|
|
|
await client.connect("ws://127.0.0.1:8081/", cafile=ca_file_fixture)
|
|
|
|
assert client.session is not None
|
|
|
|
await client.disconnect()
|
2021-03-06 17:59:03 +00:00
|
|
|
|
|
|
|
|
2022-03-03 18:50:59 +00:00
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_connect_username_without_password(broker_fixture):
|
2022-03-03 18:50:59 +00:00
|
|
|
client = MQTTClient()
|
|
|
|
await client.connect("mqtt://alice@127.0.0.1/")
|
|
|
|
assert client.session is not None
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
2021-03-06 17:59:03 +00:00
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_ping(broker_fixture):
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
|
|
|
await client.ping()
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_subscribe(broker_fixture):
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = await client.subscribe(
|
|
|
|
[
|
|
|
|
("$SYS/broker/uptime", QOS_0),
|
|
|
|
("$SYS/broker/uptime", QOS_1),
|
|
|
|
("$SYS/broker/uptime", QOS_2),
|
2024-12-21 10:52:26 +00:00
|
|
|
],
|
2021-03-14 20:44:41 +00:00
|
|
|
)
|
2021-03-06 17:59:03 +00:00
|
|
|
assert ret[0] == QOS_0
|
|
|
|
assert ret[1] == QOS_1
|
|
|
|
assert ret[2] == QOS_2
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_unsubscribe(broker_fixture):
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = await client.subscribe(
|
|
|
|
[
|
|
|
|
("$SYS/broker/uptime", QOS_0),
|
2024-12-21 10:52:26 +00:00
|
|
|
],
|
2021-03-14 20:44:41 +00:00
|
|
|
)
|
2021-03-06 17:59:03 +00:00
|
|
|
assert ret[0] == QOS_0
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.unsubscribe(["$SYS/broker/uptime"])
|
2021-03-06 17:59:03 +00:00
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_deliver(broker_fixture):
|
2021-03-14 20:44:41 +00:00
|
|
|
data = b"data"
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = await client.subscribe(
|
|
|
|
[
|
|
|
|
("test_topic", QOS_0),
|
2024-12-21 10:52:26 +00:00
|
|
|
],
|
2021-03-14 20:44:41 +00:00
|
|
|
)
|
2021-03-06 17:59:03 +00:00
|
|
|
assert ret[0] == QOS_0
|
|
|
|
client_pub = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client_pub.connect("mqtt://127.0.0.1/")
|
|
|
|
await client_pub.publish("test_topic", data, QOS_0)
|
2021-03-06 17:59:03 +00:00
|
|
|
await client_pub.disconnect()
|
|
|
|
message = await client.deliver_message()
|
|
|
|
assert message is not None
|
|
|
|
assert message.publish_packet is not None
|
|
|
|
assert message.data == data
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.unsubscribe(["$SYS/broker/uptime"])
|
2021-03-06 17:59:03 +00:00
|
|
|
await client.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_deliver_timeout(broker_fixture):
|
2021-03-06 17:59:03 +00:00
|
|
|
client = MQTTClient()
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.connect("mqtt://127.0.0.1/")
|
2021-03-06 17:59:03 +00:00
|
|
|
assert client.session is not None
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = await client.subscribe(
|
|
|
|
[
|
|
|
|
("test_topic", QOS_0),
|
2024-12-21 10:52:26 +00:00
|
|
|
],
|
2021-03-14 20:44:41 +00:00
|
|
|
)
|
2021-03-06 17:59:03 +00:00
|
|
|
assert ret[0] == QOS_0
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
2024-12-21 10:52:26 +00:00
|
|
|
await client.deliver_message(timeout_duration=2)
|
2021-03-14 20:44:41 +00:00
|
|
|
await client.unsubscribe(["$SYS/broker/uptime"])
|
2021-03-06 17:59:03 +00:00
|
|
|
await client.disconnect()
|
2022-12-04 23:08:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_cancel_publish_qos1(broker_fixture):
|
2025-04-04 17:18:17 +00:00
|
|
|
"""Tests that timeouts on published messages will clean up in-flight messages."""
|
2022-12-04 23:08:04 +00:00
|
|
|
data = b"data"
|
|
|
|
client_pub = MQTTClient()
|
|
|
|
await client_pub.connect("mqtt://127.0.0.1/")
|
2024-12-21 10:52:26 +00:00
|
|
|
|
|
|
|
assert client_pub.session is not None
|
|
|
|
assert client_pub._handler is not None
|
|
|
|
|
2022-12-04 23:08:04 +00:00
|
|
|
assert client_pub.session.inflight_out_count == 0
|
|
|
|
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_1))
|
|
|
|
assert len(client_pub._handler._puback_waiters) == 0
|
2025-04-04 17:18:17 +00:00
|
|
|
while len(client_pub._handler._puback_waiters) == 0 and not fut.done():
|
2022-12-04 23:08:04 +00:00
|
|
|
await asyncio.sleep(0)
|
|
|
|
assert len(client_pub._handler._puback_waiters) == 1
|
|
|
|
assert client_pub.session.inflight_out_count == 1
|
|
|
|
fut.cancel()
|
|
|
|
await asyncio.wait([fut])
|
|
|
|
assert len(client_pub._handler._puback_waiters) == 0
|
|
|
|
assert client_pub.session.inflight_out_count == 0
|
2025-04-04 17:18:17 +00:00
|
|
|
|
|
|
|
await asyncio.sleep(0.1)
|
2022-12-04 23:08:04 +00:00
|
|
|
await client_pub.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
async def test_cancel_publish_qos2_pubrec(broker_fixture):
|
2025-04-04 17:18:17 +00:00
|
|
|
"""Tests that timeouts on published messages will clean up in-flight messages."""
|
2022-12-04 23:08:04 +00:00
|
|
|
data = b"data"
|
|
|
|
client_pub = MQTTClient()
|
|
|
|
await client_pub.connect("mqtt://127.0.0.1/")
|
2024-12-21 10:52:26 +00:00
|
|
|
|
|
|
|
assert client_pub.session is not None
|
|
|
|
assert client_pub._handler is not None
|
|
|
|
|
2022-12-04 23:08:04 +00:00
|
|
|
assert client_pub.session.inflight_out_count == 0
|
|
|
|
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_2))
|
|
|
|
assert len(client_pub._handler._pubrec_waiters) == 0
|
2024-12-21 10:52:26 +00:00
|
|
|
while len(client_pub._handler._pubrec_waiters) == 0 or fut.done() or fut.cancelled():
|
2022-12-04 23:08:04 +00:00
|
|
|
await asyncio.sleep(0)
|
|
|
|
assert len(client_pub._handler._pubrec_waiters) == 1
|
|
|
|
assert client_pub.session.inflight_out_count == 1
|
|
|
|
fut.cancel()
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
await asyncio.wait([fut])
|
|
|
|
assert len(client_pub._handler._pubrec_waiters) == 0
|
|
|
|
assert client_pub.session.inflight_out_count == 0
|
2025-04-04 17:18:17 +00:00
|
|
|
|
|
|
|
await asyncio.sleep(0.1)
|
2022-12-04 23:08:04 +00:00
|
|
|
await client_pub.disconnect()
|
|
|
|
|
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_cancel_publish_qos2_pubcomp(broker_fixture):
|
2025-04-04 17:18:17 +00:00
|
|
|
"""Tests that timeouts on published messages will clean up in-flight messages."""
|
2024-12-21 10:52:26 +00:00
|
|
|
data = b"data"
|
|
|
|
client_pub = MQTTClient()
|
|
|
|
await client_pub.connect("mqtt://127.0.0.1/")
|
|
|
|
|
|
|
|
assert client_pub.session is not None
|
|
|
|
assert client_pub._handler is not None
|
|
|
|
|
|
|
|
assert client_pub.session.inflight_out_count == 0
|
|
|
|
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_2))
|
|
|
|
assert len(client_pub._handler._pubcomp_waiters) == 0
|
2025-04-04 17:18:17 +00:00
|
|
|
while len(client_pub._handler._pubcomp_waiters) == 0 and not fut.done():
|
2024-12-21 10:52:26 +00:00
|
|
|
await asyncio.sleep(0)
|
|
|
|
assert len(client_pub._handler._pubcomp_waiters) == 1
|
|
|
|
fut.cancel()
|
|
|
|
await asyncio.wait([fut])
|
|
|
|
assert len(client_pub._handler._pubcomp_waiters) == 0
|
|
|
|
assert client_pub.session.inflight_out_count == 0
|
2025-04-04 17:18:17 +00:00
|
|
|
|
|
|
|
await asyncio.sleep(0.1)
|
2024-12-21 10:52:26 +00:00
|
|
|
await client_pub.disconnect()
|
2025-06-03 14:57:13 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def client_config():
|
|
|
|
return {
|
|
|
|
"default_retain": False,
|
|
|
|
"topics": {
|
|
|
|
"test": {
|
|
|
|
"qos": 0
|
|
|
|
},
|
|
|
|
"some_topic": {
|
|
|
|
"retain": True,
|
|
|
|
"qos": 2
|
|
|
|
}
|
|
|
|
},
|
|
|
|
"keep_alive": 10,
|
|
|
|
"broker": {
|
|
|
|
"uri": "mqtt://localhost:1884"
|
|
|
|
},
|
|
|
|
"reconnect_max_interval": 5,
|
|
|
|
"will": {
|
|
|
|
"topic": "test/will/topic",
|
|
|
|
"retain": True,
|
|
|
|
"message": "client ABC has disconnected",
|
|
|
|
"qos": 1
|
|
|
|
},
|
|
|
|
"ping_delay": 1,
|
|
|
|
"default_qos": 0,
|
|
|
|
"auto_reconnect": True,
|
|
|
|
"reconnect_retries": 10
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2025-06-03 15:55:03 +00:00
|
|
|
@pytest.mark.asyncio
|
2025-06-03 14:57:13 +00:00
|
|
|
async def test_client_publish_will_with_retain(broker_fixture, client_config):
|
|
|
|
|
|
|
|
# verifying client functionality of will topic
|
|
|
|
# https://github.com/Yakifo/amqtt/issues/159
|
|
|
|
|
|
|
|
client1 = MQTTClient(client_id="client1")
|
|
|
|
await client1.connect('mqtt://localhost:1883')
|
|
|
|
await client1.subscribe([
|
|
|
|
("test/will/topic", QOS_0)
|
|
|
|
])
|
|
|
|
|
|
|
|
client2 = MQTTClient(client_id="client2", config=client_config)
|
|
|
|
await client2.connect('mqtt://localhost:1883')
|
|
|
|
await client2.publish('my/topic', b'my message')
|
|
|
|
await client2.disconnect()
|
|
|
|
|
|
|
|
message = await client1.deliver_message(timeout_duration=1)
|
|
|
|
assert message.topic == 'test/will/topic'
|
|
|
|
assert message.data == b'client ABC has disconnected'
|
|
|
|
await client1.disconnect()
|
|
|
|
|
|
|
|
client3 = MQTTClient(client_id="client3")
|
|
|
|
await client3.connect('mqtt://localhost:1883')
|
|
|
|
await client3.subscribe([
|
|
|
|
("test/will/topic", QOS_0)
|
|
|
|
])
|
|
|
|
message3 = await client3.deliver_message(timeout_duration=1)
|
|
|
|
assert message3.topic == 'test/will/topic'
|
|
|
|
assert message3.data == b'client ABC has disconnected'
|
|
|
|
await client3.disconnect()
|