kopia lustrzana https://github.com/Yakifo/amqtt
Implement client take-over
rodzic
63ecb375db
commit
b4a27c97b6
|
@ -453,9 +453,6 @@ class Broker:
|
|||
client_session.keep_alive += self.config["timeout-disconnect-delay"]
|
||||
self.logger.debug("Keep-alive timeout=%d" % client_session.keep_alive)
|
||||
|
||||
handler.attach(client_session, reader, writer)
|
||||
self._sessions[client_session.client_id] = (client_session, handler)
|
||||
|
||||
authenticated = await self.authenticate(
|
||||
client_session, self.listeners_config[listener_name]
|
||||
)
|
||||
|
@ -470,12 +467,25 @@ class Broker:
|
|||
break
|
||||
except (MachineError, ValueError):
|
||||
# Backwards compat: MachineError is raised by transitions < 0.5.0.
|
||||
self.logger.warning(
|
||||
"Client %s is reconnecting too quickly, make it wait"
|
||||
% client_session.client_id
|
||||
)
|
||||
# Wait a bit may be client is reconnecting too fast
|
||||
await asyncio.sleep(1)
|
||||
if client_session.transitions.is_connected():
|
||||
self.logger.warning(
|
||||
"Client %s is already connected, performing take-over.",
|
||||
client_session.client_id,
|
||||
)
|
||||
old_session = self._sessions[client_session.client_id]
|
||||
await old_session[1].stop()
|
||||
break
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Client %s is reconnecting too quickly, make it wait"
|
||||
% client_session.client_id
|
||||
)
|
||||
# Wait a bit may be client is reconnecting too fast
|
||||
await asyncio.sleep(1)
|
||||
|
||||
handler.attach(client_session, reader, writer)
|
||||
self._sessions[client_session.client_id] = (client_session, handler)
|
||||
|
||||
await handler.mqtt_connack_authorize(authenticated)
|
||||
|
||||
await self.plugins_manager.fire_event(
|
||||
|
|
Ładowanie…
Reference in New Issue