diff --git a/amqtt/broker.py b/amqtt/broker.py index c2a63b8..a4f8ef1 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -15,7 +15,7 @@ from functools import partial from transitions import Machine, MachineError from amqtt.session import Session from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler -from amqtt.errors import AMQTTException, MQTTException +from amqtt.errors import AMQTTException, MQTTException, NoDataException from amqtt.utils import format_client_message, gen_client_id from amqtt.adapters import ( StreamReaderAdapter, @@ -416,6 +416,7 @@ class Broker: ) # await writer.close() self.logger.debug("Connection closed") + server.release_connection() return except MQTTException as me: self.logger.error( @@ -423,8 +424,16 @@ class Broker: % (format_client_message(address=remote_address, port=remote_port), me) ) await writer.close() + server.release_connection() self.logger.debug("Connection closed") return + except NoDataException as ne: + self.logger.error( + "No data from %s : %s" + % (format_client_message(address=remote_address, port=remote_port), ne) + ) + server.release_connection() + return if client_session.clean_session: # Delete existing session and create a new one diff --git a/amqtt/errors.py b/amqtt/errors.py index 40f011c..b22b248 100644 --- a/amqtt/errors.py +++ b/amqtt/errors.py @@ -13,7 +13,7 @@ class AMQTTException(Exception): class MQTTException(Exception): """ - Base class for all errors refering to MQTT specifications + Base class for all errors referring to MQTT specifications """ pass diff --git a/tests/test_broker.py b/tests/test_broker.py index e3db026..f9d06d3 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -130,6 +130,7 @@ async def test_connect_tcp(broker): if conn.status == "ESTABLISHED": open_connections.append(conn) assert len(open_connections) == 1 + await asyncio.sleep(0.1) assert broker._servers["default"].conn_count == 1