kopia lustrzana https://github.com/Yakifo/amqtt
connect() method returns a Future which has result set if the session is disconnected from the broker.
rodzic
1e563ee257
commit
74a66632cf
|
@ -29,6 +29,11 @@ class ClientException(BaseException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectException(ClientException):
|
||||||
|
def __init__(self, code):
|
||||||
|
self.return_code = code
|
||||||
|
|
||||||
|
|
||||||
class MQTTClient:
|
class MQTTClient:
|
||||||
def __init__(self, client_id=None, config=None, loop=None):
|
def __init__(self, client_id=None, config=None, loop=None):
|
||||||
"""
|
"""
|
||||||
|
@ -73,6 +78,8 @@ class MQTTClient:
|
||||||
self.session = None
|
self.session = None
|
||||||
self._handler = None
|
self._handler = None
|
||||||
self._disconnect_task = None
|
self._disconnect_task = None
|
||||||
|
self._connection_closed_future = None
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def connect(self,
|
def connect(self,
|
||||||
|
@ -88,64 +95,40 @@ class MQTTClient:
|
||||||
:param cafile: server certificate authority file
|
:param cafile: server certificate authority file
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
try:
|
self.session = self._initsession(uri, cleansession, cafile, capath, cadata)
|
||||||
self.session = self._initsession(uri, cleansession, cafile, capath, cadata)
|
self.logger.debug("Connect to: %s" % uri)
|
||||||
self.logger.debug("Connect to: %s" % uri)
|
|
||||||
|
|
||||||
return_code = yield from self._connect_coro()
|
return_code = yield from self._connect_coro()
|
||||||
self._disconnect_task = asyncio.Task(self.handle_connection_close())
|
self._connection_closed_future = asyncio.Future(loop=self._loop)
|
||||||
return return_code
|
self._disconnect_task = asyncio.Task(self.handle_connection_close())
|
||||||
except MachineError:
|
return self._connection_closed_future
|
||||||
msg = "Connect call incompatible with client current state '%s'" % self.session.transitions.state
|
|
||||||
self.logger.warn(msg)
|
|
||||||
self.session.transitions.connect_fail()
|
|
||||||
raise ClientException(msg)
|
|
||||||
except Exception as e:
|
|
||||||
self.session.transitions.disconnect()
|
|
||||||
self.logger.warn("Connection failed: %s " % e)
|
|
||||||
raise ClientException("Connection failed: %s " % e)
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
try:
|
if self.session.transitions.is_connected():
|
||||||
self.session.transitions.disconnect()
|
|
||||||
if not self._disconnect_task.done():
|
if not self._disconnect_task.done():
|
||||||
self._disconnect_task.cancel()
|
self._disconnect_task.cancel()
|
||||||
yield from self._handler.mqtt_disconnect()
|
yield from self._handler.mqtt_disconnect()
|
||||||
yield from self._handler.stop()
|
yield from self._handler.stop()
|
||||||
self._handler.detach_from_session()
|
self._handler.detach_from_session()
|
||||||
except MachineError as me:
|
self.session.transitions.disconnect()
|
||||||
if self.session.transitions.state == "disconnected":
|
self._connection_closed_future.set_result(None)
|
||||||
self.logger.warn("Client session is already disconnected")
|
else:
|
||||||
else:
|
self.logger.warn("Client session is not currently connected, ignoring call")
|
||||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
|
||||||
raise ClientException("Client instance can't be disconnected: %s" % me)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warn("Unhandled exception: %s" % e)
|
|
||||||
raise ClientException("Unhandled exception: %s" % e)
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def reconnect(self, cleansession=False):
|
def reconnect(self, cleansession=False):
|
||||||
if self.session.transitions.state == 'connected':
|
if self.session.transitions.is_connected():
|
||||||
self.logger.warn("Client already connected")
|
self.logger.warn("Client already connected")
|
||||||
return CONNECTION_ACCEPTED
|
return CONNECTION_ACCEPTED
|
||||||
|
|
||||||
try:
|
self.session.clean_session = cleansession
|
||||||
self.session.clean_session = cleansession
|
self.logger.debug("Reconnecting with session parameters: %s" % self.session)
|
||||||
self.logger.debug("Reconnecting with session parameters: %s" % self.session)
|
|
||||||
|
|
||||||
return_code = yield from self._connect_coro()
|
return_code = yield from self._connect_coro()
|
||||||
self._disconnect_task = asyncio.Task(self.handle_connection_close())
|
self._connection_closed_future = asyncio.Future(loop=self._loop)
|
||||||
return return_code
|
self._disconnect_task = asyncio.Task(self.handle_connection_close())
|
||||||
except MachineError:
|
return self._connection_closed_future
|
||||||
msg = "Connect call incompatible with client current state '%s'" % self.session.transitions.state
|
|
||||||
self.logger.warn(msg)
|
|
||||||
self.session.transitions.disconnect()
|
|
||||||
raise ClientException(msg)
|
|
||||||
except Exception as e:
|
|
||||||
self.session.transitions.disconnect()
|
|
||||||
self.logger.warn("Connection failed: %s " % e)
|
|
||||||
raise ClientException("Connection failed: %s " % e)
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def ping(self):
|
def ping(self):
|
||||||
|
@ -153,7 +136,7 @@ class MQTTClient:
|
||||||
Send a MQTT ping request and wait for response
|
Send a MQTT ping request and wait for response
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
if self.session.transitions.state == 'connected':
|
if self.session.transitions.is_connected():
|
||||||
yield from self._handler.mqtt_ping()
|
yield from self._handler.mqtt_ping()
|
||||||
else:
|
else:
|
||||||
self.logger.warn("MQTT PING request incompatible with current session state '%s'" %
|
self.logger.warn("MQTT PING request incompatible with current session state '%s'" %
|
||||||
|
@ -179,6 +162,8 @@ class MQTTClient:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
return _qos, _retain
|
return _qos, _retain
|
||||||
|
if not self.session.transitions.is_connected():
|
||||||
|
self.logger.warn("publish MQTT message while not connected to broker, message may be lost")
|
||||||
(app_qos, app_retain) = get_retain_and_qos()
|
(app_qos, app_retain) = get_retain_and_qos()
|
||||||
if app_qos == 0:
|
if app_qos == 0:
|
||||||
yield from self._handler.mqtt_publish(topic, message, 0x00, app_retain)
|
yield from self._handler.mqtt_publish(topic, message, 0x00, app_retain)
|
||||||
|
@ -189,10 +174,14 @@ class MQTTClient:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def subscribe(self, topics):
|
def subscribe(self, topics):
|
||||||
|
if not self.session.transitions.is_connected():
|
||||||
|
self.logger.warn("subscribe while not connected to broker, message may be lost")
|
||||||
return (yield from self._handler.mqtt_subscribe(topics, self.session.next_packet_id))
|
return (yield from self._handler.mqtt_subscribe(topics, self.session.next_packet_id))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def unsubscribe(self, topics):
|
def unsubscribe(self, topics):
|
||||||
|
if not self.session.transitions.is_connected():
|
||||||
|
self.logger.warn("unsubscribe while not connected to broker, message may be lost")
|
||||||
yield from self._handler.mqtt_unsubscribe(topics, self.session.next_packet_id)
|
yield from self._handler.mqtt_unsubscribe(topics, self.session.next_packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -261,6 +250,7 @@ class MQTTClient:
|
||||||
yield from self._handler.stop()
|
yield from self._handler.stop()
|
||||||
self.session.transitions.disconnect()
|
self.session.transitions.disconnect()
|
||||||
self.logger.warn("Connection rejected with code '%s'" % return_code)
|
self.logger.warn("Connection rejected with code '%s'" % return_code)
|
||||||
|
raise ConnectException(return_code)
|
||||||
else:
|
else:
|
||||||
# Handle MQTT protocol
|
# Handle MQTT protocol
|
||||||
self._handler = ClientProtocolHandler(reader, writer, loop=self._loop)
|
self._handler = ClientProtocolHandler(reader, writer, loop=self._loop)
|
||||||
|
@ -268,7 +258,6 @@ class MQTTClient:
|
||||||
yield from self._handler.start()
|
yield from self._handler.start()
|
||||||
self.session.transitions.connect()
|
self.session.transitions.connect()
|
||||||
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
||||||
return return_code
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("connection failed: %s" % e)
|
self.logger.warn("connection failed: %s" % e)
|
||||||
self.session.transitions.disconnect()
|
self.session.transitions.disconnect()
|
||||||
|
@ -313,13 +302,7 @@ class MQTTClient:
|
||||||
self.logger.debug("Handle broker disconnection")
|
self.logger.debug("Handle broker disconnection")
|
||||||
yield from self._handler.stop()
|
yield from self._handler.stop()
|
||||||
self.session.transitions.disconnect()
|
self.session.transitions.disconnect()
|
||||||
# while self.session.transitions.state != 'connected':
|
self._connection_closed_future.set_result(None)
|
||||||
# yield from asyncio.sleep(2)
|
|
||||||
# self.logger.debug("Trying reconnect")
|
|
||||||
# try:
|
|
||||||
# yield from self.reconnect()
|
|
||||||
# except ClientException:
|
|
||||||
# self.logger.warn("Reconnect failed")
|
|
||||||
|
|
||||||
def _initsession(
|
def _initsession(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -23,6 +23,12 @@ config = {
|
||||||
#C = MQTTClient(config=config)
|
#C = MQTTClient(config=config)
|
||||||
C = MQTTClient()
|
C = MQTTClient()
|
||||||
|
|
||||||
|
|
||||||
|
def disconnected(future):
|
||||||
|
print("DISCONNECTED")
|
||||||
|
asyncio.get_event_loop().stop()
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_coro():
|
def test_coro():
|
||||||
yield from C.connect('mqtt://localhost:1883/')
|
yield from C.connect('mqtt://localhost:1883/')
|
||||||
|
@ -36,7 +42,22 @@ def test_coro():
|
||||||
yield from C.disconnect()
|
yield from C.disconnect()
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro2():
|
||||||
|
future = yield from C.connect('mqtt://localhost:1883/')
|
||||||
|
future.add_done_callback(disconnected)
|
||||||
|
yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))])
|
||||||
|
yield from asyncio.sleep(10)
|
||||||
|
yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))])
|
||||||
|
logger.info("messages published")
|
||||||
|
yield from C.disconnect()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||||
asyncio.get_event_loop().run_until_complete(test_coro())
|
asyncio.async(test_coro2())
|
||||||
|
try:
|
||||||
|
asyncio.get_event_loop().run_forever()
|
||||||
|
finally:
|
||||||
|
asyncio.get_event_loop().close()
|
Ładowanie…
Reference in New Issue