kopia lustrzana https://github.com/Yakifo/amqtt
Merge pull request #173 from d21d3q/bugfix/synchronize-write
Add protocol write synchronization.pull/8/head
commit
b6973e06cf
|
@ -71,6 +71,8 @@ class ProtocolHandler:
|
|||
self._pubrel_waiters = dict()
|
||||
self._pubcomp_waiters = dict()
|
||||
|
||||
self._write_lock = asyncio.Lock(loop=self._loop)
|
||||
|
||||
def _init_session(self, session: Session):
|
||||
assert session
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -440,7 +442,8 @@ class ProtocolHandler:
|
|||
@asyncio.coroutine
|
||||
def _send_packet(self, packet):
|
||||
try:
|
||||
yield from packet.to_stream(self.writer)
|
||||
with (yield from self._write_lock):
|
||||
yield from packet.to_stream(self.writer)
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
||||
|
|
Ładowanie…
Reference in New Issue