kopia lustrzana https://github.com/Yakifo/amqtt
127 wiersze
3.7 KiB
Python
127 wiersze
3.7 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
from unittest.mock import MagicMock, call, patch
|
|
|
|
import pytest
|
|
from paho.mqtt import client as mqtt_client
|
|
|
|
from amqtt.events import BrokerEvents
|
|
from amqtt.client import MQTTClient
|
|
from amqtt.mqtt.constants import QOS_1, QOS_2
|
|
from amqtt.session import Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
paho_logger = logging.getLogger("paho_client")
|
|
|
|
|
|
# monkey patch MagicMock
|
|
# taken from https://stackoverflow.com/questions/51394411/python-object-magicmock-cant-be-used-in-await-expression
|
|
async def async_magic():
|
|
pass
|
|
|
|
|
|
MagicMock.__await__ = lambda _: async_magic().__await__()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_connect(broker, mock_plugin_manager):
|
|
|
|
test_complete = asyncio.Event()
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
def on_connect(client, userdata, flags, rc, properties=None):
|
|
assert rc == 0, f"Connection failed with result code {rc}"
|
|
client.disconnect()
|
|
|
|
def on_disconnect(client, userdata, flags, rc, properties=None):
|
|
assert rc == 0, f"Disconnect failed with result code {rc}"
|
|
test_complete.set()
|
|
|
|
test_client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.on_connect = on_connect
|
|
test_client.on_disconnect = on_disconnect
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
|
|
await asyncio.wait_for(test_complete.wait(), timeout=5)
|
|
await asyncio.sleep(0.1)
|
|
|
|
broker.plugins_manager.fire_event.assert_called()
|
|
assert broker.plugins_manager.fire_event.call_count > 2
|
|
|
|
# double indexing is ugly, but call_args_list returns a tuple of tuples
|
|
events = [c[0][0] for c in broker.plugins_manager.fire_event.call_args_list]
|
|
assert BrokerEvents.CLIENT_CONNECTED in events
|
|
assert BrokerEvents.CLIENT_DISCONNECTED in events
|
|
|
|
test_client.loop_stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_qos1(broker, mock_plugin_manager):
|
|
|
|
sub_client = MQTTClient()
|
|
await sub_client.connect("mqtt://127.0.0.1")
|
|
ret = await sub_client.subscribe(
|
|
[("/qos1", QOS_1),],
|
|
)
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
test_client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
await asyncio.sleep(0.1)
|
|
test_client.publish("/qos1", "test message", qos=1)
|
|
await asyncio.sleep(0.1)
|
|
test_client.loop_stop()
|
|
|
|
message = await sub_client.deliver_message()
|
|
assert message is not None
|
|
assert message.qos == 1
|
|
assert message.topic == "/qos1"
|
|
assert message.data == b"test message"
|
|
await sub_client.disconnect()
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_qos2(broker, mock_plugin_manager):
|
|
sub_client = MQTTClient()
|
|
await sub_client.connect("mqtt://127.0.0.1")
|
|
ret = await sub_client.subscribe(
|
|
[("/qos2", QOS_2), ],
|
|
)
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
test_client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
await asyncio.sleep(0.1)
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
await asyncio.sleep(0.1)
|
|
test_client.loop_stop()
|
|
|
|
message = await sub_client.deliver_message()
|
|
assert message is not None
|
|
assert message.qos == 2
|
|
assert message.topic == "/qos2"
|
|
assert message.data == b"test message"
|
|
await sub_client.disconnect()
|
|
await asyncio.sleep(0.1)
|