diff --git a/docs/changelog.rst b/docs/changelog.rst index d3238e3..0c60f4c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,16 @@ Changelog --------- +0.7.0 +..... + +* Fix a `serie of issues `_ reported by `Christoph Krey `_ + +0.6.3 +..... + +* Fix issue `#22 `_. + 0.6.2 ..... diff --git a/hbmqtt/__init__.py b/hbmqtt/__init__.py index 58c6a7e..4854123 100644 --- a/hbmqtt/__init__.py +++ b/hbmqtt/__init__.py @@ -2,4 +2,4 @@ # # See the file license.txt for copying permission. -VERSION = (0, 6, 3, 'final', 0) +VERSION = (0, 7, 0, 'final', 0) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index aab3e35..c5de0d3 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -225,43 +225,46 @@ class Broker: for listener_name in self.listeners_config: listener = self.listeners_config[listener_name] - # Max connections - try: - max_connections = listener['max_connections'] - except KeyError: - max_connections = -1 - - # SSL Context - sc = None - if 'ssl' in listener and listener['ssl'].upper() == 'ON': + if 'bind' not in listener: + self.logger.debug("Listener configuration '%s' is not bound" % listener_name) + else: + # Max connections try: - sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - sc.load_cert_chain(listener['certfile'], listener['keyfile']) - sc.verify_mode = ssl.CERT_OPTIONAL - except KeyError as ke: - raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) - except FileNotFoundError as fnfe: - raise BrokerException("Can't read cert files '%s' or '%s' : %s" % - (listener['certfile'], listener['keyfile'], fnfe)) + max_connections = listener['max_connections'] + except KeyError: + max_connections = -1 - if listener['type'] == 'tcp': - address, port = listener['bind'].split(':') - cb_partial = partial(self.stream_connected, listener_name=listener_name) - instance = yield from asyncio.start_server(cb_partial, - address, - port, - ssl=sc, - loop=self._loop) - self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) - elif listener['type'] == 'ws': - address, port = listener['bind'].split(':') - cb_partial = partial(self.ws_connected, listener_name=listener_name) - instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop, - subprotocols=['mqtt']) - self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + # SSL Context + sc = None + if 'ssl' in listener and listener['ssl'].upper() == 'ON': + try: + sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + sc.load_cert_chain(listener['certfile'], listener['keyfile']) + sc.verify_mode = ssl.CERT_OPTIONAL + except KeyError as ke: + raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) + except FileNotFoundError as fnfe: + raise BrokerException("Can't read cert files '%s' or '%s' : %s" % + (listener['certfile'], listener['keyfile'], fnfe)) - self.logger.info("Listener '%s' bind to %s (max_connecionts=%d)" % - (listener_name, listener['bind'], max_connections)) + if listener['type'] == 'tcp': + address, port = listener['bind'].split(':') + cb_partial = partial(self.stream_connected, listener_name=listener_name) + instance = yield from asyncio.start_server(cb_partial, + address, + port, + ssl=sc, + loop=self._loop) + self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + elif listener['type'] == 'ws': + address, port = listener['bind'].split(':') + cb_partial = partial(self.ws_connected, listener_name=listener_name) + instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop, + subprotocols=['mqtt']) + self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + + self.logger.info("Listener '%s' bind to %s (max_connections=%d)" % + (listener_name, listener['bind'], max_connections)) self.transitions.starting_success() yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START) @@ -337,7 +340,7 @@ class Broker: except HBMQTTException as exc: self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" % (format_client_message(address=remote_address, port=remote_port), exc)) - yield from writer.close() + #yield from writer.close() self.logger.debug("Connection closed") return except MQTTException as me: @@ -458,6 +461,12 @@ class Broker: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%s handling message delivery" % client_session.client_id) app_message = wait_deliver.result() + if not app_message.topic: + self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id) + break + if "#" in app_message.topic or "+" in app_message.topic: + self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id) + break yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED, client_id=client_session.client_id, message=app_message) @@ -539,8 +548,9 @@ class Broker: self._retained_messages[topic_name] = retained_message else: # [MQTT-3.3.1-10] - self.logger.debug("Clear retained messages for topic '%s'" % topic_name) - del self._retained_messages[topic_name] + if topic_name in self._retained_messages: + self.logger.debug("Clear retained messages for topic '%s'" % topic_name) + del self._retained_messages[topic_name] def add_subscription(self, subscription, session): import re @@ -550,9 +560,11 @@ class Broker: if '#' in a_filter and not a_filter.endswith('#'): # [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter return 0x80 - if '+' in a_filter and not wildcard_pattern.match(a_filter): - # [MQTT-4.7.1-3] + wildcard character must occupy entire level - return 0x80 + if a_filter != "+": + if '+' in a_filter: + if "/+" not in a_filter and "+/" not in a_filter: + # [MQTT-4.7.1-3] + wildcard character must occupy entire level + return 0x80 qos = subscription[1] if 'max-qos' in self.config and qos > self.config['max-qos']: @@ -608,7 +620,7 @@ class Broker: def matches(self, topic, a_filter): import re - match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+')) + match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+')) if match_pattern.match(topic): return True else: @@ -625,7 +637,9 @@ class Broker: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("broadcasting %r" % broadcast) for k_filter in self._subscriptions: - if self.matches(broadcast['topic'], k_filter): + if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")): + self.logger.debug("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #") + elif self.matches(broadcast['topic'], k_filter): subscriptions = self._subscriptions[k_filter] for (target_session, qos) in subscriptions: if 'qos' in broadcast: diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 8cebfa3..47e9541 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -149,7 +149,6 @@ class MQTTClient: else: return (yield from self.reconnect()) - @mqtt_connected @asyncio.coroutine def disconnect(self): """ @@ -330,7 +329,10 @@ class MQTTClient: deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop) self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") - yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + if pending: + #timeout occured before message received + deliver_task.cancel() if deliver_task.exception(): raise deliver_task.exception() self.client_tasks.pop() diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index 12566d4..39b2a2a 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -143,9 +143,12 @@ class BrokerProtocolHandler(ProtocolHandler): if connect.proto_level != 4: # only MQTT 3.1.1 supported error_msg = 'Invalid protocol from %s: %d' % \ - (format_client_message(address=remote_address, port=remote_port), - connect.variable_header.protocol_level) + (format_client_message(address=remote_address, port=remote_port), connect.proto_level) connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0 + elif not connect.username_flag and connect.password_flag: + connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22] + elif connect.username_flag and not connect.password_flag: + connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22] elif connect.username_flag and connect.username is None: error_msg = 'Invalid username from %s' % \ (format_client_message(address=remote_address, port=remote_port)) @@ -153,7 +156,7 @@ class BrokerProtocolHandler(ProtocolHandler): elif connect.password_flag and connect.password is None: error_msg = 'Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.clean_session_flag is False and connect.payload.client_id is None: + elif connect.clean_session_flag is False and (connect.payload.client_id is None or connect.payload.client_id == ""): error_msg = '[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % \ format_client_message(address=remote_address, port=remote_port) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index c57c082..5004592 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -333,22 +333,26 @@ class ProtocolHandler: # Wait PUBREL if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done(): # PUBREL waiter already exists for this packet ID - message = "Can't add PUBREL waiter, a waiter already exists for message Id '%s'" \ + message = "A waiter already exists for message Id '%s', canceling it" \ % app_message.packet_id self.logger.warning(message) - raise HBMQTTException(message) - waiter = asyncio.Future(loop=self._loop) - self._pubrel_waiters[app_message.packet_id] = waiter - yield from waiter - del self._pubrel_waiters[app_message.packet_id] - app_message.pubrel_packet = waiter.result() - # Initiate delivery and discard message - yield from self.session.delivered_message_queue.put(app_message) - del self.session.inflight_in[app_message.packet_id] - # Send pubcomp - pubcomp_packet = PubcompPacket.build(app_message.packet_id) - yield from self._send_packet(pubcomp_packet) - app_message.pubcomp_packet = pubcomp_packet + self._pubrel_waiters[app_message.packet_id].cancel() + try: + waiter = asyncio.Future(loop=self._loop) + self._pubrel_waiters[app_message.packet_id] = waiter + yield from waiter + del self._pubrel_waiters[app_message.packet_id] + app_message.pubrel_packet = waiter.result() + # Initiate delivery and discard message + yield from self.session.delivered_message_queue.put(app_message) + del self.session.inflight_in[app_message.packet_id] + # Send pubcomp + pubcomp_packet = PubcompPacket.build(app_message.packet_id) + yield from self._send_packet(pubcomp_packet) + app_message.pubcomp_packet = pubcomp_packet + except asyncio.CancelledError: + self.logger.debug("Message flow cancelled") + @asyncio.coroutine def _reader_loop(self): @@ -454,7 +458,10 @@ class ProtocolHandler: def mqtt_deliver_next_message(self): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize()) - message = yield from self.session.delivered_message_queue.get() + try: + message = yield from self.session.delivered_message_queue.get() + except asyncio.CancelledError: + message = None if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Delivering message %s" % message) return message diff --git a/hbmqtt/mqtt/publish.py b/hbmqtt/mqtt/publish.py index 16b5e3d..a356519 100644 --- a/hbmqtt/mqtt/publish.py +++ b/hbmqtt/mqtt/publish.py @@ -48,7 +48,13 @@ class PublishPayload(MQTTPayload): @asyncio.coroutine def from_stream(cls, reader: asyncio.StreamReader, fixed_header: MQTTFixedHeader, variable_header: MQTTVariableHeader): - data = yield from reader.read(fixed_header.remaining_length-variable_header.bytes_length) + data = bytearray() + data_length = fixed_header.remaining_length-variable_header.bytes_length + length_read = 0 + while length_read < data_length: + buffer = yield from reader.read(data_length - length_read) + data.extend(buffer) + length_read = len(data) return cls(data) def __repr__(self): diff --git a/hbmqtt/utils.py b/hbmqtt/utils.py index 549bbe7..44fda79 100644 --- a/hbmqtt/utils.py +++ b/hbmqtt/utils.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. +import yaml def not_in_dict_or_none(dict, key): @@ -36,3 +37,12 @@ def gen_client_id(): for i in range(7, 23): gen_id += chr(random.randint(0, 74) + 48) return gen_id + +def read_yaml_config(config_file): + config = None + try: + with open(config_file, 'r') as stream: + config = yaml.load(stream) + except yaml.YAMLError as exc: + logger.error("Invalid config_file %s: %s" % (config_file, exc)) + return config \ No newline at end of file diff --git a/scripts/broker_script.py b/scripts/broker_script.py index cb06f69..4d3a95d 100644 --- a/scripts/broker_script.py +++ b/scripts/broker_script.py @@ -23,7 +23,7 @@ import os from hbmqtt.broker import Broker from hbmqtt.version import get_version from docopt import docopt -from .utils import read_yaml_config +from hbmqtt.utils import read_yaml_config default_config = { diff --git a/scripts/pub_script.py b/scripts/pub_script.py index 426c49d..1a64017 100644 --- a/scripts/pub_script.py +++ b/scripts/pub_script.py @@ -40,10 +40,8 @@ import os from hbmqtt.client import MQTTClient, ConnectException from hbmqtt.version import get_version from docopt import docopt -try: - from .utils import read_yaml_config -except: - from utils import read_yaml_config +from hbmqtt.utils import read_yaml_config + if sys.version_info < (3, 5): from asyncio import async as ensure_future else: diff --git a/scripts/sub_script.py b/scripts/sub_script.py index 8d6cc04..7a639d5 100644 --- a/scripts/sub_script.py +++ b/scripts/sub_script.py @@ -39,10 +39,7 @@ from hbmqtt.errors import MQTTException from hbmqtt.version import get_version from docopt import docopt from hbmqtt.mqtt.constants import QOS_0 -try: - from .utils import read_yaml_config -except: - from utils import read_yaml_config +from hbmqtt.utils import read_yaml_config logger = logging.getLogger(__name__) diff --git a/scripts/utils.py b/scripts/utils.py deleted file mode 100644 index 70fa14e..0000000 --- a/scripts/utils.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) 2015 Nicolas JOUANIN -# -# See the file license.txt for copying permission. -import yaml -import logging - -logger = logging.getLogger(__name__) - - -def read_yaml_config(config_file): - config = None - try: - with open(config_file, 'r') as stream: - config = yaml.load(stream) - except yaml.YAMLError as exc: - logger.error("Invalid config_file %s: %s" % (config_file, exc)) - return config diff --git a/setup.cfg b/setup.cfg index 39f5e8f..bf807a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [bdist_wheel] -python-tag = py34 +python-tag = py34.py35 \ No newline at end of file diff --git a/tests/test_broker.py b/tests/test_broker.py index 2616dd8..b588735 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -5,7 +5,10 @@ import unittest from unittest.mock import patch, call, MagicMock from hbmqtt.broker import * from hbmqtt.mqtt.constants import * -from hbmqtt.client import MQTTClient +from hbmqtt.client import MQTTClient, ConnectException +from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \ + PubrelPacket, PubcompPacket, DisconnectPacket +from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) @@ -99,6 +102,81 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_connect_will_flag(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + conn_reader, conn_writer = \ + yield from asyncio.open_connection('localhost', 1883, loop=self.loop) + reader = StreamReaderAdapter(conn_reader) + writer = StreamWriterAdapter(conn_writer) + + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = 10 + vh.clean_session_flag = False + vh.will_retain_flag = False + vh.will_flag = True + vh.will_qos = QOS_0 + payload.client_id = 'test_id' + payload.will_message = b'test' + payload.will_topic = '/topic' + connect = ConnectPacket(vh=vh, payload=payload) + yield from connect.to_stream(writer) + yield from ConnackPacket.from_stream(reader) + + yield from asyncio.sleep(0.1) + + disconnect = DisconnectPacket() + yield from disconnect.to_stream(writer) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + self.assertDictEqual(broker._sessions, {}) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_connect_clean_session_false(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + client = MQTTClient(client_id="", config={'auto_reconnect': False}) + return_code=None + try: + yield from client.connect('mqtt://localhost/', cleansession=False) + except ConnectException as ce: + return_code = ce.return_code + self.assertEqual(return_code, 0x02) + self.assertNotIn(client.session.client_id, broker._sessions) + yield from client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_subscribe(self, MockPluginManager): @asyncio.coroutine @@ -259,6 +337,119 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + #@patch('hbmqtt.broker.PluginManager') + def test_client_publish_dup(self): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + conn_reader, conn_writer = \ + yield from asyncio.open_connection('localhost', 1883, loop=self.loop) + reader = StreamReaderAdapter(conn_reader) + writer = StreamWriterAdapter(conn_writer) + + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = 10 + vh.clean_session_flag = False + vh.will_retain_flag = False + payload.client_id = 'test_id' + connect = ConnectPacket(vh=vh, payload=payload) + yield from connect.to_stream(writer) + yield from ConnackPacket.from_stream(reader) + + publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False) + yield from publish_1.to_stream(writer) + ensure_future(PubrecPacket.from_stream(reader), loop=self.loop) + + yield from asyncio.sleep(2) + + publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False) + yield from publish_dup.to_stream(writer) + pubrec2 = yield from PubrecPacket.from_stream(reader) + pubrel = PubrelPacket.build(1) + yield from pubrel.to_stream(writer) + pubcomp = yield from PubcompPacket.from_stream(reader) + + disconnect = DisconnectPacket() + yield from disconnect.to_stream(writer) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_invalid_topic(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + + ret_message = yield from pub_client.publish('/+', b'data', QOS_0) + yield from asyncio.sleep(0.1) + yield from pub_client.disconnect() + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_big(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + + ret_message = yield from pub_client.publish('/topic', bytearray(b'\x99' * 256 * 1024), QOS_2) + yield from pub_client.disconnect() + self.assertEquals(broker._retained_messages, {}) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + MockPluginManager.assert_has_calls( + [call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED, + client_id=pub_client.session.client_id, + message=ret_message), + ], any_order=True) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_retain(self, MockPluginManager): @asyncio.coroutine @@ -291,6 +482,33 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_retain_delete(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + ret_message = yield from pub_client.publish('/topic', b'', QOS_0, retain=True) + yield from pub_client.disconnect() + yield from asyncio.sleep(0.1) + self.assertNotIn('/topic', broker._retained_messages) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_subscribe_publish(self, MockPluginManager): @asyncio.coroutine @@ -327,6 +545,110 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_invalid(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe( + [('+', QOS_0), ('+/tennis/#', QOS_0), ('sport+', QOS_0), ('sport/+/player1', QOS_0)]) + self.assertEquals(ret, [QOS_0, QOS_0, 0x80, QOS_0]) + + yield from asyncio.sleep(0.1) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe([('#', QOS_0)]) + self.assertEquals(ret, [QOS_0]) + + yield from self._client_publish('/topic', b'data', QOS_0) + message = yield from sub_client.deliver_message() + self.assertIsNotNone(message) + + yield from self._client_publish('$topic', b'data', QOS_0) + yield from asyncio.sleep(0.1) + message = None + try: + message = yield from sub_client.deliver_message(timeout=2) + except Exception as e: + pass + self.assertIsNone(message) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_publish_dollar_topic_2(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe([('+/monitor/Clients', QOS_0)]) + self.assertEquals(ret, [QOS_0]) + + yield from self._client_publish('/test/monitor/Clients', b'data', QOS_0) + message = yield from sub_client.deliver_message() + self.assertIsNotNone(message) + + yield from self._client_publish('$SYS/monitor/Clients', b'data', QOS_0) + yield from asyncio.sleep(0.1) + message = None + try: + message = yield from sub_client.deliver_message(timeout=2) + except Exception as e: + pass + self.assertIsNone(message) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_retain_subscribe(self, MockPluginManager): @asyncio.coroutine diff --git a/tox.ini b/tox.ini index 4bd25ee..7a41479 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - #py34, + py344, py35, coverage, #flake8