kopia lustrzana https://github.com/Yakifo/amqtt
Add protocol write synchronization.
Concurrent write to protocol was causing library to crash when drain() function was called simltaneously.pull/8/head
rodzic
6f2cd7f83c
commit
6f282187d8
|
@ -71,6 +71,8 @@ class ProtocolHandler:
|
||||||
self._pubrel_waiters = dict()
|
self._pubrel_waiters = dict()
|
||||||
self._pubcomp_waiters = dict()
|
self._pubcomp_waiters = dict()
|
||||||
|
|
||||||
|
self._write_lock = asyncio.Lock(loop=self._loop)
|
||||||
|
|
||||||
def _init_session(self, session: Session):
|
def _init_session(self, session: Session):
|
||||||
assert session
|
assert session
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -440,6 +442,7 @@ class ProtocolHandler:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _send_packet(self, packet):
|
def _send_packet(self, packet):
|
||||||
try:
|
try:
|
||||||
|
with (yield from self._write_lock):
|
||||||
yield from packet.to_stream(self.writer)
|
yield from packet.to_stream(self.writer)
|
||||||
if self._keepalive_task:
|
if self._keepalive_task:
|
||||||
self._keepalive_task.cancel()
|
self._keepalive_task.cancel()
|
||||||
|
|
Ładowanie…
Reference in New Issue