diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index bb7e55a..0b0e4a0 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -4,7 +4,7 @@ import multiprocessing def run_app(): - app = App() + app = App(None, 200_000) app.get("/", lambda res, req: res.end("Hello, World!")) app.listen( 8000, @@ -27,4 +27,5 @@ def create_fork(): for i in range(1, multiprocessing.cpu_count()): create_fork() + run_app() # run app on the main process too :) diff --git a/bench/websockets/socketify_server.py b/bench/websockets/socketify_server.py index 70d0374..f52d420 100644 --- a/bench/websockets/socketify_server.py +++ b/bench/websockets/socketify_server.py @@ -27,7 +27,7 @@ def ws_close(ws, close, message): remaining_clients = remaining_clients + 1 -app = App() +app = App(websocket_factory_max_itens=1_500_000) app.ws( "/*", { diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 4ccc625..c6466d4 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -16,6 +16,7 @@ from urllib.parse import parse_qs, quote_plus, unquote_plus from .loop import Loop from .status_codes import status_codes from .helpers import static_route +from queue import SimpleQueue mimetypes.init() @@ -300,6 +301,32 @@ def uws_missing_server_name(hostname, hostname_length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, void*)") +def uws_websocket_factory_drain_handler(ws, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + try: + handler = handlers.drain + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws)) + if dispose: + def when_finished(_): + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws) + if dispose: + app._ws_factory.dispose(instances) + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_drain_handler(ws, user_data): if not user_data == ffi.NULL: @@ -317,6 +344,32 @@ def uws_websocket_drain_handler(ws, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, void*)") +def uws_websocket_factory_open_handler(ws, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + try: + handler = handlers.open + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws)) + if dispose: + def when_finished(_): + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws) + if dispose: + app._ws_factory.dispose(instances) + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_open_handler(ws, user_data): @@ -335,6 +388,41 @@ def uws_websocket_open_handler(ws, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") +def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + opcode = OpCode(opcode) + if opcode == OpCode.TEXT: + data = data.decode("utf-8") + + handler = handlers.message + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws, data, opcode)) + if dispose: + def when_finished(_): + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws, data, opcode) + if dispose: + app._ws_factory.dispose(instances) + + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") def uws_websocket_message_handler(ws, message, length, opcode, user_data): if not user_data == ffi.NULL: @@ -362,6 +450,37 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_factory_pong_handler(ws, message, length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.pong + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws, data)) + if dispose: + def when_finished(_): + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws, data) + if dispose: + app._ws_factory.dispose(instances) + + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_pong_handler(ws, message, length, user_data): if not user_data == ffi.NULL: @@ -378,13 +497,45 @@ def uws_websocket_pong_handler(ws, message, length, user_data): app.run_async(handler(ws, data)) else: handler(ws, data) - except Exception as err: print( "Uncaught Exception: %s" % str(err) ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_factory_ping_handler(ws, message, length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.ping + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws, data)) + if dispose: + def when_finished(_): + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws, data) + if dispose: + app._ws_factory.dispose(instances) + + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_ping_handler(ws, message, length, user_data): if not user_data == ffi.NULL: @@ -409,6 +560,50 @@ def uws_websocket_ping_handler(ws, message, length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") +def uws_websocket_factory_close_handler(ws, code, message, length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + (ws, dispose) = instances + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.close + + if handler is None: + if dispose: + app._ws_factory.dispose(instances) + return + + if inspect.iscoroutinefunction(handler): + future = app.run_async(handler(ws, int(code), data)) + + def when_finished(_): + key = ws.get_user_data_uuid() + if not key is None: + SocketRefs.pop(key, None) + if dispose: + app._ws_factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(ws, int(code), data) + key = ws.get_user_data_uuid() + if not key is None: + SocketRefs.pop(key, None) + if dispose: + app._ws_factory.dispose(instances) + + except Exception as err: + print( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") def uws_websocket_close_handler(ws, code, message, length, user_data): if not user_data == ffi.NULL: @@ -448,13 +643,65 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_res_t*, uws_req_t*, void*)") +def uws_generic_factory_method_handler(res, req, user_data): + if not user_data == ffi.NULL: + (handler, app) = ffi.from_handle(user_data) + instances = app._factory.get(app, res, req) + (response, request, dispose) = instances + try: + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + future = response.run_async(handler(response, request)) + if dispose: + def when_finished(_): + app._factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(response, request) + if dispose: + app._factory.dispose(instances) + + except Exception as err: + response.grab_aborted_handler() + app.trigger_error(err, response, request) + if dispose: + app._factory.dispose(instances) + +@ffi.callback("void(uws_res_t*, uws_req_t*, uws_socket_context_t*, void*)") +def uws_websocket_factory_upgrade_handler(res, req, context, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + instances = app._factory.get(app, res, req) + (response, request, dispose) = instances + try: + handler = handlers.upgrade + + if inspect.iscoroutinefunction(handler): + future = response.run_async(handler(response, request, context)) + if dispose: + def when_finished(_): + app._factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(response, request, context) + if dispose: + app._factory.dispose(instances) + except Exception as err: + response.grab_aborted_handler() + app.trigger_error(err, response, request) + if dispose: + app._factory.dispose(instances) + @ffi.callback("void(uws_res_t*, uws_req_t*, uws_socket_context_t*, void*)") def uws_websocket_upgrade_handler(res, req, context, user_data): if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, app.SSL, app._template) + request = AppRequest(req) try: - (handlers, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL, app._template) - request = AppRequest(req) handler = handlers.upgrade if inspect.iscoroutinefunction(handler): response.run_async(handler(response, request, context)) @@ -462,9 +709,8 @@ def uws_websocket_upgrade_handler(res, req, context, user_data): handler(response, request, context) except Exception as err: - print( - "Uncaught Exception: %s" % str(err) - ) # just log in console the error to call attention + response.grab_aborted_handler() + app.trigger_error(err, response, request) @ffi.callback("void(const char*, size_t, void*)") @@ -493,13 +739,40 @@ def uws_req_for_each_header_handler( return +@ffi.callback("void(uws_res_t*, uws_req_t*, void*)") +def uws_generic_factory_method_handler(res, req, user_data): + if not user_data == ffi.NULL: + (handler, app) = ffi.from_handle(user_data) + instances = app._factory.get(app, res, req) + (response, request, dispose) = instances + try: + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + future = response.run_async(handler(response, request)) + if dispose: + def when_finished(_): + app._factory.dispose(instances) + + future.add_done_callback(when_finished) + else: + handler(response, request) + if dispose: + app._factory.dispose(instances) + + except Exception as err: + response.grab_aborted_handler() + app.trigger_error(err, response, request) + if dispose: + app._factory.dispose(instances) + @ffi.callback("void(uws_res_t*, uws_req_t*, void*)") def uws_generic_method_handler(res, req, user_data): if not user_data == ffi.NULL: + (handler, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, app.SSL, app._template) + request = AppRequest(req) + try: - (handler, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL, app._template) - request = AppRequest(req) if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() response.run_async(handler(response, request)) @@ -940,6 +1213,83 @@ class WSBehaviorHandlers: self.close = None +class WebSocketFactory: + def __init__(self, app, max_size): + self.factory_queue = [] + for _ in range(0, max_size): + websocket = WebSocket(None, app.SSL, app.loop) + self.factory_queue.append((websocket, True)) + def get(self, app, ws): + if len(self.factory_queue) == 0: + response = WebSocket(ws, app.SSL, app.loop) + return (response, False) + + instances = self.factory_queue.pop() + (websocket, _) = instances + websocket.ws = ws + return instances + + def dispose(self, instances): + (websocket, _) = instances + #dispose ws + websocket.ws = None + websocket._cork_handler = None + websocket._for_each_topic_handler = None + websocket.socket_data_id = None + websocket.socket_data = None + websocket.got_socket_data = False + self.factory_queue.append(instances) + +class RequestResponseFactory: + def __init__(self, app, max_size): + self.factory_queue = [] + for _ in range(0, max_size): + response = AppResponse(None, app.loop, app.SSL, app._template) + request = AppRequest(None) + self.factory_queue.append((response, request, True)) + + def get(self, app, res, req): + if len(self.factory_queue) == 0: + response = AppResponse(res, app.loop, app.SSL, app._template) + request = AppRequest(req) + return (response, request, False) + + instances = self.factory_queue.pop() + (response, request, _) = instances + response.res = res + response._render = app._template + request.req = req + return instances + + def dispose(self, instances): + (res, req, _) = instances + #dispose res + res.res = None + res.aborted = False + res._aborted_handler = None + res._writable_handler = None + res._data_handler = None + res._grabed_abort_handler_once = False + res._write_jar = None + res._cork_handler = None + res._lastChunkOffset = 0 + res._chunkFuture = None + res._dataFuture = None + res._data = None + res._render = None + #dispose req + req.req = None + req.read_jar = None + req.jar_parsed = False + req._for_each_header_handler = None + req._headers = None + req._params = None + req._query = None + req._url = None + req._full_url = None + req._method = None + self.factory_queue.append(instances) + class AppRequest: def __init__(self, request): self.req = request @@ -1495,12 +1845,12 @@ class AppResponse: raise RuntimeError( '"%d" Is not an valid Status Code' % status_or_status_text ) - elif isinstance(status_text, str): - data = status_text.encode("utf-8") - elif isinstance(status_text, bytes): - data = status_text + elif isinstance(status_or_status_text, str): + data = status_or_status_text.encode("utf-8") + elif isinstance(status_or_status_text, bytes): + data = status_or_status_text else: - data = json.dumps(status_text).encode("utf-8") + data = json.dumps(status_or_status_text).encode("utf-8") lib.uws_res_write_status(self.SSL, self.res, data, len(data)) return self @@ -1649,7 +1999,7 @@ class AppResponse: class App: - def __init__(self, options=None): + def __init__(self, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): socket_options_ptr = ffi.new("struct us_socket_context_options_t *") socket_options = socket_options_ptr[0] self.options = options @@ -1698,6 +2048,7 @@ class App: else: self.is_ssl = False self.SSL = ffi.cast("int", 0) + self.loop = Loop( lambda loop, context, response: self.trigger_error(context, response, None) @@ -1715,6 +2066,16 @@ class App: self.error_handler = None self._missing_server_handler = None + if request_response_factory_max_itens and request_response_factory_max_itens >= 1: + self._factory = RequestResponseFactory(self, request_response_factory_max_itens) + else: + self._factory = None + + if websocket_factory_max_itens and websocket_factory_max_itens >= 1: + self._ws_factory = WebSocketFactory(self, websocket_factory_max_itens) + else: + self._ws_factory = None + def template(self, template_engine): self._template = template_engine @@ -1729,7 +2090,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1741,7 +2102,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1753,7 +2114,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1765,7 +2126,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1777,7 +2138,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1789,7 +2150,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1801,7 +2162,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1813,7 +2174,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1825,7 +2186,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -1837,7 +2198,7 @@ class App: self.SSL, self.app, path.encode("utf-8"), - uws_generic_method_handler, + uws_generic_factory_method_handler if self._factory else uws_generic_method_handler, user_data, ) return self @@ -2038,43 +2399,43 @@ class App: handlers = WSBehaviorHandlers() if upgrade_handler: handlers.upgrade = upgrade_handler - native_behavior.upgrade = uws_websocket_upgrade_handler + native_behavior.upgrade = uws_websocket_factory_upgrade_handler if self._factory else uws_websocket_upgrade_handler else: native_behavior.upgrade = ffi.NULL if open_handler: handlers.open = open_handler - native_behavior.open = uws_websocket_open_handler + native_behavior.open = uws_websocket_factory_open_handler if self._ws_factory else uws_websocket_open_handler else: native_behavior.open = ffi.NULL if message_handler: handlers.message = message_handler - native_behavior.message = uws_websocket_message_handler + native_behavior.message = uws_websocket_factory_message_handler if self._ws_factory else uws_websocket_message_handler else: native_behavior.message = ffi.NULL if drain_handler: handlers.drain = drain_handler - native_behavior.drain = uws_websocket_drain_handler + native_behavior.drain = uws_websocket_factory_drain_handler if self._ws_factory else uws_websocket_drain_handler else: native_behavior.drain = ffi.NULL if ping_handler: handlers.ping = ping_handler - native_behavior.ping = uws_websocket_ping_handler + native_behavior.ping = uws_websocket_factory_ping_handler if self._ws_factory else uws_websocket_ping_handler else: native_behavior.ping = ffi.NULL if pong_handler: handlers.pong = pong_handler - native_behavior.pong = uws_websocket_pong_handler + native_behavior.pong = uws_websocket_factory_pong_handler if self._ws_factory else uws_websocket_pong_handler else: native_behavior.pong = ffi.NULL if close_handler: handlers.close = close_handler - native_behavior.close = uws_websocket_close_handler + native_behavior.close = uws_websocket_factory_close_handler if self._ws_factory else uws_websocket_close_handler else: # always keep an close native_behavior.close = uws_websocket_close_handler @@ -2218,7 +2579,7 @@ class AppOptions: dh_params_file_name=None, ca_file_name=None, ssl_ciphers=None, - ssl_prefer_low_memory_usage=0, + ssl_prefer_low_memory_usage=0 ): if key_file_name != None and not isinstance(key_file_name, str): raise RuntimeError("key_file_name must be an String or None")