kopia lustrzana https://github.com/Yakifo/amqtt
Remove config argument to handler
rodzic
efd88b50fe
commit
f53ae9e10a
|
@ -18,7 +18,6 @@ _defaults = {
|
||||||
'ping_delay': 1,
|
'ping_delay': 1,
|
||||||
'default_qos': 0,
|
'default_qos': 0,
|
||||||
'default_retain': False,
|
'default_retain': False,
|
||||||
'subscriptions-polling-interval': 1,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -188,7 +187,7 @@ class MQTTClient:
|
||||||
try:
|
try:
|
||||||
self.session.reader, self.session.writer = \
|
self.session.reader, self.session.writer = \
|
||||||
yield from asyncio.open_connection(self.session.remote_address, self.session.remote_port)
|
yield from asyncio.open_connection(self.session.remote_address, self.session.remote_port)
|
||||||
self._handler = ClientProtocolHandler(self.session, self.config)
|
self._handler = ClientProtocolHandler(self.session)
|
||||||
yield from self._handler.start()
|
yield from self._handler.start()
|
||||||
|
|
||||||
return_code = yield from self._handler.mqtt_connect()
|
return_code = yield from self._handler.mqtt_connect()
|
||||||
|
@ -251,7 +250,7 @@ class MQTTClient:
|
||||||
s.cleansession = cleansession
|
s.cleansession = cleansession
|
||||||
else:
|
else:
|
||||||
s.cleansession = self.config.get('cleansession', True)
|
s.cleansession = self.config.get('cleansession', True)
|
||||||
s.keep_alive = self.config['keep_alive']
|
s.keep_alive = self.config['keep_alive'] - self.config['ping_delay']
|
||||||
if 'will' in self.config:
|
if 'will' in self.config:
|
||||||
s.will_flag = True
|
s.will_flag = True
|
||||||
s.will_retain = self.config['will']['retain']
|
s.will_retain = self.config['will']['retain']
|
||||||
|
|
|
@ -19,8 +19,8 @@ from hbmqtt.mqtt.unsuback import UnsubackPacket
|
||||||
from hbmqtt.session import Session
|
from hbmqtt.session import Session
|
||||||
|
|
||||||
class ClientProtocolHandler(ProtocolHandler):
|
class ClientProtocolHandler(ProtocolHandler):
|
||||||
def __init__(self, session: Session, config, loop=None):
|
def __init__(self, session: Session, loop=None):
|
||||||
super().__init__(session, config, loop)
|
super().__init__(session, loop)
|
||||||
self._ping_task = None
|
self._ping_task = None
|
||||||
self._connack_waiter = None
|
self._connack_waiter = None
|
||||||
self._pingresp_queue = asyncio.Queue()
|
self._pingresp_queue = asyncio.Queue()
|
||||||
|
|
|
@ -49,10 +49,9 @@ class ProtocolHandler:
|
||||||
Class implementing the MQTT communication protocol using asyncio features
|
Class implementing the MQTT communication protocol using asyncio features
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, session: Session, config, loop=None):
|
def __init__(self, session: Session, loop=None):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.session = session
|
self.session = session
|
||||||
self.config = config
|
|
||||||
if loop is None:
|
if loop is None:
|
||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
else:
|
else:
|
||||||
|
@ -182,10 +181,12 @@ class ProtocolHandler:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _writer_coro(self):
|
def _writer_coro(self):
|
||||||
self.logger.debug("Starting writer coro")
|
self.logger.debug("Starting writer coro")
|
||||||
keepalive_timeout = self.session.keep_alive - self.config['ping_delay']
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
self._writer_ready.set()
|
self._writer_ready.set()
|
||||||
|
keepalive_timeout = self.session.keep_alive
|
||||||
|
if keepalive_timeout <= 0:
|
||||||
|
keepalive_timeout = None
|
||||||
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), keepalive_timeout)
|
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), keepalive_timeout)
|
||||||
yield from packet.to_stream(self.session.writer)
|
yield from packet.to_stream(self.session.writer)
|
||||||
self.logger.debug(" -out-> " + repr(packet))
|
self.logger.debug(" -out-> " + repr(packet))
|
||||||
|
|
Ładowanie…
Reference in New Issue