kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
74a66632cf
commit
fd6ed3c42c
|
@ -7,8 +7,6 @@ import asyncio
|
||||||
import ssl
|
import ssl
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
from transitions import MachineError
|
|
||||||
|
|
||||||
from hbmqtt.utils import not_in_dict_or_none
|
from hbmqtt.utils import not_in_dict_or_none
|
||||||
from hbmqtt.session import Session
|
from hbmqtt.session import Session
|
||||||
from hbmqtt.mqtt.connack import *
|
from hbmqtt.mqtt.connack import *
|
||||||
|
@ -30,8 +28,7 @@ class ClientException(BaseException):
|
||||||
|
|
||||||
|
|
||||||
class ConnectException(ClientException):
|
class ConnectException(ClientException):
|
||||||
def __init__(self, code):
|
pass
|
||||||
self.return_code = code
|
|
||||||
|
|
||||||
|
|
||||||
class MQTTClient:
|
class MQTTClient:
|
||||||
|
@ -236,33 +233,37 @@ class MQTTClient:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("connection failed: %s" % e)
|
self.logger.warn("connection failed: %s" % e)
|
||||||
self.session.transitions.disconnect()
|
self.session.transitions.disconnect()
|
||||||
raise ClientException("connection Failed: %s" % e)
|
raise ConnectException("connection Failed: %s" % e)
|
||||||
|
|
||||||
connect_packet = self.build_connect_packet()
|
return_code = None
|
||||||
yield from connect_packet.to_stream(writer)
|
|
||||||
self.logger.debug(" -out-> " + repr(connect_packet))
|
|
||||||
try :
|
try :
|
||||||
|
connect_packet = self.build_connect_packet()
|
||||||
|
yield from connect_packet.to_stream(writer)
|
||||||
|
self.logger.debug(" -out-> " + repr(connect_packet))
|
||||||
|
|
||||||
connack = yield from ConnackPacket.from_stream(reader)
|
connack = yield from ConnackPacket.from_stream(reader)
|
||||||
self.logger.debug(" <-in-- " + repr(connack))
|
self.logger.debug(" <-in-- " + repr(connack))
|
||||||
return_code = connack.variable_header.return_code
|
return_code = connack.variable_header.return_code
|
||||||
|
|
||||||
if return_code is not CONNECTION_ACCEPTED:
|
|
||||||
yield from self._handler.stop()
|
|
||||||
self.session.transitions.disconnect()
|
|
||||||
self.logger.warn("Connection rejected with code '%s'" % return_code)
|
|
||||||
raise ConnectException(return_code)
|
|
||||||
else:
|
|
||||||
# Handle MQTT protocol
|
|
||||||
self._handler = ClientProtocolHandler(reader, writer, loop=self._loop)
|
|
||||||
self._handler.attach_to_session(self.session)
|
|
||||||
yield from self._handler.start()
|
|
||||||
self.session.transitions.connect()
|
|
||||||
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warn("connection failed: %s" % e)
|
self.logger.warn("connection failed: %s" % e)
|
||||||
self.session.transitions.disconnect()
|
self.session.transitions.disconnect()
|
||||||
raise ClientException("connection Failed: %s" % e)
|
raise ClientException("connection Failed: %s" % e)
|
||||||
|
|
||||||
|
if return_code is not CONNECTION_ACCEPTED:
|
||||||
|
yield from self._handler.stop()
|
||||||
|
self.session.transitions.disconnect()
|
||||||
|
self.logger.warn("Connection rejected with code '%s'" % return_code)
|
||||||
|
exc = ConnectException("Connection rejected by broker")
|
||||||
|
exc.return_code = return_code
|
||||||
|
raise exc
|
||||||
|
else:
|
||||||
|
# Handle MQTT protocol
|
||||||
|
self._handler = ClientProtocolHandler(reader, writer, loop=self._loop)
|
||||||
|
self._handler.attach_to_session(self.session)
|
||||||
|
yield from self._handler.start()
|
||||||
|
self.session.transitions.connect()
|
||||||
|
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
||||||
|
|
||||||
def build_connect_packet(self):
|
def build_connect_packet(self):
|
||||||
vh = ConnectVariableHeader()
|
vh = ConnectVariableHeader()
|
||||||
payload = ConnectPayload()
|
payload = ConnectPayload()
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from hbmqtt.client import MQTTClient
|
from hbmqtt.client import MQTTClient, ConnectException
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,13 +44,15 @@ def test_coro():
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_coro2():
|
def test_coro2():
|
||||||
future = yield from C.connect('mqtt://localhost:1883/')
|
try:
|
||||||
future.add_done_callback(disconnected)
|
future = yield from C.connect('mqtt://localhost:1883/')
|
||||||
yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))])
|
future.add_done_callback(disconnected)
|
||||||
yield from asyncio.sleep(10)
|
yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))])
|
||||||
yield from asyncio.wait([asyncio.async(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=0x01))])
|
logger.info("messages published")
|
||||||
logger.info("messages published")
|
yield from C.disconnect()
|
||||||
yield from C.disconnect()
|
except ConnectException as ce:
|
||||||
|
logger.error("Connection failed: %s" % ce)
|
||||||
|
asyncio.get_event_loop().stop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
Ładowanie…
Reference in New Issue