kopia lustrzana https://github.com/Yakifo/amqtt
Merge branch 'release/0.7'
commit
cbb55653b0
|
@ -1,6 +1,16 @@
|
|||
Changelog
|
||||
---------
|
||||
|
||||
0.7.0
|
||||
.....
|
||||
|
||||
* Fix a `serie of issues <https://github.com/beerfactory/hbmqtt/issues?q=milestone%3A0.7+is%3Aclosed>`_ reported by `Christoph Krey <https://github.com/ckrey>`_
|
||||
|
||||
0.6.3
|
||||
.....
|
||||
|
||||
* Fix issue `#22 <https://github.com/beerfactory/hbmqtt/issues/22>`_.
|
||||
|
||||
0.6.2
|
||||
.....
|
||||
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
#
|
||||
# See the file license.txt for copying permission.
|
||||
|
||||
VERSION = (0, 6, 3, 'final', 0)
|
||||
VERSION = (0, 7, 0, 'final', 0)
|
||||
|
|
|
@ -225,43 +225,46 @@ class Broker:
|
|||
for listener_name in self.listeners_config:
|
||||
listener = self.listeners_config[listener_name]
|
||||
|
||||
# Max connections
|
||||
try:
|
||||
max_connections = listener['max_connections']
|
||||
except KeyError:
|
||||
max_connections = -1
|
||||
|
||||
# SSL Context
|
||||
sc = None
|
||||
if 'ssl' in listener and listener['ssl'].upper() == 'ON':
|
||||
if 'bind' not in listener:
|
||||
self.logger.debug("Listener configuration '%s' is not bound" % listener_name)
|
||||
else:
|
||||
# Max connections
|
||||
try:
|
||||
sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
sc.load_cert_chain(listener['certfile'], listener['keyfile'])
|
||||
sc.verify_mode = ssl.CERT_OPTIONAL
|
||||
except KeyError as ke:
|
||||
raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
|
||||
except FileNotFoundError as fnfe:
|
||||
raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
|
||||
(listener['certfile'], listener['keyfile'], fnfe))
|
||||
max_connections = listener['max_connections']
|
||||
except KeyError:
|
||||
max_connections = -1
|
||||
|
||||
if listener['type'] == 'tcp':
|
||||
address, port = listener['bind'].split(':')
|
||||
cb_partial = partial(self.stream_connected, listener_name=listener_name)
|
||||
instance = yield from asyncio.start_server(cb_partial,
|
||||
address,
|
||||
port,
|
||||
ssl=sc,
|
||||
loop=self._loop)
|
||||
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
||||
elif listener['type'] == 'ws':
|
||||
address, port = listener['bind'].split(':')
|
||||
cb_partial = partial(self.ws_connected, listener_name=listener_name)
|
||||
instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,
|
||||
subprotocols=['mqtt'])
|
||||
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
||||
# SSL Context
|
||||
sc = None
|
||||
if 'ssl' in listener and listener['ssl'].upper() == 'ON':
|
||||
try:
|
||||
sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
sc.load_cert_chain(listener['certfile'], listener['keyfile'])
|
||||
sc.verify_mode = ssl.CERT_OPTIONAL
|
||||
except KeyError as ke:
|
||||
raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
|
||||
except FileNotFoundError as fnfe:
|
||||
raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
|
||||
(listener['certfile'], listener['keyfile'], fnfe))
|
||||
|
||||
self.logger.info("Listener '%s' bind to %s (max_connecionts=%d)" %
|
||||
(listener_name, listener['bind'], max_connections))
|
||||
if listener['type'] == 'tcp':
|
||||
address, port = listener['bind'].split(':')
|
||||
cb_partial = partial(self.stream_connected, listener_name=listener_name)
|
||||
instance = yield from asyncio.start_server(cb_partial,
|
||||
address,
|
||||
port,
|
||||
ssl=sc,
|
||||
loop=self._loop)
|
||||
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
||||
elif listener['type'] == 'ws':
|
||||
address, port = listener['bind'].split(':')
|
||||
cb_partial = partial(self.ws_connected, listener_name=listener_name)
|
||||
instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,
|
||||
subprotocols=['mqtt'])
|
||||
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
||||
|
||||
self.logger.info("Listener '%s' bind to %s (max_connections=%d)" %
|
||||
(listener_name, listener['bind'], max_connections))
|
||||
|
||||
self.transitions.starting_success()
|
||||
yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
|
||||
|
@ -337,7 +340,7 @@ class Broker:
|
|||
except HBMQTTException as exc:
|
||||
self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
|
||||
(format_client_message(address=remote_address, port=remote_port), exc))
|
||||
yield from writer.close()
|
||||
#yield from writer.close()
|
||||
self.logger.debug("Connection closed")
|
||||
return
|
||||
except MQTTException as me:
|
||||
|
@ -458,6 +461,12 @@ class Broker:
|
|||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("%s handling message delivery" % client_session.client_id)
|
||||
app_message = wait_deliver.result()
|
||||
if not app_message.topic:
|
||||
self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
|
||||
break
|
||||
if "#" in app_message.topic or "+" in app_message.topic:
|
||||
self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
|
||||
break
|
||||
yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
|
||||
client_id=client_session.client_id,
|
||||
message=app_message)
|
||||
|
@ -539,8 +548,9 @@ class Broker:
|
|||
self._retained_messages[topic_name] = retained_message
|
||||
else:
|
||||
# [MQTT-3.3.1-10]
|
||||
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
|
||||
del self._retained_messages[topic_name]
|
||||
if topic_name in self._retained_messages:
|
||||
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
|
||||
del self._retained_messages[topic_name]
|
||||
|
||||
def add_subscription(self, subscription, session):
|
||||
import re
|
||||
|
@ -550,9 +560,11 @@ class Broker:
|
|||
if '#' in a_filter and not a_filter.endswith('#'):
|
||||
# [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter
|
||||
return 0x80
|
||||
if '+' in a_filter and not wildcard_pattern.match(a_filter):
|
||||
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
|
||||
return 0x80
|
||||
if a_filter != "+":
|
||||
if '+' in a_filter:
|
||||
if "/+" not in a_filter and "+/" not in a_filter:
|
||||
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
|
||||
return 0x80
|
||||
|
||||
qos = subscription[1]
|
||||
if 'max-qos' in self.config and qos > self.config['max-qos']:
|
||||
|
@ -608,7 +620,7 @@ class Broker:
|
|||
|
||||
def matches(self, topic, a_filter):
|
||||
import re
|
||||
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+'))
|
||||
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))
|
||||
if match_pattern.match(topic):
|
||||
return True
|
||||
else:
|
||||
|
@ -625,7 +637,9 @@ class Broker:
|
|||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("broadcasting %r" % broadcast)
|
||||
for k_filter in self._subscriptions:
|
||||
if self.matches(broadcast['topic'], k_filter):
|
||||
if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")):
|
||||
self.logger.debug("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #")
|
||||
elif self.matches(broadcast['topic'], k_filter):
|
||||
subscriptions = self._subscriptions[k_filter]
|
||||
for (target_session, qos) in subscriptions:
|
||||
if 'qos' in broadcast:
|
||||
|
|
|
@ -149,7 +149,6 @@ class MQTTClient:
|
|||
else:
|
||||
return (yield from self.reconnect())
|
||||
|
||||
@mqtt_connected
|
||||
@asyncio.coroutine
|
||||
def disconnect(self):
|
||||
"""
|
||||
|
@ -330,7 +329,10 @@ class MQTTClient:
|
|||
deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
self.client_tasks.append(deliver_task)
|
||||
self.logger.debug("Waiting message delivery")
|
||||
yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
|
||||
done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
|
||||
if pending:
|
||||
#timeout occured before message received
|
||||
deliver_task.cancel()
|
||||
if deliver_task.exception():
|
||||
raise deliver_task.exception()
|
||||
self.client_tasks.pop()
|
||||
|
|
|
@ -143,9 +143,12 @@ class BrokerProtocolHandler(ProtocolHandler):
|
|||
if connect.proto_level != 4:
|
||||
# only MQTT 3.1.1 supported
|
||||
error_msg = 'Invalid protocol from %s: %d' % \
|
||||
(format_client_message(address=remote_address, port=remote_port),
|
||||
connect.variable_header.protocol_level)
|
||||
(format_client_message(address=remote_address, port=remote_port), connect.proto_level)
|
||||
connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0
|
||||
elif not connect.username_flag and connect.password_flag:
|
||||
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22]
|
||||
elif connect.username_flag and not connect.password_flag:
|
||||
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22]
|
||||
elif connect.username_flag and connect.username is None:
|
||||
error_msg = 'Invalid username from %s' % \
|
||||
(format_client_message(address=remote_address, port=remote_port))
|
||||
|
@ -153,7 +156,7 @@ class BrokerProtocolHandler(ProtocolHandler):
|
|||
elif connect.password_flag and connect.password is None:
|
||||
error_msg = 'Invalid password %s' % (format_client_message(address=remote_address, port=remote_port))
|
||||
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
|
||||
elif connect.clean_session_flag is False and connect.payload.client_id is None:
|
||||
elif connect.clean_session_flag is False and (connect.payload.client_id is None or connect.payload.client_id == ""):
|
||||
error_msg = '[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % \
|
||||
format_client_message(address=remote_address, port=remote_port)
|
||||
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
||||
|
|
|
@ -333,22 +333,26 @@ class ProtocolHandler:
|
|||
# Wait PUBREL
|
||||
if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done():
|
||||
# PUBREL waiter already exists for this packet ID
|
||||
message = "Can't add PUBREL waiter, a waiter already exists for message Id '%s'" \
|
||||
message = "A waiter already exists for message Id '%s', canceling it" \
|
||||
% app_message.packet_id
|
||||
self.logger.warning(message)
|
||||
raise HBMQTTException(message)
|
||||
waiter = asyncio.Future(loop=self._loop)
|
||||
self._pubrel_waiters[app_message.packet_id] = waiter
|
||||
yield from waiter
|
||||
del self._pubrel_waiters[app_message.packet_id]
|
||||
app_message.pubrel_packet = waiter.result()
|
||||
# Initiate delivery and discard message
|
||||
yield from self.session.delivered_message_queue.put(app_message)
|
||||
del self.session.inflight_in[app_message.packet_id]
|
||||
# Send pubcomp
|
||||
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
||||
yield from self._send_packet(pubcomp_packet)
|
||||
app_message.pubcomp_packet = pubcomp_packet
|
||||
self._pubrel_waiters[app_message.packet_id].cancel()
|
||||
try:
|
||||
waiter = asyncio.Future(loop=self._loop)
|
||||
self._pubrel_waiters[app_message.packet_id] = waiter
|
||||
yield from waiter
|
||||
del self._pubrel_waiters[app_message.packet_id]
|
||||
app_message.pubrel_packet = waiter.result()
|
||||
# Initiate delivery and discard message
|
||||
yield from self.session.delivered_message_queue.put(app_message)
|
||||
del self.session.inflight_in[app_message.packet_id]
|
||||
# Send pubcomp
|
||||
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
||||
yield from self._send_packet(pubcomp_packet)
|
||||
app_message.pubcomp_packet = pubcomp_packet
|
||||
except asyncio.CancelledError:
|
||||
self.logger.debug("Message flow cancelled")
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def _reader_loop(self):
|
||||
|
@ -454,7 +458,10 @@ class ProtocolHandler:
|
|||
def mqtt_deliver_next_message(self):
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize())
|
||||
message = yield from self.session.delivered_message_queue.get()
|
||||
try:
|
||||
message = yield from self.session.delivered_message_queue.get()
|
||||
except asyncio.CancelledError:
|
||||
message = None
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("Delivering message %s" % message)
|
||||
return message
|
||||
|
|
|
@ -48,7 +48,13 @@ class PublishPayload(MQTTPayload):
|
|||
@asyncio.coroutine
|
||||
def from_stream(cls, reader: asyncio.StreamReader, fixed_header: MQTTFixedHeader,
|
||||
variable_header: MQTTVariableHeader):
|
||||
data = yield from reader.read(fixed_header.remaining_length-variable_header.bytes_length)
|
||||
data = bytearray()
|
||||
data_length = fixed_header.remaining_length-variable_header.bytes_length
|
||||
length_read = 0
|
||||
while length_read < data_length:
|
||||
buffer = yield from reader.read(data_length - length_read)
|
||||
data.extend(buffer)
|
||||
length_read = len(data)
|
||||
return cls(data)
|
||||
|
||||
def __repr__(self):
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Copyright (c) 2015 Nicolas JOUANIN
|
||||
#
|
||||
# See the file license.txt for copying permission.
|
||||
import yaml
|
||||
|
||||
|
||||
def not_in_dict_or_none(dict, key):
|
||||
|
@ -36,3 +37,12 @@ def gen_client_id():
|
|||
for i in range(7, 23):
|
||||
gen_id += chr(random.randint(0, 74) + 48)
|
||||
return gen_id
|
||||
|
||||
def read_yaml_config(config_file):
|
||||
config = None
|
||||
try:
|
||||
with open(config_file, 'r') as stream:
|
||||
config = yaml.load(stream)
|
||||
except yaml.YAMLError as exc:
|
||||
logger.error("Invalid config_file %s: %s" % (config_file, exc))
|
||||
return config
|
|
@ -23,7 +23,7 @@ import os
|
|||
from hbmqtt.broker import Broker
|
||||
from hbmqtt.version import get_version
|
||||
from docopt import docopt
|
||||
from .utils import read_yaml_config
|
||||
from hbmqtt.utils import read_yaml_config
|
||||
|
||||
|
||||
default_config = {
|
||||
|
|
|
@ -40,10 +40,8 @@ import os
|
|||
from hbmqtt.client import MQTTClient, ConnectException
|
||||
from hbmqtt.version import get_version
|
||||
from docopt import docopt
|
||||
try:
|
||||
from .utils import read_yaml_config
|
||||
except:
|
||||
from utils import read_yaml_config
|
||||
from hbmqtt.utils import read_yaml_config
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
|
|
|
@ -39,10 +39,7 @@ from hbmqtt.errors import MQTTException
|
|||
from hbmqtt.version import get_version
|
||||
from docopt import docopt
|
||||
from hbmqtt.mqtt.constants import QOS_0
|
||||
try:
|
||||
from .utils import read_yaml_config
|
||||
except:
|
||||
from utils import read_yaml_config
|
||||
from hbmqtt.utils import read_yaml_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
# Copyright (c) 2015 Nicolas JOUANIN
|
||||
#
|
||||
# See the file license.txt for copying permission.
|
||||
import yaml
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def read_yaml_config(config_file):
|
||||
config = None
|
||||
try:
|
||||
with open(config_file, 'r') as stream:
|
||||
config = yaml.load(stream)
|
||||
except yaml.YAMLError as exc:
|
||||
logger.error("Invalid config_file %s: %s" % (config_file, exc))
|
||||
return config
|
|
@ -1,2 +1,2 @@
|
|||
[bdist_wheel]
|
||||
python-tag = py34
|
||||
python-tag = py34.py35
|
|
@ -5,7 +5,10 @@ import unittest
|
|||
from unittest.mock import patch, call, MagicMock
|
||||
from hbmqtt.broker import *
|
||||
from hbmqtt.mqtt.constants import *
|
||||
from hbmqtt.client import MQTTClient
|
||||
from hbmqtt.client import MQTTClient, ConnectException
|
||||
from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \
|
||||
PubrelPacket, PubcompPacket, DisconnectPacket
|
||||
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload
|
||||
|
||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
|
@ -99,6 +102,81 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_connect_will_flag(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
|
||||
conn_reader, conn_writer = \
|
||||
yield from asyncio.open_connection('localhost', 1883, loop=self.loop)
|
||||
reader = StreamReaderAdapter(conn_reader)
|
||||
writer = StreamWriterAdapter(conn_writer)
|
||||
|
||||
vh = ConnectVariableHeader()
|
||||
payload = ConnectPayload()
|
||||
|
||||
vh.keep_alive = 10
|
||||
vh.clean_session_flag = False
|
||||
vh.will_retain_flag = False
|
||||
vh.will_flag = True
|
||||
vh.will_qos = QOS_0
|
||||
payload.client_id = 'test_id'
|
||||
payload.will_message = b'test'
|
||||
payload.will_topic = '/topic'
|
||||
connect = ConnectPacket(vh=vh, payload=payload)
|
||||
yield from connect.to_stream(writer)
|
||||
yield from ConnackPacket.from_stream(reader)
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
|
||||
disconnect = DisconnectPacket()
|
||||
yield from disconnect.to_stream(writer)
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
self.assertDictEqual(broker._sessions, {})
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_connect_clean_session_false(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
client = MQTTClient(client_id="", config={'auto_reconnect': False})
|
||||
return_code=None
|
||||
try:
|
||||
yield from client.connect('mqtt://localhost/', cleansession=False)
|
||||
except ConnectException as ce:
|
||||
return_code = ce.return_code
|
||||
self.assertEqual(return_code, 0x02)
|
||||
self.assertNotIn(client.session.client_id, broker._sessions)
|
||||
yield from client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
@ -259,6 +337,119 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
#@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_dup(self):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
|
||||
conn_reader, conn_writer = \
|
||||
yield from asyncio.open_connection('localhost', 1883, loop=self.loop)
|
||||
reader = StreamReaderAdapter(conn_reader)
|
||||
writer = StreamWriterAdapter(conn_writer)
|
||||
|
||||
vh = ConnectVariableHeader()
|
||||
payload = ConnectPayload()
|
||||
|
||||
vh.keep_alive = 10
|
||||
vh.clean_session_flag = False
|
||||
vh.will_retain_flag = False
|
||||
payload.client_id = 'test_id'
|
||||
connect = ConnectPacket(vh=vh, payload=payload)
|
||||
yield from connect.to_stream(writer)
|
||||
yield from ConnackPacket.from_stream(reader)
|
||||
|
||||
publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False)
|
||||
yield from publish_1.to_stream(writer)
|
||||
ensure_future(PubrecPacket.from_stream(reader), loop=self.loop)
|
||||
|
||||
yield from asyncio.sleep(2)
|
||||
|
||||
publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False)
|
||||
yield from publish_dup.to_stream(writer)
|
||||
pubrec2 = yield from PubrecPacket.from_stream(reader)
|
||||
pubrel = PubrelPacket.build(1)
|
||||
yield from pubrel.to_stream(writer)
|
||||
pubcomp = yield from PubcompPacket.from_stream(reader)
|
||||
|
||||
disconnect = DisconnectPacket()
|
||||
yield from disconnect.to_stream(writer)
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_invalid_topic(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
pub_client = MQTTClient()
|
||||
ret = yield from pub_client.connect('mqtt://localhost/')
|
||||
self.assertEqual(ret, 0)
|
||||
|
||||
ret_message = yield from pub_client.publish('/+', b'data', QOS_0)
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from pub_client.disconnect()
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_big(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
pub_client = MQTTClient()
|
||||
ret = yield from pub_client.connect('mqtt://localhost/')
|
||||
self.assertEqual(ret, 0)
|
||||
|
||||
ret_message = yield from pub_client.publish('/topic', bytearray(b'\x99' * 256 * 1024), QOS_2)
|
||||
yield from pub_client.disconnect()
|
||||
self.assertEquals(broker._retained_messages, {})
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
MockPluginManager.assert_has_calls(
|
||||
[call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
|
||||
client_id=pub_client.session.client_id,
|
||||
message=ret_message),
|
||||
], any_order=True)
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_retain(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
@ -291,6 +482,33 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_retain_delete(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
|
||||
pub_client = MQTTClient()
|
||||
ret = yield from pub_client.connect('mqtt://localhost/')
|
||||
self.assertEqual(ret, 0)
|
||||
ret_message = yield from pub_client.publish('/topic', b'', QOS_0, retain=True)
|
||||
yield from pub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
self.assertNotIn('/topic', broker._retained_messages)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
@ -327,6 +545,110 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_invalid(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
sub_client = MQTTClient()
|
||||
yield from sub_client.connect('mqtt://localhost')
|
||||
ret = yield from sub_client.subscribe(
|
||||
[('+', QOS_0), ('+/tennis/#', QOS_0), ('sport+', QOS_0), ('sport/+/player1', QOS_0)])
|
||||
self.assertEquals(ret, [QOS_0, QOS_0, 0x80, QOS_0])
|
||||
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from sub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
sub_client = MQTTClient()
|
||||
yield from sub_client.connect('mqtt://localhost')
|
||||
ret = yield from sub_client.subscribe([('#', QOS_0)])
|
||||
self.assertEquals(ret, [QOS_0])
|
||||
|
||||
yield from self._client_publish('/topic', b'data', QOS_0)
|
||||
message = yield from sub_client.deliver_message()
|
||||
self.assertIsNotNone(message)
|
||||
|
||||
yield from self._client_publish('$topic', b'data', QOS_0)
|
||||
yield from asyncio.sleep(0.1)
|
||||
message = None
|
||||
try:
|
||||
message = yield from sub_client.deliver_message(timeout=2)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.assertIsNone(message)
|
||||
yield from sub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish_dollar_topic_2(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
sub_client = MQTTClient()
|
||||
yield from sub_client.connect('mqtt://localhost')
|
||||
ret = yield from sub_client.subscribe([('+/monitor/Clients', QOS_0)])
|
||||
self.assertEquals(ret, [QOS_0])
|
||||
|
||||
yield from self._client_publish('/test/monitor/Clients', b'data', QOS_0)
|
||||
message = yield from sub_client.deliver_message()
|
||||
self.assertIsNotNone(message)
|
||||
|
||||
yield from self._client_publish('$SYS/monitor/Clients', b'data', QOS_0)
|
||||
yield from asyncio.sleep(0.1)
|
||||
message = None
|
||||
try:
|
||||
message = yield from sub_client.deliver_message(timeout=2)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.assertIsNone(message)
|
||||
yield from sub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_retain_subscribe(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
|
2
tox.ini
2
tox.ini
|
@ -1,6 +1,6 @@
|
|||
[tox]
|
||||
envlist =
|
||||
#py34,
|
||||
py344,
|
||||
py35,
|
||||
coverage,
|
||||
#flake8
|
||||
|
|
Ładowanie…
Reference in New Issue