diff --git a/src/socketify/loop.py b/src/socketify/loop.py index e0dd1bd..348642e 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -21,7 +21,8 @@ def future_handler(future, loop, exception_handler, response): if response != None: response.write_status(500).end("Internal Error") finally: - return + return None + return None class Loop: def __init__(self, exception_handler=None): @@ -60,7 +61,8 @@ class Loop: #run once asyncio loop.run_once_asyncio() #use check for calling asyncio once per tick - self.timer = self.uv_loop.create_check(tick, self) + self.timer = self.uv_loop.create_timer(0, 1, tick, self) + # self.timer = self.uv_loop.create_check(tick, self) def run(self): self.uv_loop.run() @@ -69,9 +71,11 @@ class Loop: self.uv_loop.run_once() def run_once_asyncio(self): + # with suppress(asyncio.CancelledError): #run only one step self.loop.call_soon(self.loop.stop) self.loop.run_forever() + def stop(self): if(self.started): @@ -98,7 +102,6 @@ class Loop: #with threads future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response)) - #force asyncio run once to enable req in async functions before first await self.run_once_asyncio() diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 96287e6..186eab6 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -289,7 +289,8 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data): def uws_generic_on_writable_handler(res, offset, user_data): if not user_data == ffi.NULL: res = ffi.from_handle(user_data) - return res.trigger_writable_handler(offset) + result = res.trigger_writable_handler(offset) + return result return False @@ -414,6 +415,10 @@ class AppResponse: self._grabed_abort_handler_once = False self._write_jar = None self._cork_handler = None + self._lastChunkOffset = 0 + self._chunkFuture = None + self._dataFuture = None + self._data = None def cork(self, callback): if not self.aborted: @@ -516,20 +521,59 @@ class AppResponse: return None #invalid json - def get_data(self): - future = self.loop.create_future() - data = [] - def is_aborted(res): - future.set_result(data) - def get_chunks(res, chunk, is_end): - data.append(chunk) + + def send_chunk(self, buffer, total_size): + self._chunkFuture = self.loop.create_future() + self._lastChunkOffset = 0 + def is_aborted(self): + self.aborted = True + try: + if not self._chunkFuture.done(): + self._chunkFuture.set_result((False, True)) #if aborted set to done True and ok False + except: + pass + def on_writeble(self, offset): + # Here the timeout is off, we can spend as much time before calling try_end we want to + (ok, done) = self.try_end(buffer[offset - self._lastChunkOffset::], total_size) + if ok: + self._chunkFuture.set_result((ok, done)) + return ok + self.on_writable(on_writeble) + self.on_aborted(is_aborted) + + if self.aborted: + self._chunkFuture.set_result((False, True)) #if aborted set to done True and ok False + return self._chunkFuture + + (ok, done) = self.try_end(buffer, total_size) + if ok: + self._chunkFuture.set_result((ok, done)) + return self._chunkFuture + #failed to send chunk + self._lastChunkOffset = self.get_write_offset() + + return self._chunkFuture + + def get_data(self): + self._dataFuture = self.loop.create_future() + self._data = [] + def is_aborted(self): + self.aborted = True + try: + if not self._dataFuture.done(): + self._dataFuture.set_result(self._data) + except: + pass + def get_chunks(self, chunk, is_end): + self._data.append(chunk) if is_end: - future.set_result(data) + self._dataFuture.set_result(self._data) + self._data = None self.on_aborted(is_aborted) self.on_data(get_chunks) - return future + return self._dataFuture def grab_aborted_handler(self): @@ -543,24 +587,44 @@ class AppResponse: self.write_header("Location", location) self.end_without_body(False) + def try_end(self, message, total_size): + try: + if self.aborted: + return (False, False) + if self._write_jar != None: + self.write_header("Set-Cookie", self._write_jar.output(header="")) + self._write_jar = None + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + return (False, False) + result = lib.uws_res_try_end(self.SSL, self.res, data, len(data),ffi.cast("uintmax_t", total_size)) + return (bool(result.ok), bool(result.has_responded)) + except: + return (False, False) + def end(self, message, end_connection=False): - if not self.aborted: - try: - if self._write_jar != None: - self.write_header("Set-Cookie", self._write_jar.output(header="")) - if isinstance(message, str): - data = message.encode("utf-8") - elif isinstance(message, bytes): - data = message - elif message == None: - self.end_without_body(end_connection) - return self - else: - self.write_header(b'Content-Type', b'application/json') - data = json.dumps(message).encode("utf-8") - lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0) - finally: + try: + if self.aborted: + return self + if self._write_jar != None: + self.write_header("Set-Cookie", self._write_jar.output(header="")) + self._write_jar = None + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + self.end_without_body(end_connection) return self + else: + self.write_header(b'Content-Type', b'application/json') + data = json.dumps(message).encode("utf-8") + lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0) + finally: + return self def pause(self): if not self.aborted: @@ -577,11 +641,6 @@ class AppResponse: lib.uws_res_write_continue(self.SSL, self.res) return self - # /* Try and end the response. Returns [true, true] on success. - # * Starts a timeout in some cases. Returns [ok, hasResponded] */ - # std::pair tryEnd(std::string_view data, uintmax_t totalSize = 0) { - # return {internalEnd(data, totalSize, true), hasResponded()}; - # } def write_status(self, status_or_status_text): if not self.aborted: @@ -638,15 +697,9 @@ class AppResponse: lib.uws_res_write(self.SSL, self.res, data, len(data)) return self - def get_write_offset(self, message): + def get_write_offset(self): if not self.aborted: - if isinstance(message, str): - data = message.encode("utf-8") - elif isinstance(message, bytes): - data = message - else: - data = json.dumps(message).encode("utf-8") - return int(lib.uws_res_get_write_offset(self.SSL, self.res, data, len(data))) + return int(lib.uws_res_get_write_offset(self.SSL, self.res)) return 0 def has_responded(self): @@ -656,23 +709,23 @@ class AppResponse: def on_aborted(self, handler): if hasattr(handler, '__call__'): - self.grab_aborted_handler() self._aborted_handler = handler + self.grab_aborted_handler() return self def on_data(self, handler): if not self.aborted: if hasattr(handler, '__call__'): - self.grab_aborted_handler() self._data_handler = handler + self.grab_aborted_handler() lib.uws_res_on_data(self.SSL, self.res, uws_generic_on_data_handler, self._ptr) return self def on_writable(self, handler): if not self.aborted: if hasattr(handler, '__call__'): - self.grab_aborted_handler() self._writable_handler = handler + self.grab_aborted_handler() lib.uws_res_on_writable(self.SSL, self.res, uws_generic_on_writable_handler, self._ptr) return self diff --git a/src/tests.py b/src/tests.py index 36b7570..461d1b8 100644 --- a/src/tests.py +++ b/src/tests.py @@ -24,20 +24,25 @@ from socketify import App import os import multiprocessing import asyncio +import aiofiles +#need to fix get_data using sel._data etc async def home(res, req): # res.write_header("Content-Type", "plain/text") - await asyncio.sleep(0) + # await asyncio.sleep(0) + # res.write_header("Content-Type", "audio/mpeg") + res.write_header("Content-Type", "application/octet-stream") - def corked(res): - res.write("Test ") - res.end("Hello, World!") + filename = "./file_example_MP3_5MG.mp3" + total = os.stat(filename).st_size - res.cork(corked) - # res.write("Test ") - # res.end("Hello, World!") - # res.end("Hello, World!") + async with aiofiles.open(filename, "rb") as fd: + while not res.aborted: + buffer = await fd.read(16*1024) + (ok, done) = await res.send_chunk(buffer, total) + if not ok or done: #if cannot send probably aborted + break def run_app(): app = App()