kopia lustrzana https://github.com/Yakifo/amqtt
make client async without threading
rodzic
2307d3b13b
commit
9869de1aa5
|
@ -84,43 +84,42 @@ class MQTTClient:
|
|||
self.machine.add_transition(trigger='disconnect', source='idle', dest='disconnected')
|
||||
self.machine.add_transition(trigger='disconnect', source='connected', dest='disconnected')
|
||||
|
||||
def connect(self, **kwargs):
|
||||
try:
|
||||
self._loop.run_until_complete(self.connect_async(**kwargs))
|
||||
except Exception as e:
|
||||
self.logger.warn(e)
|
||||
|
||||
def connect(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None):
|
||||
@asyncio.coroutine
|
||||
def connect_async(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None):
|
||||
try:
|
||||
self.machine.connect()
|
||||
self._session = self.init_session(host, port, username, password, uri, clean_session)
|
||||
self._session = self._init_session(host, port, username, password, uri, clean_session)
|
||||
self.logger.debug("Connect with session parameters: %s" % self._session)
|
||||
|
||||
thread_event = threading.Event()
|
||||
self._loop_thread = threading.Thread(target=self._start_client_loop, args=(thread_event, self._loop))
|
||||
self._loop_thread.setDaemon(True)
|
||||
self._loop_thread.start()
|
||||
thread_event.wait()
|
||||
if self._session.state == SessionState.CONNECTED:
|
||||
self.machine.connect_success()
|
||||
else:
|
||||
self.machine.connect_fail()
|
||||
self.logger.warn("Connection failed: %s " % self._session._last_exception)
|
||||
raise ClientException("Connection failed: %s " % self._session._last_exception)
|
||||
yield from self._connect_coro()
|
||||
self.machine.connect_success()
|
||||
except MachineError:
|
||||
msg = "Connect call incompatible with client current state '%s'" % self.machine.current_state
|
||||
self.logger.warn(msg)
|
||||
self.machine.connect_fail()
|
||||
raise ClientException(msg)
|
||||
except Exception as e:
|
||||
self.machine.connect_fail()
|
||||
self.logger.warn("Connection failed: %s " % e)
|
||||
raise ClientException("Connection failed: %s " % e)
|
||||
|
||||
def disconnect(self):
|
||||
try:
|
||||
self.machine.disconnect()
|
||||
if self._loop.is_running():
|
||||
self._loop.call_soon_threadsafe(asyncio.async, self._disconnect_coro())
|
||||
self._loop.call_soon(asyncio.async, self._disconnect_coro())
|
||||
else:
|
||||
self._loop.run_until_complete(self._disconnect_coro())
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise ClientException("Client instance can't be disconnected: %s" % me)
|
||||
self._loop_thread.join()
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
self.logger.warn(self._session._last_exception)
|
||||
self._loop.stop()
|
||||
self._session = None
|
||||
|
||||
@asyncio.coroutine
|
||||
|
@ -150,48 +149,19 @@ class MQTTClient:
|
|||
# Send CONNECT packet and wait for CONNACK
|
||||
packet = ConnectPacket.build_request_from_session(self._session)
|
||||
yield from packet.to_stream(self._session.writer)
|
||||
print(packet)
|
||||
self.logger.debug(packet)
|
||||
connack = yield from ConnackPacket.from_stream(self._session.reader)
|
||||
self.logger.debug(connack)
|
||||
if connack.variable_header.return_code is not ReturnCode.CONNECTION_ACCEPTED:
|
||||
raise ClientException("Connection rejected with code '%s'" % hex(connack.variable_header.return_code))
|
||||
print(connack)
|
||||
self._session.state = SessionState.CONNECTED
|
||||
self.logger.debug("connected to %s:%s" % (self._session.remote_address, self._session.remote_port))
|
||||
except Exception as e:
|
||||
self.logger.error("Connection failed: %s" % e)
|
||||
self._session.state = SessionState.DISCONNECTED
|
||||
raise ClientException("Connection failed: %s" % e)
|
||||
raise
|
||||
|
||||
@asyncio.coroutine
|
||||
def _message_loop(self):
|
||||
while True:
|
||||
try:
|
||||
header = yield from MQTTFixedHeader.from_stream(self._session.reader)
|
||||
print(header)
|
||||
except NoDataException as e:
|
||||
self.logger.info("Message loop ending")
|
||||
# absorb exception
|
||||
self._session._last_exception = HBMQTTException("Message loop ending due to disconnection")
|
||||
break
|
||||
except BaseException as e:
|
||||
# absorb exception
|
||||
self._session._last_exception = e
|
||||
break
|
||||
|
||||
def _start_client_loop(self, connect_event:threading.Event, loop: asyncio.BaseEventLoop):
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
self.logger.debug("Connecting to broker")
|
||||
loop.run_until_complete(self._connect_coro())
|
||||
connect_event.set()
|
||||
self.logger.debug("Starting message loop")
|
||||
loop.run_until_complete(self._message_loop())
|
||||
except BaseException as e:
|
||||
# absorb exception
|
||||
self._session._last_exception = e
|
||||
connect_event.set()
|
||||
|
||||
def init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict:
|
||||
def _init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict:
|
||||
# Load config
|
||||
broker_conf = self.config.get('broker', dict()).copy()
|
||||
if 'mqtt' not in broker_conf:
|
||||
|
|
|
@ -28,4 +28,3 @@ class Session:
|
|||
self.username = None
|
||||
self.password = None
|
||||
self.scheme = None
|
||||
self._last_exception = None
|
||||
|
|
Ładowanie…
Reference in New Issue