kopia lustrzana https://github.com/Yakifo/amqtt
Various fixes
rodzic
6daa752800
commit
36c086df30
|
@ -353,8 +353,8 @@ class ProtocolHandler:
|
|||
running_tasks.popleft()
|
||||
if len(running_tasks) > 1:
|
||||
self.logger.debug("handler running tasks: %d" % len(running_tasks))
|
||||
fixed_header = await asyncio.wait_for(
|
||||
MQTTFixedHeader.from_stream(self.reader),
|
||||
|
||||
fixed_header = await asyncio.wait_for(MQTTFixedHeader.from_stream(self.reader),
|
||||
keepalive_timeout, loop=self._loop)
|
||||
if fixed_header:
|
||||
if fixed_header.packet_type == RESERVED_0 or fixed_header.packet_type == RESERVED_15:
|
||||
|
@ -403,6 +403,8 @@ class ProtocolHandler:
|
|||
else:
|
||||
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
|
||||
break
|
||||
except MQTTException:
|
||||
self.logger.debug("Message discarded")
|
||||
except asyncio.CancelledError:
|
||||
self.logger.debug("Task cancelled, reader loop ending")
|
||||
break
|
||||
|
@ -412,7 +414,7 @@ class ProtocolHandler:
|
|||
except NoDataException:
|
||||
self.logger.debug("%s No data available" % self.session.client_id)
|
||||
except BaseException as e:
|
||||
self.logger.warning("%s Unhandled exception in reader coro: %s" % (type(self).__name__, e))
|
||||
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
|
||||
break
|
||||
while running_tasks:
|
||||
running_tasks.popleft().cancel()
|
||||
|
|
|
@ -25,7 +25,8 @@ async def uptime_coro():
|
|||
logger.info("Subscribed")
|
||||
try:
|
||||
for i in range(1, 100):
|
||||
packet = await C.deliver_message()
|
||||
message = await C.deliver_message()
|
||||
packet = message.publish_packet
|
||||
print("%d %s : %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
|
||||
await C.unsubscribe(['$SYS/broker/uptime'])
|
||||
logger.info("UnSubscribed")
|
||||
|
|
Ładowanie…
Reference in New Issue