From 417bb0114861e9b885a592068fd3ced7ed8f873a Mon Sep 17 00:00:00 2001 From: Nico Date: Tue, 15 Sep 2015 23:17:55 +0200 Subject: [PATCH] Receive messages FIX --- hbmqtt/mqtt/protocol/handler.py | 10 +-- setup.py | 1 + tests/mqtt/protocol/test_handler.py | 102 +++++++++++++++++++++------- 3 files changed, 86 insertions(+), 27 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 016321e..7d59bea 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -212,6 +212,7 @@ class ProtocolHandler: del self.session.inflight_out[app_message.packet_id] elif isinstance(app_message, IncomingApplicationMessage): # Initiate delivery + self.logger.debug("Add message to delivery") yield from self.session.delivered_message_queue.put(app_message) # Send PUBACK puback = PubackPacket.build(app_message.packet_id) @@ -388,6 +389,7 @@ class ProtocolHandler: @asyncio.coroutine def mqtt_deliver_next_message(self): message = yield from self.session.delivered_message_queue.get() + self.logger.debug("Delivering message %r" % message) return message @asyncio.coroutine @@ -474,13 +476,13 @@ class ProtocolHandler: self.logger.warning("Received PUBCOMP for unknown pending message with Id: %s" % packet_id) @asyncio.coroutine - def handle_pubrel(self, pubrel: PubrecPacket): + def handle_pubrel(self, pubrel: PubrelPacket): packet_id = pubrel.packet_id try: - inflight_message = self.session.inflight_in[packet_id] - inflight_message.received_pubrel() + waiter = self._pubrel_waiters[packet_id] + waiter.set_result(pubrel) except KeyError as ke: - self.logger.warning("Received PUBREL for unknown pending subscription with Id: %s" % packet_id) + self.logger.warning("Received PUBREL for unknown pending message with Id: %s" % packet_id) @asyncio.coroutine def handle_publish(self, publish_packet: PublishPacket): diff --git a/setup.py b/setup.py index 0e7e868..a5061a2 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ setup( 'hbmqtt.test.plugins': [ 'test_plugin = tests.plugins.test_manager:TestPlugin', 'event_plugin = tests.plugins.test_manager:EventTestPlugin', + 'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin', ], 'hbmqtt.broker.plugins': [ # 'event_logger_plugin = hbmqtt.plugins.logging:EventLoggerPlugin', diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index 91cb0de..6d30a60 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -54,7 +54,7 @@ class ProtocolHandlerTest(unittest.TestCase): yield from self.start_handler(handler, s) yield from self.stop_handler(handler, s) future.set_result(True) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) future = asyncio.Future(loop=self.loop) @@ -74,7 +74,7 @@ class ProtocolHandlerTest(unittest.TestCase): self.assertEquals(packet.topic_name, '/topic') self.assertEquals(packet.qos, QOS_0) self.assertIsNone(packet.packet_id) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) @asyncio.coroutine @@ -94,7 +94,7 @@ class ProtocolHandlerTest(unittest.TestCase): self.assertIsNone(message.pubcomp_packet) yield from self.stop_handler(handler, s) future.set_result(True) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) future = asyncio.Future(loop=self.loop) @@ -116,7 +116,7 @@ class ProtocolHandlerTest(unittest.TestCase): self.assertIsNotNone(packet.packet_id) self.assertIn(packet.packet_id, self.session.inflight_out) self.assertIn(packet.packet_id, self.handler._puback_waiters) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) puback = PubackPacket.build(packet.packet_id) yield from puback.to_stream(writer) @@ -138,7 +138,7 @@ class ProtocolHandlerTest(unittest.TestCase): yield from self.stop_handler(self.handler, self.session) if not future.done(): future.set_result(True) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) self.handler = None self.session = Session() @@ -171,7 +171,7 @@ class ProtocolHandlerTest(unittest.TestCase): self.assertIn(packet.packet_id, self.handler._pubcomp_waiters) pubcomp = PubcompPacket.build(packet.packet_id) yield from pubcomp.to_stream(writer) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) @asyncio.coroutine @@ -191,7 +191,7 @@ class ProtocolHandlerTest(unittest.TestCase): yield from self.stop_handler(self.handler, self.session) if not future.done(): future.set_result(True) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) self.handler = None self.session = Session() @@ -215,19 +215,20 @@ class ProtocolHandlerTest(unittest.TestCase): @asyncio.coroutine def test_coro(): try: - s = Session() reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) - s.reader, s.writer = adapt(reader, writer) - handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop) - yield from self.start_handler(handler, s) - message = yield from handler.mqtt_deliver_next_message() + self.session.reader, self.session.writer = adapt(reader, writer) + self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop) + yield from self.start_handler(self.handler, self.session) + message = yield from self.handler.mqtt_deliver_next_message() self.assertIsInstance(message, IncomingApplicationMessage) self.assertIsNotNone(message.publish_packet) - yield from self.stop_handler(handler, s) + yield from self.stop_handler(self.handler, self.session) future.set_result(True) - except AssertionError as ae: + except Exception as ae: future.set_exception(ae) + self.handler = None + self.session = Session() future = asyncio.Future(loop=self.loop) coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) server = self.loop.run_until_complete(coro) @@ -241,31 +242,86 @@ class ProtocolHandlerTest(unittest.TestCase): @asyncio.coroutine def server_mock(reader, writer): try: + yield from self.event.wait() + #self.event.clear() packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_1, False) yield from packet.to_stream(writer) puback = yield from PubackPacket.from_stream(reader) self.assertIsNotNone(puback) self.assertEqual(packet.packet_id, puback.packet_id) - writer.close() - except AssertionError as ae: + self.assertEquals(self.session.delivered_message_queue.qsize(), 1) + #yield from self.event.wait() + #writer.close() + except Exception as ae: future.set_exception(ae) @asyncio.coroutine def test_coro(): try: - s = Session() reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) - s.reader, s.writer = adapt(reader, writer) - handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop) - yield from self.start_handler(handler, s) - message = yield from handler.mqtt_deliver_next_message() + self.session.reader, self.session.writer = adapt(reader, writer) + self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop) + yield from self.start_handler(self.handler, self.session) + self.event.set() + message = yield from self.handler.mqtt_deliver_next_message() self.assertIsInstance(message, IncomingApplicationMessage) self.assertIsNotNone(message.publish_packet) - yield from self.stop_handler(handler, s) + self.assertIsNotNone(message.puback_packet) + yield from self.stop_handler(self.handler, self.session) future.set_result(True) - except AssertionError as ae: + #self.event.set() + except Exception as ae: future.set_exception(ae) + self.handler = None + self.session = Session() + future = asyncio.Future(loop=self.loop) + self.event = asyncio.Event(loop=self.loop) + coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + if future.exception(): + raise future.exception() + + def test_receive_qos2(self): + @asyncio.coroutine + def server_mock(reader, writer): + try: + packet = PublishPacket.build('/topic', b'test_data', 2, False, QOS_2, False) + yield from packet.to_stream(writer) + pubrec = yield from PubrecPacket.from_stream(reader) + self.assertIsNotNone(pubrec) + self.assertEqual(packet.packet_id, pubrec.packet_id) + self.assertIn(packet.packet_id, self.handler._pubrel_waiters) + pubrel = PubrelPacket.build(packet.packet_id) + yield from pubrel.to_stream(writer) + self.assertNotIn(packet.packet_id, self.handler._pubrel_waiters) + pubcomp = yield from PubcompPacket.from_stream(reader) + self.assertIsNotNone(pubcomp) + self.assertEqual(packet.packet_id, pubcomp.packet_id) + writer.close() + except Exception as ae: + future.set_exception(ae) + + @asyncio.coroutine + def test_coro(): + try: + reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) + self.session.reader, self.session.writer = adapt(reader, writer) + self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop) + yield from self.start_handler(self.handler, self.session) + message = yield from self.handler.mqtt_deliver_next_message() + self.assertIsInstance(message, IncomingApplicationMessage) + self.assertIsNotNone(message.publish_packet) + yield from self.stop_handler(self.handler, self.session) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + self.handler = None + self.session = Session() future = asyncio.Future(loop=self.loop) coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) server = self.loop.run_until_complete(coro)