Add unsubscription management in handler
pull/8/head
Nicolas Jouanin 2015-07-12 13:37:40 +02:00
rodzic d2d843fec4
commit e124bbe949
1 zmienionych plików z 23 dodań i 3 usunięć

Wyświetl plik

@ -5,12 +5,13 @@ import logging
import asyncio import asyncio
from asyncio import futures from asyncio import futures
from hbmqtt.mqtt.protocol.handler import ProtocolHandler from hbmqtt.mqtt.protocol.handler import ProtocolHandler
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload from hbmqtt.mqtt.connect import ConnectPacket
from hbmqtt.mqtt.disconnect import DisconnectPacket from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.pingresp import PingRespPacket from hbmqtt.mqtt.pingresp import PingRespPacket
from hbmqtt.mqtt.subscribe import SubscribePacket from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.suback import SubackPacket from hbmqtt.mqtt.suback import SubackPacket
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.session import Session from hbmqtt.session import Session
from hbmqtt.utils import format_client_message from hbmqtt.utils import format_client_message
@ -21,11 +22,18 @@ class Subscription:
self.topics = topics self.topics = topics
class UnSubscription:
def __init__(self, packet_id, topics):
self.packet_id = packet_id
self.topics = topics
class BrokerProtocolHandler(ProtocolHandler): class BrokerProtocolHandler(ProtocolHandler):
def __init__(self, session: Session, loop=None): def __init__(self, session: Session, loop=None):
super().__init__(session, loop) super().__init__(session, loop)
self._disconnect_waiter = None self._disconnect_waiter = None
self._pending_subscriptions = asyncio.Queue() self._pending_subscriptions = asyncio.Queue()
self._pending_unsubscriptions = asyncio.Queue()
@asyncio.coroutine @asyncio.coroutine
def start(self): def start(self):
@ -47,7 +55,8 @@ class BrokerProtocolHandler(ProtocolHandler):
pass pass
def handle_read_timeout(self): def handle_read_timeout(self):
asyncio.Task(self.stop()) if self._disconnect_waiter is not None and not self._disconnect_waiter.done():
self._disconnect_waiter.set_result(None)
@asyncio.coroutine @asyncio.coroutine
def handle_disconnect(self, disconnect: DisconnectPacket): def handle_disconnect(self, disconnect: DisconnectPacket):
@ -59,7 +68,8 @@ class BrokerProtocolHandler(ProtocolHandler):
# Broker handler shouldn't received CONNECT message during messages handling # Broker handler shouldn't received CONNECT message during messages handling
# as CONNECT messages are managed by the broker on client connection # as CONNECT messages are managed by the broker on client connection
self.logger.error('[MQTT-3.1.0-2] %s : CONNECT message received during messages handling' % (format_client_message(self.session))) self.logger.error('[MQTT-3.1.0-2] %s : CONNECT message received during messages handling' % (format_client_message(self.session)))
self._disconnect_waiter.set_result(None) if self._disconnect_waiter is not None and not self._disconnect_waiter.done():
self._disconnect_waiter.set_result(None)
@asyncio.coroutine @asyncio.coroutine
def handle_pingreq(self, pingreq: PingReqPacket): def handle_pingreq(self, pingreq: PingReqPacket):
@ -70,11 +80,21 @@ class BrokerProtocolHandler(ProtocolHandler):
subscription = Subscription(subscribe.variable_header.packet_id, subscribe.payload.topics) subscription = Subscription(subscribe.variable_header.packet_id, subscribe.payload.topics)
yield from self._pending_subscriptions.put(subscription) yield from self._pending_subscriptions.put(subscription)
@asyncio.coroutine
def handle_unsubscribe(self, unsubscribe: UnsubscribePacket):
unsubscription = UnSubscription(unsubscribe.variable_header.packet_id, unsubscribe.payload.topics)
yield from self._pending_subscriptions.put(unsubscription)
@asyncio.coroutine @asyncio.coroutine
def get_next_pending_subscription(self): def get_next_pending_subscription(self):
subscription = yield from self._pending_subscriptions.get() subscription = yield from self._pending_subscriptions.get()
return subscription return subscription
@asyncio.coroutine
def get_next_pending_unsubscription(self):
unsubscription = yield from self._pending_unsubscriptions.get()
return unsubscription
@asyncio.coroutine @asyncio.coroutine
def mqtt_acknowledge_subscription(self, packet_id, return_codes): def mqtt_acknowledge_subscription(self, packet_id, return_codes):
suback = SubackPacket.build(packet_id, return_codes) suback = SubackPacket.build(packet_id, return_codes)