From a8c88b98affb345f41932656db62b435cd0a7e37 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Thu, 3 Sep 2015 22:20:31 +0200 Subject: [PATCH] Refactoring --- hbmqtt/broker.py | 56 ++++++++++++++++++++--------------------- samples/broker_start.py | 3 ++- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 9106026..93bc507 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -221,7 +221,7 @@ class Broker: self.transitions.start() self.logger.debug("Broker starting") except MachineError as me: - self.logger.debug("Invalid method call at this moment: %s" % me) + self.logger.warn("[WARN-0001] Invalid method call at this moment: %s" % me) raise BrokerException("Broker instance can't be started: %s" % me) yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START) @@ -231,7 +231,6 @@ class Broker: # Start network listeners for listener_name in self.listeners_config: listener = self.listeners_config[listener_name] - self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind'])) # Max connections try: @@ -267,6 +266,8 @@ class Broker: instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + self.logger.info("Listener '%s' bind to %s" % (listener_name, listener['bind'])) + # Start $SYS topics management try: sys_interval = int(self.config.get('sys_interval', 0)) @@ -428,27 +429,27 @@ class Broker: yield from writer.close() self.logger.debug("Connection closed") return - if connect.variable_header.proto_name != "MQTT": + if connect.proto_name != "MQTT": self.logger.warn('[MQTT-3.1.2-1] Incorrect protocol name: "%s"' % connect.variable_header.protocol_name) yield from writer.close() self.logger.debug("Connection closed") return connack = None - if connect.variable_header.proto_level != 4: + if connect.proto_level != 4: # only MQTT 3.1.1 supported self.logger.error('Invalid protocol from %s: %d' % (format_client_message(address=remote_address, port=remote_port), connect.variable_header.protocol_level)) connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0 - elif connect.variable_header.username_flag and connect.payload.username is None: + elif connect.username_flag and connect.username is None: self.logger.error('Invalid username from %s' % (format_client_message(address=remote_address, port=remote_port))) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.variable_header.password_flag and connect.payload.password is None: + elif connect.password_flag and connect.password is None: self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port))) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None: + elif connect.clean_session_flag is False and connect.payload.client_id is None: self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) @@ -459,17 +460,18 @@ class Broker: return client_session = None - self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag)) + self.logger.debug("Clean session={0}".format(connect.clean_session_flag)) self.logger.debug("known sessions={0}".format(self._sessions)) - client_id = connect.payload.client_id - if connect.variable_header.clean_session_flag: + client_id = connect.client_id + if connect.clean_session_flag: # Delete existing session and create a new one if client_id is not None: self.delete_session(client_id) + else: + client_id = gen_client_id() client_session = Session() client_session.parent = 0 client_session.client_id = client_id - self._sessions[client_id] = client_session else: # Get session from cache if client_id in self._sessions: @@ -479,32 +481,26 @@ class Broker: else: client_session = Session() client_session.client_id = client_id - self._sessions[client_id] = client_session client_session.parent = 0 - if client_session.client_id is None: - # Generate client ID - client_session.client_id = gen_client_id() + self._sessions[client_id] = client_session + client_session.remote_address = remote_address client_session.remote_port = remote_port - client_session.clean_session = connect.variable_header.clean_session_flag - client_session.will_flag = connect.variable_header.will_flag - client_session.will_retain = connect.variable_header.will_retain_flag - client_session.will_qos = connect.variable_header.will_qos - client_session.will_topic = connect.payload.will_topic - client_session.will_message = connect.payload.will_message - client_session.username = connect.payload.username - client_session.password = connect.payload.password - client_session.client_id = connect.payload.client_id - if connect.variable_header.keep_alive > 0: - client_session.keep_alive = connect.variable_header.keep_alive + self.config['timeout-disconnect-delay'] + client_session.clean_session = connect.clean_session_flag + client_session.will_flag = connect.will_flag + client_session.will_retain = connect.will_retain_flag + client_session.will_qos = connect.will_qos + client_session.will_topic = connect.will_topic + client_session.will_message = connect.will_message + client_session.username = connect.username + client_session.password = connect.password + if connect.keep_alive > 0: + client_session.keep_alive = connect.keep_alive + self.config['timeout-disconnect-delay'] else: client_session.keep_alive = 0 client_session.publish_retry_delay = self.config['publish-retry-delay'] - client_session.reader = reader - client_session.writer = writer - authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name]) if authenticated: connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED) @@ -520,6 +516,8 @@ class Broker: return client_session.transitions.connect() + client_session.reader = reader + client_session.writer = writer handler = self._init_handler(reader, writer, client_session) self.logger.debug("%s Start messages handling" % client_session.client_id) yield from handler.start() diff --git a/samples/broker_start.py b/samples/broker_start.py index ab09e12..8d56e3b 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -40,7 +40,8 @@ def test_coro(): if __name__ == '__main__': - formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + #formatter = "%(asctime)s :: %(levelname)s :: %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) asyncio.get_event_loop().run_until_complete(test_coro()) asyncio.get_event_loop().run_forever() \ No newline at end of file