test_init_handler: protocolhandler gets the current loop, if not provided and self.loop isn't the current loop, it's a new loop; ProtocolHandler.__init__: handler was using the session's keepalive, even though if the keepalive is zero, it needs to bypass

pull/182/head
Andrew Mirsky 2025-05-26 08:48:10 -04:00
rodzic 255e38a947
commit a7f2ae5746
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
3 zmienionych plików z 2 dodań i 8 usunięć

Wyświetl plik

@ -1,4 +1,3 @@
import logging
from abc import ABC, abstractmethod
import asyncio
@ -34,9 +33,6 @@ DISCONNECT = 0x0E
RESERVED_15 = 0x0F
logger = logging.getLogger(__name__)
class MQTTFixedHeader:
"""Represents the fixed header of an MQTT packet."""
@ -210,7 +206,6 @@ class MQTTPacket(Generic[_VH, _P, _FH]):
async def to_stream(self, writer: WriterAdapter) -> None:
"""Write the entire packet to the stream."""
logger.debug(f">> writing packet to stream: {self}")
writer.write(self.to_bytes())
await writer.drain()
self.protocol_ts = datetime.now(UTC)
@ -253,7 +248,6 @@ class MQTTPacket(Generic[_VH, _P, _FH]):
else:
instance = cls(fixed_header, variable_header, payload)
instance.protocol_ts = datetime.now(UTC)
logger.debug(f">> read packet from stream: {instance!r}")
return instance
@property

Wyświetl plik

@ -446,7 +446,7 @@ class ProtocolHandler:
self.logger.debug(f"{self.session.client_id} Starting reader coro")
running_tasks: collections.deque[asyncio.Task[None]] = collections.deque()
keepalive_timeout: int | None = self.session.keep_alive
if keepalive_timeout and keepalive_timeout <= 0:
if keepalive_timeout is not None and keepalive_timeout <= 0:
keepalive_timeout = None
while True:
try:

Wyświetl plik

@ -39,7 +39,7 @@ class ProtocolHandlerTest(unittest.TestCase):
def test_init_handler(self):
Session()
handler = ProtocolHandler(self.plugin_manager)
handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
assert handler.session is None
assert handler._loop is self.loop
self.check_empty_waiters(handler)