diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index f6e5009..fbda193 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -1,11 +1,11 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. +from typing import Optional, Dict, Union import logging import ssl import websockets import asyncio -import sys import re from asyncio import CancelledError from collections import deque @@ -90,7 +90,7 @@ class Server: % (self.listener_name, self.conn_count) ) - def release_connection(self): + def release_connection(self) -> None: if self.semaphore: self.semaphore.release() self.conn_count -= 1 @@ -117,7 +117,7 @@ class BrokerContext(BaseContext): It act as an adapter to broker services from plugins developed for HBMQTT broker """ - def __init__(self, broker): + def __init__(self, broker: "Broker") -> None: super().__init__() self.config = None self._broker_instance = broker @@ -205,7 +205,7 @@ class Broker: except KeyError as ke: raise BrokerException("Listener config not found invalid: %s" % ke) - def _init_states(self): + def _init_states(self) -> None: self.transitions = Machine(states=Broker.states, initial="new") self.transitions.add_transition(trigger="start", source="new", dest="starting") self.transitions.add_transition( @@ -741,7 +741,13 @@ class Broker: # If all plugins returned True, authentication is success return topic_result - def retain_message(self, source_session, topic_name, data, qos=None): + def retain_message( + self, + source_session: Session, + topic_name: str, + data: bytearray, + qos: Optional[int] = None, + ) -> None: if data is not None and data != b"": # If retained flag set, store the message for further subscriptions self.logger.debug("Retaining message on topic %s" % topic_name) @@ -794,7 +800,7 @@ class Broker: except KeyError: return 0x80 - def _del_subscription(self, a_filter, session): + def _del_subscription(self, a_filter: str, session: Session) -> int: """ Delete a session subscription on a given topic :param a_filter: @@ -819,7 +825,7 @@ class Broker: finally: return deleted - def _del_all_subscriptions(self, session): + def _del_all_subscriptions(self, session: Session) -> None: """ Delete all topic subscriptions for a given session :param session: @@ -980,7 +986,7 @@ class Broker: % (subscription[0], format_client_message(session=session)) ) - def delete_session(self, client_id): + def delete_session(self, client_id: str) -> None: """ Delete an existing session data, for example due to clean session set in CONNECT :param client_id: