kopia lustrzana https://github.com/Yakifo/amqtt
Fixed critical bugs e.g., incorrectly retained messages on qos0, race conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers.
rodzic
31165fb0e8
commit
1f5efd383f
|
@ -707,21 +707,25 @@ class Broker:
|
||||||
if 'qos' in broadcast:
|
if 'qos' in broadcast:
|
||||||
qos = broadcast['qos']
|
qos = broadcast['qos']
|
||||||
if target_session.transitions.state == 'connected':
|
if target_session.transitions.state == 'connected':
|
||||||
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
(format_client_message(session=broadcast['session']),
|
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
|
||||||
broadcast['topic'], format_client_message(session=target_session)))
|
(format_client_message(session=broadcast['session']),
|
||||||
|
broadcast['topic'], format_client_message(session=target_session)))
|
||||||
handler = self._get_handler(target_session)
|
handler = self._get_handler(target_session)
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(
|
||||||
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
|
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
|
||||||
loop=self._loop)
|
loop=self._loop)
|
||||||
running_tasks.append(task)
|
running_tasks.append(task)
|
||||||
else:
|
elif qos is not None and qos > 0:
|
||||||
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
(format_client_message(session=broadcast['session']),
|
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
|
||||||
broadcast['topic'], format_client_message(session=target_session)))
|
(format_client_message(session=broadcast['session']),
|
||||||
|
broadcast['topic'], format_client_message(session=target_session)))
|
||||||
retained_message = RetainedApplicationMessage(
|
retained_message = RetainedApplicationMessage(
|
||||||
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
|
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
|
||||||
yield from target_session.retained_messages.put(retained_message)
|
yield from target_session.retained_messages.put(retained_message)
|
||||||
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
|
self.logger.debug(f'target_session.retained_messages={target_session.retained_messages.qsize()}')
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
# Wait until current broadcasting tasks end
|
# Wait until current broadcasting tasks end
|
||||||
if running_tasks:
|
if running_tasks:
|
||||||
|
|
|
@ -456,7 +456,7 @@ class MQTTClient:
|
||||||
while self.client_tasks:
|
while self.client_tasks:
|
||||||
task = self.client_tasks.popleft()
|
task = self.client_tasks.popleft()
|
||||||
if not task.done():
|
if not task.done():
|
||||||
task.set_exception(ClientException("Connection lost"))
|
task.cancel()
|
||||||
|
|
||||||
self.logger.debug("Watch broker disconnection")
|
self.logger.debug("Watch broker disconnection")
|
||||||
# Wait for disconnection from broker (like connection lost)
|
# Wait for disconnection from broker (like connection lost)
|
||||||
|
|
|
@ -417,7 +417,7 @@ class ProtocolHandler:
|
||||||
if task:
|
if task:
|
||||||
running_tasks.append(task)
|
running_tasks.append(task)
|
||||||
else:
|
else:
|
||||||
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
|
self.logger.debug("No more data (EOF received), stopping reader coro")
|
||||||
break
|
break
|
||||||
except MQTTException:
|
except MQTTException:
|
||||||
self.logger.debug("Message discarded")
|
self.logger.debug("Message discarded")
|
||||||
|
@ -425,10 +425,10 @@ class ProtocolHandler:
|
||||||
self.logger.debug("Task cancelled, reader loop ending")
|
self.logger.debug("Task cancelled, reader loop ending")
|
||||||
break
|
break
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
|
self.logger.debug("Input stream read timeout")
|
||||||
self.handle_read_timeout()
|
self.handle_read_timeout()
|
||||||
except NoDataException:
|
except NoDataException:
|
||||||
self.logger.debug("%s No data available" % self.session.client_id)
|
self.logger.debug("No data available")
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
|
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
|
||||||
break
|
break
|
||||||
|
@ -436,7 +436,7 @@ class ProtocolHandler:
|
||||||
running_tasks.popleft().cancel()
|
running_tasks.popleft().cancel()
|
||||||
yield from self.handle_connection_closed()
|
yield from self.handle_connection_closed()
|
||||||
self._reader_stopped.set()
|
self._reader_stopped.set()
|
||||||
self.logger.debug("%s Reader coro stopped" % self.session.client_id)
|
self.logger.debug("Reader coro stopped")
|
||||||
yield from self.stop()
|
yield from self.stop()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -457,6 +457,8 @@ class ProtocolHandler:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def mqtt_deliver_next_message(self):
|
def mqtt_deliver_next_message(self):
|
||||||
|
if not self._is_attached():
|
||||||
|
return None
|
||||||
if self.logger.isEnabledFor(logging.DEBUG):
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize())
|
self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize())
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -149,7 +149,8 @@ class PluginManager:
|
||||||
if wait:
|
if wait:
|
||||||
if tasks:
|
if tasks:
|
||||||
yield from asyncio.wait(tasks, loop=self._loop)
|
yield from asyncio.wait(tasks, loop=self._loop)
|
||||||
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
|
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def map(self, coro, *args, **kwargs):
|
def map(self, coro, *args, **kwargs):
|
||||||
|
|
Ładowanie…
Reference in New Issue