amqtt/tests/mqtt/protocol/test_handler.py

458 wiersze
21 KiB
Python
Czysty Zwykły widok Historia

2015-09-12 12:47:36 +00:00
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import unittest
import asyncio
2015-09-13 20:36:22 +00:00
import logging
2015-09-12 12:47:36 +00:00
from hbmqtt.plugins.manager import PluginManager
2015-09-14 20:08:14 +00:00
from hbmqtt.session import Session, OutgoingApplicationMessage, IncomingApplicationMessage
2015-09-12 12:47:36 +00:00
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
2015-09-13 20:36:22 +00:00
from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter
from hbmqtt.mqtt.constants import *
from hbmqtt.mqtt.publish import PublishPacket
2015-09-14 20:31:16 +00:00
from hbmqtt.mqtt.puback import PubackPacket
2015-09-14 20:56:22 +00:00
from hbmqtt.mqtt.pubrec import PubrecPacket
from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.pubcomp import PubcompPacket
2015-09-13 20:36:22 +00:00
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
2015-09-15 19:47:42 +00:00
logging.basicConfig(level=logging.DEBUG, format=formatter)
2015-09-13 20:36:22 +00:00
log = logging.getLogger(__name__)
def adapt(reader, writer):
return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
2015-09-12 12:47:36 +00:00
class ProtocolHandlerTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
2015-09-12 12:47:36 +00:00
self.plugin_manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop)
2015-09-13 20:36:22 +00:00
def tearDown(self):
self.loop.close()
2015-09-12 12:47:36 +00:00
def test_init_handler(self):
s = Session()
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
self.assertIs(handler.session, s)
self.assertIs(handler._loop, self.loop)
2015-09-13 20:36:22 +00:00
self.check_empty_waiters(handler)
2015-09-12 13:28:40 +00:00
def test_start_stop(self):
@asyncio.coroutine
2015-09-13 20:36:22 +00:00
def server_mock(reader, writer):
2015-09-12 13:28:40 +00:00
pass
@asyncio.coroutine
def test_coro():
2015-09-14 20:08:14 +00:00
try:
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
handler = ProtocolHandler(s, self.plugin_manager)
2015-09-29 20:28:05 +00:00
handler.attach_stream(reader_adapted, writer_adapted)
2015-09-14 20:08:14 +00:00
yield from self.start_handler(handler, s)
yield from self.stop_handler(handler, s)
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:08:14 +00:00
future.set_exception(ae)
2015-09-13 20:36:22 +00:00
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888)
2015-09-13 20:36:22 +00:00
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()
2015-09-13 20:36:22 +00:00
def test_publish_qos0(self):
@asyncio.coroutine
def server_mock(reader, writer):
2015-09-14 19:18:08 +00:00
try:
2015-09-16 21:12:50 +00:00
packet = yield from PublishPacket.from_stream(reader)
2015-09-14 19:18:08 +00:00
self.assertEquals(packet.topic_name, '/topic')
self.assertEquals(packet.qos, QOS_0)
self.assertIsNone(packet.packet_id)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 19:18:08 +00:00
future.set_exception(ae)
2015-09-13 20:36:22 +00:00
2015-09-16 21:12:50 +00:00
2015-09-13 20:36:22 +00:00
@asyncio.coroutine
def test_coro():
2015-09-14 20:08:14 +00:00
try:
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
2015-09-14 20:08:14 +00:00
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
2015-09-29 20:28:05 +00:00
handler.attach_stream(reader_adapted, writer_adapted)
2015-09-14 20:08:14 +00:00
yield from self.start_handler(handler, s)
message = yield from handler.mqtt_publish('/topic', b'test_data', QOS_0, False)
self.assertIsInstance(message, OutgoingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
self.assertIsNone(message.puback_packet)
self.assertIsNone(message.pubrec_packet)
self.assertIsNone(message.pubrel_packet)
self.assertIsNone(message.pubcomp_packet)
yield from self.stop_handler(handler, s)
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:08:14 +00:00
future.set_exception(ae)
2015-09-13 20:36:22 +00:00
2015-09-14 19:18:08 +00:00
future = asyncio.Future(loop=self.loop)
2015-09-13 20:36:22 +00:00
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
2015-09-14 20:31:16 +00:00
if future.exception():
raise future.exception()
def test_publish_qos1(self):
@asyncio.coroutine
def server_mock(reader, writer):
packet = yield from PublishPacket.from_stream(reader)
try:
self.assertEquals(packet.topic_name, '/topic')
self.assertEquals(packet.qos, QOS_1)
self.assertIsNotNone(packet.packet_id)
self.assertIn(packet.packet_id, self.session.inflight_out)
self.assertIn(packet.packet_id, self.handler._puback_waiters)
2015-09-23 19:18:09 +00:00
puback = PubackPacket.build(packet.packet_id)
yield from puback.to_stream(writer)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:31:16 +00:00
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
2015-09-14 20:31:16 +00:00
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
2015-09-29 20:28:05 +00:00
self.handler.attach_stream(reader_adapted, writer_adapted)
2015-09-14 20:31:16 +00:00
yield from self.start_handler(self.handler, self.session)
message = yield from self.handler.mqtt_publish('/topic', b'test_data', QOS_1, False)
self.assertIsInstance(message, OutgoingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
self.assertIsNotNone(message.puback_packet)
self.assertIsNone(message.pubrec_packet)
self.assertIsNone(message.pubrel_packet)
self.assertIsNone(message.pubcomp_packet)
yield from self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:31:16 +00:00
future.set_exception(ae)
self.handler = None
self.session = Session()
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
2015-09-14 20:56:22 +00:00
if future.exception():
raise future.exception()
def test_publish_qos2(self):
@asyncio.coroutine
def server_mock(reader, writer):
try:
packet = yield from PublishPacket.from_stream(reader)
self.assertEquals(packet.topic_name, '/topic')
self.assertEquals(packet.qos, QOS_2)
self.assertIsNotNone(packet.packet_id)
self.assertIn(packet.packet_id, self.session.inflight_out)
self.assertIn(packet.packet_id, self.handler._pubrec_waiters)
pubrec = PubrecPacket.build(packet.packet_id)
yield from pubrec.to_stream(writer)
pubrel = yield from PubrelPacket.from_stream(reader)
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
pubcomp = PubcompPacket.build(packet.packet_id)
yield from pubcomp.to_stream(writer)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:56:22 +00:00
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
2015-09-14 20:56:22 +00:00
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
2015-09-29 20:28:05 +00:00
self.handler.attach_stream(reader_adapted, writer_adapted)
2015-09-14 20:56:22 +00:00
yield from self.start_handler(self.handler, self.session)
message = yield from self.handler.mqtt_publish('/topic', b'test_data', QOS_2, False)
self.assertIsInstance(message, OutgoingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
self.assertIsNone(message.puback_packet)
self.assertIsNotNone(message.pubrec_packet)
self.assertIsNotNone(message.pubrel_packet)
self.assertIsNotNone(message.pubcomp_packet)
yield from self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-14 20:56:22 +00:00
future.set_exception(ae)
self.handler = None
self.session = Session()
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
2015-09-14 19:18:08 +00:00
if future.exception():
raise future.exception()
2015-09-13 20:36:22 +00:00
2015-09-15 19:31:07 +00:00
def test_receive_qos0(self):
@asyncio.coroutine
def server_mock(reader, writer):
packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_0, False)
yield from packet.to_stream(writer)
@asyncio.coroutine
def test_coro():
2015-09-15 19:47:42 +00:00
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
2015-09-15 21:17:55 +00:00
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
2015-09-29 20:28:05 +00:00
self.handler.attach_stream(reader_adapted, writer_adapted)
2015-09-15 21:17:55 +00:00
yield from self.start_handler(self.handler, self.session)
message = yield from self.handler.mqtt_deliver_next_message()
2015-09-15 19:47:42 +00:00
self.assertIsInstance(message, IncomingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
2015-09-23 19:18:09 +00:00
self.assertIsNone(message.puback_packet)
self.assertIsNone(message.pubrec_packet)
self.assertIsNone(message.pubrel_packet)
self.assertIsNone(message.pubcomp_packet)
yield from self.stop_handler(self.handler, self.session)
2015-09-15 19:47:42 +00:00
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-15 19:47:42 +00:00
future.set_exception(ae)
2015-09-15 21:17:55 +00:00
self.handler = None
self.session = Session()
2015-09-15 19:47:42 +00:00
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()
def test_receive_qos1(self):
@asyncio.coroutine
def server_mock(reader, writer):
try:
packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_1, False)
yield from packet.to_stream(writer)
puback = yield from PubackPacket.from_stream(reader)
self.assertIsNotNone(puback)
self.assertEqual(packet.packet_id, puback.packet_id)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-15 21:38:24 +00:00
print(ae)
2015-09-15 21:17:55 +00:00
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-29 20:28:05 +00:00
reader_adapted, writer_adapted = adapt(reader, writer)
2015-09-15 21:17:55 +00:00
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
2015-09-29 20:28:05 +00:00
self.handler.attach_stream(reader_adapted, writer_adapted)
2015-09-15 21:17:55 +00:00
yield from self.start_handler(self.handler, self.session)
message = yield from self.handler.mqtt_deliver_next_message()
self.assertIsInstance(message, IncomingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
self.assertIsNotNone(message.puback_packet)
2015-09-23 19:18:09 +00:00
self.assertIsNone(message.pubrec_packet)
self.assertIsNone(message.pubrel_packet)
self.assertIsNone(message.pubcomp_packet)
yield from self.stop_handler(self.handler, self.session)
2015-09-15 21:17:55 +00:00
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
self.handler = None
self.session = Session()
future = asyncio.Future(loop=self.loop)
self.event = asyncio.Event(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()
2015-09-16 21:12:50 +00:00
@unittest.skip
2015-09-15 21:17:55 +00:00
def test_receive_qos2(self):
@asyncio.coroutine
def server_mock(reader, writer):
try:
packet = PublishPacket.build('/topic', b'test_data', 2, False, QOS_2, False)
yield from packet.to_stream(writer)
pubrec = yield from PubrecPacket.from_stream(reader)
self.assertIsNotNone(pubrec)
self.assertEqual(packet.packet_id, pubrec.packet_id)
2015-09-23 19:18:09 +00:00
self.assertIn(packet.packet_id, self.handler._pubrel_waiters)
2015-09-15 21:17:55 +00:00
pubrel = PubrelPacket.build(packet.packet_id)
yield from pubrel.to_stream(writer)
pubcomp = yield from PubcompPacket.from_stream(reader)
self.assertIsNotNone(pubcomp)
self.assertEqual(packet.packet_id, pubcomp.packet_id)
except Exception as ae:
2015-09-15 19:47:42 +00:00
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
2015-09-15 19:31:07 +00:00
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
2015-09-15 21:17:55 +00:00
self.session.reader, self.session.writer = adapt(reader, writer)
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
yield from self.start_handler(self.handler, self.session)
message = yield from self.handler.mqtt_deliver_next_message()
2015-09-15 19:31:07 +00:00
self.assertIsInstance(message, IncomingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
2015-09-23 19:18:09 +00:00
self.assertIsNone(message.puback_packet)
self.assertIsNotNone(message.pubrec_packet)
self.assertIsNotNone(message.pubrel_packet)
self.assertIsNotNone(message.pubcomp_packet)
yield from self.stop_handler(self.handler, self.session)
2015-09-15 19:31:07 +00:00
future.set_result(True)
2015-09-15 21:17:55 +00:00
except Exception as ae:
2015-09-15 19:31:07 +00:00
future.set_exception(ae)
2015-09-15 21:17:55 +00:00
self.handler = None
self.session = Session()
2015-09-15 19:31:07 +00:00
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()
2015-09-13 20:36:22 +00:00
@asyncio.coroutine
2015-09-14 20:08:14 +00:00
def start_handler(self, handler, session):
self.check_empty_waiters(handler)
self.check_no_message(session)
2015-09-15 19:31:07 +00:00
yield from handler.start()
self.assertTrue(handler._reader_ready)
2015-09-13 20:36:22 +00:00
@asyncio.coroutine
2015-09-14 20:08:14 +00:00
def stop_handler(self, handler, session):
2015-09-13 20:36:22 +00:00
yield from handler.stop()
2015-09-14 20:08:14 +00:00
self.assertTrue(handler._reader_stopped)
self.check_empty_waiters(handler)
self.check_no_message(session)
def check_empty_waiters(self, handler):
self.assertFalse(handler._puback_waiters)
self.assertFalse(handler._pubrec_waiters)
self.assertFalse(handler._pubrel_waiters)
self.assertFalse(handler._pubcomp_waiters)
def check_no_message(self, session):
self.assertFalse(session.inflight_out)
self.assertFalse(session.inflight_in)
2015-09-21 21:05:35 +00:00
# self.assertEquals(session.delivered_message_queue.qsize(), 0)
2015-09-30 19:22:46 +00:00
def test_publish_qos1_retry(self):
@asyncio.coroutine
def server_mock(reader, writer):
packet = yield from PublishPacket.from_stream(reader)
try:
self.assertEquals(packet.topic_name, '/topic')
self.assertEquals(packet.qos, QOS_1)
self.assertIsNotNone(packet.packet_id)
self.assertIn(packet.packet_id, self.session.inflight_out)
self.assertIn(packet.packet_id, self.handler._puback_waiters)
puback = PubackPacket.build(packet.packet_id)
yield from puback.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
self.handler.attach_stream(reader_adapted, writer_adapted)
yield from self.handler.start()
yield from self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
self.handler = None
self.session = Session()
message = OutgoingApplicationMessage(1, '/topic', QOS_1, b'test_data', False)
message.publish_packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_1, False)
self.session.inflight_out[1] = message
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()
def test_publish_qos2_retry(self):
@asyncio.coroutine
def server_mock(reader, writer):
try:
packet = yield from PublishPacket.from_stream(reader)
self.assertEquals(packet.topic_name, '/topic')
self.assertEquals(packet.qos, QOS_2)
self.assertIsNotNone(packet.packet_id)
self.assertIn(packet.packet_id, self.session.inflight_out)
self.assertIn(packet.packet_id, self.handler._pubrec_waiters)
pubrec = PubrecPacket.build(packet.packet_id)
yield from pubrec.to_stream(writer)
pubrel = yield from PubrelPacket.from_stream(reader)
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
pubcomp = PubcompPacket.build(packet.packet_id)
yield from pubcomp.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
@asyncio.coroutine
def test_coro():
try:
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
self.handler.attach_stream(reader_adapted, writer_adapted)
yield from self.handler.start()
yield from self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
self.handler = None
self.session = Session()
message = OutgoingApplicationMessage(1, '/topic', QOS_2, b'test_data', False)
message.publish_packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_2, False)
self.session.inflight_out[1] = message
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
if future.exception():
raise future.exception()