kopia lustrzana https://github.com/Yakifo/amqtt
move stop() method
rodzic
52c3b83704
commit
cf9e78e4bb
|
@ -75,6 +75,20 @@ class ProtocolHandler:
|
|||
self.logger.debug("%s Handler tasks started" % self.session.client_id)
|
||||
yield from self.retry_deliveries()
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
# Stop incoming messages flow waiter
|
||||
for packet_id in self.session.inflight_in:
|
||||
self.session.inflight_in[packet_id].cancel()
|
||||
self._reader_task.cancel()
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
self.logger.debug("waiting for tasks to be stopped")
|
||||
yield from asyncio.wait(
|
||||
[self._reader_stopped.wait()], loop=self._loop)
|
||||
self.logger.debug("closing writer")
|
||||
yield from self.writer.close()
|
||||
|
||||
@asyncio.coroutine
|
||||
def retry_deliveries(self):
|
||||
"""
|
||||
|
@ -278,20 +292,6 @@ class ProtocolHandler:
|
|||
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
||||
yield from self._send_packet(pubcomp_packet)
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
# Stop incoming messages flow waiter
|
||||
for packet_id in self.session.inflight_in:
|
||||
self.session.inflight_in[packet_id].cancel()
|
||||
self._reader_task.cancel()
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
self.logger.debug("waiting for tasks to be stopped")
|
||||
yield from asyncio.wait(
|
||||
[self._reader_stopped.wait()], loop=self._loop)
|
||||
self.logger.debug("closing writer")
|
||||
yield from self.writer.close()
|
||||
|
||||
@asyncio.coroutine
|
||||
def _reader_loop(self):
|
||||
self.logger.debug("%s Starting reader coro" % self.session.client_id)
|
||||
|
|
Ładowanie…
Reference in New Issue