kopia lustrzana https://github.com/Yakifo/amqtt
Fix client/session state management
rodzic
5aa358d17c
commit
821f53459e
|
@ -26,8 +26,6 @@ class ClientException(BaseException):
|
|||
|
||||
|
||||
class MQTTClient:
|
||||
states = ['new', 'connecting', 'connected', 'disconnected']
|
||||
|
||||
def __init__(self, client_id=None, config=None, loop=None):
|
||||
"""
|
||||
|
||||
|
@ -68,55 +66,73 @@ class MQTTClient:
|
|||
self.client_id = gen_client_id()
|
||||
self.logger.debug("Using generated client ID : %s" % self.client_id)
|
||||
|
||||
self._init_states()
|
||||
if loop is not None:
|
||||
self._loop = loop
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self.session = None
|
||||
self._handler = None
|
||||
|
||||
def _init_states(self):
|
||||
self.machine = Machine(states=MQTTClient.states, initial='new')
|
||||
self.machine.add_transition(trigger='connect', source='new', dest='connecting')
|
||||
self.machine.add_transition(trigger='connect', source='disconnected', dest='connecting')
|
||||
self.machine.add_transition(trigger='connect_fail', source='connecting', dest='disconnected')
|
||||
self.machine.add_transition(trigger='connect_success', source='connecting', dest='connected')
|
||||
self.machine.add_transition(trigger='disconnect', source='idle', dest='disconnected')
|
||||
self.machine.add_transition(trigger='disconnect', source='connected', dest='disconnected')
|
||||
self._disconnect_task = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def connect(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None):
|
||||
try:
|
||||
self.machine.connect()
|
||||
self.session = self._initsession(host, port, username, password, uri, cleansession)
|
||||
self.logger.debug("Connect with session parameters: %s" % self.session)
|
||||
|
||||
return_code = yield from self._connect_coro()
|
||||
self.machine.connect_success()
|
||||
self._disconnect_task = asyncio.Task(self.handle_connection_close())
|
||||
return return_code
|
||||
except MachineError:
|
||||
msg = "Connect call incompatible with client current state '%s'" % self.machine.current_state
|
||||
self.logger.warn(msg)
|
||||
self.machine.connect_fail()
|
||||
self.session.machine.connect_fail()
|
||||
raise ClientException(msg)
|
||||
except Exception as e:
|
||||
self.machine.connect_fail()
|
||||
self.session.machine.connect_fail()
|
||||
self.logger.warn("Connection failed: %s " % e)
|
||||
raise ClientException("Connection failed: %s " % e)
|
||||
|
||||
@asyncio.coroutine
|
||||
def disconnect(self):
|
||||
try:
|
||||
self.session.machine.disconnect()
|
||||
if not self._disconnect_task.done():
|
||||
self._disconnect_task.cancel()
|
||||
yield from self._handler.mqtt_disconnect()
|
||||
yield from self._handler.stop()
|
||||
self._handler.detach_from_session()
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise ClientException("Client instance can't be disconnected: %s" % me)
|
||||
if self.session.machine.state == "disconnected":
|
||||
self.logger.warn("Client session is already disconnected")
|
||||
else:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise ClientException("Client instance can't be disconnected: %s" % me)
|
||||
except Exception as e:
|
||||
self.logger.warn("Unhandled exception: %s" % e)
|
||||
raise ClientException("Unhandled exception: %s" % e)
|
||||
self.session.machine.disconnect()
|
||||
|
||||
@asyncio.coroutine
|
||||
def reconnect(self, cleansession=None):
|
||||
try:
|
||||
self.session.machine.connect()
|
||||
self.session.clclean_session = cleansession
|
||||
self.logger.debug("Reconnecting with session parameters: %s" % self.session)
|
||||
|
||||
return_code = yield from self._connect_coro()
|
||||
asyncio.Task(self.handle_connection_close())
|
||||
|
||||
self.session.machine.connect_success()
|
||||
return return_code
|
||||
except MachineError:
|
||||
msg = "Connect call incompatible with client current state '%s'" % self.machine.current_state
|
||||
self.logger.warn(msg)
|
||||
self.session.machine.connect_fail()
|
||||
raise ClientException(msg)
|
||||
except Exception as e:
|
||||
self.session.machine.connect_fail()
|
||||
self.logger.warn("Connection failed: %s " % e)
|
||||
raise ClientException("Connection failed: %s " % e)
|
||||
|
||||
@asyncio.coroutine
|
||||
def ping(self):
|
||||
|
@ -191,16 +207,25 @@ class MQTTClient:
|
|||
|
||||
if return_code is not ReturnCode.CONNECTION_ACCEPTED:
|
||||
yield from self._handler.stop()
|
||||
raise ClientException("Connection rejected with code '%s'" % return_code)
|
||||
|
||||
self.session.machine.connect()
|
||||
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
||||
self.session.machine.disconnect()
|
||||
self.logger.warn("Connection rejected with code '%s'" % return_code)
|
||||
else:
|
||||
self.session.machine.connect()
|
||||
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
||||
return return_code
|
||||
except Exception as e:
|
||||
self.session.state = SessionState.DISCONNECTED
|
||||
raise e
|
||||
|
||||
def _initsession(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None) -> dict:
|
||||
@asyncio.coroutine
|
||||
def handle_connection_close(self):
|
||||
self.logger.debug("Watch broker disconnection")
|
||||
yield from self._handler.wait_disconnect()
|
||||
self.logger.debug("Handle broker disconnection")
|
||||
yield from self._handler.stop()
|
||||
self._handler.detach_from_session()
|
||||
self.session.machine.disconnect()
|
||||
|
||||
def _initsession(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None) -> Session:
|
||||
# Load config
|
||||
broker_conf = self.config.get('broker', dict()).copy()
|
||||
if 'mqtt' not in broker_conf:
|
||||
|
@ -261,3 +286,9 @@ class MQTTClient:
|
|||
s.will_topic = None
|
||||
s.will_message = None
|
||||
return s
|
||||
|
||||
def session_state(self):
|
||||
if self.session:
|
||||
return self.session.machine.state
|
||||
else:
|
||||
return None
|
||||
|
|
Ładowanie…
Reference in New Issue