From a8e240a2b2e17b013df04ab4daf10128700ed0bc Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Mon, 13 Jul 2015 22:07:12 +0200 Subject: [PATCH] Handle unsubscription HBMQTT-16 --- hbmqtt/broker.py | 30 +++++++++++++++++++++----- hbmqtt/mqtt/protocol/broker_handler.py | 10 +++++++-- hbmqtt/mqtt/protocol/handler.py | 6 +++--- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 5e878be..a19be5a 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -211,23 +211,31 @@ class Broker: connected = True wait_disconnect = asyncio.Task(handler.wait_disconnect()) wait_subscription = asyncio.Task(handler.get_next_pending_subscription()) + wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription()) wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message()) while connected: - done, pending = yield from asyncio.wait([wait_disconnect, wait_subscription, wait_deliver], - return_when=asyncio.FIRST_COMPLETED) + done, pending = yield from asyncio.wait( + [wait_disconnect, wait_subscription, wait_unsubscription, wait_deliver], + return_when=asyncio.FIRST_COMPLETED) if wait_disconnect in done: connected = False wait_subscription.cancel() + wait_unsubscription.cancel() wait_deliver.cancel() + elif wait_unsubscription in done: + unsubscription = wait_unsubscription.result() + for topic in unsubscription.topics: + self.del_subscription(topic, client_session) + yield from handler.mqtt_acknowledge_unsubscription(unsubscription.packet_id) + wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription()) elif wait_subscription in done: subscription = wait_subscription.result() return_codes = [] for topic in subscription.topics: return_codes.append(self.add_subscription(topic, client_session)) yield from handler.mqtt_acknowledge_subscription(subscription.packet_id, return_codes) - i=0 - for topic in subscription.topics: - if return_codes[i] != 0x80: + for index, topic in enumerate(subscription.topics): + if return_codes[index] != 0x80: yield from self.publish_retained_messages_for_subscription(topic, client_session) wait_subscription = asyncio.Task(handler.get_next_pending_subscription()) elif wait_deliver in done: @@ -297,6 +305,18 @@ class Broker: except KeyError: return 0x80 + def del_subscription(self, a_filter, session): + try: + sessions = self._topics[a_filter] + for index, s in enumerate(sessions): + if s['session'].client_id == session.client_id: + self.logger.debug("Removing subscription on topic '%s' for client %s" % + (a_filter, format_client_message(session=session))) + sessions.pop(index) + except KeyError: + # Unsubscribe topic not found in current subscribed topics + pass + def matches(self, topic, filter): import re match_pattern = re.compile(filter.replace('#', '.*').replace('+', '[\s\w\d]+')) diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index 4166029..d2a79ed 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -12,6 +12,7 @@ from hbmqtt.mqtt.pingresp import PingRespPacket from hbmqtt.mqtt.subscribe import SubscribePacket from hbmqtt.mqtt.suback import SubackPacket from hbmqtt.mqtt.unsubscribe import UnsubscribePacket +from hbmqtt.mqtt.unsuback import UnsubackPacket from hbmqtt.utils import format_client_message @@ -82,7 +83,7 @@ class BrokerProtocolHandler(ProtocolHandler): @asyncio.coroutine def handle_unsubscribe(self, unsubscribe: UnsubscribePacket): unsubscription = UnSubscription(unsubscribe.variable_header.packet_id, unsubscribe.payload.topics) - yield from self._pending_subscriptions.put(unsubscription) + yield from self._pending_unsubscriptions.put(unsubscription) @asyncio.coroutine def get_next_pending_subscription(self): @@ -97,4 +98,9 @@ class BrokerProtocolHandler(ProtocolHandler): @asyncio.coroutine def mqtt_acknowledge_subscription(self, packet_id, return_codes): suback = SubackPacket.build(packet_id, return_codes) - yield from self.outgoing_queue.put(suback) \ No newline at end of file + yield from self.outgoing_queue.put(suback) + + @asyncio.coroutine + def mqtt_acknowledge_unsubscription(self, packet_id): + unsuback = UnsubackPacket.build(packet_id) + yield from self.outgoing_queue.put(unsuback) \ No newline at end of file diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index bb4ae90..89f046c 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -79,7 +79,7 @@ class ProtocolHandler: self._pubcomp_waiters = dict() self.delivered_message = asyncio.Queue() - def attach_to_session(self, session:Session): + def attach_to_session(self, session: Session): self.session = session self.session.handler = self extra_info = self.session.writer.get_extra_info('sockname') @@ -93,8 +93,8 @@ class ProtocolHandler: @asyncio.coroutine def start(self): self._running = True - self._reader_task = asyncio.async(self._reader_coro(), loop=self._loop) - self._writer_task = asyncio.async(self._writer_coro(), loop=self._loop) + self._reader_task = asyncio.Task(self._reader_coro(), loop=self._loop) + self._writer_task = asyncio.Task(self._writer_coro(), loop=self._loop) yield from asyncio.wait( [self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop) self.logger.debug("Handler tasks started")