kopia lustrzana https://github.com/Yakifo/amqtt
Improve handler stopping (remove wait delay when disconnecting)
rodzic
87e144b4de
commit
15468b7849
|
@ -53,17 +53,16 @@ class ProtocolHandler:
|
||||||
self._writer_task = None
|
self._writer_task = None
|
||||||
self._reader_ready = asyncio.Event(loop=self._loop)
|
self._reader_ready = asyncio.Event(loop=self._loop)
|
||||||
self._writer_ready = asyncio.Event(loop=self._loop)
|
self._writer_ready = asyncio.Event(loop=self._loop)
|
||||||
|
self._reader_stopped = asyncio.Event(loop=self._loop)
|
||||||
self._running = False
|
self._writer_stopped = asyncio.Event(loop=self._loop)
|
||||||
|
|
||||||
self.outgoing_queue = asyncio.Queue(loop=self._loop)
|
self.outgoing_queue = asyncio.Queue(loop=self._loop)
|
||||||
self._pubrel_waiters = dict()
|
self._pubrel_waiters = dict()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
self._running = True
|
self._reader_task = asyncio.Task(self._reader_loop(), loop=self._loop)
|
||||||
self._reader_task = asyncio.Task(self._reader_coro(), loop=self._loop)
|
self._writer_task = asyncio.Task(self._writer_loop(), loop=self._loop)
|
||||||
self._writer_task = asyncio.Task(self._writer_coro(), loop=self._loop)
|
|
||||||
yield from asyncio.wait(
|
yield from asyncio.wait(
|
||||||
[self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop)
|
[self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop)
|
||||||
self.logger.debug("%s Handler tasks started" % self.session.client_id)
|
self.logger.debug("%s Handler tasks started" % self.session.client_id)
|
||||||
|
@ -124,21 +123,23 @@ class ProtocolHandler:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
|
||||||
yield from self.outgoing_queue.put("STOP")
|
|
||||||
self.reader.feed_eof()
|
|
||||||
yield from asyncio.wait([self._writer_task, self._reader_task], loop=self._loop)
|
|
||||||
yield from self.writer.close()
|
|
||||||
# Stop incoming messages flow waiter
|
# Stop incoming messages flow waiter
|
||||||
for packet_id in self.session.incoming_msg:
|
for packet_id in self.session.incoming_msg:
|
||||||
self.session.incoming_msg[packet_id].cancel()
|
self.session.incoming_msg[packet_id].cancel()
|
||||||
for packet_id in self.session.outgoing_msg:
|
for packet_id in self.session.outgoing_msg:
|
||||||
self.session.outgoing_msg[packet_id].cancel()
|
self.session.outgoing_msg[packet_id].cancel()
|
||||||
|
self._reader_task.cancel()
|
||||||
|
self._writer_task.cancel()
|
||||||
|
self.logger.debug("waiting for loops to be stopped")
|
||||||
|
yield from asyncio.wait(
|
||||||
|
[self._reader_stopped.wait(), self._writer_stopped.wait()], loop=self._loop)
|
||||||
|
self.logger.debug("closing writer")
|
||||||
|
yield from self.writer.close()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _reader_coro(self):
|
def _reader_loop(self):
|
||||||
self.logger.debug("%s Starting reader coro" % self.session.client_id)
|
self.logger.debug("%s Starting reader coro" % self.session.client_id)
|
||||||
while self._running:
|
while True:
|
||||||
try:
|
try:
|
||||||
self._reader_ready.set()
|
self._reader_ready.set()
|
||||||
keepalive_timeout = self.session.keep_alive
|
keepalive_timeout = self.session.keep_alive
|
||||||
|
@ -192,8 +193,10 @@ class ProtocolHandler:
|
||||||
(self.session.client_id, packet.fixed_header.packet_type))
|
(self.session.client_id, packet.fixed_header.packet_type))
|
||||||
else:
|
else:
|
||||||
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
|
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
|
||||||
yield from self.handle_connection_closed()
|
|
||||||
break
|
break
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.logger.debug("Task cancelled, reader loop ending")
|
||||||
|
break
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
|
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
|
||||||
self.handle_read_timeout()
|
self.handle_read_timeout()
|
||||||
|
@ -202,28 +205,29 @@ class ProtocolHandler:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("%s Unhandled exception in reader coro: %s" % (self.session.client_id, e))
|
self.logger.warn("%s Unhandled exception in reader coro: %s" % (self.session.client_id, e))
|
||||||
break
|
break
|
||||||
|
yield from self.handle_connection_closed()
|
||||||
|
self._reader_stopped.set()
|
||||||
self.logger.debug("%s Reader coro stopped" % self.session.client_id)
|
self.logger.debug("%s Reader coro stopped" % self.session.client_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _writer_coro(self):
|
def _writer_loop(self):
|
||||||
self.logger.debug("%s Starting writer coro" % self.session.client_id)
|
self.logger.debug("%s Starting writer coro" % self.session.client_id)
|
||||||
while self._running:
|
while True:
|
||||||
try:
|
try:
|
||||||
self._writer_ready.set()
|
self._writer_ready.set()
|
||||||
keepalive_timeout = self.session.keep_alive
|
keepalive_timeout = self.session.keep_alive
|
||||||
if keepalive_timeout <= 0:
|
if keepalive_timeout <= 0:
|
||||||
keepalive_timeout = None
|
keepalive_timeout = None
|
||||||
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), keepalive_timeout, loop=self._loop)
|
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), keepalive_timeout, loop=self._loop)
|
||||||
if not isinstance(packet, MQTTPacket):
|
|
||||||
self.logger.debug("%s Writer interruption" % self.session.client_id)
|
|
||||||
break
|
|
||||||
yield from packet.to_stream(self.writer)
|
yield from packet.to_stream(self.writer)
|
||||||
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session)
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session)
|
||||||
self._loop.call_soon(self.on_packet_sent.send, packet)
|
self._loop.call_soon(self.on_packet_sent.send, packet)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.logger.debug("Task cancelled, writer loop ending")
|
||||||
|
break
|
||||||
except asyncio.TimeoutError as ce:
|
except asyncio.TimeoutError as ce:
|
||||||
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
||||||
if self._running:
|
self.handle_write_timeout()
|
||||||
self.handle_write_timeout()
|
|
||||||
except ConnectionResetError as cre:
|
except ConnectionResetError as cre:
|
||||||
yield from self.handle_connection_closed()
|
yield from self.handle_connection_closed()
|
||||||
break
|
break
|
||||||
|
@ -245,6 +249,7 @@ class ProtocolHandler:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("%s Unhandled exception in writer coro: %s" % (self.session.client_id, e))
|
self.logger.warn("%s Unhandled exception in writer coro: %s" % (self.session.client_id, e))
|
||||||
|
self._writer_stopped.set()
|
||||||
self.logger.debug("%s Writer coro stopped" % self.session.client_id)
|
self.logger.debug("%s Writer coro stopped" % self.session.client_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
|
Ładowanie…
Reference in New Issue