From 32c7c7242dda744000d32791f3f29fac9e37853f Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 28 Sep 2015 22:51:18 +0200 Subject: [PATCH] Add attach_session() detach_session() methods. (will be used for reconnection handling) --- hbmqtt/mqtt/protocol/client_handler.py | 54 +++++++++++++++++++++++++- hbmqtt/mqtt/protocol/handler.py | 19 +++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/protocol/client_handler.py b/hbmqtt/mqtt/protocol/client_handler.py index ba8c345..11790cc 100644 --- a/hbmqtt/mqtt/protocol/client_handler.py +++ b/hbmqtt/mqtt/protocol/client_handler.py @@ -2,7 +2,7 @@ # # See the file license.txt for copying permission. from asyncio import futures -from hbmqtt.mqtt.protocol.handler import ProtocolHandler +from hbmqtt.mqtt.protocol.handler import ProtocolHandler, ProtocolHandlerException from hbmqtt.mqtt.packet import * from hbmqtt.mqtt.disconnect import DisconnectPacket from hbmqtt.mqtt.pingreq import PingReqPacket @@ -11,6 +11,8 @@ from hbmqtt.mqtt.subscribe import SubscribePacket from hbmqtt.mqtt.suback import SubackPacket from hbmqtt.mqtt.unsubscribe import UnsubscribePacket from hbmqtt.mqtt.unsuback import UnsubackPacket +from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload, ConnectPacket +from hbmqtt.mqtt.connack import ConnackPacket from hbmqtt.session import Session from hbmqtt.plugins.manager import PluginManager @@ -24,6 +26,7 @@ class ClientProtocolHandler(ProtocolHandler): self._unsubscriptions_waiter = dict() self._disconnect_waiter = None self._pingresp_waiter = None + self._connack_waiter = None @asyncio.coroutine def start(self): @@ -42,6 +45,55 @@ class ClientProtocolHandler(ProtocolHandler): if self._pingresp_waiter: self._pingresp_waiter.cancel() + def _build_connect_packet(self): + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = self.session.keep_alive + vh.clean_session_flag = self.session.clean_session + vh.will_retain_flag = self.session.will_retain + payload.client_id = self.session.client_id + + if self.session.username: + vh.username_flag = True + payload.username = self.session.username + else: + vh.username_flag = False + + if self.session.password: + vh.password_flag = True + payload.password = self.session.password + else: + vh.password_flag = False + if self.session.will_flag: + vh.will_flag = True + vh.will_qos = self.session.will_qos + payload.will_message = self.session.will_message + payload.will_topic = self.session.will_topic + else: + vh.will_flag = False + + packet = ConnectPacket(vh=vh, payload=payload) + return packet + + @asyncio.coroutine + def mqtt_connect(self): + if self._connack_waiter and not self._connack_waiter.done(): + raise ProtocolHandlerException("A CONNECT request is already pending") + connect_packet = self._build_connect_packet() + yield from self._send_packet(connect_packet) + self._connack_waiter = futures.Future(loop=self._loop) + yield from self._connack_waiter + connack = self._connack_waiter.result() + return connack.return_code + + @asyncio.coroutine + def handle_connack(self, connack: ConnackPacket): + if not self._connack_waiter or self._connack_waiter.done(): + self.logger.warning("Unexpected CONNACK received") + else: + self._connack_waiter.set_result(connack) + def handle_write_timeout(self): self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping()) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index a7b3b59..69b260d 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -37,6 +37,10 @@ EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent' EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received' +class ProtocolHandlerException(BaseException): + pass + + class ProtocolHandler: """ Class implementing the MQTT communication protocol using asyncio features @@ -71,6 +75,21 @@ class ProtocolHandler: self._pubrel_waiters = dict() self._pubcomp_waiters = dict() + def attach_session(self, session: Session, reader:ReaderAdapter, writer:WriterAdapter): + if self.session: + raise ProtocolHandlerException("Handler already attached to session '%s'" % self.session.client_id) + self.session = session + self.reader = reader + self.writer = writer + + def detach_session(self): + if not self.session: + self.logger.warning("detach_session() called while no session attached to handler") + else: + self.session = None + self.reader = None + self.writer = None + @asyncio.coroutine def start(self): self._reader_ready = asyncio.Event(loop=self._loop)