kopia lustrzana https://github.com/Yakifo/amqtt
Add packet logger plugin
rodzic
913dd77134
commit
18cba2f53d
|
@ -11,6 +11,7 @@ from functools import partial
|
||||||
from transitions import Machine, MachineError
|
from transitions import Machine, MachineError
|
||||||
from hbmqtt.session import Session
|
from hbmqtt.session import Session
|
||||||
from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
||||||
|
from hbmqtt.mqtt.protocol.handler import EVENT_MQTT_PACKET_RECEIVED, EVENT_MQTT_PACKET_SENT
|
||||||
from hbmqtt.mqtt.connect import ConnectPacket
|
from hbmqtt.mqtt.connect import ConnectPacket
|
||||||
from hbmqtt.mqtt.connack import *
|
from hbmqtt.mqtt.connack import *
|
||||||
from hbmqtt.errors import HBMQTTException
|
from hbmqtt.errors import HBMQTTException
|
||||||
|
@ -47,6 +48,7 @@ EVENT_BROKER_POST_START = 'broker_post_start'
|
||||||
EVENT_BROKER_PRE_SHUTDOWN = 'broker_pre_shutdown'
|
EVENT_BROKER_PRE_SHUTDOWN = 'broker_pre_shutdown'
|
||||||
EVENT_BROKER_POST_SHUTDOWN = 'broker_post_shutdown'
|
EVENT_BROKER_POST_SHUTDOWN = 'broker_post_shutdown'
|
||||||
|
|
||||||
|
|
||||||
class BrokerException(BaseException):
|
class BrokerException(BaseException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -399,7 +401,7 @@ class Broker:
|
||||||
connect = None
|
connect = None
|
||||||
try:
|
try:
|
||||||
connect = yield from ConnectPacket.from_stream(reader)
|
connect = yield from ConnectPacket.from_stream(reader)
|
||||||
self.logger.debug(" <-in-- " + repr(connect))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connect)
|
||||||
self.check_connect(connect)
|
self.check_connect(connect)
|
||||||
except HBMQTTException as exc:
|
except HBMQTTException as exc:
|
||||||
self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
|
self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
|
||||||
|
@ -437,9 +439,8 @@ class Broker:
|
||||||
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' %
|
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' %
|
||||||
format_client_message(address=remote_address, port=remote_port))
|
format_client_message(address=remote_address, port=remote_port))
|
||||||
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
||||||
self.logger.debug(" -out-> " + repr(connack))
|
|
||||||
if connack is not None:
|
if connack is not None:
|
||||||
self.logger.debug(" -out-> " + repr(connack))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack)
|
||||||
yield from connack.to_stream(writer)
|
yield from connack.to_stream(writer)
|
||||||
yield from writer.close()
|
yield from writer.close()
|
||||||
return
|
return
|
||||||
|
@ -494,12 +495,12 @@ class Broker:
|
||||||
if self.authenticate(client_session):
|
if self.authenticate(client_session):
|
||||||
connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED)
|
connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED)
|
||||||
self.logger.info('%s : connection accepted' % format_client_message(session=client_session))
|
self.logger.info('%s : connection accepted' % format_client_message(session=client_session))
|
||||||
self.logger.debug(" -out-> " + repr(connack))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack)
|
||||||
yield from connack.to_stream(writer)
|
yield from connack.to_stream(writer)
|
||||||
else:
|
else:
|
||||||
connack = ConnackPacket.build(client_session.parent, NOT_AUTHORIZED)
|
connack = ConnackPacket.build(client_session.parent, NOT_AUTHORIZED)
|
||||||
self.logger.info('%s : connection refused' % format_client_message(session=client_session))
|
self.logger.info('%s : connection refused' % format_client_message(session=client_session))
|
||||||
self.logger.debug(" -out-> " + repr(connack))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connack)
|
||||||
yield from connack.to_stream(writer)
|
yield from connack.to_stream(writer)
|
||||||
yield from writer.close()
|
yield from writer.close()
|
||||||
return
|
return
|
||||||
|
@ -587,7 +588,7 @@ class Broker:
|
||||||
Create a BrokerProtocolHandler and attach to a session
|
Create a BrokerProtocolHandler and attach to a session
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
handler = BrokerProtocolHandler(reader, writer, self._loop)
|
handler = BrokerProtocolHandler(reader, writer, self.plugins_manager, self._loop)
|
||||||
handler.attach_to_session(session)
|
handler.attach_to_session(session)
|
||||||
handler.on_packet_received.connect(self.sys_handle_packet_received)
|
handler.on_packet_received.connect(self.sys_handle_packet_received)
|
||||||
handler.on_packet_sent.connect(self.sys_handle_packet_sent)
|
handler.on_packet_sent.connect(self.sys_handle_packet_sent)
|
||||||
|
|
|
@ -13,6 +13,8 @@ from hbmqtt.mqtt.connack import *
|
||||||
from hbmqtt.mqtt.connect import *
|
from hbmqtt.mqtt.connect import *
|
||||||
from hbmqtt.mqtt.protocol.client_handler import ClientProtocolHandler
|
from hbmqtt.mqtt.protocol.client_handler import ClientProtocolHandler
|
||||||
from hbmqtt.adapters import StreamReaderAdapter, StreamWriterAdapter, WebSocketsReader, WebSocketsWriter
|
from hbmqtt.adapters import StreamReaderAdapter, StreamWriterAdapter, WebSocketsReader, WebSocketsWriter
|
||||||
|
from hbmqtt.plugins.manager import PluginManager, BaseContext
|
||||||
|
from hbmqtt.mqtt.protocol.handler import EVENT_MQTT_PACKET_SENT, EVENT_MQTT_PACKET_RECEIVED
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
_defaults = {
|
_defaults = {
|
||||||
|
@ -31,6 +33,15 @@ class ConnectException(ClientException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ClientContext(BaseContext):
|
||||||
|
"""
|
||||||
|
ClientContext is used as the context passed to plugins interacting with the client.
|
||||||
|
It act as an adapter to client services from plugins
|
||||||
|
"""
|
||||||
|
def __init__(self, loop=None):
|
||||||
|
super().__init__(loop)
|
||||||
|
|
||||||
|
|
||||||
class MQTTClient:
|
class MQTTClient:
|
||||||
def __init__(self, client_id=None, config=None, loop=None):
|
def __init__(self, client_id=None, config=None, loop=None):
|
||||||
"""
|
"""
|
||||||
|
@ -77,6 +88,9 @@ class MQTTClient:
|
||||||
self._disconnect_task = None
|
self._disconnect_task = None
|
||||||
self._connection_closed_future = None
|
self._connection_closed_future = None
|
||||||
|
|
||||||
|
# Init plugins manager
|
||||||
|
self.plugins_manager = PluginManager('hbmqtt.client', ClientContext(self._loop), self._loop)
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def connect(self,
|
def connect(self,
|
||||||
|
@ -239,10 +253,10 @@ class MQTTClient:
|
||||||
try :
|
try :
|
||||||
connect_packet = self.build_connect_packet()
|
connect_packet = self.build_connect_packet()
|
||||||
yield from connect_packet.to_stream(writer)
|
yield from connect_packet.to_stream(writer)
|
||||||
self.logger.debug(" -out-> " + repr(connect_packet))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=connect_packet)
|
||||||
|
|
||||||
connack = yield from ConnackPacket.from_stream(reader)
|
connack = yield from ConnackPacket.from_stream(reader)
|
||||||
self.logger.debug(" <-in-- " + repr(connack))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_RECEIVED, packet=connack)
|
||||||
return_code = connack.variable_header.return_code
|
return_code = connack.variable_header.return_code
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("connection failed: %s" % e)
|
self.logger.warn("connection failed: %s" % e)
|
||||||
|
@ -258,7 +272,7 @@ class MQTTClient:
|
||||||
raise exc
|
raise exc
|
||||||
else:
|
else:
|
||||||
# Handle MQTT protocol
|
# Handle MQTT protocol
|
||||||
self._handler = ClientProtocolHandler(reader, writer, loop=self._loop)
|
self._handler = ClientProtocolHandler(reader, writer, self.plugins_manager, loop=self._loop)
|
||||||
self._handler.attach_to_session(self.session)
|
self._handler.attach_to_session(self.session)
|
||||||
yield from self._handler.start()
|
yield from self._handler.start()
|
||||||
self.session.transitions.connect()
|
self.session.transitions.connect()
|
||||||
|
|
|
@ -13,11 +13,12 @@ from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
|
||||||
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
||||||
from hbmqtt.utils import format_client_message
|
from hbmqtt.utils import format_client_message
|
||||||
from hbmqtt.adapters import WriterAdapter, ReaderAdapter
|
from hbmqtt.adapters import WriterAdapter, ReaderAdapter
|
||||||
|
from hbmqtt.plugins.manager import PluginManager
|
||||||
|
|
||||||
|
|
||||||
class BrokerProtocolHandler(ProtocolHandler):
|
class BrokerProtocolHandler(ProtocolHandler):
|
||||||
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
|
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None):
|
||||||
super().__init__(reader, writer, loop)
|
super().__init__(reader, writer, plugins_manager, loop)
|
||||||
self._disconnect_waiter = None
|
self._disconnect_waiter = None
|
||||||
self._pending_subscriptions = asyncio.Queue()
|
self._pending_subscriptions = asyncio.Queue()
|
||||||
self._pending_unsubscriptions = asyncio.Queue()
|
self._pending_unsubscriptions = asyncio.Queue()
|
||||||
|
|
|
@ -14,11 +14,12 @@ from hbmqtt.mqtt.suback import SubackPacket
|
||||||
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
|
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
|
||||||
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
||||||
from hbmqtt.adapters import ReaderAdapter, WriterAdapter
|
from hbmqtt.adapters import ReaderAdapter, WriterAdapter
|
||||||
|
from hbmqtt.plugins.manager import PluginManager
|
||||||
|
|
||||||
|
|
||||||
class ClientProtocolHandler(ProtocolHandler):
|
class ClientProtocolHandler(ProtocolHandler):
|
||||||
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
|
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None):
|
||||||
super().__init__(reader, writer, loop)
|
super().__init__(reader, writer, plugins_manager, loop=loop)
|
||||||
self._ping_task = None
|
self._ping_task = None
|
||||||
self._pingresp_queue = asyncio.Queue()
|
self._pingresp_queue = asyncio.Queue()
|
||||||
self._subscriptions_waiter = dict()
|
self._subscriptions_waiter = dict()
|
||||||
|
|
|
@ -23,6 +23,11 @@ from hbmqtt.adapters import ReaderAdapter, WriterAdapter
|
||||||
from hbmqtt.session import Session
|
from hbmqtt.session import Session
|
||||||
from hbmqtt.specs import *
|
from hbmqtt.specs import *
|
||||||
from hbmqtt.mqtt.protocol.inflight import *
|
from hbmqtt.mqtt.protocol.inflight import *
|
||||||
|
from hbmqtt.plugins.manager import PluginManager
|
||||||
|
|
||||||
|
|
||||||
|
EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent'
|
||||||
|
EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received'
|
||||||
|
|
||||||
|
|
||||||
class ProtocolHandler:
|
class ProtocolHandler:
|
||||||
|
@ -33,11 +38,12 @@ class ProtocolHandler:
|
||||||
on_packet_sent = Signal()
|
on_packet_sent = Signal()
|
||||||
on_packet_received = Signal()
|
on_packet_received = Signal()
|
||||||
|
|
||||||
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
|
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, plugins_manager: PluginManager, loop=None):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.session = None
|
self.session = None
|
||||||
self.reader = reader
|
self.reader = reader
|
||||||
self.writer = writer
|
self.writer = writer
|
||||||
|
self.plugins_manager = plugins_manager
|
||||||
if loop is None:
|
if loop is None:
|
||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
else:
|
else:
|
||||||
|
@ -153,7 +159,8 @@ class ProtocolHandler:
|
||||||
else:
|
else:
|
||||||
cls = packet_class(fixed_header)
|
cls = packet_class(fixed_header)
|
||||||
packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header)
|
packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header)
|
||||||
self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet)))
|
yield from self.plugins_manager.fire_event(
|
||||||
|
EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session)
|
||||||
self._loop.call_soon(self.on_packet_received.send, packet)
|
self._loop.call_soon(self.on_packet_received.send, packet)
|
||||||
|
|
||||||
task = None
|
task = None
|
||||||
|
@ -219,7 +226,7 @@ class ProtocolHandler:
|
||||||
self.logger.debug("%s Writer interruption" % self.session.client_id)
|
self.logger.debug("%s Writer interruption" % self.session.client_id)
|
||||||
break
|
break
|
||||||
yield from packet.to_stream(self.writer)
|
yield from packet.to_stream(self.writer)
|
||||||
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
|
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session)
|
||||||
self._loop.call_soon(self.on_packet_sent.send, packet)
|
self._loop.call_soon(self.on_packet_sent.send, packet)
|
||||||
except asyncio.TimeoutError as ce:
|
except asyncio.TimeoutError as ce:
|
||||||
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
|
||||||
|
|
|
@ -3,3 +3,4 @@
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
|
|
||||||
from .event_logger import EventLoggerPlugin
|
from .event_logger import EventLoggerPlugin
|
||||||
|
from .packet_logger import PacketLoggerPlugin
|
|
@ -13,8 +13,8 @@ class EventLoggerPlugin:
|
||||||
self.context = context
|
self.context = context
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def log_event(self, event_name):
|
def log_event(self, *args, **kwargs):
|
||||||
self.logger.info("### '%s' EVENT FIRED ###" % event_name.replace('old', ''))
|
self.logger.info("### '%s' EVENT FIRED ###" % kwargs['event_name'].replace('old', ''))
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
if name.startswith("on_"):
|
if name.startswith("on_"):
|
||||||
|
|
6
setup.py
6
setup.py
|
@ -37,7 +37,11 @@ setup(
|
||||||
'event_plugin = tests.plugins.test_manager:EventTestPlugin',
|
'event_plugin = tests.plugins.test_manager:EventTestPlugin',
|
||||||
],
|
],
|
||||||
'hbmqtt.broker': [
|
'hbmqtt.broker': [
|
||||||
'event_logger_plugin = hbmqtt.plugins:EventLoggerPlugin',
|
# 'event_logger_plugin = hbmqtt.plugins:EventLoggerPlugin',
|
||||||
|
'packet_logger_plugin = hbmqtt.plugins:PacketLoggerPlugin',
|
||||||
|
],
|
||||||
|
'hbmqtt.client': [
|
||||||
|
'packet_logger_plugin = hbmqtt.plugins:PacketLoggerPlugin',
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
)
|
)
|
Ładowanie…
Reference in New Issue