diff --git a/amqtt/broker.py b/amqtt/broker.py index a957fda..6eba2ec 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -343,8 +343,7 @@ class Broker: await self.plugins_manager.fire_event(EVENT_BROKER_POST_START) # Start broadcast loop - self._broadcast_task = asyncio.ensure_future( - self._broadcast_loop()) + self._broadcast_task = asyncio.ensure_future(self._broadcast_loop()) self.logger.debug("Broker started") except Exception as e: @@ -418,7 +417,8 @@ class Broker: # Wait for first packet and expect a CONNECT try: handler, client_session = await BrokerProtocolHandler.init_from_connect( - reader, writer, self.plugins_manager) + reader, writer, self.plugins_manager + ) except AMQTTException as exc: self.logger.warning( "[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" @@ -496,14 +496,14 @@ class Broker: await self.publish_session_retained_messages(client_session) # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect) - disconnect_waiter = asyncio.ensure_future( - handler.wait_disconnect()) + disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect()) subscribe_waiter = asyncio.ensure_future( - handler.get_next_pending_subscription()) + handler.get_next_pending_subscription() + ) unsubscribe_waiter = asyncio.ensure_future( - handler.get_next_pending_unsubscription()) - wait_deliver = asyncio.ensure_future( - handler.mqtt_deliver_next_message()) + handler.get_next_pending_unsubscription() + ) + wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message()) connected = True while connected: try: @@ -569,7 +569,8 @@ class Broker: unsubscription["packet_id"] ) unsubscribe_waiter = asyncio.Task( - handler.get_next_pending_unsubscription()) + handler.get_next_pending_unsubscription() + ) if subscribe_waiter in done: self.logger.debug( "%s handling subscription" % client_session.client_id @@ -596,7 +597,8 @@ class Broker: subscription, client_session ) subscribe_waiter = asyncio.Task( - handler.get_next_pending_subscription()) + handler.get_next_pending_subscription() + ) self.logger.debug(repr(self._subscriptions)) if wait_deliver in done: if self.logger.isEnabledFor(logging.DEBUG): @@ -643,8 +645,7 @@ class Broker: app_message.data, app_message.qos, ) - wait_deliver = asyncio.Task( - handler.mqtt_deliver_next_message()) + wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message()) except asyncio.CancelledError: self.logger.debug("Client loop cancelled") break diff --git a/amqtt/client.py b/amqtt/client.py index 258615f..3c5d153 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -123,8 +123,7 @@ class MQTTClient: # Init plugins manager context = ClientContext() context.config = self.config - self.plugins_manager = PluginManager( - "amqtt.client.plugins", context) + self.plugins_manager = PluginManager("amqtt.client.plugins", context) self.client_tasks = deque() async def connect( @@ -251,8 +250,7 @@ class MQTTClient: async def _do_connect(self): return_code = await self._connect_coro() - self._disconnect_task = asyncio.ensure_future( - self.handle_connection_close()) + self._disconnect_task = asyncio.ensure_future(self.handle_connection_close()) return return_code @mqtt_connected @@ -365,8 +363,7 @@ class MQTTClient: :return: instance of :class:`amqtt.session.ApplicationMessage` containing received message information flow. :raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered """ - deliver_task = asyncio.ensure_future( - self._handler.mqtt_deliver_next_message()) + deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message()) self.client_tasks.append(deliver_task) self.logger.debug("Waiting message delivery") done, pending = await asyncio.wait( @@ -442,9 +439,7 @@ class MQTTClient: # Open connection if scheme in ("mqtt", "mqtts"): conn_reader, conn_writer = await asyncio.open_connection( - self.session.remote_address, - self.session.remote_port, - **kwargs + self.session.remote_address, self.session.remote_port, **kwargs ) reader = StreamReaderAdapter(conn_reader) writer = StreamWriterAdapter(conn_writer) diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index 8089e4a..e00c93f 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -176,10 +176,7 @@ class ProtocolHandler: for message in itertools.chain( self.session.inflight_in.values(), self.session.inflight_out.values() ): - tasks.append( - asyncio.wait_for( - self._handle_message_flow(message), 10) - ) + tasks.append(asyncio.wait_for(self._handle_message_flow(message), 10)) if tasks: done, pending = await asyncio.wait(tasks) self.logger.debug("%d messages redelivered" % len(done)) @@ -213,8 +210,7 @@ class ProtocolHandler: message = OutgoingApplicationMessage(packet_id, topic, qos, data, retain) # Handle message flow if ack_timeout is not None and ack_timeout > 0: - await asyncio.wait_for( - self._handle_message_flow(message), ack_timeout) + await asyncio.wait_for(self._handle_message_flow(message), ack_timeout) else: await self._handle_message_flow(message) @@ -440,44 +436,33 @@ class ProtocolHandler: ) task = None if packet.fixed_header.packet_type == CONNACK: - task = asyncio.ensure_future( - self.handle_connack(packet)) + task = asyncio.ensure_future(self.handle_connack(packet)) elif packet.fixed_header.packet_type == SUBSCRIBE: - task = asyncio.ensure_future( - self.handle_subscribe(packet)) + task = asyncio.ensure_future(self.handle_subscribe(packet)) elif packet.fixed_header.packet_type == UNSUBSCRIBE: task = asyncio.ensure_future( - self.handle_unsubscribe(packet)) + self.handle_unsubscribe(packet) + ) elif packet.fixed_header.packet_type == SUBACK: - task = asyncio.ensure_future( - self.handle_suback(packet)) + task = asyncio.ensure_future(self.handle_suback(packet)) elif packet.fixed_header.packet_type == UNSUBACK: - task = asyncio.ensure_future( - self.handle_unsuback(packet)) + task = asyncio.ensure_future(self.handle_unsuback(packet)) elif packet.fixed_header.packet_type == PUBACK: - task = asyncio.ensure_future( - self.handle_puback(packet)) + task = asyncio.ensure_future(self.handle_puback(packet)) elif packet.fixed_header.packet_type == PUBREC: - task = asyncio.ensure_future( - self.handle_pubrec(packet)) + task = asyncio.ensure_future(self.handle_pubrec(packet)) elif packet.fixed_header.packet_type == PUBREL: - task = asyncio.ensure_future( - self.handle_pubrel(packet)) + task = asyncio.ensure_future(self.handle_pubrel(packet)) elif packet.fixed_header.packet_type == PUBCOMP: - task = asyncio.ensure_future( - self.handle_pubcomp(packet)) + task = asyncio.ensure_future(self.handle_pubcomp(packet)) elif packet.fixed_header.packet_type == PINGREQ: - task = asyncio.ensure_future( - self.handle_pingreq(packet)) + task = asyncio.ensure_future(self.handle_pingreq(packet)) elif packet.fixed_header.packet_type == PINGRESP: - task = asyncio.ensure_future( - self.handle_pingresp(packet)) + task = asyncio.ensure_future(self.handle_pingresp(packet)) elif packet.fixed_header.packet_type == PUBLISH: - task = asyncio.ensure_future( - self.handle_publish(packet)) + task = asyncio.ensure_future(self.handle_publish(packet)) elif packet.fixed_header.packet_type == DISCONNECT: - task = asyncio.ensure_future( - self.handle_disconnect(packet)) + task = asyncio.ensure_future(self.handle_disconnect(packet)) elif packet.fixed_header.packet_type == CONNECT: self.handle_connect(packet) else: