From 618cfb1a8f2a1fb5b94c830f2175537b40d0fe1b Mon Sep 17 00:00:00 2001 From: njouanin Date: Sat, 12 Mar 2016 22:01:59 +0100 Subject: [PATCH 01/22] Update changelog to 0.6.3 --- docs/changelog.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index d3238e3..bf46d3e 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,11 @@ Changelog --------- +0.6.3 +..... + +* Fix issue `#22 `_. + 0.6.2 ..... From 3fa35849e056c17f1d9c6a09d47067a60b8a911e Mon Sep 17 00:00:00 2001 From: njouanin Date: Sat, 12 Mar 2016 22:02:34 +0100 Subject: [PATCH 02/22] Bump version --- hbmqtt/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbmqtt/__init__.py b/hbmqtt/__init__.py index 58c6a7e..9f28847 100644 --- a/hbmqtt/__init__.py +++ b/hbmqtt/__init__.py @@ -2,4 +2,4 @@ # # See the file license.txt for copying permission. -VERSION = (0, 6, 3, 'final', 0) +VERSION = (0, 7, 0, 'alpha', 0) From 37eeb536e6fce2971487d9005d6aaecb6412baac Mon Sep 17 00:00:00 2001 From: njouanin Date: Sat, 12 Mar 2016 22:25:25 +0100 Subject: [PATCH 03/22] Update configuration --- setup.cfg | 2 +- tox.ini | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index 39f5e8f..bf807a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [bdist_wheel] -python-tag = py34 +python-tag = py34.py35 \ No newline at end of file diff --git a/tox.ini b/tox.ini index 4bd25ee..ad168e5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - #py34, + py34, py35, coverage, #flake8 From 340891a75f102ca6a3318c5f512752c7b388a347 Mon Sep 17 00:00:00 2001 From: njouanin Date: Sat, 12 Mar 2016 22:30:55 +0100 Subject: [PATCH 04/22] Update env to python 3.4.4 --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index ad168e5..7a41479 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py34, + py344, py35, coverage, #flake8 From d380029cb5ce6d3a38180b36dac7f1cfc2c5f25b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 6 Apr 2016 14:33:13 +0200 Subject: [PATCH 05/22] Fix typo --- hbmqtt/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index aab3e35..cc15bc6 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -260,7 +260,7 @@ class Broker: subprotocols=['mqtt']) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) - self.logger.info("Listener '%s' bind to %s (max_connecionts=%d)" % + self.logger.info("Listener '%s' bind to %s (max_connections=%d)" % (listener_name, listener['bind'], max_connections)) self.transitions.starting_success() From 7a868bd66e7cd44c5470014f82ff57511149f9ee Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 15:33:48 +0200 Subject: [PATCH 06/22] Fix issue #24 Data was not read correctly until the end of the buffer --- hbmqtt/mqtt/publish.py | 8 +++++++- tests/test_broker.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/publish.py b/hbmqtt/mqtt/publish.py index 16b5e3d..a356519 100644 --- a/hbmqtt/mqtt/publish.py +++ b/hbmqtt/mqtt/publish.py @@ -48,7 +48,13 @@ class PublishPayload(MQTTPayload): @asyncio.coroutine def from_stream(cls, reader: asyncio.StreamReader, fixed_header: MQTTFixedHeader, variable_header: MQTTVariableHeader): - data = yield from reader.read(fixed_header.remaining_length-variable_header.bytes_length) + data = bytearray() + data_length = fixed_header.remaining_length-variable_header.bytes_length + length_read = 0 + while length_read < data_length: + buffer = yield from reader.read(data_length - length_read) + data.extend(buffer) + length_read = len(data) return cls(data) def __repr__(self): diff --git a/tests/test_broker.py b/tests/test_broker.py index 2616dd8..473402d 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -259,6 +259,39 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_big(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + + ret_message = yield from pub_client.publish('/topic', bytearray(b'\x99' * 256 * 1024), QOS_2) + yield from pub_client.disconnect() + self.assertEquals(broker._retained_messages, {}) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + MockPluginManager.assert_has_calls( + [call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED, + client_id=pub_client.session.client_id, + message=ret_message), + ], any_order=True) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_retain(self, MockPluginManager): @asyncio.coroutine From dd04b44a57f17ca173cf59bfb5f7c0afb9415c05 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 21:46:52 +0200 Subject: [PATCH 07/22] Fix timeout management on message delivery --- hbmqtt/client.py | 5 ++++- hbmqtt/mqtt/protocol/handler.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 8cebfa3..6a4b892 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -330,7 +330,10 @@ class MQTTClient: deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop) self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") - yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) + if pending: + #timeout occured before message received + deliver_task.cancel() if deliver_task.exception(): raise deliver_task.exception() self.client_tasks.pop() diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index c57c082..5dbb955 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -454,7 +454,10 @@ class ProtocolHandler: def mqtt_deliver_next_message(self): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize()) - message = yield from self.session.delivered_message_queue.get() + try: + message = yield from self.session.delivered_message_queue.get() + except asyncio.CancelledError: + message = None if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Delivering message %s" % message) return message From 360d5b03c70cf25bdc56baf9d1f1b46ded5e2b39 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 22:06:33 +0200 Subject: [PATCH 08/22] Fix #28 --- hbmqtt/broker.py | 6 ++-- tests/test_broker.py | 76 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index cc15bc6..a9f8c57 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -608,7 +608,7 @@ class Broker: def matches(self, topic, a_filter): import re - match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+')) + match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+')) if match_pattern.match(topic): return True else: @@ -625,7 +625,9 @@ class Broker: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("broadcasting %r" % broadcast) for k_filter in self._subscriptions: - if self.matches(broadcast['topic'], k_filter): + if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")): + self.logger.debug("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #") + elif self.matches(broadcast['topic'], k_filter): subscriptions = self._subscriptions[k_filter] for (target_session, qos) in subscriptions: if 'qos' in broadcast: diff --git a/tests/test_broker.py b/tests/test_broker.py index 473402d..779445f 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -360,6 +360,82 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe([('#', QOS_0)]) + self.assertEquals(ret, [QOS_0]) + + yield from self._client_publish('/topic', b'data', QOS_0) + message = yield from sub_client.deliver_message() + self.assertIsNotNone(message) + + yield from self._client_publish('$topic', b'data', QOS_0) + yield from asyncio.sleep(0.1) + message = None + try: + message = yield from sub_client.deliver_message(timeout=2) + except Exception as e: + pass + self.assertIsNone(message) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_publish_dollar_topic_2(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe([('+/monitor/Clients', QOS_0)]) + self.assertEquals(ret, [QOS_0]) + + yield from self._client_publish('/test/monitor/Clients', b'data', QOS_0) + message = yield from sub_client.deliver_message() + self.assertIsNotNone(message) + + yield from self._client_publish('$SYS/monitor/Clients', b'data', QOS_0) + yield from asyncio.sleep(0.1) + message = None + try: + message = yield from sub_client.deliver_message(timeout=2) + except Exception as e: + pass + self.assertIsNone(message) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_retain_subscribe(self, MockPluginManager): @asyncio.coroutine From 5dc5293063582b0fca1a4929e9b7062cb39e5e26 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 22:15:17 +0200 Subject: [PATCH 09/22] Fix #25 #26 --- hbmqtt/broker.py | 3 +++ tests/test_broker.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index a9f8c57..a990c81 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -458,6 +458,9 @@ class Broker: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%s handling message delivery" % client_session.client_id) app_message = wait_deliver.result() + if "#" in app_message.topic or "+" in app_message.topic: + self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id) + break yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED, client_id=client_session.client_id, message=app_message) diff --git a/tests/test_broker.py b/tests/test_broker.py index 779445f..f4f1ed2 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -259,6 +259,34 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_invalid_topic(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + + ret_message = yield from pub_client.publish('/+', b'data', QOS_0) + yield from asyncio.sleep(0.1) + yield from pub_client.disconnect() + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_big(self, MockPluginManager): @asyncio.coroutine From e391ccebaf3cb5c7793dd50c210fc770176d2dac Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 22:17:28 +0200 Subject: [PATCH 10/22] Fix #27 --- hbmqtt/broker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index a990c81..01d3a41 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -458,6 +458,9 @@ class Broker: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%s handling message delivery" % client_session.client_id) app_message = wait_deliver.result() + if not app_message.topic: + self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id) + break if "#" in app_message.topic or "+" in app_message.topic: self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id) break From ea62725868cd91d80f42086710faeb284564a459 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 22:32:07 +0200 Subject: [PATCH 11/22] Fix #30 --- hbmqtt/broker.py | 8 +++++--- tests/test_broker.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 01d3a41..d819787 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -556,9 +556,11 @@ class Broker: if '#' in a_filter and not a_filter.endswith('#'): # [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter return 0x80 - if '+' in a_filter and not wildcard_pattern.match(a_filter): - # [MQTT-4.7.1-3] + wildcard character must occupy entire level - return 0x80 + if a_filter != "+": + if '+' in a_filter: + if "/+" not in a_filter and "+/" not in a_filter: + # [MQTT-4.7.1-3] + wildcard character must occupy entire level + return 0x80 qos = subscription[1] if 'max-qos' in self.config and qos > self.config['max-qos']: diff --git a/tests/test_broker.py b/tests/test_broker.py index f4f1ed2..d44efff 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -388,6 +388,34 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_subscribe_invalid(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + sub_client = MQTTClient() + yield from sub_client.connect('mqtt://localhost') + ret = yield from sub_client.subscribe( + [('+', QOS_0), ('+/tennis/#', QOS_0), ('sport+', QOS_0), ('sport/+/player1', QOS_0)]) + self.assertEquals(ret, [QOS_0, QOS_0, 0x80, QOS_0]) + + yield from asyncio.sleep(0.1) + yield from sub_client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager): @asyncio.coroutine From e9b806348ee713fdbc09a0ba2463bca65532a22f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 23:02:10 +0200 Subject: [PATCH 12/22] Fix #32 --- hbmqtt/broker.py | 2 +- hbmqtt/client.py | 1 - hbmqtt/mqtt/protocol/broker_handler.py | 2 +- tests/test_broker.py | 30 +++++++++++++++++++++++++- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index d819787..788745b 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -337,7 +337,7 @@ class Broker: except HBMQTTException as exc: self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" % (format_client_message(address=remote_address, port=remote_port), exc)) - yield from writer.close() + #yield from writer.close() self.logger.debug("Connection closed") return except MQTTException as me: diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 6a4b892..47e9541 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -149,7 +149,6 @@ class MQTTClient: else: return (yield from self.reconnect()) - @mqtt_connected @asyncio.coroutine def disconnect(self): """ diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index 12566d4..f7b5a07 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -153,7 +153,7 @@ class BrokerProtocolHandler(ProtocolHandler): elif connect.password_flag and connect.password is None: error_msg = 'Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.clean_session_flag is False and connect.payload.client_id is None: + elif connect.clean_session_flag is False and (connect.payload.client_id is None or connect.payload.client_id == ""): error_msg = '[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % \ format_client_message(address=remote_address, port=remote_port) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) diff --git a/tests/test_broker.py b/tests/test_broker.py index d44efff..5341398 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -5,7 +5,7 @@ import unittest from unittest.mock import patch, call, MagicMock from hbmqtt.broker import * from hbmqtt.mqtt.constants import * -from hbmqtt.client import MQTTClient +from hbmqtt.client import MQTTClient, ConnectException formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) @@ -99,6 +99,34 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_connect_clean_session_false(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + client = MQTTClient(client_id="", config={'auto_reconnect': False}) + return_code=None + try: + yield from client.connect('mqtt://localhost/', cleansession=False) + except ConnectException as ce: + return_code = ce.return_code + self.assertEqual(return_code, 0x02) + self.assertNotIn(client.session.client_id, broker._sessions) + yield from client.disconnect() + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_subscribe(self, MockPluginManager): @asyncio.coroutine From 8c3cd9be2aaa9df931df46de98d38ce1983a168b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 23:05:42 +0200 Subject: [PATCH 13/22] Fix #34 --- hbmqtt/mqtt/protocol/broker_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index f7b5a07..3f0935f 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -143,8 +143,7 @@ class BrokerProtocolHandler(ProtocolHandler): if connect.proto_level != 4: # only MQTT 3.1.1 supported error_msg = 'Invalid protocol from %s: %d' % \ - (format_client_message(address=remote_address, port=remote_port), - connect.variable_header.protocol_level) + (format_client_message(address=remote_address, port=remote_port), connect.proto_level) connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0 elif connect.username_flag and connect.username is None: error_msg = 'Invalid username from %s' % \ From ae7d13af82a808d971680ba3989affcf39f6fa0d Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Apr 2016 23:10:11 +0200 Subject: [PATCH 14/22] Fix #31 --- hbmqtt/mqtt/protocol/broker_handler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hbmqtt/mqtt/protocol/broker_handler.py b/hbmqtt/mqtt/protocol/broker_handler.py index 3f0935f..39b2a2a 100644 --- a/hbmqtt/mqtt/protocol/broker_handler.py +++ b/hbmqtt/mqtt/protocol/broker_handler.py @@ -145,6 +145,10 @@ class BrokerProtocolHandler(ProtocolHandler): error_msg = 'Invalid protocol from %s: %d' % \ (format_client_message(address=remote_address, port=remote_port), connect.proto_level) connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0 + elif not connect.username_flag and connect.password_flag: + connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22] + elif connect.username_flag and not connect.password_flag: + connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.1.2-22] elif connect.username_flag and connect.username is None: error_msg = 'Invalid username from %s' % \ (format_client_message(address=remote_address, port=remote_port)) From 6f7b438ebf9b2024af4c654a1304810adaa13747 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 21:32:01 +0200 Subject: [PATCH 15/22] Refactor scripts utils location --- hbmqtt/utils.py | 10 ++++++++++ scripts/broker_script.py | 2 +- scripts/pub_script.py | 6 ++---- scripts/sub_script.py | 5 +---- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/hbmqtt/utils.py b/hbmqtt/utils.py index 549bbe7..44fda79 100644 --- a/hbmqtt/utils.py +++ b/hbmqtt/utils.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. +import yaml def not_in_dict_or_none(dict, key): @@ -36,3 +37,12 @@ def gen_client_id(): for i in range(7, 23): gen_id += chr(random.randint(0, 74) + 48) return gen_id + +def read_yaml_config(config_file): + config = None + try: + with open(config_file, 'r') as stream: + config = yaml.load(stream) + except yaml.YAMLError as exc: + logger.error("Invalid config_file %s: %s" % (config_file, exc)) + return config \ No newline at end of file diff --git a/scripts/broker_script.py b/scripts/broker_script.py index cb06f69..4d3a95d 100644 --- a/scripts/broker_script.py +++ b/scripts/broker_script.py @@ -23,7 +23,7 @@ import os from hbmqtt.broker import Broker from hbmqtt.version import get_version from docopt import docopt -from .utils import read_yaml_config +from hbmqtt.utils import read_yaml_config default_config = { diff --git a/scripts/pub_script.py b/scripts/pub_script.py index 426c49d..1a64017 100644 --- a/scripts/pub_script.py +++ b/scripts/pub_script.py @@ -40,10 +40,8 @@ import os from hbmqtt.client import MQTTClient, ConnectException from hbmqtt.version import get_version from docopt import docopt -try: - from .utils import read_yaml_config -except: - from utils import read_yaml_config +from hbmqtt.utils import read_yaml_config + if sys.version_info < (3, 5): from asyncio import async as ensure_future else: diff --git a/scripts/sub_script.py b/scripts/sub_script.py index 8d6cc04..7a639d5 100644 --- a/scripts/sub_script.py +++ b/scripts/sub_script.py @@ -39,10 +39,7 @@ from hbmqtt.errors import MQTTException from hbmqtt.version import get_version from docopt import docopt from hbmqtt.mqtt.constants import QOS_0 -try: - from .utils import read_yaml_config -except: - from utils import read_yaml_config +from hbmqtt.utils import read_yaml_config logger = logging.getLogger(__name__) From 56f3c911dd143f323c81655ae93383631acb2b4b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 21:32:22 +0200 Subject: [PATCH 16/22] Refactor scripts utils location --- scripts/utils.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 scripts/utils.py diff --git a/scripts/utils.py b/scripts/utils.py deleted file mode 100644 index 70fa14e..0000000 --- a/scripts/utils.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) 2015 Nicolas JOUANIN -# -# See the file license.txt for copying permission. -import yaml -import logging - -logger = logging.getLogger(__name__) - - -def read_yaml_config(config_file): - config = None - try: - with open(config_file, 'r') as stream: - config = yaml.load(stream) - except yaml.YAMLError as exc: - logger.error("Invalid config_file %s: %s" % (config_file, exc)) - return config From 13c4a8a09feeb173f6b8502520ff93910fdcfc03 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 21:33:01 +0200 Subject: [PATCH 17/22] Fix #36 Don't start listener if it doesn't contain a 'bind' parameter --- hbmqtt/broker.py | 71 +++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 788745b..3ba1fcc 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -225,43 +225,46 @@ class Broker: for listener_name in self.listeners_config: listener = self.listeners_config[listener_name] - # Max connections - try: - max_connections = listener['max_connections'] - except KeyError: - max_connections = -1 - - # SSL Context - sc = None - if 'ssl' in listener and listener['ssl'].upper() == 'ON': + if 'bind' not in listener: + self.logger.debug("Listener configuration '%s' is not bound" % listener_name) + else: + # Max connections try: - sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - sc.load_cert_chain(listener['certfile'], listener['keyfile']) - sc.verify_mode = ssl.CERT_OPTIONAL - except KeyError as ke: - raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) - except FileNotFoundError as fnfe: - raise BrokerException("Can't read cert files '%s' or '%s' : %s" % - (listener['certfile'], listener['keyfile'], fnfe)) + max_connections = listener['max_connections'] + except KeyError: + max_connections = -1 - if listener['type'] == 'tcp': - address, port = listener['bind'].split(':') - cb_partial = partial(self.stream_connected, listener_name=listener_name) - instance = yield from asyncio.start_server(cb_partial, - address, - port, - ssl=sc, - loop=self._loop) - self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) - elif listener['type'] == 'ws': - address, port = listener['bind'].split(':') - cb_partial = partial(self.ws_connected, listener_name=listener_name) - instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop, - subprotocols=['mqtt']) - self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + # SSL Context + sc = None + if 'ssl' in listener and listener['ssl'].upper() == 'ON': + try: + sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + sc.load_cert_chain(listener['certfile'], listener['keyfile']) + sc.verify_mode = ssl.CERT_OPTIONAL + except KeyError as ke: + raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) + except FileNotFoundError as fnfe: + raise BrokerException("Can't read cert files '%s' or '%s' : %s" % + (listener['certfile'], listener['keyfile'], fnfe)) - self.logger.info("Listener '%s' bind to %s (max_connections=%d)" % - (listener_name, listener['bind'], max_connections)) + if listener['type'] == 'tcp': + address, port = listener['bind'].split(':') + cb_partial = partial(self.stream_connected, listener_name=listener_name) + instance = yield from asyncio.start_server(cb_partial, + address, + port, + ssl=sc, + loop=self._loop) + self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + elif listener['type'] == 'ws': + address, port = listener['bind'].split(':') + cb_partial = partial(self.ws_connected, listener_name=listener_name) + instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop, + subprotocols=['mqtt']) + self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + + self.logger.info("Listener '%s' bind to %s (max_connections=%d)" % + (listener_name, listener['bind'], max_connections)) self.transitions.starting_success() yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START) From 0e11d545d867524e9302643d6afa1be72fb49130 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 22:35:38 +0200 Subject: [PATCH 18/22] Fix #23 + add test case --- hbmqtt/mqtt/protocol/handler.py | 32 ++++++++++--------- tests/test_broker.py | 55 +++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 14 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 5dbb955..5004592 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -333,22 +333,26 @@ class ProtocolHandler: # Wait PUBREL if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done(): # PUBREL waiter already exists for this packet ID - message = "Can't add PUBREL waiter, a waiter already exists for message Id '%s'" \ + message = "A waiter already exists for message Id '%s', canceling it" \ % app_message.packet_id self.logger.warning(message) - raise HBMQTTException(message) - waiter = asyncio.Future(loop=self._loop) - self._pubrel_waiters[app_message.packet_id] = waiter - yield from waiter - del self._pubrel_waiters[app_message.packet_id] - app_message.pubrel_packet = waiter.result() - # Initiate delivery and discard message - yield from self.session.delivered_message_queue.put(app_message) - del self.session.inflight_in[app_message.packet_id] - # Send pubcomp - pubcomp_packet = PubcompPacket.build(app_message.packet_id) - yield from self._send_packet(pubcomp_packet) - app_message.pubcomp_packet = pubcomp_packet + self._pubrel_waiters[app_message.packet_id].cancel() + try: + waiter = asyncio.Future(loop=self._loop) + self._pubrel_waiters[app_message.packet_id] = waiter + yield from waiter + del self._pubrel_waiters[app_message.packet_id] + app_message.pubrel_packet = waiter.result() + # Initiate delivery and discard message + yield from self.session.delivered_message_queue.put(app_message) + del self.session.inflight_in[app_message.packet_id] + # Send pubcomp + pubcomp_packet = PubcompPacket.build(app_message.packet_id) + yield from self._send_packet(pubcomp_packet) + app_message.pubcomp_packet = pubcomp_packet + except asyncio.CancelledError: + self.logger.debug("Message flow cancelled") + @asyncio.coroutine def _reader_loop(self): diff --git a/tests/test_broker.py b/tests/test_broker.py index 5341398..9f1175f 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -6,6 +6,9 @@ from unittest.mock import patch, call, MagicMock from hbmqtt.broker import * from hbmqtt.mqtt.constants import * from hbmqtt.client import MQTTClient, ConnectException +from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \ + PubrelPacket, PubcompPacket, DisconnectPacket +from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) @@ -287,6 +290,58 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + #@patch('hbmqtt.broker.PluginManager') + def test_client_publish_dup(self): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + conn_reader, conn_writer = \ + yield from asyncio.open_connection('localhost', 1883, loop=self.loop) + reader = StreamReaderAdapter(conn_reader) + writer = StreamWriterAdapter(conn_writer) + + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = 10 + vh.clean_session_flag = False + vh.will_retain_flag = False + payload.client_id = 'test_id' + connect = ConnectPacket(vh=vh, payload=payload) + yield from connect.to_stream(writer) + yield from ConnackPacket.from_stream(reader) + + publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False) + yield from publish_1.to_stream(writer) + ensure_future(PubrecPacket.from_stream(reader), loop=self.loop) + + yield from asyncio.sleep(2) + + publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False) + yield from publish_dup.to_stream(writer) + pubrec2 = yield from PubrecPacket.from_stream(reader) + pubrel = PubrelPacket.build(1) + yield from pubrel.to_stream(writer) + pubcomp = yield from PubcompPacket.from_stream(reader) + + disconnect = DisconnectPacket() + yield from disconnect.to_stream(writer) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_publish_invalid_topic(self, MockPluginManager): @asyncio.coroutine From f298a4a5433eac0af0eb2b0ea0691b7a7ce6342f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 22:43:06 +0200 Subject: [PATCH 19/22] Fix #35 Test retain message exists before trying to delete it. --- hbmqtt/broker.py | 5 +++-- tests/test_broker.py | 27 +++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 3ba1fcc..c5de0d3 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -548,8 +548,9 @@ class Broker: self._retained_messages[topic_name] = retained_message else: # [MQTT-3.3.1-10] - self.logger.debug("Clear retained messages for topic '%s'" % topic_name) - del self._retained_messages[topic_name] + if topic_name in self._retained_messages: + self.logger.debug("Clear retained messages for topic '%s'" % topic_name) + del self._retained_messages[topic_name] def add_subscription(self, subscription, session): import re diff --git a/tests/test_broker.py b/tests/test_broker.py index 9f1175f..436a53f 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -435,6 +435,33 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_publish_retain_delete(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + pub_client = MQTTClient() + ret = yield from pub_client.connect('mqtt://localhost/') + self.assertEqual(ret, 0) + ret_message = yield from pub_client.publish('/topic', b'', QOS_0, retain=True) + yield from pub_client.disconnect() + yield from asyncio.sleep(0.1) + self.assertNotIn('/topic', broker._retained_messages) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_subscribe_publish(self, MockPluginManager): @asyncio.coroutine From 89ea06246bee00d944d3695750fe9078ec8b6a8f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 22:55:19 +0200 Subject: [PATCH 20/22] #33 Added test case --- tests/test_broker.py | 47 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/test_broker.py b/tests/test_broker.py index 436a53f..b588735 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -102,6 +102,53 @@ class BrokerTest(unittest.TestCase): if future.exception(): raise future.exception() + @patch('hbmqtt.broker.PluginManager') + def test_client_connect_will_flag(self, MockPluginManager): + @asyncio.coroutine + def test_coro(): + try: + broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins") + yield from broker.start() + self.assertTrue(broker.transitions.is_started()) + + conn_reader, conn_writer = \ + yield from asyncio.open_connection('localhost', 1883, loop=self.loop) + reader = StreamReaderAdapter(conn_reader) + writer = StreamWriterAdapter(conn_writer) + + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = 10 + vh.clean_session_flag = False + vh.will_retain_flag = False + vh.will_flag = True + vh.will_qos = QOS_0 + payload.client_id = 'test_id' + payload.will_message = b'test' + payload.will_topic = '/topic' + connect = ConnectPacket(vh=vh, payload=payload) + yield from connect.to_stream(writer) + yield from ConnackPacket.from_stream(reader) + + yield from asyncio.sleep(0.1) + + disconnect = DisconnectPacket() + yield from disconnect.to_stream(writer) + + yield from asyncio.sleep(0.1) + yield from broker.shutdown() + self.assertTrue(broker.transitions.is_stopped()) + self.assertDictEqual(broker._sessions, {}) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(test_coro()) + if future.exception(): + raise future.exception() + @patch('hbmqtt.broker.PluginManager') def test_client_connect_clean_session_false(self, MockPluginManager): @asyncio.coroutine From 1fc8b869e51744ed39cf871d1f7d7e14a2f18fad Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 23:00:34 +0200 Subject: [PATCH 21/22] Add next version change log --- docs/changelog.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index bf46d3e..0c60f4c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,11 @@ Changelog --------- +0.7.0 +..... + +* Fix a `serie of issues `_ reported by `Christoph Krey `_ + 0.6.3 ..... From 966f82047c037a9c636d6e60082a300bad3f265e Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 4 May 2016 23:02:48 +0200 Subject: [PATCH 22/22] Bump version --- hbmqtt/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbmqtt/__init__.py b/hbmqtt/__init__.py index 9f28847..4854123 100644 --- a/hbmqtt/__init__.py +++ b/hbmqtt/__init__.py @@ -2,4 +2,4 @@ # # See the file license.txt for copying permission. -VERSION = (0, 7, 0, 'alpha', 0) +VERSION = (0, 7, 0, 'final', 0)