Revert "Fixed critical bugs e.g., incorrectly retained messages on qos0, race conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers."

This reverts commit 1f5efd383f.
pull/19/head
Marius Kriegerowski 2021-01-10 22:41:28 +01:00 zatwierdzone przez Florian Ludwig
rodzic 7b7d17b789
commit 30582f4cac
4 zmienionych plików z 12 dodań i 15 usunięć

Wyświetl plik

@ -693,20 +693,18 @@ class Broker:
if 'qos' in broadcast:
qos = broadcast['qos']
if target_session.transitions.state == 'connected':
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
handler = self._get_handler(target_session)
task = asyncio.ensure_future(
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
loop=self._loop)
running_tasks.append(task)
elif qos is not None and qos > 0:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
else:
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
retained_message = RetainedApplicationMessage(
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
await target_session.retained_messages.put(retained_message)

Wyświetl plik

@ -443,7 +443,7 @@ class MQTTClient:
while self.client_tasks:
task = self.client_tasks.popleft()
if not task.done():
task.cancel()
task.set_exception(ClientException("Connection lost"))
self.logger.debug("Watch broker disconnection")
# Wait for disconnection from broker (like connection lost)

Wyświetl plik

@ -408,7 +408,7 @@ class ProtocolHandler:
if task:
running_tasks.append(task)
else:
self.logger.debug("No more data (EOF received), stopping reader coro")
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
break
except MQTTException:
self.logger.debug("Message discarded")
@ -416,10 +416,10 @@ class ProtocolHandler:
self.logger.debug("Task cancelled, reader loop ending")
break
except asyncio.TimeoutError:
self.logger.debug("Input stream read timeout")
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
self.handle_read_timeout()
except NoDataException:
self.logger.debug("No data available")
self.logger.debug("%s No data available" % self.session.client_id)
except BaseException as e:
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
break

Wyświetl plik

@ -147,8 +147,7 @@ class PluginManager:
if wait:
if tasks:
await asyncio.wait(tasks, loop=self._loop)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
async def map(self, coro, *args, **kwargs):
"""