diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 8cebfa3..6a4b892 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -330,7 +330,10 @@ class MQTTClient: deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop) self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") - yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + if pending: + #timeout occured before message received + deliver_task.cancel() if deliver_task.exception(): raise deliver_task.exception() self.client_tasks.pop() diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index c57c082..5dbb955 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -454,7 +454,10 @@ class ProtocolHandler: def mqtt_deliver_next_message(self): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize()) - message = yield from self.session.delivered_message_queue.get() + try: + message = yield from self.session.delivered_message_queue.get() + except asyncio.CancelledError: + message = None if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Delivering message %s" % message) return message