kopia lustrzana https://github.com/Yakifo/amqtt
remove deprecated/not used loop argument
rodzic
f95f97be01
commit
1d6acb9934
|
@ -65,15 +65,11 @@ class RetainedApplicationMessage:
|
||||||
|
|
||||||
|
|
||||||
class Server:
|
class Server:
|
||||||
def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):
|
def __init__(self, listener_name, server_instance, max_connections=-1):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.instance = server_instance
|
self.instance = server_instance
|
||||||
self.conn_count = 0
|
self.conn_count = 0
|
||||||
self.listener_name = listener_name
|
self.listener_name = listener_name
|
||||||
if loop is not None:
|
|
||||||
self._loop = loop
|
|
||||||
else:
|
|
||||||
self._loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
self.max_connections = max_connections
|
self.max_connections = max_connections
|
||||||
if self.max_connections > 0:
|
if self.max_connections > 0:
|
||||||
|
@ -318,7 +314,7 @@ class Broker:
|
||||||
ssl=sc,
|
ssl=sc,
|
||||||
)
|
)
|
||||||
self._servers[listener_name] = Server(
|
self._servers[listener_name] = Server(
|
||||||
listener_name, instance, max_connections, self._loop
|
listener_name, instance, max_connections
|
||||||
)
|
)
|
||||||
elif listener["type"] == "ws":
|
elif listener["type"] == "ws":
|
||||||
cb_partial = partial(self.ws_connected, listener_name=listener_name)
|
cb_partial = partial(self.ws_connected, listener_name=listener_name)
|
||||||
|
@ -327,11 +323,10 @@ class Broker:
|
||||||
address,
|
address,
|
||||||
port,
|
port,
|
||||||
ssl=sc,
|
ssl=sc,
|
||||||
loop=self._loop,
|
|
||||||
subprotocols=["mqtt"],
|
subprotocols=["mqtt"],
|
||||||
)
|
)
|
||||||
self._servers[listener_name] = Server(
|
self._servers[listener_name] = Server(
|
||||||
listener_name, instance, max_connections, self._loop
|
listener_name, instance, max_connections
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
|
|
|
@ -95,11 +95,10 @@ class MQTTClient:
|
||||||
|
|
||||||
:param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`amqtt.utils.gen_client_id`
|
:param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`amqtt.utils.gen_client_id`
|
||||||
:param config: Client configuration
|
:param config: Client configuration
|
||||||
:param loop: asynio loop to use
|
|
||||||
:return: class instance
|
:return: class instance
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, client_id=None, config=None, loop=None):
|
def __init__(self, client_id=None, config=None):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.config = copy.deepcopy(_defaults)
|
self.config = copy.deepcopy(_defaults)
|
||||||
if config is not None:
|
if config is not None:
|
||||||
|
@ -112,10 +111,6 @@ class MQTTClient:
|
||||||
self.client_id = gen_client_id()
|
self.client_id = gen_client_id()
|
||||||
self.logger.debug("Using generated client ID : %s" % self.client_id)
|
self.logger.debug("Using generated client ID : %s" % self.client_id)
|
||||||
|
|
||||||
if loop is not None:
|
|
||||||
self._loop = loop
|
|
||||||
else:
|
|
||||||
self._loop = asyncio.get_event_loop()
|
|
||||||
self.session = None
|
self.session = None
|
||||||
self._handler = None
|
self._handler = None
|
||||||
self._disconnect_task = None
|
self._disconnect_task = None
|
||||||
|
@ -450,7 +445,6 @@ class MQTTClient:
|
||||||
websocket = await websockets.connect(
|
websocket = await websockets.connect(
|
||||||
self.session.broker_uri,
|
self.session.broker_uri,
|
||||||
subprotocols=["mqtt"],
|
subprotocols=["mqtt"],
|
||||||
loop=self._loop,
|
|
||||||
extra_headers=self.extra_headers,
|
extra_headers=self.extra_headers,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
|
@ -191,7 +191,7 @@ class BrokerProtocolHandler(ProtocolHandler):
|
||||||
await writer.close()
|
await writer.close()
|
||||||
raise MQTTException(error_msg)
|
raise MQTTException(error_msg)
|
||||||
|
|
||||||
incoming_session = Session(loop)
|
incoming_session = Session()
|
||||||
incoming_session.client_id = connect.client_id
|
incoming_session.client_id = connect.client_id
|
||||||
incoming_session.clean_session = connect.clean_session_flag
|
incoming_session.clean_session = connect.clean_session_flag
|
||||||
incoming_session.will_flag = connect.will_flag
|
incoming_session.will_flag = connect.will_flag
|
||||||
|
|
|
@ -177,7 +177,7 @@ def main(*args, **kwargs):
|
||||||
config["will"]["qos"] = int(arguments["--will-qos"])
|
config["will"]["qos"] = int(arguments["--will-qos"])
|
||||||
config["will"]["retain"] = arguments["--will-retain"]
|
config["will"]["retain"] = arguments["--will-retain"]
|
||||||
|
|
||||||
client = MQTTClient(client_id=client_id, config=config, loop=loop)
|
client = MQTTClient(client_id=client_id, config=config)
|
||||||
loop.run_until_complete(do_pub(client, arguments))
|
loop.run_until_complete(do_pub(client, arguments))
|
||||||
loop.close()
|
loop.close()
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ def main(*args, **kwargs):
|
||||||
config["will"]["qos"] = int(arguments["--will-qos"])
|
config["will"]["qos"] = int(arguments["--will-qos"])
|
||||||
config["will"]["retain"] = arguments["--will-retain"]
|
config["will"]["retain"] = arguments["--will-retain"]
|
||||||
|
|
||||||
client = MQTTClient(client_id=client_id, config=config, loop=loop)
|
client = MQTTClient(client_id=client_id, config=config)
|
||||||
loop.run_until_complete(do_sub(client, arguments))
|
loop.run_until_complete(do_sub(client, arguments))
|
||||||
loop.close()
|
loop.close()
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
# Copyright (c) 2015 Nicolas JOUANIN
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
#
|
#
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
import asyncio
|
|
||||||
from transitions import Machine
|
from transitions import Machine
|
||||||
from asyncio import Queue
|
from asyncio import Queue
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
@ -106,7 +105,7 @@ class OutgoingApplicationMessage(ApplicationMessage):
|
||||||
class Session:
|
class Session:
|
||||||
states = ["new", "connected", "disconnected"]
|
states = ["new", "connected", "disconnected"]
|
||||||
|
|
||||||
def __init__(self, loop=None):
|
def __init__(self):
|
||||||
self._init_states()
|
self._init_states()
|
||||||
self.remote_address = None
|
self.remote_address = None
|
||||||
self.remote_port = None
|
self.remote_port = None
|
||||||
|
@ -127,10 +126,6 @@ class Session:
|
||||||
self.cadata = None
|
self.cadata = None
|
||||||
self._packet_id = 0
|
self._packet_id = 0
|
||||||
self.parent = 0
|
self.parent = 0
|
||||||
if loop is not None:
|
|
||||||
self._loop = loop
|
|
||||||
else:
|
|
||||||
self._loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
# Used to store outgoing ApplicationMessage while publish protocol flows
|
# Used to store outgoing ApplicationMessage while publish protocol flows
|
||||||
self.inflight_out = OrderedDict()
|
self.inflight_out = OrderedDict()
|
||||||
|
|
Ładowanie…
Reference in New Issue