kopia lustrzana https://github.com/Yakifo/amqtt
Implement basic $SYS tree
rodzic
83ab0c318a
commit
4d5074aced
|
@ -15,6 +15,7 @@ from hbmqtt.mqtt.connect import ConnectPacket
|
|||
from hbmqtt.mqtt.connack import *
|
||||
from hbmqtt.errors import HBMQTTException
|
||||
from hbmqtt.utils import format_client_message, gen_client_id
|
||||
from hbmqtt.mqtt.packet import PUBLISH
|
||||
from hbmqtt.codecs import int_to_bytes_str
|
||||
from hbmqtt.adapters import (
|
||||
StreamReaderAdapter,
|
||||
|
@ -35,7 +36,10 @@ STAT_BYTES_SENT = 'bytes_sent'
|
|||
STAT_BYTES_RECEIVED = 'bytes_received'
|
||||
STAT_MSG_SENT = 'messages_sent'
|
||||
STAT_MSG_RECEIVED = 'messages_received'
|
||||
STAT_PUBLISH_SENT = 'publish_sent'
|
||||
STAT_PUBLISH_RECEIVED = 'publish_received'
|
||||
STAT_UPTIME = 'uptime'
|
||||
STAT_CLIENTS_MAXIMUM = 'clients_maximum'
|
||||
|
||||
class BrokerException(BaseException):
|
||||
pass
|
||||
|
@ -274,7 +278,9 @@ class Broker:
|
|||
STAT_BYTES_SENT,
|
||||
STAT_MSG_RECEIVED,
|
||||
STAT_MSG_SENT,
|
||||
'clients_connected'):
|
||||
STAT_CLIENTS_MAXIMUM,
|
||||
STAT_PUBLISH_RECEIVED,
|
||||
STAT_PUBLISH_SENT):
|
||||
self._stats[stat] = 0
|
||||
self._stats[STAT_UPTIME] = datetime.now()
|
||||
|
||||
|
@ -296,7 +302,21 @@ class Broker:
|
|||
# Update stats
|
||||
uptime = datetime.now() - self._stats[STAT_UPTIME]
|
||||
client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected')
|
||||
if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]:
|
||||
self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
|
||||
client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected')
|
||||
inflight_in = 0
|
||||
inflight_out = 0
|
||||
messages_stored = 0
|
||||
for k, session in self._sessions.items():
|
||||
inflight_in += session.inflight_in_count
|
||||
inflight_out += session.inflight_out_count
|
||||
messages_stored += session.retained_messages_count
|
||||
messages_stored += len(self._global_retained_messages)
|
||||
subscriptions_count = 0
|
||||
for topic in self._subscriptions:
|
||||
subscriptions_count += len(self._subscriptions[topic])
|
||||
|
||||
# Broadcast updates
|
||||
tasks = [
|
||||
self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])),
|
||||
|
@ -308,6 +328,16 @@ class Broker:
|
|||
self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')),
|
||||
self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)),
|
||||
self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)),
|
||||
self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])),
|
||||
self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)),
|
||||
self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)),
|
||||
self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)),
|
||||
self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)),
|
||||
self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)),
|
||||
self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])),
|
||||
self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])),
|
||||
self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))),
|
||||
self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)),
|
||||
]
|
||||
|
||||
# Wait until broadcasting tasks end
|
||||
|
@ -627,7 +657,6 @@ class Broker:
|
|||
self.logger.debug("Broadcasting message from %s on topic %s" %
|
||||
(format_client_message(session=source_session), topic)
|
||||
)
|
||||
self.logger.debug("Current subscriptions: %s" % repr(self._subscriptions))
|
||||
publish_tasks = []
|
||||
try:
|
||||
for k_filter in self._subscriptions:
|
||||
|
@ -728,9 +757,13 @@ class Broker:
|
|||
packet_size = packet.bytes_length
|
||||
self._stats[STAT_BYTES_RECEIVED] += packet_size
|
||||
self._stats[STAT_MSG_RECEIVED] += 1
|
||||
if packet.fixed_header.packet_type == PUBLISH:
|
||||
self._stats[STAT_PUBLISH_RECEIVED] += 1
|
||||
|
||||
def sys_handle_packet_sent(self, packet):
|
||||
if packet:
|
||||
packet_size = packet.bytes_length
|
||||
self._stats[STAT_BYTES_SENT] += packet_size
|
||||
self._stats[STAT_MSG_SENT] += 1
|
||||
if packet.fixed_header.packet_type == PUBLISH:
|
||||
self._stats[STAT_PUBLISH_SENT] += 1
|
||||
|
|
Ładowanie…
Reference in New Issue