From 484c445f1233c8b27add470a71495df62af5e79a Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 14 Sep 2015 22:31:16 +0200 Subject: [PATCH] Add QOS_1 publish testing --- tests/mqtt/protocol/test_handler.py | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index 505d456..42b044c 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -10,6 +10,7 @@ from hbmqtt.mqtt.protocol.handler import ProtocolHandler from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter from hbmqtt.mqtt.constants import * from hbmqtt.mqtt.publish import PublishPacket +from hbmqtt.mqtt.puback import PubackPacket formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) @@ -102,6 +103,52 @@ class ProtocolHandlerTest(unittest.TestCase): if future.exception(): raise future.exception() + def test_publish_qos1(self): + @asyncio.coroutine + def server_mock(reader, writer): + packet = yield from PublishPacket.from_stream(reader) + try: + self.assertEquals(packet.topic_name, '/topic') + self.assertEquals(packet.qos, QOS_1) + self.assertIsNotNone(packet.packet_id) + self.assertIn(packet.packet_id, self.session.inflight_out) + self.assertIn(packet.packet_id, self.handler._puback_waiters) + except AssertionError as ae: + future.set_exception(ae) + puback = PubackPacket.build(packet.packet_id) + yield from puback.to_stream(writer) + + @asyncio.coroutine + def test_coro(): + try: + reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) + self.session.reader, self.session.writer = adapt(reader, writer) + self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop) + yield from self.start_handler(self.handler, self.session) + message = yield from self.handler.mqtt_publish('/topic', b'test_data', QOS_1, False) + self.assertIsInstance(message, OutgoingApplicationMessage) + self.assertIsNotNone(message.publish_packet) + self.assertIsNotNone(message.puback_packet) + self.assertIsNone(message.pubrec_packet) + self.assertIsNone(message.pubrel_packet) + self.assertIsNone(message.pubcomp_packet) + yield from self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except AssertionError as ae: + future.set_exception(ae) + self.handler = None + self.session = Session() + future = asyncio.Future(loop=self.loop) + + coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + if future.exception(): + raise future.exception() + @asyncio.coroutine def start_handler(self, handler, session): yield from handler.start()