From b4a27c97b6c55b07b52ee54da0bbf373bf9206ae Mon Sep 17 00:00:00 2001 From: Marc Billow Date: Wed, 23 Dec 2020 13:11:53 -0600 Subject: [PATCH] Implement client take-over --- amqtt/broker.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index c91a9af..ad647bb 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -453,9 +453,6 @@ class Broker: client_session.keep_alive += self.config["timeout-disconnect-delay"] self.logger.debug("Keep-alive timeout=%d" % client_session.keep_alive) - handler.attach(client_session, reader, writer) - self._sessions[client_session.client_id] = (client_session, handler) - authenticated = await self.authenticate( client_session, self.listeners_config[listener_name] ) @@ -470,12 +467,25 @@ class Broker: break except (MachineError, ValueError): # Backwards compat: MachineError is raised by transitions < 0.5.0. - self.logger.warning( - "Client %s is reconnecting too quickly, make it wait" - % client_session.client_id - ) - # Wait a bit may be client is reconnecting too fast - await asyncio.sleep(1) + if client_session.transitions.is_connected(): + self.logger.warning( + "Client %s is already connected, performing take-over.", + client_session.client_id, + ) + old_session = self._sessions[client_session.client_id] + await old_session[1].stop() + break + else: + self.logger.warning( + "Client %s is reconnecting too quickly, make it wait" + % client_session.client_id + ) + # Wait a bit may be client is reconnecting too fast + await asyncio.sleep(1) + + handler.attach(client_session, reader, writer) + self._sessions[client_session.client_id] = (client_session, handler) + await handler.mqtt_connack_authorize(authenticated) await self.plugins_manager.fire_event(