Merge pull request #237 from ajmirsky/connection_timeout

client connection timeout
pull/236/head
Andrew Mirsky 2025-06-26 13:17:20 -04:00 zatwierdzone przez GitHub
commit aac3bdbfab
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
3 zmienionych plików z 30 dodań i 11 usunięć

Wyświetl plik

@ -456,6 +456,8 @@ class MQTTClient:
# if not self._handler: # if not self._handler:
self._handler = ClientProtocolHandler(self.plugins_manager) self._handler = ClientProtocolHandler(self.plugins_manager)
connection_timeout = self.config.get("connection_timeout", None)
if secure: if secure:
sc = ssl.create_default_context( sc = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH, ssl.Purpose.SERVER_AUTH,
@ -476,21 +478,24 @@ class MQTTClient:
# Open connection # Open connection
if scheme in ("mqtt", "mqtts"): if scheme in ("mqtt", "mqtts"):
conn_reader, conn_writer = await asyncio.open_connection( conn_reader, conn_writer = await asyncio.wait_for(
self.session.remote_address, asyncio.open_connection(
self.session.remote_port, self.session.remote_address,
**kwargs, self.session.remote_port,
) **kwargs,
), timeout=connection_timeout)
reader = StreamReaderAdapter(conn_reader) reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer) writer = StreamWriterAdapter(conn_writer)
elif scheme in ("ws", "wss") and self.session.broker_uri: elif scheme in ("ws", "wss") and self.session.broker_uri:
websocket: ClientConnection = await websockets.connect( websocket: ClientConnection = await asyncio.wait_for(
self.session.broker_uri, websockets.connect(
subprotocols=[websockets.Subprotocol("mqtt")], self.session.broker_uri,
additional_headers=self.additional_headers, subprotocols=[websockets.Subprotocol("mqtt")],
**kwargs, additional_headers=self.additional_headers,
) **kwargs,
), timeout=connection_timeout)
reader = WebSocketsReader(websocket) reader = WebSocketsReader(websocket)
writer = WebSocketsWriter(websocket) writer = WebSocketsWriter(websocket)
elif not self.session.broker_uri: elif not self.session.broker_uri:

Wyświetl plik

@ -25,6 +25,11 @@ Default retain value to messages published. Defaults to `false`.
Enable or disable auto-reconnect if connection with the broker is interrupted. Defaults to `false`. Enable or disable auto-reconnect if connection with the broker is interrupted. Defaults to `false`.
### `connect_timeout` *(int)*
If specified, the number of seconds before a connection times out
### `reconnect_retries` *(int)* ### `reconnect_retries` *(int)*
Maximum reconnection retries. Defaults to `2`. Negative value will cause client to reconnect infinitely. Maximum reconnection retries. Defaults to `2`. Negative value will cause client to reconnect infinitely.

Wyświetl plik

@ -446,6 +446,15 @@ async def test_connect_incorrect_scheme():
await client.connect('"mq://someplace') await client.connect('"mq://someplace')
@pytest.mark.asyncio
@pytest.mark.timeout(3)
async def test_connect_timeout():
config = {"auto_reconnect": False, "connection_timeout": 2}
client = MQTTClient(config=config)
with pytest.raises(ClientError):
await client.connect("mqtt://localhost:8888")
async def test_client_no_auth(): async def test_client_no_auth():
class MockEntryPoints: class MockEntryPoints: