kopia lustrzana https://github.com/Yakifo/amqtt
broker: release server connection upon exception
rodzic
4944b0deae
commit
7fcdb0c13b
|
@ -15,7 +15,7 @@ from functools import partial
|
||||||
from transitions import Machine, MachineError
|
from transitions import Machine, MachineError
|
||||||
from amqtt.session import Session
|
from amqtt.session import Session
|
||||||
from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
||||||
from amqtt.errors import AMQTTException, MQTTException
|
from amqtt.errors import AMQTTException, MQTTException, NoDataException
|
||||||
from amqtt.utils import format_client_message, gen_client_id
|
from amqtt.utils import format_client_message, gen_client_id
|
||||||
from amqtt.adapters import (
|
from amqtt.adapters import (
|
||||||
StreamReaderAdapter,
|
StreamReaderAdapter,
|
||||||
|
@ -416,6 +416,7 @@ class Broker:
|
||||||
)
|
)
|
||||||
# await writer.close()
|
# await writer.close()
|
||||||
self.logger.debug("Connection closed")
|
self.logger.debug("Connection closed")
|
||||||
|
server.release_connection()
|
||||||
return
|
return
|
||||||
except MQTTException as me:
|
except MQTTException as me:
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
|
@ -423,8 +424,16 @@ class Broker:
|
||||||
% (format_client_message(address=remote_address, port=remote_port), me)
|
% (format_client_message(address=remote_address, port=remote_port), me)
|
||||||
)
|
)
|
||||||
await writer.close()
|
await writer.close()
|
||||||
|
server.release_connection()
|
||||||
self.logger.debug("Connection closed")
|
self.logger.debug("Connection closed")
|
||||||
return
|
return
|
||||||
|
except NoDataException as ne:
|
||||||
|
self.logger.error(
|
||||||
|
"No data from %s : %s"
|
||||||
|
% (format_client_message(address=remote_address, port=remote_port), ne)
|
||||||
|
)
|
||||||
|
server.release_connection()
|
||||||
|
return
|
||||||
|
|
||||||
if client_session.clean_session:
|
if client_session.clean_session:
|
||||||
# Delete existing session and create a new one
|
# Delete existing session and create a new one
|
||||||
|
|
|
@ -13,7 +13,7 @@ class AMQTTException(Exception):
|
||||||
|
|
||||||
class MQTTException(Exception):
|
class MQTTException(Exception):
|
||||||
"""
|
"""
|
||||||
Base class for all errors refering to MQTT specifications
|
Base class for all errors referring to MQTT specifications
|
||||||
"""
|
"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -130,6 +130,7 @@ async def test_connect_tcp(broker):
|
||||||
if conn.status == "ESTABLISHED":
|
if conn.status == "ESTABLISHED":
|
||||||
open_connections.append(conn)
|
open_connections.append(conn)
|
||||||
assert len(open_connections) == 1
|
assert len(open_connections) == 1
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
assert broker._servers["default"].conn_count == 1
|
assert broker._servers["default"].conn_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
Ładowanie…
Reference in New Issue