kopia lustrzana https://github.com/Yakifo/amqtt
184 wiersze
5.4 KiB
Python
184 wiersze
5.4 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from threading import Thread
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, call, patch
|
|
|
|
import pytest
|
|
import yaml
|
|
from paho.mqtt import client as paho_client
|
|
from yaml import Loader
|
|
|
|
from amqtt.broker import Broker
|
|
from amqtt.events import BrokerEvents
|
|
from amqtt.client import MQTTClient
|
|
from amqtt.mqtt3.constants import QOS_1, QOS_2
|
|
from amqtt.session import Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
paho_logger = logging.getLogger("paho_client")
|
|
|
|
|
|
# monkey patch MagicMock
|
|
# taken from https://stackoverflow.com/questions/51394411/python-object-magicmock-cant-be-used-in-await-expression
|
|
async def async_magic():
|
|
pass
|
|
|
|
|
|
MagicMock.__await__ = lambda _: async_magic().__await__()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_connect(broker, mock_plugin_manager):
|
|
|
|
test_complete = asyncio.Event()
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
def on_connect(client, userdata, flags, rc, properties=None):
|
|
assert rc == 0, f"Connection failed with result code {rc}"
|
|
client.disconnect()
|
|
|
|
def on_disconnect(client, userdata, flags, rc, properties=None):
|
|
assert rc == 0, f"Disconnect failed with result code {rc}"
|
|
test_complete.set()
|
|
|
|
test_client = paho_client.Client(paho_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.on_connect = on_connect
|
|
test_client.on_disconnect = on_disconnect
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
|
|
await asyncio.wait_for(test_complete.wait(), timeout=5)
|
|
await asyncio.sleep(0.1)
|
|
|
|
broker.plugins_manager.fire_event.assert_called()
|
|
assert broker.plugins_manager.fire_event.call_count > 2
|
|
|
|
# double indexing is ugly, but call_args_list returns a tuple of tuples
|
|
events = [c[0][0] for c in broker.plugins_manager.fire_event.call_args_list]
|
|
assert BrokerEvents.CLIENT_CONNECTED in events
|
|
assert BrokerEvents.CLIENT_DISCONNECTED in events
|
|
|
|
test_client.loop_stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_qos1(broker, mock_plugin_manager):
|
|
|
|
sub_client = MQTTClient()
|
|
await sub_client.connect("mqtt://127.0.0.1")
|
|
ret = await sub_client.subscribe(
|
|
[("/qos1", QOS_1),],
|
|
)
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
test_client = paho_client.Client(paho_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
await asyncio.sleep(0.1)
|
|
test_client.publish("/qos1", "test message", qos=1)
|
|
await asyncio.sleep(0.1)
|
|
test_client.loop_stop()
|
|
|
|
message = await sub_client.deliver_message()
|
|
assert message is not None
|
|
assert message.qos == 1
|
|
assert message.topic == "/qos1"
|
|
assert message.data == b"test message"
|
|
await sub_client.disconnect()
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_qos2(broker, mock_plugin_manager):
|
|
sub_client = MQTTClient()
|
|
await sub_client.connect("mqtt://127.0.0.1")
|
|
ret = await sub_client.subscribe(
|
|
[("/qos2", QOS_2), ],
|
|
)
|
|
|
|
host = "localhost"
|
|
port = 1883
|
|
client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
|
|
test_client = paho_client.Client(paho_client.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
test_client.enable_logger(paho_logger)
|
|
|
|
test_client.connect(host, port)
|
|
test_client.loop_start()
|
|
await asyncio.sleep(0.1)
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
await asyncio.sleep(0.1)
|
|
test_client.loop_stop()
|
|
|
|
message = await sub_client.deliver_message()
|
|
assert message is not None
|
|
assert message.qos == 2
|
|
assert message.topic == "/qos2"
|
|
assert message.data == b"test message"
|
|
await sub_client.disconnect()
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
def run_paho_client(flag):
|
|
client_id = 'websocket_client_1'
|
|
logging.info("creating paho client")
|
|
test_client = paho_client.Client(callback_api_version=paho_client.CallbackAPIVersion.VERSION2,
|
|
transport='websockets',
|
|
client_id=client_id)
|
|
|
|
test_client.ws_set_options('')
|
|
logging.info("client connecting...")
|
|
test_client.connect('127.0.0.1', 8080)
|
|
logging.info("starting loop")
|
|
test_client.loop_start()
|
|
logging.info("client connected")
|
|
time.sleep(1)
|
|
logging.info("sending messages")
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
test_client.publish("/qos2", "test message", qos=2)
|
|
time.sleep(1)
|
|
test_client.loop_stop()
|
|
test_client.disconnect()
|
|
flag.set()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paho_ws():
|
|
path = Path('docs_test/test.amqtt.local.yaml')
|
|
with path.open() as f:
|
|
cfg: dict[str, Any] = yaml.load(f, Loader=Loader)
|
|
logger.warning(cfg)
|
|
broker = Broker(config=cfg)
|
|
await broker.start()
|
|
|
|
# python websockets and paho mqtt don't play well with each other in the same thread
|
|
flag = threading.Event()
|
|
thread = Thread(target=run_paho_client, args=(flag,))
|
|
thread.start()
|
|
|
|
await asyncio.sleep(5)
|
|
thread.join(1)
|
|
|
|
assert flag.is_set(), "paho thread didn't execute completely"
|
|
|
|
logging.info("client disconnected")
|
|
await broker.shutdown()
|