kopia lustrzana https://github.com/Yakifo/amqtt
Add session deletion method used on clean_session to delete session and subscriptions
HBMQTT-22pull/8/head
rodzic
df317319d4
commit
2a3032927d
|
|
@ -29,6 +29,9 @@ class Subscription:
|
|||
self.session = session
|
||||
self.qos = qos
|
||||
|
||||
def __repr__(self):
|
||||
return type(self).__name__ + '(client_id={0}, qos={1!r})'.format(self.session.client_id, self.qos)
|
||||
|
||||
|
||||
class RetainedApplicationMessage:
|
||||
def __init__(self, source_session, topic, data, qos=None):
|
||||
|
|
@ -158,9 +161,8 @@ class Broker:
|
|||
self.logger.debug("known sessions={0}".format(self._sessions))
|
||||
if connect.variable_header.clean_session_flag:
|
||||
client_id = connect.payload.client_id
|
||||
if client_id is not None and client_id in self._sessions:
|
||||
# Delete existing session
|
||||
del self._sessions[client_id]
|
||||
if client_id is not None:
|
||||
self.delete_session(client_id)
|
||||
client_session = Session()
|
||||
client_session.parent = 0
|
||||
self._sessions[client_id] = client_session
|
||||
|
|
@ -215,6 +217,7 @@ class Broker:
|
|||
handler.attach_to_session(client_session)
|
||||
self.logger.debug("%s Start messages handling" % client_session.client_id)
|
||||
yield from handler.start()
|
||||
self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
|
||||
yield from self.publish_session_retained_messages(client_session)
|
||||
self.logger.debug("%s Wait for disconnect" % client_session.client_id)
|
||||
|
||||
|
|
@ -400,6 +403,7 @@ class Broker:
|
|||
publish_tasks.append(
|
||||
asyncio.Task(target_session.retained_messages.put(retained_message))
|
||||
)
|
||||
|
||||
if len(publish_tasks) > 0:
|
||||
asyncio.wait(publish_tasks)
|
||||
except Exception as e:
|
||||
|
|
@ -407,6 +411,9 @@ class Broker:
|
|||
self.logger.debug("End Broadcasting message from %s on topic %s" %
|
||||
(format_client_message(session=source_session), topic)
|
||||
)
|
||||
for client_id in self._sessions:
|
||||
self.logger.debug("%s Retained messages queue size: %d" %
|
||||
(client_id, self._sessions[client_id].retained_messages.qsize()))
|
||||
|
||||
@asyncio.coroutine
|
||||
def publish_session_retained_messages(self, session):
|
||||
|
|
@ -439,3 +446,28 @@ class Broker:
|
|||
asyncio.wait(publish_tasks)
|
||||
self.logger.debug("End broadcasting messages retained due to subscription on '%s' from %s" %
|
||||
(subscription['filter'], format_client_message(session=session)))
|
||||
|
||||
def delete_session(self, client_id):
|
||||
"""
|
||||
Delete an existing session data, for example due to clean session set in CONNECT
|
||||
:param client_id:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
session = self._sessions[client_id]
|
||||
except KeyError:
|
||||
session = None
|
||||
if session is None:
|
||||
self.logger.warn("Delete session : session %s doesn't exist" % client_id)
|
||||
return
|
||||
|
||||
# Delete subscriptions
|
||||
self.logger.debug("deleting session %s subscriptions" % repr(session))
|
||||
nb_sub = 0
|
||||
for filter in self._subscriptions:
|
||||
self.del_subscription(filter, session)
|
||||
nb_sub += 1
|
||||
self.logger.debug("%d subscriptions deleted" % nb_sub)
|
||||
|
||||
self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id]))
|
||||
del self._sessions[client_id]
|
||||
|
|
|
|||
Ładowanie…
Reference in New Issue