diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 2b7457e..150afa5 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -15,6 +15,7 @@ from hbmqtt.mqtt.connect import ConnectPacket from hbmqtt.mqtt.connack import * from hbmqtt.errors import HBMQTTException from hbmqtt.utils import format_client_message, gen_client_id +from hbmqtt.mqtt.packet import PUBLISH from hbmqtt.codecs import int_to_bytes_str from hbmqtt.adapters import ( StreamReaderAdapter, @@ -35,7 +36,10 @@ STAT_BYTES_SENT = 'bytes_sent' STAT_BYTES_RECEIVED = 'bytes_received' STAT_MSG_SENT = 'messages_sent' STAT_MSG_RECEIVED = 'messages_received' +STAT_PUBLISH_SENT = 'publish_sent' +STAT_PUBLISH_RECEIVED = 'publish_received' STAT_UPTIME = 'uptime' +STAT_CLIENTS_MAXIMUM = 'clients_maximum' class BrokerException(BaseException): pass @@ -274,7 +278,9 @@ class Broker: STAT_BYTES_SENT, STAT_MSG_RECEIVED, STAT_MSG_SENT, - 'clients_connected'): + STAT_CLIENTS_MAXIMUM, + STAT_PUBLISH_RECEIVED, + STAT_PUBLISH_SENT): self._stats[stat] = 0 self._stats[STAT_UPTIME] = datetime.now() @@ -296,7 +302,21 @@ class Broker: # Update stats uptime = datetime.now() - self._stats[STAT_UPTIME] client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected') + if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]: + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected') + inflight_in = 0 + inflight_out = 0 + messages_stored = 0 + for k, session in self._sessions.items(): + inflight_in += session.inflight_in_count + inflight_out += session.inflight_out_count + messages_stored += session.retained_messages_count + messages_stored += len(self._global_retained_messages) + subscriptions_count = 0 + for topic in self._subscriptions: + subscriptions_count += len(self._subscriptions[topic]) + # Broadcast updates tasks = [ self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])), @@ -308,6 +328,16 @@ class Broker: self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')), self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)), self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)), + self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])), + self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)), + self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)), + self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)), + self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)), + self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)), + self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])), + self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])), + self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))), + self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)), ] # Wait until broadcasting tasks end @@ -627,7 +657,6 @@ class Broker: self.logger.debug("Broadcasting message from %s on topic %s" % (format_client_message(session=source_session), topic) ) - self.logger.debug("Current subscriptions: %s" % repr(self._subscriptions)) publish_tasks = [] try: for k_filter in self._subscriptions: @@ -728,9 +757,13 @@ class Broker: packet_size = packet.bytes_length self._stats[STAT_BYTES_RECEIVED] += packet_size self._stats[STAT_MSG_RECEIVED] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_RECEIVED] += 1 def sys_handle_packet_sent(self, packet): if packet: packet_size = packet.bytes_length self._stats[STAT_BYTES_SENT] += packet_size self._stats[STAT_MSG_SENT] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_SENT] += 1