From 0e11d545d867524e9302643d6afa1be72fb49130 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 22:35:38 +0200 Subject: [PATCH] Fix #23 + add test case --- hbmqtt/mqtt/protocol/handler.py | 32 ++++++++++--------- tests/test_broker.py | 55 +++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 14 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 5dbb955..5004592 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -333,22 +333,26 @@ class ProtocolHandler: # Wait PUBREL if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done(): # PUBREL waiter already exists for this packet ID - message = "Can't add PUBREL waiter, a waiter already exists for message Id '%s'" \ + message = "A waiter already exists for message Id '%s', canceling it" \ % app_message.packet_id self.logger.warning(message) - raise HBMQTTException(message) - waiter = asyncio.Future(loop=self._loop) - self._pubrel_waiters[app_message.packet_id] = waiter - yield from waiter - del self._pubrel_waiters[app_message.packet_id] - app_message.pubrel_packet = waiter.result() - # Initiate delivery and discard message - yield from self.session.delivered_message_queue.put(app_message) - del self.session.inflight_in[app_message.packet_id] - # Send pubcomp - pubcomp_packet = PubcompPacket.build(app_message.packet_id) - yield from self._send_packet(pubcomp_packet) - app_message.pubcomp_packet = pubcomp_packet + self._pubrel_waiters[app_message.packet_id].cancel() + try: + waiter = asyncio.Future(loop=self._loop) + self._pubrel_waiters[app_message.packet_id] = waiter + yield from waiter + del self._pubrel_waiters[app_message.packet_id] + app_message.pubrel_packet = waiter.result() + # Initiate delivery and discard message + yield from self.session.delivered_message_queue.put(app_message) + del self.session.inflight_in[app_message.packet_id] + # Send pubcomp + pubcomp_packet = PubcompPacket.build(app_message.packet_id) + yield from self._send_packet(pubcomp_packet) + app_message.pubcomp_packet = pubcomp_packet + except asyncio.CancelledError: + self.logger.debug("Message flow cancelled") + @asyncio.coroutine def _reader_loop(self): diff --git a/tests/test_broker.py b/tests/test_broker.py index 5341398..9f1175f 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -6,6 +6,9 @@ from unittest.mock import patch, call, MagicMock from hbmqtt.broker import * from hbmqtt.mqtt.constants import * from hbmqtt.client import MQTTClient, ConnectException +from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \ + PubrelPacket, PubcompPacket, DisconnectPacket +from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) @@ -287,6 +290,58 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + #@patch('hbmqtt.broker.PluginManager') + def test_client_publish_dup(self): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + conn_reader, conn_writer = \ + yield from asyncio.open_connection('localhost', 1883, loop=self.loop) + reader = StreamReaderAdapter(conn_reader) + writer = StreamWriterAdapter(conn_writer) + + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = 10 + vh.clean_session_flag = False + vh.will_retain_flag = False + payload.client_id = 'test_id' + connect = ConnectPacket(vh=vh, payload=payload) + yield from connect.to_stream(writer) + yield from ConnackPacket.from_stream(reader) + + publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False) + yield from publish_1.to_stream(writer) + ensure_future(PubrecPacket.from_stream(reader), loop=self.loop) + + yield from asyncio.sleep(2) + + publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False) + yield from publish_dup.to_stream(writer) + pubrec2 = yield from PubrecPacket.from_stream(reader) + pubrel = PubrelPacket.build(1) + yield from pubrel.to_stream(writer) + pubcomp = yield from PubcompPacket.from_stream(reader) + + disconnect = DisconnectPacket() + yield from disconnect.to_stream(writer) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_invalid_topic(self, MockPluginManager): @asyncio.coroutine