Fix #40 and add test cases

pull/8/head
Nicolas 2016-05-31 21:40:10 +02:00
rodzic d390d12da5
commit b7401443c8
2 zmienionych plików z 103 dodań i 9 usunięć

Wyświetl plik

@ -325,18 +325,19 @@ class MQTTClient:
:param timeout: maximum number of seconds to wait before returning. If timeout is not specified or None, there is no limit to the wait time until next message arrives.
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
"""
deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
self.client_tasks.append(deliver_task)
self.logger.debug("Waiting message delivery")
done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
if pending:
if deliver_task in done:
self.client_tasks.pop()
return deliver_task.result()
else:
#timeout occured before message received
deliver_task.cancel()
if deliver_task.exception():
raise deliver_task.exception()
self.client_tasks.pop()
return deliver_task.result()
raise asyncio.TimeoutError
@asyncio.coroutine
def _connect_coro(self):

Wyświetl plik

@ -6,12 +6,32 @@ import asyncio
import os
import logging
from hbmqtt.client import MQTTClient, ConnectException
from hbmqtt.broker import Broker
from hbmqtt.mqtt.constants import *
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
log = logging.getLogger(__name__)
broker_config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': 'localhost:1883',
'max_connections': 10
},
'ws': {
'type': 'ws',
'bind': 'localhost:8080',
'max_connections': 10
},
},
'sys_interval': 0,
'auth': {
'allow-anonymous': True,
}
}
class MQTTClientTest(unittest.TestCase):
def setUp(self):
@ -75,10 +95,13 @@ class MQTTClientTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
yield from client.connect('ws://test.mosquitto.org:8080/')
yield from client.connect('ws://localhost:8080/')
self.assertIsNotNone(client.session)
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
@ -110,11 +133,14 @@ class MQTTClientTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
ret = yield from client.connect('mqtt://test.mosquitto.org/')
ret = yield from client.connect('mqtt://localhost/')
self.assertIsNotNone(client.session)
yield from client.ping()
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
@ -128,8 +154,10 @@ class MQTTClientTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
yield from client.connect('mqtt://test.mosquitto.org/')
yield from client.connect('mqtt://localhost/')
self.assertIsNotNone(client.session)
ret = yield from client.subscribe([
('$SYS/broker/uptime', QOS_0),
@ -140,6 +168,7 @@ class MQTTClientTest(unittest.TestCase):
self.assertEquals(ret[1], QOS_1)
self.assertEquals(ret[2], QOS_2)
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
@ -153,8 +182,10 @@ class MQTTClientTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
yield from client.connect('mqtt://test.mosquitto.org/')
yield from client.connect('mqtt://localhost/')
self.assertIsNotNone(client.session)
ret = yield from client.subscribe([
('$SYS/broker/uptime', QOS_0),
@ -162,6 +193,68 @@ class MQTTClientTest(unittest.TestCase):
self.assertEquals(ret[0], QOS_0)
yield from client.unsubscribe(['$SYS/broker/uptime'])
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
def test_deliver(self):
data = b'data'
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
yield from client.connect('mqtt://localhost/')
self.assertIsNotNone(client.session)
ret = yield from client.subscribe([
('test_topic', QOS_0),
])
self.assertEquals(ret[0], QOS_0)
client_pub = MQTTClient()
yield from client_pub.connect('mqtt://localhost/')
yield from client_pub.publish('test_topic', data, QOS_0)
yield from client_pub.disconnect()
message = yield from client.deliver_message()
self.assertIsNotNone(message)
self.assertIsNotNone(message.publish_packet)
self.assertEquals(message.data, data)
yield from client.unsubscribe(['$SYS/broker/uptime'])
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
def test_deliver_timeout(self):
@asyncio.coroutine
def test_coro():
try:
broker = Broker(broker_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
client = MQTTClient()
yield from client.connect('mqtt://localhost/')
self.assertIsNotNone(client.session)
ret = yield from client.subscribe([
('test_topic', QOS_0),
])
self.assertEquals(ret[0], QOS_0)
with self.assertRaises(asyncio.TimeoutError):
message = yield from client.deliver_message(timeout=2)
yield from client.unsubscribe(['$SYS/broker/uptime'])
yield from client.disconnect()
yield from broker.shutdown()
future.set_result(True)
except Exception as ae:
future.set_exception(ae)