diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index 42b044c..0e4e775 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -11,6 +11,9 @@ from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter from hbmqtt.mqtt.constants import * from hbmqtt.mqtt.publish import PublishPacket from hbmqtt.mqtt.puback import PubackPacket +from hbmqtt.mqtt.pubrec import PubrecPacket +from hbmqtt.mqtt.pubrel import PubrelPacket +from hbmqtt.mqtt.pubcomp import PubcompPacket formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) @@ -149,6 +152,59 @@ class ProtocolHandlerTest(unittest.TestCase): if future.exception(): raise future.exception() + def test_publish_qos2(self): + @asyncio.coroutine + def server_mock(reader, writer): + try: + packet = yield from PublishPacket.from_stream(reader) + self.assertEquals(packet.topic_name, '/topic') + self.assertEquals(packet.qos, QOS_2) + self.assertIsNotNone(packet.packet_id) + self.assertIn(packet.packet_id, self.session.inflight_out) + + self.assertIn(packet.packet_id, self.handler._pubrec_waiters) + pubrec = PubrecPacket.build(packet.packet_id) + yield from pubrec.to_stream(writer) + + pubrel = yield from PubrelPacket.from_stream(reader) + + self.assertIn(packet.packet_id, self.handler._pubcomp_waiters) + pubcomp = PubcompPacket.build(packet.packet_id) + yield from pubcomp.to_stream(writer) + except AssertionError as ae: + future.set_exception(ae) + + @asyncio.coroutine + def test_coro(): + try: + reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) + self.session.reader, self.session.writer = adapt(reader, writer) + self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop) + yield from self.start_handler(self.handler, self.session) + message = yield from self.handler.mqtt_publish('/topic', b'test_data', QOS_2, False) + self.assertIsInstance(message, OutgoingApplicationMessage) + self.assertIsNotNone(message.publish_packet) + self.assertIsNone(message.puback_packet) + self.assertIsNotNone(message.pubrec_packet) + self.assertIsNotNone(message.pubrel_packet) + self.assertIsNotNone(message.pubcomp_packet) + yield from self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except AssertionError as ae: + future.set_exception(ae) + self.handler = None + self.session = Session() + future = asyncio.Future(loop=self.loop) + + coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + if future.exception(): + raise future.exception() + @asyncio.coroutine def start_handler(self, handler, session): yield from handler.start()