diff --git a/hbmqtt/mqtt/protocol/client_handler.py b/hbmqtt/mqtt/protocol/client_handler.py index 0dc887d..e13e49f 100644 --- a/hbmqtt/mqtt/protocol/client_handler.py +++ b/hbmqtt/mqtt/protocol/client_handler.py @@ -25,10 +25,14 @@ class ClientProtocolHandler(ProtocolHandler): self._pingresp_queue = asyncio.Queue() self._subscriptions_waiter = dict() self._unsubscriptions_waiter = dict() + self._disconnect_waiter = None + self._pingresp_waiter = None @asyncio.coroutine def start(self): yield from super().start() + if self._disconnect_waiter is None: + self._disconnect_waiter = futures.Future(loop=self._loop) @asyncio.coroutine def stop(self): @@ -38,6 +42,8 @@ class ClientProtocolHandler(ProtocolHandler): self._ping_task.cancel() except Exception: pass + if self._pingresp_waiter: + self._pingresp_waiter.cancel() def handle_write_timeout(self): self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping()) @@ -147,8 +153,19 @@ class ClientProtocolHandler(ProtocolHandler): yield from self.outgoing_queue.put(ping_packet) self._pingresp_waiter = futures.Future(loop=self._loop) resp = yield from self._pingresp_queue.get() + self._pingresp_waiter = None return resp @asyncio.coroutine def handle_pingresp(self, pingresp: PingRespPacket): yield from self._pingresp_queue.put(pingresp) + + @asyncio.coroutine + def handle_connection_closed(self): + self.logger.debug("Broker closed connection") + if not self._disconnect_waiter.done(): + self._disconnect_waiter.set_result(None) + + @asyncio.coroutine + def wait_disconnect(self): + yield from self._disconnect_waiter diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 0b14d77..44e64a9 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -229,9 +229,11 @@ class ProtocolHandler: self.logger.debug("Output queue get timeout") if self._running: self.handle_write_timeout() + except ConnectionResetError as cre: + yield from self.handle_connection_closed() + break except Exception as e: self.logger.warn("Unhandled exception in writer coro: %s" % e) - yield from self.handle_connection_closed() break self.logger.debug("Writer coro stopping") # Flush queue before stopping