From 74a66632cf5761a0033f76b15adf717a2a0b7aec Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Tue, 11 Aug 2015 22:20:03 +0200 Subject: [PATCH] connect() method returns a Future which has result set if the session is disconnected from the broker. --- hbmqtt/client.py | 85 ++++++++++++++++----------------------- samples/client_publish.py | 23 ++++++++++- 2 files changed, 56 insertions(+), 52 deletions(-) diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 05a2420..062995c 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -29,6 +29,11 @@ class ClientException(BaseException): pass +class ConnectException(ClientException): + def __init__(self, code): + self.return_code = code + + class MQTTClient: def __init__(self, client_id=None, config=None, loop=None): """ @@ -73,6 +78,8 @@ class MQTTClient: self.session = None self._handler = None self._disconnect_task = None + self._connection_closed_future = None + @asyncio.coroutine def connect(self, @@ -88,64 +95,40 @@ class MQTTClient: :param cafile: server certificate authority file :return: """ - try: - self.session = self._initsession(uri, cleansession, cafile, capath, cadata) - self.logger.debug("Connect to: %s" % uri) + self.session = self._initsession(uri, cleansession, cafile, capath, cadata) + self.logger.debug("Connect to: %s" % uri) - return_code = yield from self._connect_coro() - self._disconnect_task = asyncio.Task(self.handle_connection_close()) - return return_code - except MachineError: - msg = "Connect call incompatible with client current state '%s'" % self.session.transitions.state - self.logger.warn(msg) - self.session.transitions.connect_fail() - raise ClientException(msg) - except Exception as e: - self.session.transitions.disconnect() - self.logger.warn("Connection failed: %s " % e) - raise ClientException("Connection failed: %s " % e) + return_code = yield from self._connect_coro() + self._connection_closed_future = asyncio.Future(loop=self._loop) + self._disconnect_task = asyncio.Task(self.handle_connection_close()) + return self._connection_closed_future @asyncio.coroutine def disconnect(self): - try: - self.session.transitions.disconnect() + if self.session.transitions.is_connected(): if not self._disconnect_task.done(): self._disconnect_task.cancel() yield from self._handler.mqtt_disconnect() yield from self._handler.stop() self._handler.detach_from_session() - except MachineError as me: - if self.session.transitions.state == "disconnected": - self.logger.warn("Client session is already disconnected") - else: - self.logger.debug("Invalid method call at this moment: %s" % me) - raise ClientException("Client instance can't be disconnected: %s" % me) - except Exception as e: - self.logger.warn("Unhandled exception: %s" % e) - raise ClientException("Unhandled exception: %s" % e) + self.session.transitions.disconnect() + self._connection_closed_future.set_result(None) + else: + self.logger.warn("Client session is not currently connected, ignoring call") @asyncio.coroutine def reconnect(self, cleansession=False): - if self.session.transitions.state == 'connected': + if self.session.transitions.is_connected(): self.logger.warn("Client already connected") return CONNECTION_ACCEPTED - try: - self.session.clean_session = cleansession - self.logger.debug("Reconnecting with session parameters: %s" % self.session) + self.session.clean_session = cleansession + self.logger.debug("Reconnecting with session parameters: %s" % self.session) - return_code = yield from self._connect_coro() - self._disconnect_task = asyncio.Task(self.handle_connection_close()) - return return_code - except MachineError: - msg = "Connect call incompatible with client current state '%s'" % self.session.transitions.state - self.logger.warn(msg) - self.session.transitions.disconnect() - raise ClientException(msg) - except Exception as e: - self.session.transitions.disconnect() - self.logger.warn("Connection failed: %s " % e) - raise ClientException("Connection failed: %s " % e) + return_code = yield from self._connect_coro() + self._connection_closed_future = asyncio.Future(loop=self._loop) + self._disconnect_task = asyncio.Task(self.handle_connection_close()) + return self._connection_closed_future @asyncio.coroutine def ping(self): @@ -153,7 +136,7 @@ class MQTTClient: Send a MQTT ping request and wait for response :return: None """ - if self.session.transitions.state == 'connected': + if self.session.transitions.is_connected(): yield from self._handler.mqtt_ping() else: self.logger.warn("MQTT PING request incompatible with current session state '%s'" % @@ -179,6 +162,8 @@ class MQTTClient: except KeyError: pass return _qos, _retain + if not self.session.transitions.is_connected(): + self.logger.warn("publish MQTT message while not connected to broker, message may be lost") (app_qos, app_retain) = get_retain_and_qos() if app_qos == 0: yield from self._handler.mqtt_publish(topic, message, 0x00, app_retain) @@ -189,10 +174,14 @@ class MQTTClient: @asyncio.coroutine def subscribe(self, topics): + if not self.session.transitions.is_connected(): + self.logger.warn("subscribe while not connected to broker, message may be lost") return (yield from self._handler.mqtt_subscribe(topics, self.session.next_packet_id)) @asyncio.coroutine def unsubscribe(self, topics): + if not self.session.transitions.is_connected(): + self.logger.warn("unsubscribe while not connected to broker, message may be lost") yield from self._handler.mqtt_unsubscribe(topics, self.session.next_packet_id) @asyncio.coroutine @@ -261,6 +250,7 @@ class MQTTClient: yield from self._handler.stop() self.session.transitions.disconnect() self.logger.warn("Connection rejected with code '%s'" % return_code) + raise ConnectException(return_code) else: # Handle MQTT protocol self._handler = ClientProtocolHandler(reader, writer, loop=self._loop) @@ -268,7 +258,6 @@ class MQTTClient: yield from self._handler.start() self.session.transitions.connect() self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port)) - return return_code except Exception as e: self.logger.warn("connection failed: %s" % e) self.session.transitions.disconnect() @@ -313,13 +302,7 @@ class MQTTClient: self.logger.debug("Handle broker disconnection") yield from self._handler.stop() self.session.transitions.disconnect() -# while self.session.transitions.state != 'connected': -# yield from asyncio.sleep(2) -# self.logger.debug("Trying reconnect") -# try: -# yield from self.reconnect() -# except ClientException: -# self.logger.warn("Reconnect failed") + self._connection_closed_future.set_result(None) def _initsession( self, diff --git a/samples/client_publish.py b/samples/client_publish.py index 53bcab5..7818700 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -23,6 +23,12 @@ config = { #C = MQTTClient(config=config) C = MQTTClient() + +def disconnected(future): + print("DISCONNECTED") + asyncio.get_event_loop().stop() + + @asyncio.coroutine def test_coro(): yield from C.connect('mqtt://localhost:1883/') @@ -36,7 +42,22 @@ def test_coro(): yield from C.disconnect() +@asyncio.coroutine +def test_coro2(): + future = yield from C.connect('mqtt://localhost:1883/') + future.add_done_callback(disconnected) + yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))]) + yield from asyncio.sleep(10) + yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))]) + logger.info("messages published") + yield from C.disconnect() + + if __name__ == '__main__': formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) - asyncio.get_event_loop().run_until_complete(test_coro()) \ No newline at end of file + asyncio.async(test_coro2()) + try: + asyncio.get_event_loop().run_forever() + finally: + asyncio.get_event_loop().close() \ No newline at end of file