kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
186e1e0b8d
commit
5aa358d17c
|
@ -25,10 +25,14 @@ class ClientProtocolHandler(ProtocolHandler):
|
||||||
self._pingresp_queue = asyncio.Queue()
|
self._pingresp_queue = asyncio.Queue()
|
||||||
self._subscriptions_waiter = dict()
|
self._subscriptions_waiter = dict()
|
||||||
self._unsubscriptions_waiter = dict()
|
self._unsubscriptions_waiter = dict()
|
||||||
|
self._disconnect_waiter = None
|
||||||
|
self._pingresp_waiter = None
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
yield from super().start()
|
yield from super().start()
|
||||||
|
if self._disconnect_waiter is None:
|
||||||
|
self._disconnect_waiter = futures.Future(loop=self._loop)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -38,6 +42,8 @@ class ClientProtocolHandler(ProtocolHandler):
|
||||||
self._ping_task.cancel()
|
self._ping_task.cancel()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
if self._pingresp_waiter:
|
||||||
|
self._pingresp_waiter.cancel()
|
||||||
|
|
||||||
def handle_write_timeout(self):
|
def handle_write_timeout(self):
|
||||||
self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping())
|
self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping())
|
||||||
|
@ -147,8 +153,19 @@ class ClientProtocolHandler(ProtocolHandler):
|
||||||
yield from self.outgoing_queue.put(ping_packet)
|
yield from self.outgoing_queue.put(ping_packet)
|
||||||
self._pingresp_waiter = futures.Future(loop=self._loop)
|
self._pingresp_waiter = futures.Future(loop=self._loop)
|
||||||
resp = yield from self._pingresp_queue.get()
|
resp = yield from self._pingresp_queue.get()
|
||||||
|
self._pingresp_waiter = None
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_pingresp(self, pingresp: PingRespPacket):
|
def handle_pingresp(self, pingresp: PingRespPacket):
|
||||||
yield from self._pingresp_queue.put(pingresp)
|
yield from self._pingresp_queue.put(pingresp)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def handle_connection_closed(self):
|
||||||
|
self.logger.debug("Broker closed connection")
|
||||||
|
if not self._disconnect_waiter.done():
|
||||||
|
self._disconnect_waiter.set_result(None)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def wait_disconnect(self):
|
||||||
|
yield from self._disconnect_waiter
|
||||||
|
|
|
@ -229,9 +229,11 @@ class ProtocolHandler:
|
||||||
self.logger.debug("Output queue get timeout")
|
self.logger.debug("Output queue get timeout")
|
||||||
if self._running:
|
if self._running:
|
||||||
self.handle_write_timeout()
|
self.handle_write_timeout()
|
||||||
|
except ConnectionResetError as cre:
|
||||||
|
yield from self.handle_connection_closed()
|
||||||
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("Unhandled exception in writer coro: %s" % e)
|
self.logger.warn("Unhandled exception in writer coro: %s" % e)
|
||||||
yield from self.handle_connection_closed()
|
|
||||||
break
|
break
|
||||||
self.logger.debug("Writer coro stopping")
|
self.logger.debug("Writer coro stopping")
|
||||||
# Flush queue before stopping
|
# Flush queue before stopping
|
||||||
|
|
Ładowanie…
Reference in New Issue