kopia lustrzana https://github.com/Yakifo/amqtt
Add packet sent/received signals
rodzic
1fc3d5094d
commit
c8705a059f
|
@ -2,6 +2,7 @@
|
||||||
#
|
#
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
import logging
|
import logging
|
||||||
|
from blinker import Signal
|
||||||
from hbmqtt.mqtt import packet_class
|
from hbmqtt.mqtt import packet_class
|
||||||
from hbmqtt.mqtt.packet import *
|
from hbmqtt.mqtt.packet import *
|
||||||
from hbmqtt.mqtt.connack import ConnackPacket
|
from hbmqtt.mqtt.connack import ConnackPacket
|
||||||
|
@ -29,6 +30,9 @@ class ProtocolHandler:
|
||||||
Class implementing the MQTT communication protocol using asyncio features
|
Class implementing the MQTT communication protocol using asyncio features
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
on_packet_sent = Signal()
|
||||||
|
on_packet_received = Signal()
|
||||||
|
|
||||||
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
|
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.session = None
|
self.session = None
|
||||||
|
@ -151,6 +155,7 @@ class ProtocolHandler:
|
||||||
cls = packet_class(fixed_header)
|
cls = packet_class(fixed_header)
|
||||||
packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header)
|
packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header)
|
||||||
self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet)))
|
self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet)))
|
||||||
|
self._loop.call_soon(self.on_packet_received.send, packet)
|
||||||
|
|
||||||
task = None
|
task = None
|
||||||
if packet.fixed_header.packet_type == CONNACK:
|
if packet.fixed_header.packet_type == CONNACK:
|
||||||
|
@ -214,9 +219,9 @@ class ProtocolHandler:
|
||||||
if not isinstance(packet, MQTTPacket):
|
if not isinstance(packet, MQTTPacket):
|
||||||
self.logger.debug("%s Writer interruption" % self.session.client_id)
|
self.logger.debug("%s Writer interruption" % self.session.client_id)
|
||||||
break
|
break
|
||||||
yield from packet.to_stream(self.writer)
|
|
||||||
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
|
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
|
||||||
yield from self.writer.drain()
|
yield from packet.to_stream(self.writer)
|
||||||
|
self._loop.call_soon(self.on_packet_sent.send, packet)
|
||||||
except asyncio.TimeoutError as ce:
|
except asyncio.TimeoutError as ce:
|
||||||
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
||||||
if self._running:
|
if self._running:
|
||||||
|
@ -237,6 +242,7 @@ class ProtocolHandler:
|
||||||
break
|
break
|
||||||
yield from packet.to_stream(self.session.writer)
|
yield from packet.to_stream(self.session.writer)
|
||||||
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
|
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
|
||||||
|
self._loop.call_soon(self.on_packet_sent, packet)
|
||||||
except asyncio.QueueEmpty:
|
except asyncio.QueueEmpty:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
Ładowanie…
Reference in New Issue