diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 70754ae..9106026 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -528,16 +528,16 @@ class Broker: self.logger.debug("%s Wait for disconnect" % client_session.client_id) connected = True - wait_disconnect = asyncio.Task(handler.wait_disconnect(), loop=self._loop) - wait_subscription = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop) - wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop) + disconnect_waiter = asyncio.Task(handler.wait_disconnect(), loop=self._loop) + subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop) + unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop) wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop) while connected: done, pending = yield from asyncio.wait( - [wait_disconnect, wait_subscription, wait_unsubscription, wait_deliver], + [disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver], return_when=asyncio.FIRST_COMPLETED, loop=self._loop) - if wait_disconnect in done: - result = wait_disconnect.result() + if disconnect_waiter in done: + result = disconnect_waiter.result() self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result)) if result is None: self.logger.debug("Will flag: %s" % client_session.will_flag) @@ -555,16 +555,16 @@ class Broker: client_session.will_message, client_session.will_qos) connected = False - if wait_unsubscription in done: + if unsubscribe_waiter in done: self.logger.debug("%s handling unsubscription" % client_session.client_id) - unsubscription = wait_unsubscription.result() + unsubscription = unsubscribe_waiter.result() for topic in unsubscription['topics']: self.del_subscription(topic, client_session) yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id']) - wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop) - if wait_subscription in done: + unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop) + if subscribe_waiter in done: self.logger.debug("%s handling subscription" % client_session.client_id) - subscriptions = wait_subscription.result() + subscriptions = subscribe_waiter.result() return_codes = [] for subscription in subscriptions['topics']: return_codes.append(self.add_subscription(subscription, client_session)) @@ -572,7 +572,7 @@ class Broker: for index, subscription in enumerate(subscriptions['topics']): if return_codes[index] != 0x80: yield from self.publish_retained_messages_for_subscription(subscription, client_session) - wait_subscription = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop) + subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop) self.logger.debug(repr(self._subscriptions)) if wait_deliver in done: self.logger.debug("%s handling message delivery" % client_session.client_id) @@ -586,8 +586,8 @@ class Broker: # Acknowledge message delivery yield from handler.mqtt_acknowledge_delivery(packet_id) wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop) - wait_subscription.cancel() - wait_unsubscription.cancel() + subscribe_waiter.cancel() + unsubscribe_waiter.cancel() wait_deliver.cancel() self.logger.debug("%s Client disconnecting" % client_session.client_id)