kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
280f6979c9
commit
d43ffdce27
|
@ -7,7 +7,7 @@ from urllib.parse import urlparse
|
|||
from transitions import Machine, MachineError
|
||||
|
||||
from hbmqtt.utils import not_in_dict_or_none
|
||||
from hbmqtt.mqtt.protocol import Session, SessionState
|
||||
from hbmqtt.session import Session, SessionState
|
||||
from hbmqtt.mqtt.connack import ConnackPacket, ReturnCode
|
||||
from hbmqtt.mqtt.disconnect import DisconnectPacket
|
||||
from hbmqtt.mqtt.publish import PublishPacket
|
||||
|
@ -20,6 +20,7 @@ from hbmqtt.mqtt.pingresp import PingRespPacket
|
|||
from hbmqtt.mqtt.subscribe import SubscribePacket
|
||||
from hbmqtt.mqtt.suback import SubackPacket
|
||||
from hbmqtt.errors import MQTTException
|
||||
from hbmqtt.mqtt.protocol import ClientProtocolHandler
|
||||
|
||||
_defaults = {
|
||||
'keep_alive': 60,
|
||||
|
@ -55,7 +56,7 @@ class MQTTClient:
|
|||
uri: mqtt:xxx@yyy//localhost:1883/
|
||||
# OR a mix ot both
|
||||
keep_alive: 60
|
||||
clean_session: true
|
||||
cleansession: true
|
||||
will:
|
||||
retain: false
|
||||
topic: some/topic
|
||||
|
@ -85,7 +86,8 @@ class MQTTClient:
|
|||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._ping_handle = None
|
||||
self._session = None
|
||||
self.session = None
|
||||
self._handler = None
|
||||
|
||||
def _init_states(self):
|
||||
self.machine = Machine(states=MQTTClient.states, initial='new')
|
||||
|
@ -97,11 +99,11 @@ class MQTTClient:
|
|||
self.machine.add_transition(trigger='disconnect', source='connected', dest='disconnected')
|
||||
|
||||
@asyncio.coroutine
|
||||
def connect(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None):
|
||||
def connect(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None):
|
||||
try:
|
||||
self.machine.connect()
|
||||
self._session = self._init_session(host, port, username, password, uri, clean_session)
|
||||
self.logger.debug("Connect with session parameters: %s" % self._session)
|
||||
self.session = self._initsession(host, port, username, password, uri, cleansession)
|
||||
self.logger.debug("Connect with session parameters: %s" % self.session)
|
||||
|
||||
yield from self._connect_coro()
|
||||
self.machine.connect_success()
|
||||
|
@ -119,23 +121,22 @@ class MQTTClient:
|
|||
@asyncio.coroutine
|
||||
def disconnect(self):
|
||||
try:
|
||||
self.machine.disconnect()
|
||||
disconnect_packet = DisconnectPacket()
|
||||
self.logger.debug(" -out-> " + repr(disconnect_packet))
|
||||
yield from disconnect_packet.to_stream(self._session.writer)
|
||||
self._session.writer.close()
|
||||
yield from self._handler.mqtt_disconnect()
|
||||
yield from self._handler.stop()
|
||||
except Exception as e:
|
||||
self.logger.warn("Unhandled exception: %s" % e)
|
||||
raise ClientException("Unhandled exception: %s" % e)
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise ClientException("Client instance can't be disconnected: %s" % me)
|
||||
self._loop.stop()
|
||||
self._session = None
|
||||
self.session = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def ping(self):
|
||||
ping_packet = PingReqPacket()
|
||||
self.logger.debug(" -out-> " + repr(ping_packet))
|
||||
yield from ping_packet.to_stream(self._session.writer)
|
||||
response = yield from PingRespPacket.from_stream(self._session.reader)
|
||||
yield from ping_packet.to_stream(self.session.writer)
|
||||
response = yield from PingRespPacket.from_stream(self.session.reader)
|
||||
self.logger.debug(" <-in-- " + repr(response))
|
||||
self._keep_alive()
|
||||
|
||||
|
@ -146,7 +147,7 @@ class MQTTClient:
|
|||
self.logger.debug('Cancel pending ping')
|
||||
except Exception:
|
||||
pass
|
||||
next_ping = self._session.keep_alive-self.config['ping_delay']
|
||||
next_ping = self.session.keep_alive-self.config['ping_delay']
|
||||
if next_ping > 0:
|
||||
self.logger.debug('Next ping in %d seconds' % next_ping)
|
||||
self._ping_handle = self._loop.call_later(next_ping, asyncio.async, self.ping())
|
||||
|
@ -181,18 +182,18 @@ class MQTTClient:
|
|||
|
||||
@asyncio.coroutine
|
||||
def _publish_qos_0(self, topic, message, dup, retain):
|
||||
packet = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x00, retain)
|
||||
packet = PublishPacket.build(topic, message, self.session.next_packet_id, dup, 0x00, retain)
|
||||
self.logger.debug(" -out-> " + repr(packet))
|
||||
yield from packet.to_stream(self._session.writer)
|
||||
yield from packet.to_stream(self.session.writer)
|
||||
self._keep_alive()
|
||||
|
||||
@asyncio.coroutine
|
||||
def _publish_qos_1(self, topic, message, dup, retain):
|
||||
packet = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x01, retain)
|
||||
packet = PublishPacket.build(topic, message, self.session.next_packet_id, dup, 0x01, retain)
|
||||
self.logger.debug(" -out-> " + repr(packet))
|
||||
yield from packet.to_stream(self._session.writer)
|
||||
yield from packet.to_stream(self.session.writer)
|
||||
|
||||
puback = yield from PubackPacket.from_stream(self._session.reader)
|
||||
puback = yield from PubackPacket.from_stream(self.session.reader)
|
||||
self.logger.debug(" <-in-- " + repr(puback))
|
||||
self._keep_alive()
|
||||
|
||||
|
@ -201,20 +202,20 @@ class MQTTClient:
|
|||
|
||||
@asyncio.coroutine
|
||||
def _publish_qos_2(self, topic, message, dup, retain):
|
||||
publish = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x02, retain)
|
||||
publish = PublishPacket.build(topic, message, self.session.next_packet_id, dup, 0x02, retain)
|
||||
self.logger.debug(" -out-> " + repr(publish))
|
||||
yield from publish.to_stream(self._session.writer)
|
||||
yield from publish.to_stream(self.session.writer)
|
||||
|
||||
pubrec = yield from PubrecPacket.from_stream(self._session.reader)
|
||||
pubrec = yield from PubrecPacket.from_stream(self.session.reader)
|
||||
if publish.variable_header.packet_id != pubrec.variable_header.packet_id:
|
||||
raise MQTTException("[MQTT-4.3.2-2] Puback packet packet_id doesn't match publish packet")
|
||||
self.logger.debug(" <-in-- " + repr(pubrec))
|
||||
|
||||
pubrel = PubrelPacket.build(pubrec.variable_header.packet_id)
|
||||
yield from pubrel.to_stream(self._session.writer)
|
||||
yield from pubrel.to_stream(self.session.writer)
|
||||
self.logger.debug(" -out-> " + repr(pubrel))
|
||||
|
||||
pubcomp = yield from PubcompPacket.from_stream(self._session.reader)
|
||||
pubcomp = yield from PubcompPacket.from_stream(self.session.reader)
|
||||
self.logger.debug(" <-in-- " + repr(pubcomp))
|
||||
if pubrel.variable_header.packet_id != pubcomp.variable_header.packet_id:
|
||||
raise MQTTException("[MQTT-4.3.2-2] Pubcomp packet packet_id doesn't match pubrel packet")
|
||||
|
@ -222,11 +223,11 @@ class MQTTClient:
|
|||
|
||||
@asyncio.coroutine
|
||||
def subscribe(self, topics):
|
||||
subscribe = SubscribePacket.build(topics, self._session.next_packet_id)
|
||||
yield from subscribe.to_stream(self._session.writer)
|
||||
subscribe = SubscribePacket.build(topics, self.session.next_packet_id)
|
||||
yield from subscribe.to_stream(self.session.writer)
|
||||
self.logger.debug(" -out-> " + repr(subscribe))
|
||||
|
||||
suback = yield from SubackPacket.from_stream(self._session.reader)
|
||||
suback = yield from SubackPacket.from_stream(self.session.reader)
|
||||
self.logger.debug(" <-in-- " + repr(suback))
|
||||
if suback.variable_header.packet_id != subscribe.variable_header.packet_id:
|
||||
raise MQTTException("[MQTT-4.3.2-2] Suback packet packet_id doesn't match subscribe packet")
|
||||
|
@ -235,26 +236,23 @@ class MQTTClient:
|
|||
@asyncio.coroutine
|
||||
def _connect_coro(self):
|
||||
try:
|
||||
reader, writer = yield from asyncio.open_connection(self._session.remote_address, self._session.remote_port)
|
||||
self._session.open(reader, writer)
|
||||
self.session.reader, self.session.writer = \
|
||||
yield from asyncio.open_connection(self.session.remote_address, self.session.remote_port)
|
||||
self._handler = ClientProtocolHandler(self.session)
|
||||
yield from self._handler.start()
|
||||
|
||||
# Send CONNECT packet and wait for CONNACK
|
||||
packet = self._session.build_connect_packet()
|
||||
yield from packet.to_stream(self._session.writer)
|
||||
self.logger.debug(" -out-> " + repr(packet))
|
||||
return_code = yield from self._handler.mqtt_connect()
|
||||
|
||||
connack = yield from ConnackPacket.from_stream(self._session.reader)
|
||||
self.logger.debug(" <-in-- " + repr(connack))
|
||||
if connack.variable_header.return_code is not ReturnCode.CONNECTION_ACCEPTED:
|
||||
raise ClientException("Connection rejected with code '%s'" % hex(connack.variable_header.return_code))
|
||||
if return_code is not ReturnCode.CONNECTION_ACCEPTED:
|
||||
raise ClientException("Connection rejected with code '%s'" % hex(return_code))
|
||||
|
||||
self._session.state = SessionState.CONNECTED
|
||||
self.logger.debug("connected to %s:%s" % (self._session.remote_address, self._session.remote_port))
|
||||
self.session.state = SessionState.CONNECTED
|
||||
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
||||
except Exception as e:
|
||||
self._session.state = SessionState.DISCONNECTED
|
||||
self.session.state = SessionState.DISCONNECTED
|
||||
raise e
|
||||
|
||||
def _init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict:
|
||||
def _initsession(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None) -> dict:
|
||||
# Load config
|
||||
broker_conf = self.config.get('broker', dict()).copy()
|
||||
if 'mqtt' not in broker_conf:
|
||||
|
@ -284,8 +282,8 @@ class MQTTClient:
|
|||
broker_conf['username'] = username
|
||||
if password:
|
||||
broker_conf['password'] = password
|
||||
if clean_session is not None:
|
||||
broker_conf['clean_session'] = clean_session
|
||||
if cleansession is not None:
|
||||
broker_conf['cleansession'] = cleansession
|
||||
|
||||
for key in ['scheme', 'host', 'port']:
|
||||
if not_in_dict_or_none(broker_conf, key):
|
||||
|
@ -298,10 +296,10 @@ class MQTTClient:
|
|||
s.username = broker_conf['username']
|
||||
s.password = broker_conf['password']
|
||||
s.scheme = broker_conf['scheme']
|
||||
if clean_session is not None:
|
||||
s.clean_session = clean_session
|
||||
if cleansession is not None:
|
||||
s.cleansession = cleansession
|
||||
else:
|
||||
s.clean_session = self.config.get('clean_session', True)
|
||||
s.cleansession = self.config.get('cleansession', True)
|
||||
s.keep_alive = self.config['keep_alive']
|
||||
if 'will' in self.config:
|
||||
s.will_flag = True
|
||||
|
|
|
@ -3,99 +3,13 @@
|
|||
# See the file license.txt for copying permission.
|
||||
import logging
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
from hbmqtt.mqtt.packet import MQTTFixedHeader
|
||||
from hbmqtt.mqtt import packet_class
|
||||
from hbmqtt.errors import NoDataException
|
||||
from hbmqtt.mqtt.packet import PacketType
|
||||
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload
|
||||
|
||||
class SessionState(Enum):
|
||||
NEW = 0
|
||||
CONNECTED = 1
|
||||
DISCONNECTED = 2
|
||||
|
||||
class Session:
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
self.state = SessionState.NEW
|
||||
self.reader = None
|
||||
self.writer = None
|
||||
self.remote_address = None
|
||||
self.remote_port = None
|
||||
self.local_address = None
|
||||
self.local_port = None
|
||||
self.client_id = None
|
||||
self.clean_session = None
|
||||
self.will_flag = False
|
||||
self.will_message = None
|
||||
self.will_qos = None
|
||||
self.will_retain = None
|
||||
self.will_topic = None
|
||||
self.keep_alive = None
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.scheme = None
|
||||
self._packet_id = 0
|
||||
|
||||
self.incoming_queues = dict()
|
||||
for p in PacketType:
|
||||
self.incoming_queues[p] = asyncio.Queue()
|
||||
self.outgoing_queue = asyncio.Queue()
|
||||
|
||||
self.handler = ProtocolHandler(self)
|
||||
|
||||
@asyncio.coroutine
|
||||
def open(self, reader: asyncio.StreamReader, writer:asyncio.StreamWriter):
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
self.local_address, self.local_port = self.writer.get_extra_info('sockname')
|
||||
|
||||
yield from self.handler.start()
|
||||
|
||||
@asyncio.coroutine
|
||||
def close(self):
|
||||
yield from self.handler.stop()
|
||||
self.writer.close()
|
||||
|
||||
def build_connect_packet(self):
|
||||
vh = ConnectVariableHeader()
|
||||
payload = ConnectPayload()
|
||||
|
||||
vh.keep_alive = self.keep_alive
|
||||
vh.clean_session_flag = self.clean_session
|
||||
vh.will_retain_flag = self.will_retain
|
||||
payload.client_id = self.client_id
|
||||
|
||||
if self.username:
|
||||
vh.username_flag = True
|
||||
payload.username = self.username
|
||||
else:
|
||||
vh.username_flag = False
|
||||
|
||||
if self.password:
|
||||
vh.password_flag = True
|
||||
payload.password = self.password
|
||||
else:
|
||||
vh.password_flag = False
|
||||
if self.will_flag:
|
||||
vh.will_flag = True
|
||||
vh.will_qos = self.will_qos
|
||||
payload.will_message = self.will_message
|
||||
payload.will_topic = self.will_topic
|
||||
else:
|
||||
vh.will_flag = False
|
||||
|
||||
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
|
||||
packet = ConnectPacket(header, vh, payload)
|
||||
return packet
|
||||
|
||||
|
||||
@property
|
||||
def next_packet_id(self):
|
||||
self._packet_id += 1
|
||||
return self._packet_id
|
||||
from hbmqtt.mqtt.disconnect import DisconnectPacket
|
||||
from hbmqtt.session import Session
|
||||
|
||||
class ProtocolHandler:
|
||||
"""
|
||||
|
@ -114,6 +28,12 @@ class ProtocolHandler:
|
|||
self._writer_ready = asyncio.Event(loop=self._loop)
|
||||
self._running = False
|
||||
|
||||
self.session.local_address, self.session.local_port = self.session.writer.get_extra_info('sockname')
|
||||
self.incoming_queues = dict()
|
||||
for p in PacketType:
|
||||
self.incoming_queues[p] = asyncio.Queue()
|
||||
self.outgoing_queue = asyncio.Queue()
|
||||
|
||||
@asyncio.coroutine
|
||||
def start(self):
|
||||
self._running = True
|
||||
|
@ -122,11 +42,12 @@ class ProtocolHandler:
|
|||
yield from asyncio.wait([self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop)
|
||||
self.logger.debug("Handler tasks started")
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
self._running = False
|
||||
yield from asyncio.wait([self._writer_task], loop=self._loop)
|
||||
|
||||
self.session.reader.feed_eof()
|
||||
yield from asyncio.wait([self._writer_task, self._reader_task], loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def _reader_coro(self):
|
||||
|
@ -138,7 +59,8 @@ class ProtocolHandler:
|
|||
if fixed_header:
|
||||
cls = packet_class(fixed_header)
|
||||
packet = yield from cls.from_stream(self.session.reader, fixed_header=fixed_header)
|
||||
yield from self.session.incoming_queues[packet.fixed_header.packet_type].put(packet)
|
||||
self.logger.debug(" <-in-- " + repr(packet))
|
||||
yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet)
|
||||
else:
|
||||
self.logger.debug("No data")
|
||||
except asyncio.TimeoutError:
|
||||
|
@ -146,8 +68,8 @@ class ProtocolHandler:
|
|||
except NoDataException as nde:
|
||||
self.logger.debug("No data available")
|
||||
#break
|
||||
except BaseException as e:
|
||||
self.logger.warn("Exception in reader coro: %s" % e)
|
||||
except Exception as e:
|
||||
self.logger.warn("Unhandled exception in reader coro: %s" % e)
|
||||
break
|
||||
self.logger.debug("Reader coro stopped")
|
||||
|
||||
|
@ -155,28 +77,81 @@ class ProtocolHandler:
|
|||
@asyncio.coroutine
|
||||
def _writer_coro(self):
|
||||
self.logger.debug("Starting writer coro")
|
||||
out_queue = self.session.outgoing_queue
|
||||
packet = None
|
||||
while self._running:
|
||||
try:
|
||||
self._writer_ready.set()
|
||||
packet = yield from asyncio.wait_for(out_queue.get(), 5)
|
||||
self.logger.debug(packet)
|
||||
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), 5)
|
||||
self.logger.debug(" -out-> " + repr(packet))
|
||||
yield from packet.to_stream(self.session.writer)
|
||||
yield from self.session.writer.drain()
|
||||
except asyncio.TimeoutError as ce:
|
||||
self.logger.warn("Output queue get timeout")
|
||||
except Exception as e:
|
||||
self.logger.warn("Exception in writer coro: %s" % e)
|
||||
self.logger.warn("Unhandled exception in writer coro: %s" % e)
|
||||
break
|
||||
self.logger.debug("Writer coro stopping")
|
||||
# Flush queue before stopping
|
||||
if not out_queue.empty():
|
||||
if not self.outgoing_queue.empty():
|
||||
while True:
|
||||
try:
|
||||
packet = out_queue.get_nowait()
|
||||
packet = self.outgoing_queue.get_nowait()
|
||||
self.logger.debug(packet)
|
||||
yield from packet.to_stream(self.session.writer)
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.warn("Unhandled exception in writer coro: %s" % e)
|
||||
self.logger.debug("Writer coro stopped")
|
||||
|
||||
|
||||
class ClientProtocolHandler(ProtocolHandler):
|
||||
def __init__(self, session: Session, loop=None):
|
||||
super().__init__(session, loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def mqtt_connect(self):
|
||||
def build_connect_packet(session):
|
||||
vh = ConnectVariableHeader()
|
||||
payload = ConnectPayload()
|
||||
|
||||
vh.keep_alive = session.keep_alive
|
||||
vh.clean_session_flag = session.clean_session
|
||||
vh.will_retain_flag = session.will_retain
|
||||
payload.client_id = session.client_id
|
||||
|
||||
if session.username:
|
||||
vh.username_flag = True
|
||||
payload.username = session.username
|
||||
else:
|
||||
vh.username_flag = False
|
||||
|
||||
if session.password:
|
||||
vh.password_flag = True
|
||||
payload.password = session.password
|
||||
else:
|
||||
vh.password_flag = False
|
||||
if session.will_flag:
|
||||
vh.will_flag = True
|
||||
vh.will_qos = session.will_qos
|
||||
payload.will_message = session.will_message
|
||||
payload.will_topic = session.will_topic
|
||||
else:
|
||||
vh.will_flag = False
|
||||
|
||||
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
|
||||
packet = ConnectPacket(header, vh, payload)
|
||||
return packet
|
||||
|
||||
packet = build_connect_packet(self.session)
|
||||
yield from self.outgoing_queue.put(packet)
|
||||
connack = yield from self.incoming_queues[PacketType.CONNACK].get()
|
||||
return connack.variable_header.return_code
|
||||
|
||||
@asyncio.coroutine
|
||||
def mqtt_disconnect(self):
|
||||
disconnect_packet = DisconnectPacket()
|
||||
yield from self.outgoing_queue.put(disconnect_packet)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
from enum import Enum
|
||||
|
||||
class SessionState(Enum):
|
||||
NEW = 0
|
||||
CONNECTED = 1
|
||||
DISCONNECTED = 2
|
||||
|
||||
class Session:
|
||||
def __init__(self):
|
||||
self.state = SessionState.NEW
|
||||
self.reader = None
|
||||
self.writer = None
|
||||
self.remote_address = None
|
||||
self.remote_port = None
|
||||
self.local_address = None
|
||||
self.local_port = None
|
||||
self.client_id = None
|
||||
self.clean_session = None
|
||||
self.will_flag = False
|
||||
self.will_message = None
|
||||
self.will_qos = None
|
||||
self.will_retain = None
|
||||
self.will_topic = None
|
||||
self.keep_alive = None
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.scheme = None
|
||||
self._packet_id = 0
|
||||
|
||||
@property
|
||||
def next_packet_id(self):
|
||||
self._packet_id += 1
|
||||
return self._packet_id
|
|
@ -1,13 +1,13 @@
|
|||
#import logging
|
||||
#from hbmqtt.client._client import MQTTClient
|
||||
#import asyncio
|
||||
import logging
|
||||
from hbmqtt.client._client import MQTTClient
|
||||
import asyncio
|
||||
|
||||
# C=MQTTClient()
|
||||
#
|
||||
# @asyncio.coroutine
|
||||
# def test_coro():
|
||||
# yield from C.connect(uri='mqtt://iot.eclipse.org:1883/', username='testuser', password="passwd")
|
||||
# yield from asyncio.sleep(1)
|
||||
C = MQTTClient()
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
yield from C.connect(uri='mqtt://iot.eclipse.org:1883/', username='testuser', password="passwd")
|
||||
yield from asyncio.sleep(1)
|
||||
# yield from C.publish('a/b', b'0123456789')
|
||||
# yield from C.publish('a/b', b'0123456789', qos=0x01)
|
||||
# yield from C.publish('a/b', b'0123456789', qos=0x02)
|
||||
|
@ -16,8 +16,9 @@
|
|||
# {'filter': 'c/d', 'qos': 0x02}
|
||||
# ])
|
||||
# #yield from asyncio.sleep(10)
|
||||
# yield from C.disconnect()
|
||||
#
|
||||
# if __name__ == '__main__':
|
||||
# logging.basicConfig(level=logging.DEBUG)
|
||||
# asyncio.get_event_loop().run_until_complete(test_coro())
|
||||
yield from C.disconnect()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
asyncio.get_event_loop().run_until_complete(test_coro())
|
|
@ -5,7 +5,8 @@ import unittest
|
|||
import asyncio
|
||||
|
||||
from hbmqtt.mqtt.connect import ConnectPacket, ConnectVariableHeader, ConnectPayload
|
||||
from hbmqtt.mqtt.protocol import Session
|
||||
from hbmqtt.mqtt.protocol import ProtocolHandler
|
||||
from hbmqtt.session import Session
|
||||
from hbmqtt.mqtt.packet import PacketType
|
||||
import logging
|
||||
|
||||
|
@ -33,11 +34,12 @@ class ConnectPacketTest(unittest.TestCase):
|
|||
@asyncio.coroutine
|
||||
def client():
|
||||
S = Session()
|
||||
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
|
||||
S.reader, S.writer = yield from asyncio.open_connection('127.0.0.1', 8888,
|
||||
loop=loop)
|
||||
yield from S.open(reader, writer)
|
||||
incoming_packet = yield from S.incoming_queues[PacketType.CONNECT].get()
|
||||
yield from S.close()
|
||||
h = ProtocolHandler(S)
|
||||
yield from h.start()
|
||||
incoming_packet = yield from h.incoming_queues[PacketType.CONNECT].get()
|
||||
yield from h.stop()
|
||||
return incoming_packet
|
||||
|
||||
packet = loop.run_until_complete(client())
|
||||
|
@ -59,10 +61,11 @@ class ConnectPacketTest(unittest.TestCase):
|
|||
@asyncio.coroutine
|
||||
def client():
|
||||
S = Session()
|
||||
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=loop)
|
||||
yield from S.open(reader, writer)
|
||||
yield from S.outgoing_queue.put(test_packet)
|
||||
yield from S.close()
|
||||
S.reader, S.writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=loop)
|
||||
h = ProtocolHandler(S)
|
||||
yield from h.start()
|
||||
yield from h.outgoing_queue.put(test_packet)
|
||||
yield from h.stop()
|
||||
|
||||
# Start server
|
||||
loop = asyncio.get_event_loop()
|
||||
|
|
Ładowanie…
Reference in New Issue