From 31888f8e691b0c51d61fc1a201ff99daa7a0eb8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Desv=C3=A9?= Date: Fri, 29 Jun 2018 15:56:27 +0200 Subject: [PATCH 1/3] python3.7 support --- hbmqtt/broker.py | 18 ++++++--------- hbmqtt/client.py | 10 +++------ hbmqtt/mqtt/protocol/client_handler.py | 7 +----- hbmqtt/mqtt/protocol/handler.py | 31 +++++++++++--------------- hbmqtt/plugins/manager.py | 6 +---- hbmqtt/plugins/sys/broker.py | 10 ++++----- scripts/pub_script.py | 6 +---- tests/test_broker.py | 7 +----- 8 files changed, 31 insertions(+), 64 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index d4c3610..bbc4b36 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -25,10 +25,6 @@ from hbmqtt.adapters import ( WebSocketsWriter) from .plugins.manager import PluginManager, BaseContext -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future _defaults = { 'timeout-disconnect-delay': 2, @@ -287,7 +283,7 @@ class Broker: yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START) #Start broadcast loop - self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop) + self._broadcast_task = asyncio.ensure_future(self._broadcast_loop(), loop=self._loop) self.logger.debug("Broker started") except Exception as e: @@ -415,10 +411,10 @@ class Broker: yield from self.publish_session_retained_messages(client_session) # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect) - disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop) - subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop) - unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop) - wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop) + disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect(), loop=self._loop) + subscribe_waiter = asyncio.ensure_future(handler.get_next_pending_subscription(), loop=self._loop) + unsubscribe_waiter = asyncio.ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop) + wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop) connected = True while connected: try: @@ -707,7 +703,7 @@ class Broker: (format_client_message(session=broadcast['session']), broadcast['topic'], format_client_message(session=target_session))) handler = self._get_handler(target_session) - task = ensure_future( + task = asyncio.ensure_future( handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False), loop=self._loop) running_tasks.append(task) @@ -743,7 +739,7 @@ class Broker: handler = self._get_handler(session) while not session.retained_messages.empty(): retained = yield from session.retained_messages.get() - publish_tasks.append(ensure_future( + publish_tasks.append(asyncio.ensure_future( handler.mqtt_publish( retained.topic, retained.data, retained.qos, True), loop=self._loop)) if publish_tasks: diff --git a/hbmqtt/client.py b/hbmqtt/client.py index dbf47e3..3524003 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -20,11 +20,7 @@ import websockets from websockets.uri import InvalidURI from websockets.exceptions import InvalidHandshake from collections import deque -import sys -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future + _defaults = { 'keep_alive': 10, @@ -213,7 +209,7 @@ class MQTTClient: @asyncio.coroutine def _do_connect(self): return_code = yield from self._connect_coro() - self._disconnect_task = ensure_future(self.handle_connection_close(), loop=self._loop) + self._disconnect_task = asyncio.ensure_future(self.handle_connection_close(), loop=self._loop) return return_code @mqtt_connected @@ -326,7 +322,7 @@ class MQTTClient: :return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow. :raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered """ - deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop) + deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop) self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout) diff --git a/hbmqtt/mqtt/protocol/client_handler.py b/hbmqtt/mqtt/protocol/client_handler.py index 5e37f2f..627b689 100644 --- a/hbmqtt/mqtt/protocol/client_handler.py +++ b/hbmqtt/mqtt/protocol/client_handler.py @@ -17,11 +17,6 @@ from hbmqtt.mqtt.connack import ConnackPacket from hbmqtt.session import Session from hbmqtt.plugins.manager import PluginManager -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future - class ClientProtocolHandler(ProtocolHandler): def __init__(self, plugins_manager: PluginManager, session: Session=None, loop=None): @@ -94,7 +89,7 @@ class ClientProtocolHandler(ProtocolHandler): try: if not self._ping_task: self.logger.debug("Scheduling Ping") - self._ping_task = ensure_future(self.mqtt_ping()) + self._ping_task = asyncio.ensure_future(self.mqtt_ping()) except BaseException as be: self.logger.debug("Exception ignored in ping task: %r" % be) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index d1e8b24..dbbc8e1 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -33,11 +33,6 @@ from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from hbmqtt.plugins.manager import PluginManager from hbmqtt.errors import HBMQTTException, MQTTException, NoDataException -import sys -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent' EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received' @@ -387,31 +382,31 @@ class ProtocolHandler: EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session) task = None if packet.fixed_header.packet_type == CONNACK: - task = ensure_future(self.handle_connack(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_connack(packet), loop=self._loop) elif packet.fixed_header.packet_type == SUBSCRIBE: - task = ensure_future(self.handle_subscribe(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_subscribe(packet), loop=self._loop) elif packet.fixed_header.packet_type == UNSUBSCRIBE: - task = ensure_future(self.handle_unsubscribe(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_unsubscribe(packet), loop=self._loop) elif packet.fixed_header.packet_type == SUBACK: - task = ensure_future(self.handle_suback(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_suback(packet), loop=self._loop) elif packet.fixed_header.packet_type == UNSUBACK: - task = ensure_future(self.handle_unsuback(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_unsuback(packet), loop=self._loop) elif packet.fixed_header.packet_type == PUBACK: - task = ensure_future(self.handle_puback(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_puback(packet), loop=self._loop) elif packet.fixed_header.packet_type == PUBREC: - task = ensure_future(self.handle_pubrec(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_pubrec(packet), loop=self._loop) elif packet.fixed_header.packet_type == PUBREL: - task = ensure_future(self.handle_pubrel(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_pubrel(packet), loop=self._loop) elif packet.fixed_header.packet_type == PUBCOMP: - task = ensure_future(self.handle_pubcomp(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_pubcomp(packet), loop=self._loop) elif packet.fixed_header.packet_type == PINGREQ: - task = ensure_future(self.handle_pingreq(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_pingreq(packet), loop=self._loop) elif packet.fixed_header.packet_type == PINGRESP: - task = ensure_future(self.handle_pingresp(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_pingresp(packet), loop=self._loop) elif packet.fixed_header.packet_type == PUBLISH: - task = ensure_future(self.handle_publish(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_publish(packet), loop=self._loop) elif packet.fixed_header.packet_type == DISCONNECT: - task = ensure_future(self.handle_disconnect(packet), loop=self._loop) + task = asyncio.ensure_future(self.handle_disconnect(packet), loop=self._loop) elif packet.fixed_header.packet_type == CONNECT: self.handle_connect(packet) else: diff --git a/hbmqtt/plugins/manager.py b/hbmqtt/plugins/manager.py index c6d725b..953b360 100644 --- a/hbmqtt/plugins/manager.py +++ b/hbmqtt/plugins/manager.py @@ -12,10 +12,6 @@ import sys from collections import namedtuple -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future Plugin = namedtuple('Plugin', ['name', 'ep', 'object']) @@ -113,7 +109,7 @@ class PluginManager: return self._plugins def _schedule_coro(self, coro): - return ensure_future(coro, loop=self._loop) + return asyncio.ensure_future(coro, loop=self._loop) @asyncio.coroutine def fire_event(self, event_name, wait=False, *args, **kwargs): diff --git a/hbmqtt/plugins/sys/broker.py b/hbmqtt/plugins/sys/broker.py index 722a9ef..1af9786 100644 --- a/hbmqtt/plugins/sys/broker.py +++ b/hbmqtt/plugins/sys/broker.py @@ -8,10 +8,6 @@ import asyncio import sys from collections import deque -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future DOLLAR_SYS_ROOT = '$SYS/broker/' STAT_BYTES_SENT = 'bytes_sent' @@ -53,8 +49,10 @@ class BrokerSysPlugin: return (yield from self.context.broadcast_message(topic_basename, data)) def schedule_broadcast_sys_topic(self, topic_basename, data): - return ensure_future(self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data), - loop=self.context.loop) + return asyncio.ensure_future( + self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data), + loop=self.context.loop + ) @asyncio.coroutine def on_broker_pre_start(self, *args, **kwargs): diff --git a/scripts/pub_script.py b/scripts/pub_script.py index d68fa26..2ad0fee 100644 --- a/scripts/pub_script.py +++ b/scripts/pub_script.py @@ -42,10 +42,6 @@ from hbmqtt.version import get_version from docopt import docopt from hbmqtt.utils import read_yaml_config -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future logger = logging.getLogger(__name__) @@ -107,7 +103,7 @@ def do_pub(client, arguments): retain = arguments['-r'] for message in _get_message(arguments): logger.info("%s Publishing to '%s'" % (client.client_id, topic)) - task = ensure_future(client.publish(topic, message, qos, retain)) + task = asyncio.ensure_future(client.publish(topic, message, qos, retain)) running_tasks.append(task) if running_tasks: yield from asyncio.wait(running_tasks) diff --git a/tests/test_broker.py b/tests/test_broker.py index 44bf269..66717e5 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -25,11 +25,6 @@ from hbmqtt.mqtt import ( from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 -import sys -if sys.version_info < (3, 5): - from asyncio import async as ensure_future -else: - from asyncio import ensure_future formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) @@ -387,7 +382,7 @@ class BrokerTest(unittest.TestCase): publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False) yield from publish_1.to_stream(writer) - ensure_future(PubrecPacket.from_stream(reader), loop=self.loop) + asyncio.ensure_future(PubrecPacket.from_stream(reader), loop=self.loop) yield from asyncio.sleep(2) From 229bad2d84359c1e8fd77189843a06bed168fc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Desv=C3=A9?= Date: Fri, 6 Jul 2018 09:58:22 +0200 Subject: [PATCH 2/3] Add py37 in tox --- .travis.yml | 1 + tox.ini | 1 + 2 files changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index db12fdf..fd17896 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,7 @@ python: - "3.4" - "3.5" - "3.6" + - "3.7" - "nightly" matrix: allow_failures: diff --git a/tox.ini b/tox.ini index 18c9c7d..e959bf2 100644 --- a/tox.ini +++ b/tox.ini @@ -8,6 +8,7 @@ envlist = py34, py35, py36, + py37, coverage, flake8, check-manifest From acce3b1c510845738e48a07c62a0fac673b8dbbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20Desv=C3=A9?= Date: Sat, 7 Jul 2018 12:26:53 +0200 Subject: [PATCH 3/3] Update .travis.yml --- .travis.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index fd17896..65014cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,11 +6,15 @@ python: - "3.4" - "3.5" - "3.6" - - "3.7" - "nightly" matrix: allow_failures: - python: "nightly" + include: + - python: "3.7" + env: TOXENV=py37 + dist: xenial + sudo: true install: - pip install coveralls tox-travis