kopia lustrzana https://github.com/Yakifo/amqtt
Fix buffer management when feeding
rodzic
673dd9f293
commit
7ecf802e2d
|
@ -52,6 +52,7 @@ class WriterAdapter:
|
|||
Close the protocol connection
|
||||
"""
|
||||
|
||||
|
||||
class WebSocketsReader(ReaderAdapter):
|
||||
"""
|
||||
WebSockets API reader adapter
|
||||
|
@ -64,7 +65,8 @@ class WebSocketsReader(ReaderAdapter):
|
|||
@asyncio.coroutine
|
||||
def read(self, n=-1) -> bytes:
|
||||
yield from self._feed_buffer(n)
|
||||
return self._stream.read(n)
|
||||
data = self._stream.read(n)
|
||||
return data
|
||||
|
||||
@asyncio.coroutine
|
||||
def _feed_buffer(self, n=1):
|
||||
|
@ -72,13 +74,15 @@ class WebSocketsReader(ReaderAdapter):
|
|||
Feed the data buffer by reading a Websocket message.
|
||||
:param n: if given, feed buffer until it contains at least n bytes
|
||||
"""
|
||||
while len(self._stream.getbuffer()) < n:
|
||||
buffer = bytearray(self._stream.read())
|
||||
while len(buffer) < n:
|
||||
message = yield from self._protocol.recv()
|
||||
if message is None:
|
||||
break
|
||||
if not type(message, bytes):
|
||||
if not isinstance(message, bytes):
|
||||
raise TypeError("message must be bytes")
|
||||
self._stream.getbuffer().append(message)
|
||||
buffer.extend(message)
|
||||
self._stream = io.BytesIO(buffer)
|
||||
|
||||
|
||||
class WebSocketsWriter(WriterAdapter):
|
||||
|
@ -101,7 +105,9 @@ class WebSocketsWriter(WriterAdapter):
|
|||
"""
|
||||
Let the write buffer of the underlying transport a chance to be flushed.
|
||||
"""
|
||||
yield from self._protocol.send(self._stream.getbuffer())
|
||||
data = self._stream.getvalue()
|
||||
if len(data):
|
||||
yield from self._protocol.send(data)
|
||||
self._stream = io.BytesIO(b'')
|
||||
|
||||
def get_peer_info(self):
|
||||
|
@ -184,7 +190,7 @@ class BufferWriter(WriterAdapter):
|
|||
self._stream = io.BytesIO(b'')
|
||||
|
||||
def get_buffer(self):
|
||||
return self._stream.getbuffer()
|
||||
return self._stream.getvalue()
|
||||
|
||||
def get_peer_info(self):
|
||||
return "BufferWriter", 0
|
||||
|
|
Ładowanie…
Reference in New Issue