added try_end + send_chunk and fixed get_data

pull/39/head
Ciro 2022-10-25 22:08:13 -03:00
rodzic 8c4ffc918a
commit 39041a9453
3 zmienionych plików z 114 dodań i 53 usunięć

Wyświetl plik

@ -21,7 +21,8 @@ def future_handler(future, loop, exception_handler, response):
if response != None:
response.write_status(500).end("Internal Error")
finally:
return
return None
return None
class Loop:
def __init__(self, exception_handler=None):
@ -60,7 +61,8 @@ class Loop:
#run once asyncio
loop.run_once_asyncio()
#use check for calling asyncio once per tick
self.timer = self.uv_loop.create_check(tick, self)
self.timer = self.uv_loop.create_timer(0, 1, tick, self)
# self.timer = self.uv_loop.create_check(tick, self)
def run(self):
self.uv_loop.run()
@ -69,9 +71,11 @@ class Loop:
self.uv_loop.run_once()
def run_once_asyncio(self):
# with suppress(asyncio.CancelledError):
#run only one step
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
def stop(self):
if(self.started):
@ -98,7 +102,6 @@ class Loop:
#with threads
future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response))
#force asyncio run once to enable req in async functions before first await
self.run_once_asyncio()

Wyświetl plik

@ -289,7 +289,8 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data):
def uws_generic_on_writable_handler(res, offset, user_data):
if not user_data == ffi.NULL:
res = ffi.from_handle(user_data)
return res.trigger_writable_handler(offset)
result = res.trigger_writable_handler(offset)
return result
return False
@ -414,6 +415,10 @@ class AppResponse:
self._grabed_abort_handler_once = False
self._write_jar = None
self._cork_handler = None
self._lastChunkOffset = 0
self._chunkFuture = None
self._dataFuture = None
self._data = None
def cork(self, callback):
if not self.aborted:
@ -516,20 +521,59 @@ class AppResponse:
return None #invalid json
def get_data(self):
future = self.loop.create_future()
data = []
def is_aborted(res):
future.set_result(data)
def get_chunks(res, chunk, is_end):
data.append(chunk)
def send_chunk(self, buffer, total_size):
self._chunkFuture = self.loop.create_future()
self._lastChunkOffset = 0
def is_aborted(self):
self.aborted = True
try:
if not self._chunkFuture.done():
self._chunkFuture.set_result((False, True)) #if aborted set to done True and ok False
except:
pass
def on_writeble(self, offset):
# Here the timeout is off, we can spend as much time before calling try_end we want to
(ok, done) = self.try_end(buffer[offset - self._lastChunkOffset::], total_size)
if ok:
self._chunkFuture.set_result((ok, done))
return ok
self.on_writable(on_writeble)
self.on_aborted(is_aborted)
if self.aborted:
self._chunkFuture.set_result((False, True)) #if aborted set to done True and ok False
return self._chunkFuture
(ok, done) = self.try_end(buffer, total_size)
if ok:
self._chunkFuture.set_result((ok, done))
return self._chunkFuture
#failed to send chunk
self._lastChunkOffset = self.get_write_offset()
return self._chunkFuture
def get_data(self):
self._dataFuture = self.loop.create_future()
self._data = []
def is_aborted(self):
self.aborted = True
try:
if not self._dataFuture.done():
self._dataFuture.set_result(self._data)
except:
pass
def get_chunks(self, chunk, is_end):
self._data.append(chunk)
if is_end:
future.set_result(data)
self._dataFuture.set_result(self._data)
self._data = None
self.on_aborted(is_aborted)
self.on_data(get_chunks)
return future
return self._dataFuture
def grab_aborted_handler(self):
@ -543,24 +587,44 @@ class AppResponse:
self.write_header("Location", location)
self.end_without_body(False)
def try_end(self, message, total_size):
try:
if self.aborted:
return (False, False)
if self._write_jar != None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
self._write_jar = None
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
else:
return (False, False)
result = lib.uws_res_try_end(self.SSL, self.res, data, len(data),ffi.cast("uintmax_t", total_size))
return (bool(result.ok), bool(result.has_responded))
except:
return (False, False)
def end(self, message, end_connection=False):
if not self.aborted:
try:
if self._write_jar != None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
elif message == None:
self.end_without_body(end_connection)
return self
else:
self.write_header(b'Content-Type', b'application/json')
data = json.dumps(message).encode("utf-8")
lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0)
finally:
try:
if self.aborted:
return self
if self._write_jar != None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
self._write_jar = None
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
elif message == None:
self.end_without_body(end_connection)
return self
else:
self.write_header(b'Content-Type', b'application/json')
data = json.dumps(message).encode("utf-8")
lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0)
finally:
return self
def pause(self):
if not self.aborted:
@ -577,11 +641,6 @@ class AppResponse:
lib.uws_res_write_continue(self.SSL, self.res)
return self
# /* Try and end the response. Returns [true, true] on success.
# * Starts a timeout in some cases. Returns [ok, hasResponded] */
# std::pair<bool, bool> tryEnd(std::string_view data, uintmax_t totalSize = 0) {
# return {internalEnd(data, totalSize, true), hasResponded()};
# }
def write_status(self, status_or_status_text):
if not self.aborted:
@ -638,15 +697,9 @@ class AppResponse:
lib.uws_res_write(self.SSL, self.res, data, len(data))
return self
def get_write_offset(self, message):
def get_write_offset(self):
if not self.aborted:
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
else:
data = json.dumps(message).encode("utf-8")
return int(lib.uws_res_get_write_offset(self.SSL, self.res, data, len(data)))
return int(lib.uws_res_get_write_offset(self.SSL, self.res))
return 0
def has_responded(self):
@ -656,23 +709,23 @@ class AppResponse:
def on_aborted(self, handler):
if hasattr(handler, '__call__'):
self.grab_aborted_handler()
self._aborted_handler = handler
self.grab_aborted_handler()
return self
def on_data(self, handler):
if not self.aborted:
if hasattr(handler, '__call__'):
self.grab_aborted_handler()
self._data_handler = handler
self.grab_aborted_handler()
lib.uws_res_on_data(self.SSL, self.res, uws_generic_on_data_handler, self._ptr)
return self
def on_writable(self, handler):
if not self.aborted:
if hasattr(handler, '__call__'):
self.grab_aborted_handler()
self._writable_handler = handler
self.grab_aborted_handler()
lib.uws_res_on_writable(self.SSL, self.res, uws_generic_on_writable_handler, self._ptr)
return self

Wyświetl plik

@ -24,20 +24,25 @@ from socketify import App
import os
import multiprocessing
import asyncio
import aiofiles
#need to fix get_data using sel._data etc
async def home(res, req):
# res.write_header("Content-Type", "plain/text")
await asyncio.sleep(0)
# await asyncio.sleep(0)
# res.write_header("Content-Type", "audio/mpeg")
res.write_header("Content-Type", "application/octet-stream")
def corked(res):
res.write("Test ")
res.end("Hello, World!")
filename = "./file_example_MP3_5MG.mp3"
total = os.stat(filename).st_size
res.cork(corked)
# res.write("Test ")
# res.end("Hello, World!")
# res.end("Hello, World!")
async with aiofiles.open(filename, "rb") as fd:
while not res.aborted:
buffer = await fd.read(16*1024)
(ok, done) = await res.send_chunk(buffer, total)
if not ok or done: #if cannot send probably aborted
break
def run_app():
app = App()