From 3afb7ec2f2dbd8ab68829f2dc0fa8194420bbf16 Mon Sep 17 00:00:00 2001 From: Ciro Date: Sun, 19 Mar 2023 13:18:29 -0300 Subject: [PATCH] fix several bugs, segfault and memory leak --- bench/asgi_wsgi/raw-asgi.py | 2 +- bench/asgi_wsgi/raw-wsgi.py | 18 +- bench/asgi_wsgi/stress.sh | 4 + bench/asgi_wsgi/test.py | 83 +++++++++ bench/socketify_plaintext.py | 2 +- examples/helpers/graphiql.py | 3 +- src/socketify/__init__.py | 2 +- src/socketify/asgi.py | 84 ++++++--- src/socketify/loop.py | 8 +- src/socketify/native.py | 10 +- src/socketify/native/src/libsocketify.cpp | 58 +++--- src/socketify/native/src/libsocketify.h | 214 +++++++++++----------- src/socketify/tasks.py | 23 --- src/socketify/uWebSockets | 2 +- src/socketify/wsgi.py | 28 ++- 15 files changed, 325 insertions(+), 216 deletions(-) create mode 100755 bench/asgi_wsgi/stress.sh create mode 100644 bench/asgi_wsgi/test.py diff --git a/bench/asgi_wsgi/raw-asgi.py b/bench/asgi_wsgi/raw-asgi.py index bb7ad95..4f27673 100644 --- a/bench/asgi_wsgi/raw-asgi.py +++ b/bench/asgi_wsgi/raw-asgi.py @@ -19,4 +19,4 @@ async def app(scope, receive, send): if __name__ == "__main__": - ASGI(app, lifespan=False).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(8) + ASGI(app, lifespan=False).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(1) diff --git a/bench/asgi_wsgi/raw-wsgi.py b/bench/asgi_wsgi/raw-wsgi.py index 738226e..5bf556b 100644 --- a/bench/asgi_wsgi/raw-wsgi.py +++ b/bench/asgi_wsgi/raw-wsgi.py @@ -24,14 +24,24 @@ def app(environ, start_response): yield payload[sended:end] sended = end +# import gc + +# gc.collect() +# gc.set_threshold(50, 3, 3) + +# import tracemalloc + +# tracemalloc.start() + def app_hello(environ, start_response): - start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Length', '13')]) - - yield b'Hello, World!' + # start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Length', '13')]) + start_response('200 OK', [('Content-Type', 'text/plain')]) + + return [ b'Hello, World!'] if __name__ == "__main__": # import fastwsgi - # fastwsgi.run(wsgi_app=app_hello, host='127.0.0.1', port=8000) + # fastwsgi.run(wsgi_app=app_hello, host='127.0.0.1', port=8000, loglevel=0) # from meinheld import server # server.listen(("0.0.0.0", 8000)) # server.run(app_hello) diff --git a/bench/asgi_wsgi/stress.sh b/bench/asgi_wsgi/stress.sh new file mode 100755 index 0000000..c3b5eda --- /dev/null +++ b/bench/asgi_wsgi/stress.sh @@ -0,0 +1,4 @@ +while true +do + wrk -t1 -c200 -d1 -H 'Connection: keep-alive' http://127.0.0.1:8000 > /dev/null +done diff --git a/bench/asgi_wsgi/test.py b/bench/asgi_wsgi/test.py new file mode 100644 index 0000000..1eec24b --- /dev/null +++ b/bench/asgi_wsgi/test.py @@ -0,0 +1,83 @@ +import sys +import io +import time +import datetime +import socket +import optparse + +parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False) +parser.add_option("-h", "--host", dest="host", default='127.0.0.1', type="string") +parser.add_option("-p", "--port", dest="port", default=3000, type="int") +(opt, args) = parser.parse_args() + +def get_request(path = r'/', host = '127.0.0.1', port = 3000): + req = f'GET {path}' + r' HTTP/1.1' + '\r\n' + req += f'Host: {host}:{port}\r\n' + req += r'User-Agent: curl/7.66.0' + '\r\n' + req += r'Accept: */*' + '\r\n' + req += '\r\n' + return req + +payload_tiny = get_request(host = opt.host, port = opt.port) +payload_tiny = payload_tiny.encode('utf-8') + +def create_sock(timeout = 0.001): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((opt.host, opt.port)) + return sock + +sock = create_sock() +sock.sendall(payload_tiny) +time.sleep(0.020) +resp = sock.recv(4096) +print('====== response ========') +print(resp.decode('utf-8')) +print('========================') +sock.close() + +start_time = datetime.datetime.now() +test1_limit = start_time + datetime.timedelta(seconds = 1) +test2_limit = test1_limit + datetime.timedelta(seconds = 10) + +sock = create_sock() +while True: + if datetime.datetime.now() >= test1_limit: + break + sock.sendall(payload_tiny) + try: + resp = sock.recv(4096) + except socket.timeout: + pass + +print(f'Test 1 completed!') +sock.close() + +req_num = 1000*1000 +payload_huge = payload_tiny * req_num +#print(len(payload_huge)) +print(f'Run test 2 ...') +totalsent = 0 +totalresp = b'' +sock = create_sock() +while True: + if datetime.datetime.now() >= test2_limit: + print(f'Test 2: Timeout exceeded!') + break + try: + rc = sock.send(payload_huge[totalsent:]) + if rc == 0: + #raise RuntimeError("socket connection broken") + pass + totalsent += rc + resp = sock.recv(65*1024) + totalresp += resp + except socket.timeout: + pass + except ConnectionResetError: + print(f'totalsent = {totalsent}, totalrecv = {len(totalresp)}') + print(f'LastResp: {totalresp[-256:]}') + raise + +sock.close() +print("==== Test Finish =====") diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index 320e49a..1fd40d9 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -8,7 +8,7 @@ def run_app(): router = app.router() @router.get("/") - def home(res, req): + async def home(res, req): res.send(b"Hello, World!") app.listen( diff --git a/examples/helpers/graphiql.py b/examples/helpers/graphiql.py index 62cae48..699391f 100644 --- a/examples/helpers/graphiql.py +++ b/examples/helpers/graphiql.py @@ -27,11 +27,12 @@ def graphiql_from(Query, Mutation=None): async def graph_ql(res, body, context_value): query = body["query"] + variables = body.get("variables", None) root_value = body.get("rootValue", None) operation_name = body.get("operationName", None) - + data = await schema.execute( query, variables, diff --git a/src/socketify/__init__.py b/src/socketify/__init__.py index ecb974e..f9e5bdc 100644 --- a/src/socketify/__init__.py +++ b/src/socketify/__init__.py @@ -1,7 +1,7 @@ import asyncio from .dataclasses import AppListenOptions, AppOptions - +from .tasks import TaskFactory, create_task, RequestTask from .socketify import ( App, OpCode, diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 24375f2..94bf2b5 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -10,7 +10,13 @@ import uuid import asyncio is_pypy = platform.python_implementation() == "PyPy" - +@ffi.callback("void(uws_res_t*, void*)") +def asgi_on_abort_handler(res, user_data): + ctx = ffi.from_handle(user_data) + ctx.aborted = True + if ctx.abort_future is not None: + ctx.abort_future.set_result(True) + ctx.abort_future = None async def task_wrapper(task): try: @@ -49,9 +55,9 @@ def ws_open(ws, user_data): @ffi.callback( - "void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*, bool*)" + "void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*)" ) -def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): +def ws_upgrade(ssl, response, info, socket_context, user_data): app = ffi.from_handle(user_data) headers = [] next_header = info.header_list @@ -82,6 +88,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): extensions = ffi.unpack(info.extensions, info.extensions_size).decode("utf8") compress = app.ws_compression ws = ASGIWebSocket(app.server.loop) + lib.uws_res_on_aborted(ssl, response, asgi_on_abort_handler, ws._ptr) scope = { "type": "websocket", @@ -108,7 +115,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): } async def send(options): - if bool(aborted[0]): + if ws.aborted: return False type = options["type"] if type == "websocket.send": @@ -258,6 +265,22 @@ class ASGIDataQueue: self.is_end = False self.next_data_future = loop.create_future() +class ASGIContext: + def __init__(self, ssl, response, loop): + self._ptr = ffi.new_handle(self) + self.aborted = False + self.sended_empty = False + self.data_queue = None + self.ssl = ssl + self.response = response + self.loop = loop + self.abort_future = None + + async def wait_disconnect(self): + if not self.aborted: + if self.abort_future is None: + self.abort_future = self.loop.create_future() + await self.abort_future class ASGIWebSocket: def __init__(self, loop): @@ -272,6 +295,14 @@ class ASGIWebSocket: self._message = None self._ptr = ffi.new_handle(self) self.unregister = None + self.aborted = False + self.abort_future = None + + async def wait_disconnect(self): + if not self.aborted: + if self.abort_future is None: + self.abort_future = self.loop.create_future() + await self.abort_future def accept(self): self.accept_future = self.loop.create_future() @@ -404,8 +435,8 @@ def uws_asgi_corked_403_handler(res, user_data): lib.uws_res_end_without_body(ssl, res, 0) -@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*, bool*)") -def asgi(ssl, response, info, user_data, aborted): +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*)") +def asgi(ssl, response, info, user_data): app = ffi.from_handle(user_data) headers = [] @@ -436,18 +467,21 @@ def asgi(ssl, response, info, user_data, aborted): "raw_path": url, "query_string": ffi.unpack(info.query_string, info.query_string_size), "headers": headers, + } + loop = app.server.loop + ctx = ASGIContext(ssl, response, loop) if bool(info.has_content): - data_queue = ASGIDataQueue(app.server.loop) + data_queue = ASGIDataQueue(loop) lib.uws_res_on_data(ssl, response, asgi_on_data_handler, data_queue._ptr) - else: - data_queue = None + ctx.data_queue = data_queue + + lib.uws_res_on_aborted(ssl, response, asgi_on_abort_handler, ctx._ptr) - sended_empty = False async def receive(): - nonlocal sended_empty - if bool(aborted[0]): + if ctx.aborted: return {"type": "http.disconnect"} + data_queue = ctx.data_queue if data_queue: if data_queue.queue.empty(): if not data_queue.is_end: @@ -460,19 +494,21 @@ def asgi(ssl, response, info, user_data, aborted): return data_queue.queue.get(False) # consume queue # no more body, just EMPTY RESPONSE - if not sended_empty: - sended_empty = True + if not ctx.sended_empty: + ctx.sended_empty = True return EMPTY_RESPONSE # already sended empty body so wait for aborted request - while not bool(aborted[0]): - await asyncio.sleep(0.01) #10ms is good enought + if not ctx.aborted: + await ctx.wait_disconnect() return {"type": "http.disconnect"} async def send(options): - if bool(aborted[0]): + if ctx.aborted: return False type = options["type"] + ssl = ctx.ssl + response = ctx.response if type == "http.response.start": # can also be more native optimized to do it in one GIL call # try socketify_res_write_int_status_with_headers and create and socketify_res_cork_write_int_status_with_headers @@ -501,6 +537,12 @@ def asgi(ssl, response, info, user_data, aborted): elif isinstance(message, str): data = message.encode("utf-8") lib.socketify_res_cork_end(ssl, response, data, len(data), 0) + + + if ctx.abort_future is not None: + ctx.aborted = True + ctx.abort_future.set_result(False) + ctx.abort_future = None return True return False @@ -546,14 +588,6 @@ class _ASGI: self._run_task = run_task else: - if sys.version_info >= (3, 8): # name fixed to avoid dynamic name - - def run_task(task): - future = create_task(loop, task_wrapper(task)) - future._log_destroy_pending = False - - self._run_task = run_task - else: def run_task(task): future = create_task(loop, task_wrapper(task)) diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 86bbc37..e1cb096 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -78,8 +78,12 @@ class Loop: def _keep_alive(self): if self.started: - self.uv_loop.run_nowait() - self.loop.call_soon(self._keep_alive) + if int(self.uv_loop.run_nowait()) > 1: + # be more agressive when needed + self.loop.call_soon(self._keep_alive) + else: + # this will relax CPU usage a lot when idle + self.loop.call_later(0.001, self._keep_alive) def create_task(self, *args, **kwargs): # this is not using optimized create_task yet diff --git a/src/socketify/native.py b/src/socketify/native.py index 65742f4..c80d09a 100644 --- a/src/socketify/native.py +++ b/src/socketify/native.py @@ -352,21 +352,21 @@ typedef struct { socketify_header* header_list; } socketify_asgi_ws_data; -typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data, bool* aborted); +typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data); typedef struct { int ssl; uws_app_t* app; socketify_asgi_method_handler handler; void * user_data; } socksocketify_asgi_app_info; -typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t* socket, void *user_data, bool* aborted); +typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t* socket, void *user_data); typedef struct { int ssl; uws_app_t* app; socketify_asgi_ws_method_handler handler; uws_socket_behavior_t behavior; void * user_data; -} socksocketify_asgi_ws_app_info; +} socketify_asgi_ws_app_info; socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res); void socketify_destroy_headers(socketify_header* headers); @@ -376,8 +376,8 @@ socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_re bool socketify_res_write_int_status(int ssl, uws_res_t* res, int code); socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data); void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app); -socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data); -void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app); +socketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data); +void socketify_destroy_asgi_ws_app_info(socketify_asgi_ws_app_info* app); void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length); void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection); diff --git a/src/socketify/native/src/libsocketify.cpp b/src/socketify/native/src/libsocketify.cpp index ce41c83..813983d 100644 --- a/src/socketify/native/src/libsocketify.cpp +++ b/src/socketify/native/src/libsocketify.cpp @@ -114,7 +114,8 @@ extern "C" if (ssl) { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->cork([=](){ + uwsRes->cork([=]() + { socketify_res_write_int_status(ssl, res, code); if (content_type && content_type_size) { @@ -128,13 +129,13 @@ extern "C" else { uws_res_end_without_body(ssl, res, close_connection); - } - }); + } }); } else { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->cork([=](){ + uwsRes->cork([=]() + { socketify_res_write_int_status(ssl, res, code); @@ -150,8 +151,7 @@ extern "C" else { uws_res_end_without_body(ssl, res, close_connection); - } - }); + } }); } } @@ -161,7 +161,8 @@ extern "C" if (ssl) { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->cork([=](){ + uwsRes->cork([=]() + { uws_res_write_status(ssl, res, status_code, status_code_size); @@ -177,13 +178,13 @@ extern "C" else { uws_res_end_without_body(ssl, res, close_connection); - } - }); + } }); } else { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->cork([=](){ + uwsRes->cork([=]() + { uws_res_write_status(ssl, res, status_code, status_code_size); @@ -199,8 +200,7 @@ extern "C" else { uws_res_end_without_body(ssl, res, close_connection); - } - }); + } }); } } @@ -415,15 +415,7 @@ extern "C" { socksocketify_asgi_app_info *info = ((socksocketify_asgi_app_info *)user_data); socketify_asgi_data data = socketify_asgi_request(info->ssl, request, response); - bool *aborted = (bool *)malloc(sizeof(aborted)); - *aborted = false; - uws_res_on_aborted( - info->ssl, response, [](uws_res_t *res, void *opcional_data) - { - bool* aborted = (bool*)opcional_data; - *aborted = true; }, - aborted); - info->handler(info->ssl, response, data, info->user_data, aborted); + info->handler(info->ssl, response, data, info->user_data); socketify_destroy_headers(data.header_list); } @@ -490,9 +482,9 @@ extern "C" } } - socksocketify_asgi_ws_app_info *socketify_add_asgi_ws_handler(int ssl, uws_app_t *app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void *user_data) + socketify_asgi_ws_app_info *socketify_add_asgi_ws_handler(int ssl, uws_app_t *app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void *user_data) { - socksocketify_asgi_ws_app_info *info = (socksocketify_asgi_ws_app_info *)malloc(sizeof(socksocketify_asgi_ws_app_info)); + socketify_asgi_ws_app_info *info = (socketify_asgi_ws_app_info *)malloc(sizeof(socketify_asgi_ws_app_info)); info->ssl = ssl; info->app = app; info->handler = handler; @@ -505,35 +497,27 @@ extern "C" ws_behavior.upgrade = [](uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void *user_data) { - socksocketify_asgi_ws_app_info *info = ((socksocketify_asgi_ws_app_info *)user_data); + socketify_asgi_ws_app_info *info = ((socketify_asgi_ws_app_info *)user_data); socketify_asgi_ws_data data = socketify_asgi_ws_request(info->ssl, request, response); - bool *aborted = (bool *)malloc(sizeof(aborted)); - *aborted = false; - uws_res_on_aborted( - info->ssl, response, [](uws_res_t *res, void *opcional_data) - { - bool* aborted = (bool*)opcional_data; - *aborted = true; }, - aborted); - info->handler(info->ssl, response, data, context, info->user_data, aborted); + info->handler(info->ssl, response, data, context, info->user_data); socketify_destroy_headers(data.header_list); }; ws_behavior.open = [](uws_websocket_t *ws, void *user_data) { - socksocketify_asgi_ws_app_info *info = ((socksocketify_asgi_ws_app_info *)user_data); + socketify_asgi_ws_app_info *info = ((socketify_asgi_ws_app_info *)user_data); auto socket_data = uws_ws_get_user_data(info->ssl, ws); info->behavior.open(ws, socket_data); }; ws_behavior.message = [](uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void *user_data) { - socksocketify_asgi_ws_app_info *info = ((socksocketify_asgi_ws_app_info *)user_data); + socketify_asgi_ws_app_info *info = ((socketify_asgi_ws_app_info *)user_data); auto socket_data = uws_ws_get_user_data(info->ssl, ws); info->behavior.message(ws, message, length, opcode, socket_data); }; ws_behavior.close = [](uws_websocket_t *ws, int code, const char *message, size_t length, void *user_data) { - socksocketify_asgi_ws_app_info *info = ((socksocketify_asgi_ws_app_info *)user_data); + socketify_asgi_ws_app_info *info = ((socketify_asgi_ws_app_info *)user_data); auto socket_data = uws_ws_get_user_data(info->ssl, ws); info->behavior.close(ws, code, message, length, socket_data); }; @@ -559,7 +543,7 @@ extern "C" { free(app); } - void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info *app) + void socketify_destroy_asgi_ws_app_info(socketify_asgi_ws_app_info *app) { free(app); } diff --git a/src/socketify/native/src/libsocketify.h b/src/socketify/native/src/libsocketify.h index ca53c0d..79cdf1c 100644 --- a/src/socketify/native/src/libsocketify.h +++ b/src/socketify/native/src/libsocketify.h @@ -8,140 +8,144 @@ extern "C" { #endif -DLL_EXPORT typedef void (*socketify_prepare_handler)(void* user_data); -DLL_EXPORT typedef void (*socketify_timer_handler)(void* user_data); + DLL_EXPORT typedef void (*socketify_prepare_handler)(void *user_data); + DLL_EXPORT typedef void (*socketify_timer_handler)(void *user_data); -DLL_EXPORT typedef enum { - SOCKETIFY_RUN_DEFAULT = 0, - SOCKETIFY_RUN_ONCE, - SOCKETIFY_RUN_NOWAIT -} socketify_run_mode; + DLL_EXPORT typedef enum { + SOCKETIFY_RUN_DEFAULT = 0, + SOCKETIFY_RUN_ONCE, + SOCKETIFY_RUN_NOWAIT + } socketify_run_mode; -DLL_EXPORT typedef struct { - void* uv_prepare_ptr; + DLL_EXPORT typedef struct + { + void *uv_prepare_ptr; socketify_prepare_handler on_prepare_handler; - void* on_prepare_data; - void* uv_loop; -} socketify_loop; + void *on_prepare_data; + void *uv_loop; + } socketify_loop; -DLL_EXPORT typedef struct{ - void* uv_timer_ptr; + DLL_EXPORT typedef struct + { + void *uv_timer_ptr; socketify_timer_handler handler; - void* user_data; -} socketify_timer; + void *user_data; + } socketify_timer; + DLL_EXPORT typedef struct + { -DLL_EXPORT typedef struct { + const char *name; + const char *value; - const char* name; - const char* value; - - size_t name_size; - size_t value_size; - - void* next; -} socketify_header; + size_t name_size; + size_t value_size; + void *next; + } socketify_header; -DLL_EXPORT typedef struct { + DLL_EXPORT typedef struct + { - const char* full_url; - const char* url; - const char* query_string; - const char* method; - const char* remote_address; + const char *full_url; + const char *url; + const char *query_string; + const char *method; + const char *remote_address; - size_t full_url_size; - size_t url_size; - size_t query_string_size; - size_t method_size; - size_t remote_address_size; + size_t full_url_size; + size_t url_size; + size_t query_string_size; + size_t method_size; + size_t remote_address_size; - socketify_header* header_list; + socketify_header *header_list; - bool has_content; -} socketify_asgi_data; + bool has_content; + } socketify_asgi_data; -DLL_EXPORT typedef struct { + DLL_EXPORT typedef struct + { - const char* full_url; - const char* url; - const char* query_string; - const char* method; - const char* remote_address; + const char *full_url; + const char *url; + const char *query_string; + const char *method; + const char *remote_address; - size_t full_url_size; - size_t url_size; - size_t query_string_size; - size_t method_size; - size_t remote_address_size; + size_t full_url_size; + size_t url_size; + size_t query_string_size; + size_t method_size; + size_t remote_address_size; - const char* protocol; - const char* key; - const char* extensions; - size_t protocol_size; - size_t key_size; - size_t extensions_size; + const char *protocol; + const char *key; + const char *extensions; + size_t protocol_size; + size_t key_size; + size_t extensions_size; - socketify_header* header_list; -} socketify_asgi_ws_data; + socketify_header *header_list; + } socketify_asgi_ws_data; -DLL_EXPORT typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data, bool* aborted); -DLL_EXPORT typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t* socket, void *user_data, bool* aborted); -DLL_EXPORT typedef struct { - int ssl; - uws_app_t* app; - socketify_asgi_method_handler handler; - void * user_data; -} socksocketify_asgi_app_info; + DLL_EXPORT typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data); + DLL_EXPORT typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t *socket, void *user_data); + DLL_EXPORT typedef struct + { + int ssl; + uws_app_t *app; + socketify_asgi_method_handler handler; + void *user_data; + } socksocketify_asgi_app_info; -DLL_EXPORT typedef struct { - int ssl; - uws_app_t* app; - socketify_asgi_ws_method_handler handler; - uws_socket_behavior_t behavior; - void * user_data; -} socksocketify_asgi_ws_app_info; + DLL_EXPORT typedef struct + { + int ssl; + uws_app_t *app; + socketify_asgi_ws_method_handler handler; + uws_socket_behavior_t behavior; + void *user_data; + } socketify_asgi_ws_app_info; + DLL_EXPORT socketify_loop *socketify_create_loop(); + DLL_EXPORT bool socketify_constructor_failed(socketify_loop *loop); + DLL_EXPORT bool socketify_on_prepare(socketify_loop *loop, socketify_prepare_handler handler, void *user_data); + DLL_EXPORT bool socketify_prepare_unbind(socketify_loop *loop); + DLL_EXPORT void socketify_destroy_loop(socketify_loop *loop); + DLL_EXPORT void *socketify_get_native_loop(socketify_loop *loop); -DLL_EXPORT socketify_loop * socketify_create_loop(); -DLL_EXPORT bool socketify_constructor_failed(socketify_loop* loop); -DLL_EXPORT bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data); -DLL_EXPORT bool socketify_prepare_unbind(socketify_loop* loop); -DLL_EXPORT void socketify_destroy_loop(socketify_loop* loop); -DLL_EXPORT void* socketify_get_native_loop(socketify_loop* loop); + DLL_EXPORT int socketify_loop_run(socketify_loop *loop, socketify_run_mode mode); + DLL_EXPORT void socketify_loop_stop(socketify_loop *loop); -DLL_EXPORT int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode); -DLL_EXPORT void socketify_loop_stop(socketify_loop* loop); + DLL_EXPORT socketify_timer *socketify_create_timer(socketify_loop *loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void *user_data); + DLL_EXPORT void socketify_timer_destroy(socketify_timer *timer); + DLL_EXPORT void socketify_timer_set_repeat(socketify_timer *timer, uint64_t repeat); -DLL_EXPORT socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void* user_data); -DLL_EXPORT void socketify_timer_destroy(socketify_timer* timer); -DLL_EXPORT void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat); + DLL_EXPORT socketify_timer *socketify_create_check(socketify_loop *loop, socketify_timer_handler handler, void *user_data); + DLL_EXPORT void socketify_check_destroy(socketify_timer *timer); -DLL_EXPORT socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data); -DLL_EXPORT void socketify_check_destroy(socketify_timer* timer); + DLL_EXPORT socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res); + DLL_EXPORT void socketify_destroy_headers(socketify_header *headers); + DLL_EXPORT bool socketify_res_write_int_status_with_headers(int ssl, uws_res_t *res, int code, socketify_header *headers); + DLL_EXPORT void socketify_res_write_headers(int ssl, uws_res_t *res, socketify_header *headers); + DLL_EXPORT bool socketify_res_write_int_status(int ssl, uws_res_t *res, int code); + DLL_EXPORT socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_res_t *res); -DLL_EXPORT socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res); -DLL_EXPORT void socketify_destroy_headers(socketify_header* headers); -DLL_EXPORT bool socketify_res_write_int_status_with_headers(int ssl, uws_res_t* res, int code, socketify_header* headers); -DLL_EXPORT void socketify_res_write_headers(int ssl, uws_res_t* res, socketify_header* headers); -DLL_EXPORT bool socketify_res_write_int_status(int ssl, uws_res_t* res, int code); -DLL_EXPORT socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_res_t *res); + DLL_EXPORT socksocketify_asgi_app_info *socketify_add_asgi_http_handler(int ssl, uws_app_t *app, socketify_asgi_method_handler handler, void *user_data); + DLL_EXPORT void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info *app); -DLL_EXPORT socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data); -DLL_EXPORT void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app); + DLL_EXPORT void socketify_res_cork_write(int ssl, uws_res_t *response, const char *data, size_t length); + DLL_EXPORT void socketify_res_cork_end(int ssl, uws_res_t *response, const char *data, size_t length, bool close_connection); -DLL_EXPORT void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length); -DLL_EXPORT void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection); - -DLL_EXPORT socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data); -DLL_EXPORT void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app); -DLL_EXPORT void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode); -DLL_EXPORT void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool fin); -DLL_EXPORT void socketify_res_send_int_code(int ssl, uws_res_t *res, const char* content_data, size_t content_data_size, int code, const char *content_type, size_t content_type_size, bool close_connection); -DLL_EXPORT void socketify_res_send(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, const char *status_code, size_t status_code_size, const char *content_type, size_t content_type_size, bool close_connection); -DLL_EXPORT void socketify_res_cork_send_int_code(int ssl, uws_res_t *res, const char* content_data, size_t content_data_size, int code, const char *content_type, size_t content_type_size, bool close_connection); -DLL_EXPORT void socketify_res_cork_send(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, const char *status_code, size_t status_code_size, const char *content_type, size_t content_type_size, bool close_connection); + DLL_EXPORT socketify_asgi_ws_app_info *socketify_add_asgi_ws_handler(int ssl, uws_app_t *app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void *user_data); + DLL_EXPORT void socketify_destroy_asgi_ws_app_info(socketify_asgi_ws_app_info *app); + DLL_EXPORT void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char *data, size_t length, uws_opcode_t opcode); + DLL_EXPORT void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char *data, size_t length, uws_opcode_t opcode, bool compress, bool fin); + DLL_EXPORT void socketify_res_send_int_code(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, int code, const char *content_type, size_t content_type_size, bool close_connection); + DLL_EXPORT void socketify_res_send(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, const char *status_code, size_t status_code_size, const char *content_type, size_t content_type_size, bool close_connection); + DLL_EXPORT void socketify_res_cork_send_int_code(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, int code, const char *content_type, size_t content_type_size, bool close_connection); + DLL_EXPORT void socketify_res_cork_send(int ssl, uws_res_t *res, const char *content_data, size_t content_data_size, const char *status_code, size_t status_code_size, const char *content_type, size_t content_type_size, bool close_connection); #endif #ifdef __cplusplus } diff --git a/src/socketify/tasks.py b/src/socketify/tasks.py index b56c52d..aa6cefc 100644 --- a/src/socketify/tasks.py +++ b/src/socketify/tasks.py @@ -590,29 +590,6 @@ class RequestTask: __iter__ = __await__ # make compatible with 'yield from'. -# def create_task_with_factory(task_factory_max_items=100_000): -# items = [] -# for _ in range(0, task_factory_max_items): -# task = RequestTask(None, None, None, True) -# if task._source_traceback: -# del task._source_traceback[-1] -# items.append(task) - -# def factory(loop, coro, default_done_callback=None): -# if len(items) == 0: -# return create_task(loop, coro, default_done_callback) -# task = items.pop() - -# def done(f): -# if default_done_callback is not None: -# default_done_callback(f) -# items.append(f) - -# task._reuse(coro, loop, done) -# return task - -# return factory - async def factory_task_wrapper(task, dispose): try: await task diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index 216c8b9..623ee82 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit 216c8b9ea20ec8719175b136cf86cb51b316382d +Subproject commit 623ee82841d32f85e54873135637e9a5ddf15618 diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index b233059..3e6e99c 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -221,9 +221,11 @@ def wsgi_corked_response_start_handler(res, user_data): data_response.on_data(data_response, res) -@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") -def wsgi(ssl, response, info, user_data, aborted): + +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*)") +def wsgi(ssl, response, info, user_data): app = ffi.from_handle(user_data) + # reusing the dict is slower than cloning because we need to clear HTTP headers environ = dict(app.BASIC_ENVIRON) @@ -355,16 +357,19 @@ def wsgi(ssl, response, info, user_data, aborted): return write - failed_chunks = None - last_offset = -1 - data_retry = None + + # check for body if bool(info.has_content): WSGI_INPUT = BytesIO() environ["wsgi.input"] = WSGIBody(WSGI_INPUT) def on_data(data_response, response): - nonlocal failed_chunks, last_offset, data_retry + last_offset = -1 + data_retry = None + failed_chunks = None + + if data_response.aborted: return @@ -372,15 +377,18 @@ def wsgi(ssl, response, info, user_data, aborted): data_response.environ["CONTENT_LENGTH"] = str( data_response.buffer.getbuffer().nbytes ) + if data_response.id is not None: + data_response.app._data_refs.pop(data_response.id, None) + app_iter = data_response.app.wsgi( data_response.environ, data_response.start_response ) + try: for data in app_iter: if data: if not headers_written: write_headers(headers_set) - content_length = ffi.cast("uintmax_t", content_length) if is_chunked: if isinstance(data, bytes): @@ -434,8 +442,6 @@ def wsgi(ssl, response, info, user_data, aborted): ssl, response ) - - data_response = WSGIDataResponse( app, environ, start_response, WSGI_INPUT, on_data ) @@ -445,6 +451,9 @@ def wsgi(ssl, response, info, user_data, aborted): lib.uws_res_on_aborted(ssl, response, wsgi_on_data_ref_abort_handler, data_response._ptr) lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr) else: + failed_chunks = None + last_offset = -1 + data_retry = None environ["wsgi.input"] = None app_iter = app.wsgi(environ, start_response) result = None @@ -509,7 +518,6 @@ def wsgi(ssl, response, info, user_data, aborted): ) - def is_asgi(module): return ( hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3