amqtt/hbmqtt/plugins/sys/broker.py

250 wiersze
8.6 KiB
Python

# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from datetime import datetime
from collections import deque
import asyncio
import hbmqtt
from hbmqtt.mqtt.packet import PUBLISH
from hbmqtt.codecs import int_to_bytes_str
DOLLAR_SYS_ROOT = "$SYS/broker/"
STAT_BYTES_SENT = "bytes_sent"
STAT_BYTES_RECEIVED = "bytes_received"
STAT_MSG_SENT = "messages_sent"
STAT_MSG_RECEIVED = "messages_received"
STAT_PUBLISH_SENT = "publish_sent"
STAT_PUBLISH_RECEIVED = "publish_received"
STAT_START_TIME = "start_time"
STAT_CLIENTS_MAXIMUM = "clients_maximum"
STAT_CLIENTS_CONNECTED = "clients_connected"
STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
class BrokerSysPlugin:
def __init__(self, context):
self.context = context
# Broker statistics initialization
self._stats = dict()
self._sys_handle = None
def _clear_stats(self):
"""
Initializes broker statistics data structures
"""
for stat in (
STAT_BYTES_RECEIVED,
STAT_BYTES_SENT,
STAT_MSG_RECEIVED,
STAT_MSG_SENT,
STAT_CLIENTS_MAXIMUM,
STAT_CLIENTS_CONNECTED,
STAT_CLIENTS_DISCONNECTED,
STAT_PUBLISH_RECEIVED,
STAT_PUBLISH_SENT,
):
self._stats[stat] = 0
async def _broadcast_sys_topic(self, topic_basename, data):
return await self.context.broadcast_message(topic_basename, data)
def schedule_broadcast_sys_topic(self, topic_basename, data):
return asyncio.ensure_future(
self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
loop=self.context.loop,
)
async def on_broker_pre_start(self, *args, **kwargs):
self._clear_stats()
async def on_broker_post_start(self, *args, **kwargs):
self._stats[STAT_START_TIME] = datetime.now()
version = f"HBMQTT version {hbmqtt.__version__}"
self.context.retain_message(DOLLAR_SYS_ROOT + "version", version.encode())
# Start $SYS topics management
try:
sys_interval = int(self.context.config.get("sys_interval", 0))
if sys_interval > 0:
self.context.logger.debug(
"Setup $SYS broadcasting every %d seconds" % sys_interval
)
self.sys_handle = self.context.loop.call_later(
sys_interval, self.broadcast_dollar_sys_topics
)
else:
self.context.logger.debug("$SYS disabled")
except KeyError:
pass
# 'sys_internal' config parameter not found
async def on_broker_pre_stop(self, *args, **kwargs):
# Stop $SYS topics broadcasting
if self.sys_handle:
self.sys_handle.cancel()
def broadcast_dollar_sys_topics(self):
"""
Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config
parameter.
"""
# Update stats
uptime = datetime.now() - self._stats[STAT_START_TIME]
client_connected = self._stats[STAT_CLIENTS_CONNECTED]
client_disconnected = self._stats[STAT_CLIENTS_DISCONNECTED]
inflight_in = 0
inflight_out = 0
messages_stored = 0
for session in self.context.sessions:
inflight_in += session.inflight_in_count
inflight_out += session.inflight_out_count
messages_stored += session.retained_messages_count
messages_stored += len(self.context.retained_messages)
subscriptions_count = 0
for topic in self.context.subscriptions:
subscriptions_count += len(self.context.subscriptions[topic])
# Broadcast updates
tasks = deque()
tasks.append(
self.schedule_broadcast_sys_topic(
"load/bytes/received",
int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED]),
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"load/bytes/sent", int_to_bytes_str(self._stats[STAT_BYTES_SENT])
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/received", int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/sent", int_to_bytes_str(self._stats[STAT_MSG_SENT])
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"time", str(datetime.now()).encode("utf-8")
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"uptime", int_to_bytes_str(int(uptime.total_seconds()))
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"uptime/formated", str(uptime).encode("utf-8")
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"clients/connected", int_to_bytes_str(client_connected)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"clients/disconnected", int_to_bytes_str(client_disconnected)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"clients/maximum", int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"clients/total",
int_to_bytes_str(client_connected + client_disconnected),
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/inflight", int_to_bytes_str(inflight_in + inflight_out)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/inflight/in", int_to_bytes_str(inflight_in)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/inflight/out", int_to_bytes_str(inflight_out)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/inflight/stored", int_to_bytes_str(messages_stored)
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/publish/received",
int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED]),
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/publish/sent",
int_to_bytes_str(self._stats[STAT_PUBLISH_SENT]),
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/retained/count",
int_to_bytes_str(len(self.context.retained_messages)),
)
)
tasks.append(
self.schedule_broadcast_sys_topic(
"messages/subscriptions/count", int_to_bytes_str(subscriptions_count)
)
)
# Wait until broadcasting tasks end
while tasks and tasks[0].done():
tasks.popleft()
# Reschedule
sys_interval = int(self.context.config["sys_interval"])
self.context.logger.debug("Broadcasting $SYS topics")
self.sys_handle = self.context.loop.call_later(
sys_interval, self.broadcast_dollar_sys_topics
)
async def on_mqtt_packet_received(self, *args, **kwargs):
packet = kwargs.get("packet")
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_RECEIVED] += packet_size
self._stats[STAT_MSG_RECEIVED] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_RECEIVED] += 1
async def on_mqtt_packet_sent(self, *args, **kwargs):
packet = kwargs.get("packet")
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_SENT] += packet_size
self._stats[STAT_MSG_SENT] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_SENT] += 1
async def on_broker_client_connected(self, *args, **kwargs):
self._stats[STAT_CLIENTS_CONNECTED] += 1
self._stats[STAT_CLIENTS_MAXIMUM] = max(
self._stats[STAT_CLIENTS_MAXIMUM], self._stats[STAT_CLIENTS_CONNECTED]
)
async def on_broker_client_disconnected(self, *args, **kwargs):
self._stats[STAT_CLIENTS_CONNECTED] -= 1
self._stats[STAT_CLIENTS_DISCONNECTED] += 1