diff --git a/hbmqtt/adapters.py b/hbmqtt/adapters.py index c99f2f9..496ad70 100644 --- a/hbmqtt/adapters.py +++ b/hbmqtt/adapters.py @@ -158,13 +158,16 @@ class StreamWriterAdapter(WriterAdapter): def __init__(self, writer: StreamWriter): self.logger = logging.getLogger(__name__) self._writer = writer + self.is_closed = False # StreamWriter has no test for closed...we use our own def write(self, data): - self._writer.write(data) + if not self.is_closed: + self._writer.write(data) @asyncio.coroutine def drain(self): - yield from self._writer.drain() + if not self.is_closed: + yield from self._writer.drain() def get_peer_info(self): extra_info = self._writer.get_extra_info('peername') @@ -172,10 +175,14 @@ class StreamWriterAdapter(WriterAdapter): @asyncio.coroutine def close(self): - yield from self._writer.drain() - if self._writer.can_write_eof(): - self._writer.write_eof() - self._writer.close() + if not self.is_closed: + self.is_closed = True # we first mark this closed so yields below don't cause races with waiting writes + yield from self._writer.drain() + if self._writer.can_write_eof(): + self._writer.write_eof() + self._writer.close() + try: yield from self._writer.wait_closed() # py37+ + except AttributeError: pass class BufferReader(ReaderAdapter): diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 98bab73..55beb95 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -692,7 +692,9 @@ class Broker: try: while True: while running_tasks and running_tasks[0].done(): - running_tasks.popleft() + task = running_tasks.popleft() + try: task.result() # make asyncio happy and collect results + except Exception: pass broadcast = yield from self._broadcast_queue.get() if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("broadcasting %r" % broadcast) @@ -705,25 +707,30 @@ class Broker: if 'qos' in broadcast: qos = broadcast['qos'] if target_session.transitions.state == 'connected': - self.logger.debug("broadcasting application message from %s on topic '%s' to %s" % - (format_client_message(session=broadcast['session']), - broadcast['topic'], format_client_message(session=target_session))) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("broadcasting application message from %s on topic '%s' to %s" % + (format_client_message(session=broadcast['session']), + broadcast['topic'], format_client_message(session=target_session))) handler = self._get_handler(target_session) task = asyncio.ensure_future( handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False), loop=self._loop) running_tasks.append(task) - else: - self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" % - (format_client_message(session=broadcast['session']), - broadcast['topic'], format_client_message(session=target_session))) + elif qos is not None and qos > 0: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" % + (format_client_message(session=broadcast['session']), + broadcast['topic'], format_client_message(session=target_session))) retained_message = RetainedApplicationMessage( broadcast['session'], broadcast['topic'], broadcast['data'], qos) yield from target_session.retained_messages.put(retained_message) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f'target_session.retained_messages={target_session.retained_messages.qsize()}') except CancelledError: # Wait until current broadcasting tasks end if running_tasks: yield from asyncio.wait(running_tasks, loop=self._loop) + raise # reraise per CancelledError semantics @asyncio.coroutine def _broadcast_message(self, session, topic, data, force_qos=None): diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 7eb8f8a..6846878 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -182,7 +182,7 @@ class MQTTClient: :return: """ try: - while True: + while self.client_tasks: task = self.client_tasks.pop() task.cancel() except IndexError as err: @@ -349,16 +349,16 @@ class MQTTClient: self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + if self.client_tasks: + self.client_tasks.pop() if deliver_task in done: if deliver_task.exception() is not None: # deliver_task raised an exception, pass it on to our caller raise deliver_task.exception() - self.client_tasks.pop() return deliver_task.result() else: #timeout occured before message received deliver_task.cancel() - self.client_tasks.pop() raise asyncio.TimeoutError @asyncio.coroutine @@ -456,7 +456,7 @@ class MQTTClient: while self.client_tasks: task = self.client_tasks.popleft() if not task.done(): - task.set_exception(ClientException("Connection lost")) + task.cancel() self.logger.debug("Watch broker disconnection") # Wait for disconnection from broker (like connection lost) diff --git a/hbmqtt/codecs.py b/hbmqtt/codecs.py index 404806d..a6af603 100644 --- a/hbmqtt/codecs.py +++ b/hbmqtt/codecs.py @@ -49,7 +49,10 @@ def read_or_raise(reader, n=-1): :param n: number of bytes to read :return: bytes read """ - data = yield from reader.read(n) + try: + data = yield from reader.read(n) + except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): + data = None if not data: raise NoDataException("No more data") return data diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 07745c4..a8f2ee7 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -417,7 +417,7 @@ class ProtocolHandler: if task: running_tasks.append(task) else: - self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id) + self.logger.debug("No more data (EOF received), stopping reader coro") break except MQTTException: self.logger.debug("Message discarded") @@ -425,10 +425,10 @@ class ProtocolHandler: self.logger.debug("Task cancelled, reader loop ending") break except asyncio.TimeoutError: - self.logger.debug("%s Input stream read timeout" % self.session.client_id) + self.logger.debug("Input stream read timeout") self.handle_read_timeout() except NoDataException: - self.logger.debug("%s No data available" % self.session.client_id) + self.logger.debug("No data available") except BaseException as e: self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e)) break @@ -436,7 +436,7 @@ class ProtocolHandler: running_tasks.popleft().cancel() yield from self.handle_connection_closed() self._reader_stopped.set() - self.logger.debug("%s Reader coro stopped" % self.session.client_id) + self.logger.debug("Reader coro stopped") yield from self.stop() @asyncio.coroutine @@ -449,8 +449,9 @@ class ProtocolHandler: self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout) yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session) - except ConnectionResetError as cre: + except (ConnectionResetError, BrokenPipeError): yield from self.handle_connection_closed() + except asyncio.CancelledError: raise except BaseException as e: self.logger.warning("Unhandled exception: %s" % e) @@ -458,6 +459,8 @@ class ProtocolHandler: @asyncio.coroutine def mqtt_deliver_next_message(self): + if not self._is_attached(): + return None if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize()) try: diff --git a/hbmqtt/plugins/manager.py b/hbmqtt/plugins/manager.py index 7a923a8..c96c5f1 100644 --- a/hbmqtt/plugins/manager.py +++ b/hbmqtt/plugins/manager.py @@ -149,7 +149,8 @@ class PluginManager: if wait: if tasks: yield from asyncio.wait(tasks, loop=self._loop) - self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events))) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events))) @asyncio.coroutine def map(self, coro, *args, **kwargs):