diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 8530759..fe28592 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -75,6 +75,20 @@ class ProtocolHandler: self.logger.debug("%s Handler tasks started" % self.session.client_id) yield from self.retry_deliveries() + @asyncio.coroutine + def stop(self): + # Stop incoming messages flow waiter + for packet_id in self.session.inflight_in: + self.session.inflight_in[packet_id].cancel() + self._reader_task.cancel() + if self._keepalive_task: + self._keepalive_task.cancel() + self.logger.debug("waiting for tasks to be stopped") + yield from asyncio.wait( + [self._reader_stopped.wait()], loop=self._loop) + self.logger.debug("closing writer") + yield from self.writer.close() + @asyncio.coroutine def retry_deliveries(self): """ @@ -278,20 +292,6 @@ class ProtocolHandler: pubcomp_packet = PubcompPacket.build(app_message.packet_id) yield from self._send_packet(pubcomp_packet) - @asyncio.coroutine - def stop(self): - # Stop incoming messages flow waiter - for packet_id in self.session.inflight_in: - self.session.inflight_in[packet_id].cancel() - self._reader_task.cancel() - if self._keepalive_task: - self._keepalive_task.cancel() - self.logger.debug("waiting for tasks to be stopped") - yield from asyncio.wait( - [self._reader_stopped.wait()], loop=self._loop) - self.logger.debug("closing writer") - yield from self.writer.close() - @asyncio.coroutine def _reader_loop(self): self.logger.debug("%s Starting reader coro" % self.session.client_id)