kopia lustrzana https://github.com/Yakifo/amqtt
Use queue for managing PINRESP
rodzic
b8d2c3b0a6
commit
1f084d5ca1
|
@ -155,7 +155,7 @@ class ProtocolHandler:
|
|||
if packet.fixed_header.packet_type == PacketType.PUBCOMP:
|
||||
yield from self.handle_pubcomp(packet)
|
||||
if packet.fixed_header.packet_type == PacketType.PINGRESP:
|
||||
yield from self.handle_pingresp(packet)
|
||||
asyncio.Task(self.handle_pingresp(packet))
|
||||
else:
|
||||
yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet)
|
||||
else:
|
||||
|
@ -278,7 +278,7 @@ class ClientProtocolHandler(ProtocolHandler):
|
|||
super().__init__(session, config, loop)
|
||||
self._ping_task = None
|
||||
self._connack_waiter = None
|
||||
self._pingresp_waiter = None
|
||||
self._pingresp_queue = asyncio.Queue()
|
||||
self._subscriptions_waiter = dict()
|
||||
self._unsubscriptions_waiter = dict()
|
||||
|
||||
|
@ -399,11 +399,9 @@ class ClientProtocolHandler(ProtocolHandler):
|
|||
ping_packet = PingReqPacket()
|
||||
yield from self.outgoing_queue.put(ping_packet)
|
||||
self._pingresp_waiter = futures.Future(loop=self._loop)
|
||||
yield from self._pingresp_waiter
|
||||
resp = self._pingresp_waiter.result()
|
||||
|
||||
resp = yield from self._pingresp_queue.get()
|
||||
return resp
|
||||
|
||||
@asyncio.coroutine
|
||||
def handle_pingresp(self, pingresp: PingRespPacket):
|
||||
self._pingresp_waiter.set_result(pingresp)
|
||||
yield from self._pingresp_queue.put(pingresp)
|
||||
|
|
Ładowanie…
Reference in New Issue