From 18cba2f53d19a472152d03ab72985454d6af19d6 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Mon, 17 Aug 2015 23:15:40 +0200 Subject: [PATCH] Add packet logger plugin --- hbmqtt/broker.py | 13 +++++++------ hbmqtt/client.py | 20 +++++++++++++++++--- hbmqtt/mqtt/protocol/broker_handler.py | 5 +++-- hbmqtt/mqtt/protocol/client_handler.py | 5 +++-- hbmqtt/mqtt/protocol/handler.py | 13 ++++++++++--- hbmqtt/plugins/__init__.py | 3 ++- hbmqtt/plugins/event_logger.py | 6 +++--- setup.py | 6 +++++- 8 files changed, 50 insertions(+), 21 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index e104d32..fb31a3c 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -11,6 +11,7 @@ from functools import partial from transitions import Machine, MachineError from hbmqtt.session import Session from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler +from hbmqtt.mqtt.protocol.handler import EVENT_MQTT_PACKET_RECEIVED, EVENT_MQTT_PACKET_SENT from hbmqtt.mqtt.connect import ConnectPacket from hbmqtt.mqtt.connack import * from hbmqtt.errors import HBMQTTException @@ -47,6 +48,7 @@ EVENT_BROKER_POST_START = 'broker_post_start' EVENT_BROKER_PRE_SHUTDOWN = 'broker_pre_shutdown' EVENT_BROKER_POST_SHUTDOWN = 'broker_post_shutdown' + class BrokerException(BaseException): pass @@ -399,7 +401,7 @@ class Broker: connect = None try: connect = yield from ConnectPacket.from_stream(reader) - self.logger.debug(" <-in-- " + repr(connect)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connect) self.check_connect(connect) except HBMQTTException as exc: self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" % @@ -437,9 +439,8 @@ class Broker: self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) - self.logger.debug(" -out-> " + repr(connack)) if connack is not None: - self.logger.debug(" -out-> " + repr(connack)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack) yield from connack.to_stream(writer) yield from writer.close() return @@ -494,12 +495,12 @@ class Broker: if self.authenticate(client_session): connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED) self.logger.info('%s : connection accepted' % format_client_message(session=client_session)) - self.logger.debug(" -out-> " + repr(connack)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack) yield from connack.to_stream(writer) else: connack = ConnackPacket.build(client_session.parent, NOT_AUTHORIZED) self.logger.info('%s : connection refused' % format_client_message(session=client_session)) - self.logger.debug(" -out-> " + repr(connack)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack) yield from connack.to_stream(writer) yield from writer.close() return @@ -587,7 +588,7 @@ class Broker: Create a BrokerProtocolHandler and attach to a session :return: """ - handler = BrokerProtocolHandler(reader, writer, self._loop) + handler = BrokerProtocolHandler(reader, writer, self.plugins_manager, self._loop) handler.attach_to_session(session) handler.on_packet_received.connect(self.sys_handle_packet_received) handler.on_packet_sent.connect(self.sys_handle_packet_sent) diff --git a/hbmqtt/client.py b/hbmqtt/client.py index e9e2158..fb78873 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -13,6 +13,8 @@ from hbmqtt.mqtt.connack import * from hbmqtt.mqtt.connect import * from hbmqtt.mqtt.protocol.client_handler import ClientProtocolHandler from hbmqtt.adapters import StreamReaderAdapter, StreamWriterAdapter, WebSocketsReader, WebSocketsWriter +from hbmqtt.plugins.manager import PluginManager, BaseContext +from hbmqtt.mqtt.protocol.handler import EVENT_MQTT_PACKET_SENT, EVENT_MQTT_PACKET_RECEIVED import websockets _defaults = { @@ -31,6 +33,15 @@ class ConnectException(ClientException): pass +class ClientContext(BaseContext): + """ + ClientContext is used as the context passed to plugins interacting with the client. + It act as an adapter to client services from plugins + """ + def __init__(self, loop=None): + super().__init__(loop) + + class MQTTClient: def __init__(self, client_id=None, config=None, loop=None): """ @@ -77,6 +88,9 @@ class MQTTClient: self._disconnect_task = None self._connection_closed_future = None + # Init plugins manager + self.plugins_manager = PluginManager('hbmqtt.client', ClientContext(self._loop), self._loop) + @asyncio.coroutine def connect(self, @@ -239,10 +253,10 @@ class MQTTClient: try : connect_packet = self.build_connect_packet() yield from connect_packet.to_stream(writer) - self.logger.debug(" -out-> " + repr(connect_packet)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connect_packet) connack = yield from ConnackPacket.from_stream(reader) - self.logger.debug(" <-in-- " + repr(connack)) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack) return_code = connack.variable_header.return_code except Exception as e: self.logger.warn("connection failed: %s" % e) @@ -258,7 +272,7 @@ class MQTTClient: raise exc else: # Handle MQTT protocol - self._handler = ClientProtocolHandler(reader, writer, loop=self._loop) + self._handler = ClientProtocolHandler(reader, writer, self.plugins_manager, loop=self._loop) self._handler.attach_to_session(self.session) yield from self._handler.start() self.session.transitions.connect() diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index 253e768..664e2f4 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -13,11 +13,12 @@ from hbmqtt.mqtt.unsubscribe import UnsubscribePacket from hbmqtt.mqtt.unsuback import UnsubackPacket from hbmqtt.utils import format_client_message from hbmqtt.adapters import WriterAdapter, ReaderAdapter +from hbmqtt.plugins.manager import PluginManager class BrokerProtocolHandler(ProtocolHandler): - def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): - super().__init__(reader, writer, loop) + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None): + super().__init__(reader, writer, plugins_manager, loop) self._disconnect_waiter = None self._pending_subscriptions = asyncio.Queue() self._pending_unsubscriptions = asyncio.Queue() diff --git a/hbmqtt/mqtt/protocol/client_handler.py b/hbmqtt/mqtt/protocol/client_handler.py index 7bdb5d5..aa7aa78 100644 --- a/hbmqtt/mqtt/protocol/client_handler.py +++ b/hbmqtt/mqtt/protocol/client_handler.py @@ -14,11 +14,12 @@ from hbmqtt.mqtt.suback import SubackPacket from hbmqtt.mqtt.unsubscribe import UnsubscribePacket from hbmqtt.mqtt.unsuback import UnsubackPacket from hbmqtt.adapters import ReaderAdapter, WriterAdapter +from hbmqtt.plugins.manager import PluginManager class ClientProtocolHandler(ProtocolHandler): - def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): - super().__init__(reader, writer, loop) + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None): + super().__init__(reader, writer, plugins_manager, loop=loop) self._ping_task = None self._pingresp_queue = asyncio.Queue() self._subscriptions_waiter = dict() diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 208f358..e4aba14 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -23,6 +23,11 @@ from hbmqtt.adapters import ReaderAdapter, WriterAdapter from hbmqtt.session import Session from hbmqtt.specs import * from hbmqtt.mqtt.protocol.inflight import * +from hbmqtt.plugins.manager import PluginManager + + +EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent' +EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received' class ProtocolHandler: @@ -33,11 +38,12 @@ class ProtocolHandler: on_packet_sent = Signal() on_packet_received = Signal() - def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None): self.logger = logging.getLogger(__name__) self.session = None self.reader = reader self.writer = writer + self.plugins_manager = plugins_manager if loop is None: self._loop = asyncio.get_event_loop() else: @@ -153,7 +159,8 @@ class ProtocolHandler: else: cls = packet_class(fixed_header) packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header) - self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet))) + yield from self.plugins_manager.fire_event( + EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session) self._loop.call_soon(self.on_packet_received.send, packet) task = None @@ -219,7 +226,7 @@ class ProtocolHandler: self.logger.debug("%s Writer interruption" % self.session.client_id) break yield from packet.to_stream(self.writer) - self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) + yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session) self._loop.call_soon(self.on_packet_sent.send, packet) except asyncio.TimeoutError as ce: self.logger.debug("%s Output queue get timeout" % self.session.client_id) diff --git a/hbmqtt/plugins/__init__.py b/hbmqtt/plugins/__init__.py index 395d1de..a8927f1 100644 --- a/hbmqtt/plugins/__init__.py +++ b/hbmqtt/plugins/__init__.py @@ -2,4 +2,5 @@ # # See the file license.txt for copying permission. -from .event_logger import EventLoggerPlugin \ No newline at end of file +from .event_logger import EventLoggerPlugin +from .packet_logger import PacketLoggerPlugin \ No newline at end of file diff --git a/hbmqtt/plugins/event_logger.py b/hbmqtt/plugins/event_logger.py index 39360b0..2046425 100644 --- a/hbmqtt/plugins/event_logger.py +++ b/hbmqtt/plugins/event_logger.py @@ -13,9 +13,9 @@ class EventLoggerPlugin: self.context = context @asyncio.coroutine - def log_event(self, event_name): - self.logger.info("### '%s' EVENT FIRED ###" % event_name.replace('old', '')) + def log_event(self, *args, **kwargs): + self.logger.info("### '%s' EVENT FIRED ###" % kwargs['event_name'].replace('old', '')) def __getattr__(self, name): if name.startswith("on_"): - return partial(self.log_event, event_name=name) \ No newline at end of file + return partial(self.log_event, event_name=name) diff --git a/setup.py b/setup.py index 3f8f576..e60984c 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,11 @@ setup( 'event_plugin = tests.plugins.test_manager:EventTestPlugin', ], 'hbmqtt.broker': [ - 'event_logger_plugin = hbmqtt.plugins:EventLoggerPlugin', +# 'event_logger_plugin = hbmqtt.plugins:EventLoggerPlugin', + 'packet_logger_plugin = hbmqtt.plugins:PacketLoggerPlugin', + ], + 'hbmqtt.client': [ + 'packet_logger_plugin = hbmqtt.plugins:PacketLoggerPlugin', ] } ) \ No newline at end of file