diff --git a/amqtt/plugins/sys/broker.py b/amqtt/plugins/sys/broker.py index b0ba07f..1ce7abe 100644 --- a/amqtt/plugins/sys/broker.py +++ b/amqtt/plugins/sys/broker.py @@ -42,9 +42,8 @@ STAT_START_TIME = "start_time" STAT_CLIENTS_MAXIMUM = "clients_maximum" STAT_CLIENTS_CONNECTED = "clients_connected" STAT_CLIENTS_DISCONNECTED = "clients_disconnected" -MEMORY_USAGE_MAXIMUM = "memory_maximum" -CPU_USAGE_MAXIMUM = "cpu_usage_maximum" -CPU_USAGE_LAST = "cpu_usage_last" +STAT_MEMORY_USAGE_MAXIMUM = "memory_maximum" +STAT_CPU_USAGE_MAXIMUM = "cpu_usage_maximum" PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] @@ -75,17 +74,17 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): def _clear_stats(self) -> None: """Initialize broker statistics data structures.""" for stat in ( - STAT_BYTES_RECEIVED, - STAT_BYTES_SENT, - STAT_MSG_RECEIVED, - STAT_MSG_SENT, - STAT_CLIENTS_MAXIMUM, - STAT_CLIENTS_CONNECTED, - STAT_CLIENTS_DISCONNECTED, - STAT_PUBLISH_RECEIVED, - STAT_PUBLISH_SENT, - MEMORY_USAGE_MAXIMUM, - CPU_USAGE_MAXIMUM + STAT_BYTES_RECEIVED, + STAT_BYTES_SENT, + STAT_MSG_RECEIVED, + STAT_MSG_SENT, + STAT_CLIENTS_MAXIMUM, + STAT_CLIENTS_CONNECTED, + STAT_CLIENTS_DISCONNECTED, + STAT_PUBLISH_RECEIVED, + STAT_PUBLISH_SENT, + STAT_MEMORY_USAGE_MAXIMUM, + STAT_CPU_USAGE_MAXIMUM ): self._stats[stat] = 0 @@ -149,14 +148,15 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): messages_stored += session.retained_messages_count messages_stored += len(self.context.retained_messages) subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values()) - self._stats[STAT_CLIENTS_MAXIMUM] = client_connected + + self._stats[STAT_CLIENTS_MAXIMUM] = max(self._stats[STAT_CLIENTS_MAXIMUM], client_connected) cpu_usage = self._current_process.cpu_percent(interval=0) - self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage) + self._stats[STAT_CPU_USAGE_MAXIMUM] = max(self._stats[STAT_CPU_USAGE_MAXIMUM], cpu_usage) mem_info_usage = self._current_process.memory_full_info() mem_size = mem_info_usage.rss / (1024 ** 2) - self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size) + self._stats[STAT_MEMORY_USAGE_MAXIMUM] = max(self._stats[STAT_MEMORY_USAGE_MAXIMUM], mem_size) # Broadcast updates tasks: deque[asyncio.Task[None]] = deque() @@ -181,9 +181,9 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): "messages/retained/count": len(self.context.retained_messages), "messages/subscriptions/count": subscriptions_count, "heap/size": mem_size, - "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM], + "heap/maximum": self._stats[STAT_MEMORY_USAGE_MAXIMUM], "cpu/percent": cpu_usage, - "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM], + "cpu/maximum": self._stats[STAT_CPU_USAGE_MAXIMUM], } for stat_name, stat_value in stats.items(): data: bytes = val_to_bytes_str(stat_value)