From 7b5145da7205097f64761a24a9fa010534c0d71b Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 13:57:17 +0200 Subject: [PATCH 01/13] Add to_bytes() method on Packet --- hbmqtt/mqtt/packet.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/packet.py b/hbmqtt/mqtt/packet.py index 18ea334..39cbe36 100644 --- a/hbmqtt/mqtt/packet.py +++ b/hbmqtt/mqtt/packet.py @@ -171,7 +171,7 @@ class MQTTPacket: writer.write(self.to_bytes()) yield from writer.drain() - def to_bytes(self): + def to_bytes(self) -> bytes: if self.variable_header: variable_header_bytes = self.variable_header.to_bytes() else: @@ -208,6 +208,10 @@ class MQTTPacket: else: return cls(fixed_header, variable_header, payload) + @property + def bytes_length(self): + return len(self.to_bytes()) + def __repr__(self): return type(self).__name__ + '(fixed={0!r}, variable={1!r}, payload={2!r})'.\ format(self.fixed_header, self.variable_header, self.payload) From 73a739525704583244f284c563f3da8e3bf3bdc9 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 13:57:48 +0200 Subject: [PATCH 02/13] Add $SYS publish on 10s interval --- samples/broker_start.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samples/broker_start.py b/samples/broker_start.py index 1b3151e..4fd204a 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -17,7 +17,8 @@ config = { 'bind': '127.0.0.1:8080', 'type': 'ws' }, - } + }, + 'sys_interval': 10, } broker = Broker(config) From 9adbbdfc3a8fc894eaf2f32ba7a3c080a24a0778 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 13:58:21 +0200 Subject: [PATCH 03/13] Remove unused --- hbmqtt/mqtt/packet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/packet.py b/hbmqtt/mqtt/packet.py index 39cbe36..7d3f0b0 100644 --- a/hbmqtt/mqtt/packet.py +++ b/hbmqtt/mqtt/packet.py @@ -1,7 +1,7 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. -from hbmqtt.errors import CodecException, MQTTException, HBMQTTException +from hbmqtt.errors import CodecException, MQTTException from hbmqtt.codecs import * from hbmqtt.adapters import ReaderAdapter, WriterAdapter import abc From afed8acf7200ad5f8a7a6831f1daef5f61163d26 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 22:01:08 +0200 Subject: [PATCH 04/13] Generate packet ID only for QOS > 0 messages. --- hbmqtt/mqtt/protocol/handler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 4e48994..ea79b3d 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -97,9 +97,12 @@ class ProtocolHandler: @asyncio.coroutine def mqtt_publish(self, topic, message, qos, retain): - packet_id = self.session.next_packet_id - if packet_id in self.session.outgoing_msg: - self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id) + if qos: + packet_id = self.session.next_packet_id + if packet_id in self.session.outgoing_msg: + self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id) + else: + packet_id = None packet = PublishPacket.build(topic, message, packet_id, False, qos, retain) yield from self.outgoing_queue.put(packet) if qos != QOS_0: From 1fc3d5094d23306dad5e472a052d863086a29e7a Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 22:04:17 +0200 Subject: [PATCH 05/13] Fix formatting --- hbmqtt/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hbmqtt/utils.py b/hbmqtt/utils.py index 65ea6cb..728980c 100644 --- a/hbmqtt/utils.py +++ b/hbmqtt/utils.py @@ -16,11 +16,13 @@ def not_in_dict_or_none(dict, key): return False -def format_client_message(session=None, address=None, port=None, id=None): +def format_client_message(session=None, address=None, port=None): if session: return "(client @=%s:%d id=%s)" % (session.remote_address, session.remote_port, session.client_id) + elif address is not None and port is not None: + return "(client @=%s:%d)" % (address, port) else: - return "(client @=%s:%d id=%s)" % (address, port, id) + return "(unknown client)" def gen_client_id(): From c8705a059fc89ecbedfad7d154aedee03b8103d1 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 8 Aug 2015 22:04:55 +0200 Subject: [PATCH 06/13] Add packet sent/received signals --- hbmqtt/mqtt/protocol/handler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index ea79b3d..9e0b004 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -2,6 +2,7 @@ # # See the file license.txt for copying permission. import logging +from blinker import Signal from hbmqtt.mqtt import packet_class from hbmqtt.mqtt.packet import * from hbmqtt.mqtt.connack import ConnackPacket @@ -29,6 +30,9 @@ class ProtocolHandler: Class implementing the MQTT communication protocol using asyncio features """ + on_packet_sent = Signal() + on_packet_received = Signal() + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): self.logger = logging.getLogger(__name__) self.session = None @@ -151,6 +155,7 @@ class ProtocolHandler: cls = packet_class(fixed_header) packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header) self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_received.send, packet) task = None if packet.fixed_header.packet_type == CONNACK: @@ -214,9 +219,9 @@ class ProtocolHandler: if not isinstance(packet, MQTTPacket): self.logger.debug("%s Writer interruption" % self.session.client_id) break - yield from packet.to_stream(self.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) - yield from self.writer.drain() + yield from packet.to_stream(self.writer) + self._loop.call_soon(self.on_packet_sent.send, packet) except asyncio.TimeoutError as ce: self.logger.debug("%s Output queue get timeout" % self.session.client_id) if self._running: @@ -237,6 +242,7 @@ class ProtocolHandler: break yield from packet.to_stream(self.session.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_sent, packet) except asyncio.QueueEmpty: break except Exception as e: From 4a5a702cf8fe3dd13c32fd1d2eb0508e8ed1e42d Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sun, 9 Aug 2015 17:48:47 +0200 Subject: [PATCH 07/13] Clean warnings --- hbmqtt/broker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index afb5643..d591aad 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -282,7 +282,7 @@ class Broker: elif connect.variable_header.password_flag and connect.payload.password is None: self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port))) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.variable_header.clean_session_flag == False and connect.payload.client_id is None: + elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None: self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) @@ -502,9 +502,9 @@ class Broker: # Unsubscribe topic not found in current subscribed topics pass - def matches(self, topic, filter): + def matches(self, topic, a_filter): import re - match_pattern = re.compile(filter.replace('#', '.*').replace('+', '[\s\w\d]+')) + match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+')) if match_pattern.match(topic): return True else: From 986046b80bcb5c20245cfe550f5129f4600cb644 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sun, 9 Aug 2015 22:58:09 +0200 Subject: [PATCH 08/13] Fix clean_session init --- hbmqtt/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 7022904..3082d5d 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -308,9 +308,9 @@ class MQTTClient: s.capath = broker_conf['capath'] s.cadata = broker_conf['cadata'] if cleansession is not None: - s.cleansession = cleansession + s.clean_session = cleansession else: - s.cleansession = self.config.get('cleansession', True) + s.clean_session = self.config.get('cleansession', True) s.keep_alive = self.config['keep_alive'] - self.config['ping_delay'] if 'will' in self.config: s.will_flag = True From 568bdd9cb31b3e2ee98fc945f3066b9fbdee3130 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sun, 9 Aug 2015 23:00:30 +0200 Subject: [PATCH 09/13] Fix client session initialisation --- hbmqtt/broker.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index d591aad..83de39e 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -296,22 +296,25 @@ class Broker: client_session = None self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag)) self.logger.debug("known sessions={0}".format(self._sessions)) + client_id = connect.payload.client_id if connect.variable_header.clean_session_flag: - client_id = connect.payload.client_id + # Delete existing session and create a new one if client_id is not None: self.delete_session(client_id) client_session = Session() client_session.parent = 0 + client_session.client_id = client_id self._sessions[client_id] = client_session else: # Get session from cache - client_id = connect.payload.client_id if client_id in self._sessions: self.logger.debug("Found old session %s" % repr(self._sessions[client_id])) client_session = self._sessions[client_id] client_session.parent = 1 else: client_session = Session() + client_session.client_id = client_id + self._sessions[client_id] = client_session client_session.parent = 0 if client_session.client_id is None: From 592952f1f05d971df92ada1b6656253e0ed84d45 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sun, 9 Aug 2015 23:07:55 +0200 Subject: [PATCH 10/13] Add first $SYS topics --- hbmqtt/broker.py | 159 +++++++++++++++++++++++++++++++++++++++++------ hbmqtt/codecs.py | 10 +++ 2 files changed, 150 insertions(+), 19 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 83de39e..2b7457e 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -5,6 +5,7 @@ import logging import ssl import websockets import asyncio +from datetime import datetime from functools import partial from transitions import Machine, MachineError @@ -14,6 +15,7 @@ from hbmqtt.mqtt.connect import ConnectPacket from hbmqtt.mqtt.connack import * from hbmqtt.errors import HBMQTTException from hbmqtt.utils import format_client_message, gen_client_id +from hbmqtt.codecs import int_to_bytes_str from hbmqtt.adapters import ( StreamReaderAdapter, StreamWriterAdapter, @@ -28,6 +30,12 @@ _defaults = { 'publish-retry-delay': 5, } +DOLLAR_SYS_ROOT = '$SYS/broker/' +STAT_BYTES_SENT = 'bytes_sent' +STAT_BYTES_RECEIVED = 'bytes_received' +STAT_MSG_SENT = 'messages_sent' +STAT_MSG_RECEIVED = 'messages_received' +STAT_UPTIME = 'uptime' class BrokerException(BaseException): pass @@ -141,6 +149,12 @@ class Broker: self._subscriptions = dict() self._global_retained_messages = dict() + # Broker statistics initialization + self._stats = dict() + + # $SYS tree task handle + self.sys_handle = None + def _build_listeners_config(self, broker_config): self.listeners_config = dict() try: @@ -173,12 +187,15 @@ class Broker: self.logger.debug("Invalid method call at this moment: %s" % me) raise BrokerException("Broker instance can't be started: %s" % me) + # Clear broker stats + self._clear_stats() try: + # Start network listeners for listener_name in self.listeners_config: listener = self.listeners_config[listener_name] self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind'])) - #Max connections + # Max connections try: max_connections = listener['max_connections'] except KeyError: @@ -195,22 +212,34 @@ class Broker: raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) except FileNotFoundError as fnfe: raise BrokerException("Can't read cert files '%s' or '%s' : %s" % - (listener['certfile'], listener['keyfile'], fnfe)) + (listener['certfile'], listener['keyfile'], fnfe)) if listener['type'] == 'tcp': address, port = listener['bind'].split(':') cb_partial = partial(self.stream_connected, listener_name=listener_name) instance = yield from asyncio.start_server(cb_partial, - address, - port, - ssl=sc, - loop=self._loop) + address, + port, + ssl=sc, + loop=self._loop) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) elif listener['type'] == 'ws': address, port = listener['bind'].split(':') cb_partial = partial(self.ws_connected, listener_name=listener_name) instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + + # Start $SYS topics management + self._init_dollar_sys() + try: + sys_interval = int(self.config['sys_interval']) + if sys_interval: + self.logger.debug("Setup $SYS broadcasting every %d secondes" % sys_interval) + self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics) + except KeyError: + pass + # 'sys_internal' config parameter not found + self.machine.starting_success() self.logger.debug("Broker started") except Exception as e: @@ -225,6 +254,11 @@ class Broker: except MachineError as me: self.logger.debug("Invalid method call at this moment: %s" % me) raise BrokerException("Broker instance can't be stopped: %s" % me) + + # Stop $SYS topics broadcasting + if self.sys_handle: + self.sys_handle.cancel() + for listener_name in self._servers: server = self._servers[listener_name] yield from server.close_instance() @@ -232,6 +266,64 @@ class Broker: self.logger.info("Broker closed") self.machine.stopping_success() + def _clear_stats(self): + """ + Initializes broker statistics data structures + """ + for stat in (STAT_BYTES_RECEIVED, + STAT_BYTES_SENT, + STAT_MSG_RECEIVED, + STAT_MSG_SENT, + 'clients_connected'): + self._stats[stat] = 0 + self._stats[STAT_UPTIME] = datetime.now() + + def _init_dollar_sys(self): + """ + Initializes and publish $SYS static topics + """ + from hbmqtt.version import get_version + self.sys_handle = None + version = 'HBMQTT version ' + get_version() + self.retain_message(None, DOLLAR_SYS_ROOT + 'version', version.encode()) + + def broadcast_dollar_sys_topics(self): + """ + Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config + parameter. + """ + + # Update stats + uptime = datetime.now() - self._stats[STAT_UPTIME] + client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected') + client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected') + # Broadcast updates + tasks = [ + self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])), + self._broadcast_sys_topic('load/bytes/sent', int_to_bytes_str(self._stats[STAT_BYTES_SENT])), + self._broadcast_sys_topic('messages/received', int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])), + self._broadcast_sys_topic('messages/sent', int_to_bytes_str(self._stats[STAT_MSG_SENT])), + self._broadcast_sys_topic('time', str(datetime.now()).encode('utf-8')), + self._broadcast_sys_topic('uptime', int_to_bytes_str(int(uptime.total_seconds()))), + self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')), + self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)), + self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)), + ] + + # Wait until broadcasting tasks end + if len(tasks) > 0: + asyncio.wait(tasks) + # Reschedule + sys_interval = int(self.config['sys_interval']) + self.logger.debug("Broadcasting $SYS topics") + self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics) + + def _broadcast_sys_topic(self, topic_basename, data): + return self._internal_message_broadcast(DOLLAR_SYS_ROOT + topic_basename, data) + + def _internal_message_broadcast(self, topic, data): + return asyncio.Task(self.broadcast_application_message(None, topic, data)) + @asyncio.coroutine def ws_connected(self, websocket, uri, listener_name): yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket)) @@ -354,8 +446,7 @@ class Broker: return client_session.machine.connect() - handler = BrokerProtocolHandler(reader, writer, self._loop) - handler.attach_to_session(client_session) + handler = self._init_handler(reader, writer, client_session) self.logger.debug("%s Start messages handling" % client_session.client_id) yield from handler.start() self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize()) @@ -376,7 +467,7 @@ class Broker: self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result)) if result is None: self.logger.debug("Will flag: %s" % client_session.will_flag) - #Connection closed anormally, send will message + # Connection closed anormally, send will message if client_session.will_flag: self.logger.debug("Client %s disconnected abnormally, sending will message" % format_client_message(client_session)) @@ -426,17 +517,36 @@ class Broker: wait_deliver.cancel() self.logger.debug("%s Client disconnecting" % client_session.client_id) + yield from self._stop_handler(handler) + client_session.machine.disconnect() + yield from writer.close() + self.logger.debug("%s Session disconnected" % client_session.client_id) + server.release_connection() + + def _init_handler(self, reader, writer, session): + """ + Create a BrokerProtocolHandler and attach to a session + :return: + """ + handler = BrokerProtocolHandler(reader, writer, self._loop) + handler.attach_to_session(session) + handler.on_packet_received.connect(self.sys_handle_packet_received) + handler.on_packet_sent.connect(self.sys_handle_packet_sent) + return handler + + @asyncio.coroutine + def _stop_handler(self, handler): + """ + Stop a running handler and detach if from the session + :param handler: + :return: + """ try: yield from handler.stop() except Exception as e: self.logger.error(e) finally: handler.detach_from_session() - handler = None - client_session.machine.disconnect() - yield from writer.close() - self.logger.debug("%s Session disconnected" % client_session.client_id) - server.release_connection() @asyncio.coroutine def check_connect(self, connect: ConnectPacket): @@ -457,17 +567,16 @@ class Broker: def retain_message(self, source_session, topic_name, data, qos=None): if data is not None and data != b'': # If retained flag set, store the message for further subscriptions - self.logger.debug("%s Retaining message on topic %s" % (source_session.client_id, topic_name)) + self.logger.debug("Retaining message on topic %s" % topic_name) retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos) self._global_retained_messages[topic_name] = retained_message else: # [MQTT-3.3.1-10] - self.logger.debug("%s Clear retained messages for topic '%s'" % (source_session.client_id, topic_name)) + self.logger.debug("Clear retained messages for topic '%s'" % topic_name) del self._global_retained_messages[topic_name] def add_subscription(self, subscription, session): import re - #wildcard_pattern = re.compile('(/.+?\+)|(/\+.+?)|(/.+?\+.+?)') wildcard_pattern = re.compile('.*?/?\+/?.*?') try: a_filter = subscription['filter'] @@ -606,10 +715,22 @@ class Broker: # Delete subscriptions self.logger.debug("deleting session %s subscriptions" % repr(session)) nb_sub = 0 - for filter in self._subscriptions: - self.del_subscription(filter, session) + for a_filter in self._subscriptions: + self.del_subscription(a_filter, session) nb_sub += 1 self.logger.debug("%d subscriptions deleted" % nb_sub) self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id])) del self._sessions[client_id] + + def sys_handle_packet_received(self, packet): + if packet: + packet_size = packet.bytes_length + self._stats[STAT_BYTES_RECEIVED] += packet_size + self._stats[STAT_MSG_RECEIVED] += 1 + + def sys_handle_packet_sent(self, packet): + if packet: + packet_size = packet.bytes_length + self._stats[STAT_BYTES_SENT] += packet_size + self._stats[STAT_MSG_SENT] += 1 diff --git a/hbmqtt/codecs.py b/hbmqtt/codecs.py index b62fa3a..46a0446 100644 --- a/hbmqtt/codecs.py +++ b/hbmqtt/codecs.py @@ -102,3 +102,13 @@ def decode_packet_id(reader) -> int: """ packet_id_bytes = yield from read_or_raise(reader, 2) return bytes_to_int(packet_id_bytes) + + +def int_to_bytes_str(value: int) -> bytes: + """ + Converts a int value to a bytes array containing the numeric character. + Ex: 123 -> b'123' + :param value: int value to convert + :return: bytes array + """ + return str(value).encode('utf-8') From bc248f9fc5d597814af496182e82e488cd5e5d5d Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sun, 9 Aug 2015 23:19:17 +0200 Subject: [PATCH 11/13] Move debug trace to get packet length --- hbmqtt/mqtt/protocol/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 9e0b004..552a5bb 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -219,8 +219,8 @@ class ProtocolHandler: if not isinstance(packet, MQTTPacket): self.logger.debug("%s Writer interruption" % self.session.client_id) break - self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) yield from packet.to_stream(self.writer) + self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) self._loop.call_soon(self.on_packet_sent.send, packet) except asyncio.TimeoutError as ce: self.logger.debug("%s Output queue get timeout" % self.session.client_id) From 83ab0c318a6d7abdf9101b3d89d7917dece80154 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Mon, 10 Aug 2015 21:29:40 +0200 Subject: [PATCH 12/13] Add message count properties --- hbmqtt/session.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hbmqtt/session.py b/hbmqtt/session.py index 9da3acc..17e1c9f 100644 --- a/hbmqtt/session.py +++ b/hbmqtt/session.py @@ -57,5 +57,17 @@ class Session: self._packet_id += 1 return self._packet_id + @property + def inflight_in_count(self): + return len(self.incoming_msg) + + @property + def inflight_out_count(self): + return len(self.outgoing_msg) + + @property + def retained_messages_count(self): + return self.retained_messages.qsize() + def __repr__(self): return type(self).__name__ + '(clientId={0}, state={1})'.format(self.client_id, self.machine.state) From 4d5074aced03029b34bf97fd2e18812d0815b92d Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Mon, 10 Aug 2015 21:30:03 +0200 Subject: [PATCH 13/13] Implement basic $SYS tree --- hbmqtt/broker.py | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 2b7457e..150afa5 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -15,6 +15,7 @@ from hbmqtt.mqtt.connect import ConnectPacket from hbmqtt.mqtt.connack import * from hbmqtt.errors import HBMQTTException from hbmqtt.utils import format_client_message, gen_client_id +from hbmqtt.mqtt.packet import PUBLISH from hbmqtt.codecs import int_to_bytes_str from hbmqtt.adapters import ( StreamReaderAdapter, @@ -35,7 +36,10 @@ STAT_BYTES_SENT = 'bytes_sent' STAT_BYTES_RECEIVED = 'bytes_received' STAT_MSG_SENT = 'messages_sent' STAT_MSG_RECEIVED = 'messages_received' +STAT_PUBLISH_SENT = 'publish_sent' +STAT_PUBLISH_RECEIVED = 'publish_received' STAT_UPTIME = 'uptime' +STAT_CLIENTS_MAXIMUM = 'clients_maximum' class BrokerException(BaseException): pass @@ -274,7 +278,9 @@ class Broker: STAT_BYTES_SENT, STAT_MSG_RECEIVED, STAT_MSG_SENT, - 'clients_connected'): + STAT_CLIENTS_MAXIMUM, + STAT_PUBLISH_RECEIVED, + STAT_PUBLISH_SENT): self._stats[stat] = 0 self._stats[STAT_UPTIME] = datetime.now() @@ -296,7 +302,21 @@ class Broker: # Update stats uptime = datetime.now() - self._stats[STAT_UPTIME] client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected') + if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]: + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected') + inflight_in = 0 + inflight_out = 0 + messages_stored = 0 + for k, session in self._sessions.items(): + inflight_in += session.inflight_in_count + inflight_out += session.inflight_out_count + messages_stored += session.retained_messages_count + messages_stored += len(self._global_retained_messages) + subscriptions_count = 0 + for topic in self._subscriptions: + subscriptions_count += len(self._subscriptions[topic]) + # Broadcast updates tasks = [ self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])), @@ -308,6 +328,16 @@ class Broker: self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')), self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)), self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)), + self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])), + self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)), + self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)), + self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)), + self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)), + self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)), + self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])), + self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])), + self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))), + self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)), ] # Wait until broadcasting tasks end @@ -627,7 +657,6 @@ class Broker: self.logger.debug("Broadcasting message from %s on topic %s" % (format_client_message(session=source_session), topic) ) - self.logger.debug("Current subscriptions: %s" % repr(self._subscriptions)) publish_tasks = [] try: for k_filter in self._subscriptions: @@ -728,9 +757,13 @@ class Broker: packet_size = packet.bytes_length self._stats[STAT_BYTES_RECEIVED] += packet_size self._stats[STAT_MSG_RECEIVED] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_RECEIVED] += 1 def sys_handle_packet_sent(self, packet): if packet: packet_size = packet.bytes_length self._stats[STAT_BYTES_SENT] += packet_size self._stats[STAT_MSG_SENT] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_SENT] += 1