migrate test_client to pytest

pull/19/head
Florian Ludwig 2021-03-06 18:59:03 +01:00
rodzic 7276075ed8
commit 7c56c9caba
1 zmienionych plików z 144 dodań i 227 usunięć

Wyświetl plik

@ -1,13 +1,15 @@
# Copyright (c) 2015 Nicolas JOUANIN # Copyright (c) 2015 Nicolas JOUANIN
# #
# See the file license.txt for copying permission. # See the file license.txt for copying permission.
import unittest
import asyncio import asyncio
import os import os
import logging import logging
import urllib.request import urllib.request
import tempfile import tempfile
import shutil import shutil
import pytest
from hbmqtt.client import MQTTClient, ConnectException from hbmqtt.client import MQTTClient, ConnectException
from hbmqtt.broker import Broker from hbmqtt.broker import Broker
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
@ -40,256 +42,171 @@ broker_config = {
} }
} }
ca_file: str = ""
temp_dir: str = ""
class MQTTClientTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.temp_dir = tempfile.mkdtemp(prefix='hbmqtt-test-') def setup_module():
url = "http://test.mosquitto.org/ssl/mosquitto.org.crt" global ca_file, temp_dir
self.ca_file = os.path.join(self.temp_dir, 'mosquitto.org.crt')
urllib.request.urlretrieve(url, self.ca_file)
log.info("stored mosquitto cert at %s" % self.ca_file)
def tearDown(self): temp_dir = tempfile.mkdtemp(prefix='hbmqtt-test-')
self.loop.close() url = "http://test.mosquitto.org/ssl/mosquitto.org.crt"
shutil.rmtree(self.temp_dir) ca_file = os.path.join(temp_dir, 'mosquitto.org.crt')
urllib.request.urlretrieve(url, ca_file)
log.info("stored mosquitto cert at %s" % ca_file)
def test_connect_tcp(self):
async def test_coro():
try:
client = MQTTClient()
await client.connect('mqtt://test.mosquitto.org/')
self.assertIsNotNone(client.session)
await client.disconnect()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) def teardown_module():
self.loop.run_until_complete(test_coro()) shutil.rmtree(temp_dir)
if future.exception():
raise future.exception()
def test_connect_tcp_secure(self):
async def test_coro():
try:
client = MQTTClient(config={'check_hostname': False})
await client.connect('mqtts://test.mosquitto.org/', cafile=self.ca_file)
self.assertIsNotNone(client.session)
await client.disconnect()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_connect_tcp():
if future.exception(): client = MQTTClient()
raise future.exception() await client.connect('mqtt://test.mosquitto.org/')
assert client.session is not None
await client.disconnect()
def test_connect_tcp_failure(self):
async def test_coro():
try:
config = {'auto_reconnect': False}
client = MQTTClient(config=config)
await client.connect('mqtt://127.0.0.1/')
except ConnectException as e:
future.set_result(True)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_connect_tcp_secure():
if future.exception(): client = MQTTClient(config={'check_hostname': False})
raise future.exception() await client.connect('mqtts://test.mosquitto.org/', cafile=ca_file)
assert client.session is not None
await client.disconnect()
def test_connect_ws(self):
async def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('ws://127.0.0.1:8080/')
self.assertIsNotNone(client.session)
await client.disconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_connect_tcp_failure():
if future.exception(): config = {'auto_reconnect': False}
raise future.exception() client = MQTTClient(config=config)
with pytest.raises(ConnectException):
await client.connect('mqtt://127.0.0.1/')
def test_reconnect_ws_retain_username_password(self):
async def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('ws://fred:password@127.0.0.1:8080/')
self.assertIsNotNone(client.session)
await client.disconnect()
await client.reconnect()
self.assertIsNotNone(client.session.username) @pytest.mark.asyncio
self.assertIsNotNone(client.session.password) async def test_connect_ws():
await broker.shutdown() broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
future.set_result(True) await broker.start()
except Exception as ae: client = MQTTClient()
future.set_exception(ae) await client.connect('ws://127.0.0.1:8080/')
assert client.session is not None
await client.disconnect()
await broker.shutdown()
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
def test_connect_ws_secure(self): @pytest.mark.asyncio
async def test_coro(): async def test_reconnect_ws_retain_username_password():
try: broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins") await broker.start()
await broker.start() client = MQTTClient()
client = MQTTClient() await client.connect('ws://fred:password@127.0.0.1:8080/')
await client.connect('ws://127.0.0.1:8081/', cafile=self.ca_file) assert client.session is not None
self.assertIsNotNone(client.session) await client.disconnect()
await client.disconnect() await client.reconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) assert client.session.username is not None
self.loop.run_until_complete(test_coro()) assert client.session.password is not None
if future.exception(): await broker.shutdown()
raise future.exception()
def test_ping(self):
async def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
self.assertIsNotNone(client.session)
await client.ping()
await client.disconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_connect_ws_secure():
if future.exception(): broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
raise future.exception() await broker.start()
client = MQTTClient()
await client.connect('ws://127.0.0.1:8081/', cafile=ca_file)
assert client.session is not None
await client.disconnect()
await broker.shutdown()
def test_subscribe(self):
async def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
self.assertIsNotNone(client.session)
ret = await client.subscribe([
('$SYS/broker/uptime', QOS_0),
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/uptime', QOS_2),
])
self.assertEqual(ret[0], QOS_0)
self.assertEqual(ret[1], QOS_1)
self.assertEqual(ret[2], QOS_2)
await client.disconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_ping():
if future.exception(): broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
raise future.exception() await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
assert client.session is not None
await client.ping()
await client.disconnect()
await broker.shutdown()
def test_unsubscribe(self):
async def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
self.assertIsNotNone(client.session)
ret = await client.subscribe([
('$SYS/broker/uptime', QOS_0),
])
self.assertEqual(ret[0], QOS_0)
await client.unsubscribe(['$SYS/broker/uptime'])
await client.disconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop) @pytest.mark.asyncio
self.loop.run_until_complete(test_coro()) async def test_subscribe():
if future.exception(): broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
raise future.exception() await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
assert client.session is not None
ret = await client.subscribe([
('$SYS/broker/uptime', QOS_0),
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/uptime', QOS_2),
])
assert ret[0] == QOS_0
assert ret[1] == QOS_1
assert ret[2] == QOS_2
await client.disconnect()
await broker.shutdown()
def test_deliver(self):
data = b'data'
async def test_coro(): @pytest.mark.asyncio
try: async def test_unsubscribe():
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins") broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start() await broker.start()
client = MQTTClient() client = MQTTClient()
await client.connect('mqtt://127.0.0.1/') await client.connect('mqtt://127.0.0.1/')
self.assertIsNotNone(client.session) assert client.session is not None
ret = await client.subscribe([ ret = await client.subscribe([
('test_topic', QOS_0), ('$SYS/broker/uptime', QOS_0),
]) ])
self.assertEqual(ret[0], QOS_0) assert ret[0] == QOS_0
client_pub = MQTTClient() await client.unsubscribe(['$SYS/broker/uptime'])
await client_pub.connect('mqtt://127.0.0.1/') await client.disconnect()
await client_pub.publish('test_topic', data, QOS_0) await broker.shutdown()
await client_pub.disconnect()
message = await client.deliver_message()
self.assertIsNotNone(message)
self.assertIsNotNone(message.publish_packet)
self.assertEqual(message.data, data)
await client.unsubscribe(['$SYS/broker/uptime'])
await client.disconnect()
await broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
def test_deliver_timeout(self): @pytest.mark.asyncio
async def test_coro(): async def test_deliver():
try: data = b'data'
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins") broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start() await broker.start()
client = MQTTClient() client = MQTTClient()
await client.connect('mqtt://127.0.0.1/') await client.connect('mqtt://127.0.0.1/')
self.assertIsNotNone(client.session) assert client.session is not None
ret = await client.subscribe([ ret = await client.subscribe([
('test_topic', QOS_0), ('test_topic', QOS_0),
]) ])
self.assertEqual(ret[0], QOS_0) assert ret[0] == QOS_0
with self.assertRaises(asyncio.TimeoutError): client_pub = MQTTClient()
await client.deliver_message(timeout=2) await client_pub.connect('mqtt://127.0.0.1/')
await client.unsubscribe(['$SYS/broker/uptime']) await client_pub.publish('test_topic', data, QOS_0)
await client.disconnect() await client_pub.disconnect()
await broker.shutdown() message = await client.deliver_message()
future.set_result(True) assert message is not None
except Exception as ae: assert message.publish_packet is not None
future.set_exception(ae) assert message.data == data
await client.unsubscribe(['$SYS/broker/uptime'])
await client.disconnect()
await broker.shutdown()
@pytest.mark.asyncio
async def test_deliver_timeout():
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
await broker.start()
client = MQTTClient()
await client.connect('mqtt://127.0.0.1/')
assert client.session is not None
ret = await client.subscribe([
('test_topic', QOS_0),
])
assert ret[0] == QOS_0
with pytest.raises(asyncio.TimeoutError):
await client.deliver_message(timeout=2)
await client.unsubscribe(['$SYS/broker/uptime'])
await client.disconnect()
await broker.shutdown()
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()