pull/8/head
Nicolas Jouanin 2015-09-01 22:53:17 +02:00
rodzic bcf14ef6b8
commit fb6c2784bb
1 zmienionych plików z 14 dodań i 14 usunięć

Wyświetl plik

@ -528,16 +528,16 @@ class Broker:
self.logger.debug("%s Wait for disconnect" % client_session.client_id)
connected = True
wait_disconnect = asyncio.Task(handler.wait_disconnect(), loop=self._loop)
wait_subscription = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
disconnect_waiter = asyncio.Task(handler.wait_disconnect(), loop=self._loop)
subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)
while connected:
done, pending = yield from asyncio.wait(
[wait_disconnect, wait_subscription, wait_unsubscription, wait_deliver],
[disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver],
return_when=asyncio.FIRST_COMPLETED, loop=self._loop)
if wait_disconnect in done:
result = wait_disconnect.result()
if disconnect_waiter in done:
result = disconnect_waiter.result()
self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
if result is None:
self.logger.debug("Will flag: %s" % client_session.will_flag)
@ -555,16 +555,16 @@ class Broker:
client_session.will_message,
client_session.will_qos)
connected = False
if wait_unsubscription in done:
if unsubscribe_waiter in done:
self.logger.debug("%s handling unsubscription" % client_session.client_id)
unsubscription = wait_unsubscription.result()
unsubscription = unsubscribe_waiter.result()
for topic in unsubscription['topics']:
self.del_subscription(topic, client_session)
yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
if wait_subscription in done:
unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
if subscribe_waiter in done:
self.logger.debug("%s handling subscription" % client_session.client_id)
subscriptions = wait_subscription.result()
subscriptions = subscribe_waiter.result()
return_codes = []
for subscription in subscriptions['topics']:
return_codes.append(self.add_subscription(subscription, client_session))
@ -572,7 +572,7 @@ class Broker:
for index, subscription in enumerate(subscriptions['topics']):
if return_codes[index] != 0x80:
yield from self.publish_retained_messages_for_subscription(subscription, client_session)
wait_subscription = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
self.logger.debug(repr(self._subscriptions))
if wait_deliver in done:
self.logger.debug("%s handling message delivery" % client_session.client_id)
@ -586,8 +586,8 @@ class Broker:
# Acknowledge message delivery
yield from handler.mqtt_acknowledge_delivery(packet_id)
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)
wait_subscription.cancel()
wait_unsubscription.cancel()
subscribe_waiter.cancel()
unsubscribe_waiter.cancel()
wait_deliver.cancel()
self.logger.debug("%s Client disconnecting" % client_session.client_id)