kopia lustrzana https://github.com/Yakifo/amqtt
Refactoring
rodzic
b1caea4d6a
commit
a8c88b98af
|
@ -221,7 +221,7 @@ class Broker:
|
|||
self.transitions.start()
|
||||
self.logger.debug("Broker starting")
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
self.logger.warn("[WARN-0001] Invalid method call at this moment: %s" % me)
|
||||
raise BrokerException("Broker instance can't be started: %s" % me)
|
||||
|
||||
yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)
|
||||
|
@ -231,7 +231,6 @@ class Broker:
|
|||
# Start network listeners
|
||||
for listener_name in self.listeners_config:
|
||||
listener = self.listeners_config[listener_name]
|
||||
self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind']))
|
||||
|
||||
# Max connections
|
||||
try:
|
||||
|
@ -267,6 +266,8 @@ class Broker:
|
|||
instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop)
|
||||
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
||||
|
||||
self.logger.info("Listener '%s' bind to %s" % (listener_name, listener['bind']))
|
||||
|
||||
# Start $SYS topics management
|
||||
try:
|
||||
sys_interval = int(self.config.get('sys_interval', 0))
|
||||
|
@ -428,27 +429,27 @@ class Broker:
|
|||
yield from writer.close()
|
||||
self.logger.debug("Connection closed")
|
||||
return
|
||||
if connect.variable_header.proto_name != "MQTT":
|
||||
if connect.proto_name != "MQTT":
|
||||
self.logger.warn('[MQTT-3.1.2-1] Incorrect protocol name: "%s"' % connect.variable_header.protocol_name)
|
||||
yield from writer.close()
|
||||
self.logger.debug("Connection closed")
|
||||
return
|
||||
|
||||
connack = None
|
||||
if connect.variable_header.proto_level != 4:
|
||||
if connect.proto_level != 4:
|
||||
# only MQTT 3.1.1 supported
|
||||
self.logger.error('Invalid protocol from %s: %d' %
|
||||
(format_client_message(address=remote_address, port=remote_port),
|
||||
connect.variable_header.protocol_level))
|
||||
connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0
|
||||
elif connect.variable_header.username_flag and connect.payload.username is None:
|
||||
elif connect.username_flag and connect.username is None:
|
||||
self.logger.error('Invalid username from %s' %
|
||||
(format_client_message(address=remote_address, port=remote_port)))
|
||||
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
|
||||
elif connect.variable_header.password_flag and connect.payload.password is None:
|
||||
elif connect.password_flag and connect.password is None:
|
||||
self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)))
|
||||
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
|
||||
elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None:
|
||||
elif connect.clean_session_flag is False and connect.payload.client_id is None:
|
||||
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' %
|
||||
format_client_message(address=remote_address, port=remote_port))
|
||||
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
||||
|
@ -459,17 +460,18 @@ class Broker:
|
|||
return
|
||||
|
||||
client_session = None
|
||||
self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag))
|
||||
self.logger.debug("Clean session={0}".format(connect.clean_session_flag))
|
||||
self.logger.debug("known sessions={0}".format(self._sessions))
|
||||
client_id = connect.payload.client_id
|
||||
if connect.variable_header.clean_session_flag:
|
||||
client_id = connect.client_id
|
||||
if connect.clean_session_flag:
|
||||
# Delete existing session and create a new one
|
||||
if client_id is not None:
|
||||
self.delete_session(client_id)
|
||||
else:
|
||||
client_id = gen_client_id()
|
||||
client_session = Session()
|
||||
client_session.parent = 0
|
||||
client_session.client_id = client_id
|
||||
self._sessions[client_id] = client_session
|
||||
else:
|
||||
# Get session from cache
|
||||
if client_id in self._sessions:
|
||||
|
@ -479,32 +481,26 @@ class Broker:
|
|||
else:
|
||||
client_session = Session()
|
||||
client_session.client_id = client_id
|
||||
self._sessions[client_id] = client_session
|
||||
client_session.parent = 0
|
||||
|
||||
if client_session.client_id is None:
|
||||
# Generate client ID
|
||||
client_session.client_id = gen_client_id()
|
||||
self._sessions[client_id] = client_session
|
||||
|
||||
client_session.remote_address = remote_address
|
||||
client_session.remote_port = remote_port
|
||||
client_session.clean_session = connect.variable_header.clean_session_flag
|
||||
client_session.will_flag = connect.variable_header.will_flag
|
||||
client_session.will_retain = connect.variable_header.will_retain_flag
|
||||
client_session.will_qos = connect.variable_header.will_qos
|
||||
client_session.will_topic = connect.payload.will_topic
|
||||
client_session.will_message = connect.payload.will_message
|
||||
client_session.username = connect.payload.username
|
||||
client_session.password = connect.payload.password
|
||||
client_session.client_id = connect.payload.client_id
|
||||
if connect.variable_header.keep_alive > 0:
|
||||
client_session.keep_alive = connect.variable_header.keep_alive + self.config['timeout-disconnect-delay']
|
||||
client_session.clean_session = connect.clean_session_flag
|
||||
client_session.will_flag = connect.will_flag
|
||||
client_session.will_retain = connect.will_retain_flag
|
||||
client_session.will_qos = connect.will_qos
|
||||
client_session.will_topic = connect.will_topic
|
||||
client_session.will_message = connect.will_message
|
||||
client_session.username = connect.username
|
||||
client_session.password = connect.password
|
||||
if connect.keep_alive > 0:
|
||||
client_session.keep_alive = connect.keep_alive + self.config['timeout-disconnect-delay']
|
||||
else:
|
||||
client_session.keep_alive = 0
|
||||
client_session.publish_retry_delay = self.config['publish-retry-delay']
|
||||
|
||||
client_session.reader = reader
|
||||
client_session.writer = writer
|
||||
|
||||
authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name])
|
||||
if authenticated:
|
||||
connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED)
|
||||
|
@ -520,6 +516,8 @@ class Broker:
|
|||
return
|
||||
|
||||
client_session.transitions.connect()
|
||||
client_session.reader = reader
|
||||
client_session.writer = writer
|
||||
handler = self._init_handler(reader, writer, client_session)
|
||||
self.logger.debug("%s Start messages handling" % client_session.client_id)
|
||||
yield from handler.start()
|
||||
|
|
|
@ -40,7 +40,8 @@ def test_coro():
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
|
||||
#formatter = "%(asctime)s :: %(levelname)s :: %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
asyncio.get_event_loop().run_until_complete(test_coro())
|
||||
asyncio.get_event_loop().run_forever()
|
Ładowanie…
Reference in New Issue