diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index ea79b3d..9e0b004 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -2,6 +2,7 @@ # # See the file license.txt for copying permission. import logging +from blinker import Signal from hbmqtt.mqtt import packet_class from hbmqtt.mqtt.packet import * from hbmqtt.mqtt.connack import ConnackPacket @@ -29,6 +30,9 @@ class ProtocolHandler: Class implementing the MQTT communication protocol using asyncio features """ + on_packet_sent = Signal() + on_packet_received = Signal() + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): self.logger = logging.getLogger(__name__) self.session = None @@ -151,6 +155,7 @@ class ProtocolHandler: cls = packet_class(fixed_header) packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header) self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_received.send, packet) task = None if packet.fixed_header.packet_type == CONNACK: @@ -214,9 +219,9 @@ class ProtocolHandler: if not isinstance(packet, MQTTPacket): self.logger.debug("%s Writer interruption" % self.session.client_id) break - yield from packet.to_stream(self.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) - yield from self.writer.drain() + yield from packet.to_stream(self.writer) + self._loop.call_soon(self.on_packet_sent.send, packet) except asyncio.TimeoutError as ce: self.logger.debug("%s Output queue get timeout" % self.session.client_id) if self._running: @@ -237,6 +242,7 @@ class ProtocolHandler: break yield from packet.to_stream(self.session.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_sent, packet) except asyncio.QueueEmpty: break except Exception as e: