kopia lustrzana https://github.com/Yakifo/amqtt
Asyncio fixes
rodzic
69e3e0bc5b
commit
6d6dd997f1
|
@ -158,13 +158,16 @@ class StreamWriterAdapter(WriterAdapter):
|
|||
def __init__(self, writer: StreamWriter):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._writer = writer
|
||||
self.is_closed = False # StreamWriter has no test for closed...we use our own
|
||||
|
||||
def write(self, data):
|
||||
self._writer.write(data)
|
||||
if not self.is_closed:
|
||||
self._writer.write(data)
|
||||
|
||||
@asyncio.coroutine
|
||||
def drain(self):
|
||||
yield from self._writer.drain()
|
||||
if not self.is_closed:
|
||||
yield from self._writer.drain()
|
||||
|
||||
def get_peer_info(self):
|
||||
extra_info = self._writer.get_extra_info('peername')
|
||||
|
@ -172,10 +175,14 @@ class StreamWriterAdapter(WriterAdapter):
|
|||
|
||||
@asyncio.coroutine
|
||||
def close(self):
|
||||
yield from self._writer.drain()
|
||||
if self._writer.can_write_eof():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
if not self.is_closed:
|
||||
yield from self._writer.drain()
|
||||
if self._writer.can_write_eof():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
try: yield from self._writer.wait_closed() # py37+
|
||||
except AttributeError: pass
|
||||
self.is_closed = True
|
||||
|
||||
|
||||
class BufferReader(ReaderAdapter):
|
||||
|
|
|
@ -692,7 +692,9 @@ class Broker:
|
|||
try:
|
||||
while True:
|
||||
while running_tasks and running_tasks[0].done():
|
||||
running_tasks.popleft()
|
||||
task = running_tasks.popleft()
|
||||
try: task.result() # make asyncio happy and collect results
|
||||
except Exception: pass
|
||||
broadcast = yield from self._broadcast_queue.get()
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("broadcasting %r" % broadcast)
|
||||
|
@ -724,6 +726,7 @@ class Broker:
|
|||
# Wait until current broadcasting tasks end
|
||||
if running_tasks:
|
||||
yield from asyncio.wait(running_tasks, loop=self._loop)
|
||||
raise # reraise per CancelledError semantics
|
||||
|
||||
@asyncio.coroutine
|
||||
def _broadcast_message(self, session, topic, data, force_qos=None):
|
||||
|
|
|
@ -49,7 +49,10 @@ def read_or_raise(reader, n=-1):
|
|||
:param n: number of bytes to read
|
||||
:return: bytes read
|
||||
"""
|
||||
data = yield from reader.read(n)
|
||||
try:
|
||||
data = yield from reader.read(n)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError):
|
||||
data = None
|
||||
if not data:
|
||||
raise NoDataException("No more data")
|
||||
return data
|
||||
|
|
|
@ -449,9 +449,8 @@ class ProtocolHandler:
|
|||
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
||||
|
||||
yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session)
|
||||
except ConnectionResetError as cre:
|
||||
except (ConnectionResetError, BrokenPipeError):
|
||||
yield from self.handle_connection_closed()
|
||||
raise
|
||||
except BaseException as e:
|
||||
self.logger.warning("Unhandled exception: %s" % e)
|
||||
raise
|
||||
|
|
Ładowanie…
Reference in New Issue