diff --git a/hbmqtt/mqtt/protocol.py b/hbmqtt/mqtt/protocol.py index a731fa5..7e8eb27 100644 --- a/hbmqtt/mqtt/protocol.py +++ b/hbmqtt/mqtt/protocol.py @@ -12,6 +12,7 @@ from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPay from hbmqtt.mqtt.connack import ConnackPacket from hbmqtt.mqtt.disconnect import DisconnectPacket from hbmqtt.mqtt.pingreq import PingReqPacket +from hbmqtt.mqtt.pingresp import PingRespPacket from hbmqtt.mqtt.publish import PublishPacket from hbmqtt.mqtt.pubrel import PubrelPacket from hbmqtt.mqtt.puback import PubackPacket @@ -153,6 +154,8 @@ class ProtocolHandler: yield from self.handle_pubrec(packet) if packet.fixed_header.packet_type == PacketType.PUBCOMP: yield from self.handle_pubcomp(packet) + if packet.fixed_header.packet_type == PacketType.PINGRESP: + yield from self.handle_pingresp(packet) else: yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet) else: @@ -238,6 +241,10 @@ class ProtocolHandler: def handle_unsuback(self, unsuback: UnsubackPacket): pass + @asyncio.coroutine + def handle_pingresp(self, pingresp: PingRespPacket): + pass + @asyncio.coroutine def handle_puback(self, puback: PubackPacket): packet_id = puback.variable_header.packet_id @@ -271,6 +278,7 @@ class ClientProtocolHandler(ProtocolHandler): super().__init__(session, config, loop) self._ping_task = None self._connack_waiter = None + self._pingresp_waiter = None self._subscriptions_waiter = dict() self._unsubscriptions_waiter = dict() @@ -390,4 +398,12 @@ class ClientProtocolHandler(ProtocolHandler): def mqtt_ping(self): ping_packet = PingReqPacket() yield from self.outgoing_queue.put(ping_packet) - yield from self.incoming_queues[PacketType.PINGRESP].get() + self._pingresp_waiter = futures.Future(loop=self._loop) + yield from self._pingresp_waiter + resp = self._pingresp_waiter.result() + + return resp + + @asyncio.coroutine + def handle_pingresp(self, pingresp: PingRespPacket): + self._pingresp_waiter.set_result(pingresp)