diff --git a/hbmqtt/client/_client.py b/hbmqtt/client/_client.py index 4642093..a8b2592 100644 --- a/hbmqtt/client/_client.py +++ b/hbmqtt/client/_client.py @@ -84,43 +84,42 @@ class MQTTClient: self.machine.add_transition(trigger='disconnect', source='idle', dest='disconnected') self.machine.add_transition(trigger='disconnect', source='connected', dest='disconnected') + def connect(self, **kwargs): + try: + self._loop.run_until_complete(self.connect_async(**kwargs)) + except Exception as e: + self.logger.warn(e) - def connect(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None): + @asyncio.coroutine + def connect_async(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None): try: self.machine.connect() - self._session = self.init_session(host, port, username, password, uri, clean_session) + self._session = self._init_session(host, port, username, password, uri, clean_session) self.logger.debug("Connect with session parameters: %s" % self._session) - thread_event = threading.Event() - self._loop_thread = threading.Thread(target=self._start_client_loop, args=(thread_event, self._loop)) - self._loop_thread.setDaemon(True) - self._loop_thread.start() - thread_event.wait() - if self._session.state == SessionState.CONNECTED: - self.machine.connect_success() - else: - self.machine.connect_fail() - self.logger.warn("Connection failed: %s " % self._session._last_exception) - raise ClientException("Connection failed: %s " % self._session._last_exception) + yield from self._connect_coro() + self.machine.connect_success() except MachineError: msg = "Connect call incompatible with client current state '%s'" % self.machine.current_state self.logger.warn(msg) self.machine.connect_fail() raise ClientException(msg) + except Exception as e: + self.machine.connect_fail() + self.logger.warn("Connection failed: %s " % e) + raise ClientException("Connection failed: %s " % e) def disconnect(self): try: self.machine.disconnect() if self._loop.is_running(): - self._loop.call_soon_threadsafe(asyncio.async, self._disconnect_coro()) + self._loop.call_soon(asyncio.async, self._disconnect_coro()) else: self._loop.run_until_complete(self._disconnect_coro()) except MachineError as me: self.logger.debug("Invalid method call at this moment: %s" % me) raise ClientException("Client instance can't be disconnected: %s" % me) - self._loop_thread.join() - self._loop.call_soon_threadsafe(self._loop.stop) - self.logger.warn(self._session._last_exception) + self._loop.stop() self._session = None @asyncio.coroutine @@ -150,48 +149,19 @@ class MQTTClient: # Send CONNECT packet and wait for CONNACK packet = ConnectPacket.build_request_from_session(self._session) yield from packet.to_stream(self._session.writer) - print(packet) + self.logger.debug(packet) connack = yield from ConnackPacket.from_stream(self._session.reader) + self.logger.debug(connack) if connack.variable_header.return_code is not ReturnCode.CONNECTION_ACCEPTED: raise ClientException("Connection rejected with code '%s'" % hex(connack.variable_header.return_code)) - print(connack) self._session.state = SessionState.CONNECTED self.logger.debug("connected to %s:%s" % (self._session.remote_address, self._session.remote_port)) except Exception as e: self.logger.error("Connection failed: %s" % e) self._session.state = SessionState.DISCONNECTED - raise ClientException("Connection failed: %s" % e) + raise - @asyncio.coroutine - def _message_loop(self): - while True: - try: - header = yield from MQTTFixedHeader.from_stream(self._session.reader) - print(header) - except NoDataException as e: - self.logger.info("Message loop ending") - # absorb exception - self._session._last_exception = HBMQTTException("Message loop ending due to disconnection") - break - except BaseException as e: - # absorb exception - self._session._last_exception = e - break - - def _start_client_loop(self, connect_event:threading.Event, loop: asyncio.BaseEventLoop): - asyncio.set_event_loop(loop) - try: - self.logger.debug("Connecting to broker") - loop.run_until_complete(self._connect_coro()) - connect_event.set() - self.logger.debug("Starting message loop") - loop.run_until_complete(self._message_loop()) - except BaseException as e: - # absorb exception - self._session._last_exception = e - connect_event.set() - - def init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict: + def _init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict: # Load config broker_conf = self.config.get('broker', dict()).copy() if 'mqtt' not in broker_conf: diff --git a/hbmqtt/session.py b/hbmqtt/session.py index 97e8c4f..3ce1731 100644 --- a/hbmqtt/session.py +++ b/hbmqtt/session.py @@ -28,4 +28,3 @@ class Session: self.username = None self.password = None self.scheme = None - self._last_exception = None