diff --git a/pyproject.toml b/pyproject.toml index 6303699..987b70c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "socketify" -version = "0.0.20" +version = "0.0.21" authors = [ { name="Ciro Spaciari", email="ciro.spaciari@gmail.com" }, ] diff --git a/setup.py b/setup.py index 48afb82..ff40a26 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ with open("README.md", "r", encoding="utf-8") as fh: setuptools.setup( name="socketify", - version="0.0.20", + version="0.0.21", platforms=["any"], author="Ciro Spaciari", author_email="ciro.spaciari@gmail.com", diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 94bf2b5..2ed0c3c 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -14,6 +14,8 @@ is_pypy = platform.python_implementation() == "PyPy" def asgi_on_abort_handler(res, user_data): ctx = ffi.from_handle(user_data) ctx.aborted = True + ctx.loop.is_idle = False + if ctx.abort_future is not None: ctx.abort_future.set_result(True) ctx.abort_future = None @@ -59,6 +61,7 @@ def ws_open(ws, user_data): ) def ws_upgrade(ssl, response, info, socket_context, user_data): app = ffi.from_handle(user_data) + app.server.loop.is_idle = False headers = [] next_header = info.header_list while next_header != ffi.NULL: @@ -117,6 +120,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data): async def send(options): if ws.aborted: return False + ws.loop.is_idle = False type = options["type"] if type == "websocket.send": data = options.get("bytes", None) @@ -244,6 +248,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data): @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response = ffi.from_handle(user_data) + data_response.loop.is_idle = False data_response.is_end = bool(is_end) more_body = not data_response.is_end result = { @@ -438,7 +443,8 @@ def uws_asgi_corked_403_handler(res, user_data): @ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*)") def asgi(ssl, response, info, user_data): app = ffi.from_handle(user_data) - + app.server.loop.is_idle = False + headers = [] next_header = info.header_list while next_header != ffi.NULL: @@ -481,6 +487,8 @@ def asgi(ssl, response, info, user_data): async def receive(): if ctx.aborted: return {"type": "http.disconnect"} + + ctx.loop.is_idle = False data_queue = ctx.data_queue if data_queue: if data_queue.queue.empty(): @@ -506,6 +514,8 @@ def asgi(ssl, response, info, user_data): async def send(options): if ctx.aborted: return False + + ctx.loop.is_idle = False type = options["type"] ssl = ctx.ssl response = ctx.response @@ -684,6 +694,7 @@ class _ASGI: async def send(options): nonlocal asgi_app + asgi_app.server.loop.is_idle = False type = options["type"] asgi_app.status_message = options.get("message", "") if type == "lifespan.startup.complete": @@ -701,6 +712,7 @@ class _ASGI: async def receive(): nonlocal asgi_app + asgi_app.server.loop.is_idle = False while not asgi_app.is_stopped: if asgi_app.is_starting: asgi_app.is_starting = False @@ -723,7 +735,7 @@ class _ASGI: asgi_app.server.listen(port_or_options, handler) finally: return None - + self.server.loop.is_idle = False # start lifespan self.server.loop.ensure_future(task_wrapper(self.app(scope, receive, send))) self.server.run() diff --git a/src/socketify/loop.py b/src/socketify/loop.py index bc87cad..9b74613 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -29,6 +29,8 @@ class Loop: # get the current running loop or create a new one without warnings self.loop = asyncio._get_running_loop() + self._idle_count = 0 + self.is_idle = False if self.loop is None: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) @@ -73,15 +75,27 @@ class Loop: def create_future(self): return self.loop.create_future() - def _keep_alive(self): if self.started: - self.uv_loop.run_nowait() - # be more agressive when needed - self.loop.call_soon(self._keep_alive) - - + relax = False + if not self.is_idle: + self._idle_count = 0 + elif self._idle_count < 10000: + self._idle_count += 1 + else: + relax = True + + self.is_idle = True + + if relax: + self.uv_loop.run_nowait() + self.loop.call_later(0.001, self._keep_alive) + else: + self.uv_loop.run_nowait() + # be more agressive when needed + self.loop.call_soon(self._keep_alive) + def create_task(self, *args, **kwargs): # this is not using optimized create_task yet return self.loop.create_task(*args, **kwargs) diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 45eecd5..c0ac643 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -41,6 +41,7 @@ def uws_missing_server_name(hostname, hostname_length, user_data): def uws_websocket_factory_drain_handler(ws, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances try: @@ -76,6 +77,7 @@ def uws_websocket_drain_handler_with_extension(ws, user_data): try: handlers, app = ffi.from_handle(user_data) ws = WebSocket(ws, app) + app.loop.is_idle = False # bind methods to websocket app._ws_extension.set_properties(ws) # set default value in properties @@ -97,6 +99,7 @@ def uws_websocket_drain_handler(ws, user_data): try: handlers, app = ffi.from_handle(user_data) ws = WebSocket(ws, app) + app.loop.is_idle = False handler = handlers.drain if inspect.iscoroutinefunction(handler): app.run_async(handler(ws)) @@ -119,6 +122,7 @@ def uws_websocket_factory_subscription_handler( ): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances try: @@ -201,6 +205,7 @@ def uws_websocket_subscription_handler( if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) handler = handlers.subscription @@ -243,6 +248,7 @@ def uws_websocket_subscription_handler_with_extension( if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) # bind methods to websocket app._ws_extension.set_properties(ws) @@ -281,6 +287,7 @@ def uws_websocket_subscription_handler_with_extension( def uws_websocket_factory_open_handler(ws, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances try: @@ -315,6 +322,7 @@ def uws_websocket_open_handler_with_extension(ws, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) # bind methods to websocket app._ws_extension.set_properties(ws) @@ -337,6 +345,7 @@ def uws_websocket_open_handler(ws, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) handler = handlers.open if inspect.iscoroutinefunction(handler): @@ -353,6 +362,7 @@ def uws_websocket_open_handler(ws, user_data): def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances try: @@ -397,6 +407,7 @@ def uws_websocket_message_handler_with_extension( if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) # bind methods to websocket app._ws_extension.set_properties(ws) @@ -428,6 +439,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) if message == ffi.NULL: @@ -454,6 +466,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data): def uws_websocket_factory_pong_handler(ws, message, length, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances try: @@ -493,6 +506,7 @@ def uws_websocket_pong_handler_with_extension(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) # bind methods to websocket app._ws_extension.set_properties(ws) @@ -519,6 +533,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) if message == ffi.NULL: data = None @@ -540,6 +555,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data): def uws_websocket_factory_ping_handler(ws, message, length, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances @@ -580,6 +596,7 @@ def uws_websocket_ping_handler_with_extension(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) # bind methods to websocket app._ws_extension.set_properties(ws) @@ -608,6 +625,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False ws = WebSocket(ws, app) if message == ffi.NULL: @@ -631,6 +649,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data): def uws_websocket_factory_close_handler(ws, code, message, length, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._ws_factory.get(app, ws) ws, dispose = instances @@ -681,6 +700,7 @@ def uws_websocket_close_handler_with_extension(ws, code, message, length, user_d if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False # pass to free data on WebSocket if needed ws = WebSocket(ws, app) # bind methods to websocket @@ -726,6 +746,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False # pass to free data on WebSocket if needed ws = WebSocket(ws, app) @@ -766,6 +787,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): def uws_generic_factory_method_handler(res, req, user_data): if user_data != ffi.NULL: (handler, app) = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._factory.get(app, res, req) (response, request, dispose) = instances try: @@ -800,6 +822,7 @@ def uws_generic_factory_method_handler(res, req, user_data): def uws_websocket_factory_upgrade_handler(res, req, context, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._factory.get(app, res, req) (response, request, dispose) = instances try: @@ -837,6 +860,7 @@ def uws_websocket_factory_upgrade_handler(res, req, context, user_data): def uws_websocket_upgrade_handler_with_extension(res, req, context, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False response = AppResponse(res, app) # set default value in properties app._response_extension.set_properties(response) @@ -864,6 +888,7 @@ def uws_websocket_upgrade_handler_with_extension(res, req, context, user_data): def uws_websocket_upgrade_handler(res, req, context, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) + app.loop.is_idle = False response = AppResponse(res, app) request = AppRequest(req, app) try: @@ -908,6 +933,7 @@ def uws_req_for_each_header_handler( def uws_generic_factory_method_handler(res, req, user_data): if user_data != ffi.NULL: (handler, app) = ffi.from_handle(user_data) + app.loop.is_idle = False instances = app._factory.get(app, res, req) (response, request, dispose) = instances try: @@ -943,6 +969,7 @@ def uws_generic_factory_method_handler(res, req, user_data): def uws_generic_method_handler_with_extension(res, req, user_data): if user_data != ffi.NULL: (handler, app) = ffi.from_handle(user_data) + app.loop.is_idle = False response = AppResponse(res, app) # set default value in properties app._response_extension.set_properties(response) @@ -969,6 +996,7 @@ def uws_generic_method_handler_with_extension(res, req, user_data): def uws_generic_method_handler(res, req, user_data): if user_data != ffi.NULL: (handler, app) = ffi.from_handle(user_data) + app.loop.is_idle = False response = AppResponse(res, app) request = AppRequest(req, app) @@ -1006,6 +1034,7 @@ def uws_generic_listen_handler(listen_socket, config, user_data): if user_data != ffi.NULL: app = ffi.from_handle(user_data) + app.loop.is_idle = False config.port = lib.us_socket_local_port(app.SSL, listen_socket) if hasattr(app, "_listen_handler") and hasattr(app._listen_handler, "__call__"): app.socket = listen_socket @@ -1041,6 +1070,7 @@ def uws_generic_aborted_handler(response, user_data): def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data): if user_data != ffi.NULL: res = ffi.from_handle(user_data) + res.app.loop.is_idle = False if chunk == ffi.NULL: data = None else: @@ -1053,6 +1083,7 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data): def uws_generic_on_writable_handler(res, offset, user_data): if user_data != ffi.NULL: res = ffi.from_handle(user_data) + res.app.loop.is_idle = False result = res.trigger_writable_handler(offset) return result return False @@ -1307,6 +1338,7 @@ class WebSocket: return None def send_fragment(self, message, compress=False): + self.app.loop.is_idle = False try: if isinstance(message, str): data = message.encode("utf-8") @@ -1327,6 +1359,7 @@ class WebSocket: return None def send_last_fragment(self, message, compress=False): + self.app.loop.is_idle = False try: if isinstance(message, str): data = message.encode("utf-8") @@ -1347,6 +1380,7 @@ class WebSocket: return None def send_first_fragment(self, message, opcode=OpCode.BINARY, compress=False): + self.app.loop.is_idle = False try: if isinstance(message, str): data = message.encode("utf-8") @@ -1373,6 +1407,7 @@ class WebSocket: return self def send(self, message, opcode=OpCode.BINARY, compress=False, fin=True): + self.app.loop.is_idle = False try: if isinstance(message, str): data = message.encode("utf-8") @@ -1399,6 +1434,7 @@ class WebSocket: return self def end(self, code=0, message=None): + self.app.loop.is_idle = False try: if not isinstance(code, int): raise RuntimeError("code must be an int") @@ -1447,6 +1483,8 @@ class AppResponse: self._data = None def cork(self, callback): + self.app.loop.is_idle = False + if not self.aborted: self.grab_aborted_handler() self._cork_handler = callback @@ -1645,6 +1683,7 @@ class AppResponse: return self def try_end(self, message, total_size, end_connection=False): + self.app.loop.is_idle = False try: if self.aborted: return False, True @@ -1752,7 +1791,8 @@ class AppResponse: headers = None, end_connection: bool = False, ): - + self.app.loop.is_idle = False + # TODO: optimize headers if headers is not None: for name, value in headers: @@ -1851,6 +1891,8 @@ class AppResponse: return self def end(self, message, end_connection=False): + self.app.loop.is_idle = False + try: if self.aborted: return self @@ -1879,16 +1921,19 @@ class AppResponse: return self def resume(self): + self.app.loop.is_idle = False if not self.aborted: lib.uws_res_resume(self.app.SSL, self.res) return self def write_continue(self): + self.app.loop.is_idle = False if not self.aborted: lib.uws_res_write_continue(self.app.SSL, self.res) return self def write_status(self, status_or_status_text): + self.app.loop.is_idle = False if not self.aborted: if isinstance(status_or_status_text, int): if bool( @@ -1914,6 +1959,7 @@ class AppResponse: return self def write_header(self, key, value): + self.app.loop.is_idle = False if not self.aborted: if isinstance(key, str): key_data = key.encode("utf-8") @@ -1947,6 +1993,7 @@ class AppResponse: return self def end_without_body(self, end_connection=False): + self.app.loop.is_idle = False if not self.aborted: if self._write_jar is not None: self.write_header("Set-Cookie", self._write_jar.output(header="")) @@ -1956,6 +2003,7 @@ class AppResponse: return self def write(self, message): + self.app.loop.is_idle = False if not self.aborted: if isinstance(message, str): data = message.encode("utf-8") @@ -2923,6 +2971,7 @@ class App: ) def publish(self, topic, message, opcode=OpCode.BINARY, compress=False): + self.loop.is_idle = False if isinstance(topic, str): topic_data = topic.encode("utf-8") diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index 1e64983..41779c9 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -15,6 +15,8 @@ import uuid @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response = ffi.from_handle(user_data) + data_response.app.server.loop.is_idle = False + if chunk != ffi.NULL: data_response.buffer.write(ffi.unpack(chunk, chunk_length)) if bool(is_end): @@ -29,6 +31,7 @@ def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): def wsgi_on_data_ref_abort_handler(res, user_data): data_retry = ffi.from_handle(user_data) data_retry.aborted = True + data_retry.server.loop.is_idle = False if data_retry.id is not None: data_retry.app._data_refs.pop(data_retry.id, None) @@ -41,7 +44,9 @@ def wsgi_on_writable_handler(res, offset, user_data): chunks = data_retry.chunks last_sended_offset = data_retry.last_offset - ssl = data_retry.app.server.SSL + server = data_retry.app.server + ssl = server.SSL + server.loop.is_idle = False content_length = data_retry.content_length data = chunks[0] @@ -224,6 +229,7 @@ def wsgi_corked_response_start_handler(res, user_data): @ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*)") def wsgi(ssl, response, info, user_data): app = ffi.from_handle(user_data) + app.server.loop.is_idle = False # reusing the dict is slower than cloning because we need to clear HTTP headers environ = dict(app.BASIC_ENVIRON) @@ -257,9 +263,10 @@ def wsgi(ssl, response, info, user_data): is_chunked = False content_length = -1 def write_headers(headers): - nonlocal headers_written, headers_set, status_text, content_length, is_chunked + nonlocal headers_written, headers_set, status_text, content_length, is_chunked, app if headers_written or not headers_set: return + app.server.loop.is_idle = False headers_written = True @@ -327,7 +334,8 @@ def wsgi(ssl, response, info, user_data): content_length = ffi.cast("uintmax_t", content_length) def start_response(status, headers, exc_info=None): - nonlocal headers_set, status_text + nonlocal headers_set, status_text, app + app.server.loop.is_idle = False if exc_info: try: if headers_written: @@ -342,7 +350,8 @@ def wsgi(ssl, response, info, user_data): status_text = status def write(data): - nonlocal is_chunked + nonlocal is_chunked, app + app.server.loop.is_idle = False if not headers_written: write_headers(headers_set) # will allow older frameworks only with is_chunked @@ -371,7 +380,7 @@ def wsgi(ssl, response, info, user_data): if data_response.aborted: return - + data_response.app.server.loop.is_idle = False ssl = data_response.app.server.SSL data_response.environ["CONTENT_LENGTH"] = str( data_response.buffer.getbuffer().nbytes