kopia lustrzana https://github.com/Yakifo/amqtt
run black
rodzic
5d3691454d
commit
035cd6267b
|
@ -343,8 +343,7 @@ class Broker:
|
||||||
await self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
|
await self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
|
||||||
|
|
||||||
# Start broadcast loop
|
# Start broadcast loop
|
||||||
self._broadcast_task = asyncio.ensure_future(
|
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop())
|
||||||
self._broadcast_loop())
|
|
||||||
|
|
||||||
self.logger.debug("Broker started")
|
self.logger.debug("Broker started")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -418,7 +417,8 @@ class Broker:
|
||||||
# Wait for first packet and expect a CONNECT
|
# Wait for first packet and expect a CONNECT
|
||||||
try:
|
try:
|
||||||
handler, client_session = await BrokerProtocolHandler.init_from_connect(
|
handler, client_session = await BrokerProtocolHandler.init_from_connect(
|
||||||
reader, writer, self.plugins_manager)
|
reader, writer, self.plugins_manager
|
||||||
|
)
|
||||||
except AMQTTException as exc:
|
except AMQTTException as exc:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
"[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s"
|
"[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s"
|
||||||
|
@ -496,14 +496,14 @@ class Broker:
|
||||||
await self.publish_session_retained_messages(client_session)
|
await self.publish_session_retained_messages(client_session)
|
||||||
|
|
||||||
# Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
|
# Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
|
||||||
disconnect_waiter = asyncio.ensure_future(
|
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect())
|
||||||
handler.wait_disconnect())
|
|
||||||
subscribe_waiter = asyncio.ensure_future(
|
subscribe_waiter = asyncio.ensure_future(
|
||||||
handler.get_next_pending_subscription())
|
handler.get_next_pending_subscription()
|
||||||
|
)
|
||||||
unsubscribe_waiter = asyncio.ensure_future(
|
unsubscribe_waiter = asyncio.ensure_future(
|
||||||
handler.get_next_pending_unsubscription())
|
handler.get_next_pending_unsubscription()
|
||||||
wait_deliver = asyncio.ensure_future(
|
)
|
||||||
handler.mqtt_deliver_next_message())
|
wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message())
|
||||||
connected = True
|
connected = True
|
||||||
while connected:
|
while connected:
|
||||||
try:
|
try:
|
||||||
|
@ -569,7 +569,8 @@ class Broker:
|
||||||
unsubscription["packet_id"]
|
unsubscription["packet_id"]
|
||||||
)
|
)
|
||||||
unsubscribe_waiter = asyncio.Task(
|
unsubscribe_waiter = asyncio.Task(
|
||||||
handler.get_next_pending_unsubscription())
|
handler.get_next_pending_unsubscription()
|
||||||
|
)
|
||||||
if subscribe_waiter in done:
|
if subscribe_waiter in done:
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"%s handling subscription" % client_session.client_id
|
"%s handling subscription" % client_session.client_id
|
||||||
|
@ -596,7 +597,8 @@ class Broker:
|
||||||
subscription, client_session
|
subscription, client_session
|
||||||
)
|
)
|
||||||
subscribe_waiter = asyncio.Task(
|
subscribe_waiter = asyncio.Task(
|
||||||
handler.get_next_pending_subscription())
|
handler.get_next_pending_subscription()
|
||||||
|
)
|
||||||
self.logger.debug(repr(self._subscriptions))
|
self.logger.debug(repr(self._subscriptions))
|
||||||
if wait_deliver in done:
|
if wait_deliver in done:
|
||||||
if self.logger.isEnabledFor(logging.DEBUG):
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
|
@ -643,8 +645,7 @@ class Broker:
|
||||||
app_message.data,
|
app_message.data,
|
||||||
app_message.qos,
|
app_message.qos,
|
||||||
)
|
)
|
||||||
wait_deliver = asyncio.Task(
|
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message())
|
||||||
handler.mqtt_deliver_next_message())
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
self.logger.debug("Client loop cancelled")
|
self.logger.debug("Client loop cancelled")
|
||||||
break
|
break
|
||||||
|
|
|
@ -123,8 +123,7 @@ class MQTTClient:
|
||||||
# Init plugins manager
|
# Init plugins manager
|
||||||
context = ClientContext()
|
context = ClientContext()
|
||||||
context.config = self.config
|
context.config = self.config
|
||||||
self.plugins_manager = PluginManager(
|
self.plugins_manager = PluginManager("amqtt.client.plugins", context)
|
||||||
"amqtt.client.plugins", context)
|
|
||||||
self.client_tasks = deque()
|
self.client_tasks = deque()
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
|
@ -251,8 +250,7 @@ class MQTTClient:
|
||||||
|
|
||||||
async def _do_connect(self):
|
async def _do_connect(self):
|
||||||
return_code = await self._connect_coro()
|
return_code = await self._connect_coro()
|
||||||
self._disconnect_task = asyncio.ensure_future(
|
self._disconnect_task = asyncio.ensure_future(self.handle_connection_close())
|
||||||
self.handle_connection_close())
|
|
||||||
return return_code
|
return return_code
|
||||||
|
|
||||||
@mqtt_connected
|
@mqtt_connected
|
||||||
|
@ -365,8 +363,7 @@ class MQTTClient:
|
||||||
:return: instance of :class:`amqtt.session.ApplicationMessage` containing received message information flow.
|
:return: instance of :class:`amqtt.session.ApplicationMessage` containing received message information flow.
|
||||||
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
|
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
|
||||||
"""
|
"""
|
||||||
deliver_task = asyncio.ensure_future(
|
deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message())
|
||||||
self._handler.mqtt_deliver_next_message())
|
|
||||||
self.client_tasks.append(deliver_task)
|
self.client_tasks.append(deliver_task)
|
||||||
self.logger.debug("Waiting message delivery")
|
self.logger.debug("Waiting message delivery")
|
||||||
done, pending = await asyncio.wait(
|
done, pending = await asyncio.wait(
|
||||||
|
@ -442,9 +439,7 @@ class MQTTClient:
|
||||||
# Open connection
|
# Open connection
|
||||||
if scheme in ("mqtt", "mqtts"):
|
if scheme in ("mqtt", "mqtts"):
|
||||||
conn_reader, conn_writer = await asyncio.open_connection(
|
conn_reader, conn_writer = await asyncio.open_connection(
|
||||||
self.session.remote_address,
|
self.session.remote_address, self.session.remote_port, **kwargs
|
||||||
self.session.remote_port,
|
|
||||||
**kwargs
|
|
||||||
)
|
)
|
||||||
reader = StreamReaderAdapter(conn_reader)
|
reader = StreamReaderAdapter(conn_reader)
|
||||||
writer = StreamWriterAdapter(conn_writer)
|
writer = StreamWriterAdapter(conn_writer)
|
||||||
|
|
|
@ -176,10 +176,7 @@ class ProtocolHandler:
|
||||||
for message in itertools.chain(
|
for message in itertools.chain(
|
||||||
self.session.inflight_in.values(), self.session.inflight_out.values()
|
self.session.inflight_in.values(), self.session.inflight_out.values()
|
||||||
):
|
):
|
||||||
tasks.append(
|
tasks.append(asyncio.wait_for(self._handle_message_flow(message), 10))
|
||||||
asyncio.wait_for(
|
|
||||||
self._handle_message_flow(message), 10)
|
|
||||||
)
|
|
||||||
if tasks:
|
if tasks:
|
||||||
done, pending = await asyncio.wait(tasks)
|
done, pending = await asyncio.wait(tasks)
|
||||||
self.logger.debug("%d messages redelivered" % len(done))
|
self.logger.debug("%d messages redelivered" % len(done))
|
||||||
|
@ -213,8 +210,7 @@ class ProtocolHandler:
|
||||||
message = OutgoingApplicationMessage(packet_id, topic, qos, data, retain)
|
message = OutgoingApplicationMessage(packet_id, topic, qos, data, retain)
|
||||||
# Handle message flow
|
# Handle message flow
|
||||||
if ack_timeout is not None and ack_timeout > 0:
|
if ack_timeout is not None and ack_timeout > 0:
|
||||||
await asyncio.wait_for(
|
await asyncio.wait_for(self._handle_message_flow(message), ack_timeout)
|
||||||
self._handle_message_flow(message), ack_timeout)
|
|
||||||
else:
|
else:
|
||||||
await self._handle_message_flow(message)
|
await self._handle_message_flow(message)
|
||||||
|
|
||||||
|
@ -440,44 +436,33 @@ class ProtocolHandler:
|
||||||
)
|
)
|
||||||
task = None
|
task = None
|
||||||
if packet.fixed_header.packet_type == CONNACK:
|
if packet.fixed_header.packet_type == CONNACK:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_connack(packet))
|
||||||
self.handle_connack(packet))
|
|
||||||
elif packet.fixed_header.packet_type == SUBSCRIBE:
|
elif packet.fixed_header.packet_type == SUBSCRIBE:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_subscribe(packet))
|
||||||
self.handle_subscribe(packet))
|
|
||||||
elif packet.fixed_header.packet_type == UNSUBSCRIBE:
|
elif packet.fixed_header.packet_type == UNSUBSCRIBE:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(
|
||||||
self.handle_unsubscribe(packet))
|
self.handle_unsubscribe(packet)
|
||||||
|
)
|
||||||
elif packet.fixed_header.packet_type == SUBACK:
|
elif packet.fixed_header.packet_type == SUBACK:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_suback(packet))
|
||||||
self.handle_suback(packet))
|
|
||||||
elif packet.fixed_header.packet_type == UNSUBACK:
|
elif packet.fixed_header.packet_type == UNSUBACK:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_unsuback(packet))
|
||||||
self.handle_unsuback(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PUBACK:
|
elif packet.fixed_header.packet_type == PUBACK:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_puback(packet))
|
||||||
self.handle_puback(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PUBREC:
|
elif packet.fixed_header.packet_type == PUBREC:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_pubrec(packet))
|
||||||
self.handle_pubrec(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PUBREL:
|
elif packet.fixed_header.packet_type == PUBREL:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_pubrel(packet))
|
||||||
self.handle_pubrel(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PUBCOMP:
|
elif packet.fixed_header.packet_type == PUBCOMP:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_pubcomp(packet))
|
||||||
self.handle_pubcomp(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PINGREQ:
|
elif packet.fixed_header.packet_type == PINGREQ:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_pingreq(packet))
|
||||||
self.handle_pingreq(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PINGRESP:
|
elif packet.fixed_header.packet_type == PINGRESP:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_pingresp(packet))
|
||||||
self.handle_pingresp(packet))
|
|
||||||
elif packet.fixed_header.packet_type == PUBLISH:
|
elif packet.fixed_header.packet_type == PUBLISH:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_publish(packet))
|
||||||
self.handle_publish(packet))
|
|
||||||
elif packet.fixed_header.packet_type == DISCONNECT:
|
elif packet.fixed_header.packet_type == DISCONNECT:
|
||||||
task = asyncio.ensure_future(
|
task = asyncio.ensure_future(self.handle_disconnect(packet))
|
||||||
self.handle_disconnect(packet))
|
|
||||||
elif packet.fixed_header.packet_type == CONNECT:
|
elif packet.fixed_header.packet_type == CONNECT:
|
||||||
self.handle_connect(packet)
|
self.handle_connect(packet)
|
||||||
else:
|
else:
|
||||||
|
|
Ładowanie…
Reference in New Issue