Add attach_session() detach_session() methods. (will be used for reconnection handling)

pull/8/head
Nico 2015-09-28 22:51:18 +02:00
rodzic 05a61581fe
commit 32c7c7242d
2 zmienionych plików z 72 dodań i 1 usunięć

Wyświetl plik

@ -2,7 +2,7 @@
#
# See the file license.txt for copying permission.
from asyncio import futures
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
from hbmqtt.mqtt.protocol.handler import ProtocolHandler, ProtocolHandlerException
from hbmqtt.mqtt.packet import *
from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket
@ -11,6 +11,8 @@ from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.suback import SubackPacket
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.mqtt.unsuback import UnsubackPacket
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload, ConnectPacket
from hbmqtt.mqtt.connack import ConnackPacket
from hbmqtt.session import Session
from hbmqtt.plugins.manager import PluginManager
@ -24,6 +26,7 @@ class ClientProtocolHandler(ProtocolHandler):
self._unsubscriptions_waiter = dict()
self._disconnect_waiter = None
self._pingresp_waiter = None
self._connack_waiter = None
@asyncio.coroutine
def start(self):
@ -42,6 +45,55 @@ class ClientProtocolHandler(ProtocolHandler):
if self._pingresp_waiter:
self._pingresp_waiter.cancel()
def _build_connect_packet(self):
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = self.session.keep_alive
vh.clean_session_flag = self.session.clean_session
vh.will_retain_flag = self.session.will_retain
payload.client_id = self.session.client_id
if self.session.username:
vh.username_flag = True
payload.username = self.session.username
else:
vh.username_flag = False
if self.session.password:
vh.password_flag = True
payload.password = self.session.password
else:
vh.password_flag = False
if self.session.will_flag:
vh.will_flag = True
vh.will_qos = self.session.will_qos
payload.will_message = self.session.will_message
payload.will_topic = self.session.will_topic
else:
vh.will_flag = False
packet = ConnectPacket(vh=vh, payload=payload)
return packet
@asyncio.coroutine
def mqtt_connect(self):
if self._connack_waiter and not self._connack_waiter.done():
raise ProtocolHandlerException("A CONNECT request is already pending")
connect_packet = self._build_connect_packet()
yield from self._send_packet(connect_packet)
self._connack_waiter = futures.Future(loop=self._loop)
yield from self._connack_waiter
connack = self._connack_waiter.result()
return connack.return_code
@asyncio.coroutine
def handle_connack(self, connack: ConnackPacket):
if not self._connack_waiter or self._connack_waiter.done():
self.logger.warning("Unexpected CONNACK received")
else:
self._connack_waiter.set_result(connack)
def handle_write_timeout(self):
self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping())

Wyświetl plik

@ -37,6 +37,10 @@ EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent'
EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received'
class ProtocolHandlerException(BaseException):
pass
class ProtocolHandler:
"""
Class implementing the MQTT communication protocol using asyncio features
@ -71,6 +75,21 @@ class ProtocolHandler:
self._pubrel_waiters = dict()
self._pubcomp_waiters = dict()
def attach_session(self, session: Session, reader:ReaderAdapter, writer:WriterAdapter):
if self.session:
raise ProtocolHandlerException("Handler already attached to session '%s'" % self.session.client_id)
self.session = session
self.reader = reader
self.writer = writer
def detach_session(self):
if not self.session:
self.logger.warning("detach_session() called while no session attached to handler")
else:
self.session = None
self.reader = None
self.writer = None
@asyncio.coroutine
def start(self):
self._reader_ready = asyncio.Event(loop=self._loop)