Manage ping resp with Futures

pull/8/head
nico 2015-07-06 16:50:19 +02:00
rodzic 6009129cf3
commit 57df79e3a9
1 zmienionych plików z 17 dodań i 1 usunięć

Wyświetl plik

@ -12,6 +12,7 @@ from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPay
from hbmqtt.mqtt.connack import ConnackPacket from hbmqtt.mqtt.connack import ConnackPacket
from hbmqtt.mqtt.disconnect import DisconnectPacket from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.pingresp import PingRespPacket
from hbmqtt.mqtt.publish import PublishPacket from hbmqtt.mqtt.publish import PublishPacket
from hbmqtt.mqtt.pubrel import PubrelPacket from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.puback import PubackPacket from hbmqtt.mqtt.puback import PubackPacket
@ -153,6 +154,8 @@ class ProtocolHandler:
yield from self.handle_pubrec(packet) yield from self.handle_pubrec(packet)
if packet.fixed_header.packet_type == PacketType.PUBCOMP: if packet.fixed_header.packet_type == PacketType.PUBCOMP:
yield from self.handle_pubcomp(packet) yield from self.handle_pubcomp(packet)
if packet.fixed_header.packet_type == PacketType.PINGRESP:
yield from self.handle_pingresp(packet)
else: else:
yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet) yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet)
else: else:
@ -238,6 +241,10 @@ class ProtocolHandler:
def handle_unsuback(self, unsuback: UnsubackPacket): def handle_unsuback(self, unsuback: UnsubackPacket):
pass pass
@asyncio.coroutine
def handle_pingresp(self, pingresp: PingRespPacket):
pass
@asyncio.coroutine @asyncio.coroutine
def handle_puback(self, puback: PubackPacket): def handle_puback(self, puback: PubackPacket):
packet_id = puback.variable_header.packet_id packet_id = puback.variable_header.packet_id
@ -271,6 +278,7 @@ class ClientProtocolHandler(ProtocolHandler):
super().__init__(session, config, loop) super().__init__(session, config, loop)
self._ping_task = None self._ping_task = None
self._connack_waiter = None self._connack_waiter = None
self._pingresp_waiter = None
self._subscriptions_waiter = dict() self._subscriptions_waiter = dict()
self._unsubscriptions_waiter = dict() self._unsubscriptions_waiter = dict()
@ -390,4 +398,12 @@ class ClientProtocolHandler(ProtocolHandler):
def mqtt_ping(self): def mqtt_ping(self):
ping_packet = PingReqPacket() ping_packet = PingReqPacket()
yield from self.outgoing_queue.put(ping_packet) yield from self.outgoing_queue.put(ping_packet)
yield from self.incoming_queues[PacketType.PINGRESP].get() self._pingresp_waiter = futures.Future(loop=self._loop)
yield from self._pingresp_waiter
resp = self._pingresp_waiter.result()
return resp
@asyncio.coroutine
def handle_pingresp(self, pingresp: PingRespPacket):
self._pingresp_waiter.set_result(pingresp)