From fd6ed3c42c6728139e2680dd1f4ac22def252976 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Wed, 12 Aug 2015 18:46:56 +0200 Subject: [PATCH] Fix error handling if broker can't be connected HBMQTT-24 --- hbmqtt/client.py | 43 ++++++++++++++++++++------------------- samples/client_publish.py | 18 ++++++++-------- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 062995c..e9e2158 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -7,8 +7,6 @@ import asyncio import ssl from urllib.parse import urlparse -from transitions import MachineError - from hbmqtt.utils import not_in_dict_or_none from hbmqtt.session import Session from hbmqtt.mqtt.connack import * @@ -30,8 +28,7 @@ class ClientException(BaseException): class ConnectException(ClientException): - def __init__(self, code): - self.return_code = code + pass class MQTTClient: @@ -236,33 +233,37 @@ class MQTTClient: except Exception as e: self.logger.warn("connection failed: %s" % e) self.session.transitions.disconnect() - raise ClientException("connection Failed: %s" % e) + raise ConnectException("connection Failed: %s" % e) - connect_packet = self.build_connect_packet() - yield from connect_packet.to_stream(writer) - self.logger.debug(" -out-> " + repr(connect_packet)) + return_code = None try : + connect_packet = self.build_connect_packet() + yield from connect_packet.to_stream(writer) + self.logger.debug(" -out-> " + repr(connect_packet)) + connack = yield from ConnackPacket.from_stream(reader) self.logger.debug(" <-in-- " + repr(connack)) return_code = connack.variable_header.return_code - - if return_code is not CONNECTION_ACCEPTED: - yield from self._handler.stop() - self.session.transitions.disconnect() - self.logger.warn("Connection rejected with code '%s'" % return_code) - raise ConnectException(return_code) - else: - # Handle MQTT protocol - self._handler = ClientProtocolHandler(reader, writer, loop=self._loop) - self._handler.attach_to_session(self.session) - yield from self._handler.start() - self.session.transitions.connect() - self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port)) except Exception as e: self.logger.warn("connection failed: %s" % e) self.session.transitions.disconnect() raise ClientException("connection Failed: %s" % e) + if return_code is not CONNECTION_ACCEPTED: + yield from self._handler.stop() + self.session.transitions.disconnect() + self.logger.warn("Connection rejected with code '%s'" % return_code) + exc = ConnectException("Connection rejected by broker") + exc.return_code = return_code + raise exc + else: + # Handle MQTT protocol + self._handler = ClientProtocolHandler(reader, writer, loop=self._loop) + self._handler.attach_to_session(self.session) + yield from self._handler.start() + self.session.transitions.connect() + self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port)) + def build_connect_packet(self): vh = ConnectVariableHeader() payload = ConnectPayload() diff --git a/samples/client_publish.py b/samples/client_publish.py index 7818700..f706657 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -1,7 +1,7 @@ import logging import asyncio -from hbmqtt.client import MQTTClient +from hbmqtt.client import MQTTClient, ConnectException @@ -44,13 +44,15 @@ def test_coro(): @asyncio.coroutine def test_coro2(): - future = yield from C.connect('mqtt://localhost:1883/') - future.add_done_callback(disconnected) - yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))]) - yield from asyncio.sleep(10) - yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))]) - logger.info("messages published") - yield from C.disconnect() + try: + future = yield from C.connect('mqtt://localhost:1883/') + future.add_done_callback(disconnected) + yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))]) + logger.info("messages published") + yield from C.disconnect() + except ConnectException as ce: + logger.error("Connection failed: %s" % ce) + asyncio.get_event_loop().stop() if __name__ == '__main__':