From 3872860684deca4cff6e15489fe119049fa1e998 Mon Sep 17 00:00:00 2001 From: Ciro Date: Mon, 7 Nov 2022 14:35:06 -0300 Subject: [PATCH] add async upgrade --- examples/upgrade_async.py | 31 ++++++++++ src/socketify/__init__.py | 2 +- src/socketify/socketify.py | 122 +++++++++++++++++++++++-------------- src/tests.py | 75 +++++------------------ 4 files changed, 125 insertions(+), 105 deletions(-) create mode 100644 examples/upgrade_async.py diff --git a/examples/upgrade_async.py b/examples/upgrade_async.py new file mode 100644 index 0000000..6d5af44 --- /dev/null +++ b/examples/upgrade_async.py @@ -0,0 +1,31 @@ +from socketify import App, AppOptions, OpCode, CompressOptions +import asyncio + +def ws_open(ws): + print('A WebSocket got connected!') + ws.send("Hello World!", OpCode.TEXT) + +def ws_message(ws, message, opcode): + print(message, opcode) + #Ok is false if backpressure was built up, wait for drain + ok = ws.send(message, opcode) + +async def ws_upgrade(res, req, socket_context): + key = req.get_header("sec-websocket-key") + protocol = req.get_header("sec-websocket-protocol") + extensions = req.get_header("sec-websocket-extensions") + await asyncio.sleep(2) + res.upgrade(key, protocol, extensions, socket_context) + +app = App() +app.ws("/*", { + 'compression': CompressOptions.SHARED_COMPRESSOR, + 'max_payload_length': 16 * 1024 * 1024, + 'idle_timeout': 12, + 'open': ws_open, + 'message': ws_message, + 'upgrade': ws_upgrade +}) +app.any("/", lambda res,req: res.end("Nothing to see here!'")) +app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port))) +app.run() \ No newline at end of file diff --git a/src/socketify/__init__.py b/src/socketify/__init__.py index 31b9daa..280b85b 100644 --- a/src/socketify/__init__.py +++ b/src/socketify/__init__.py @@ -1,2 +1,2 @@ -from .socketify import App, AppOptions, AppListenOptions, OpCode, SendStatus +from .socketify import App, AppOptions, AppListenOptions, OpCode, SendStatus, CompressOptions from .helpers import sendfile \ No newline at end of file diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index a3db01e..d07f788 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -255,11 +255,13 @@ library_path = os.path.join(os.path.dirname(__file__), "libsocketify_%s_%s.%s" % lib = ffi.dlopen(library_path) -@ffi.callback("void(const char *, size_t, void *)") + + +@ffi.callback("void(const char*, size_t, void*)") def uws_missing_server_name(hostname, hostname_length, user_data): if not user_data == ffi.NULL: - app = ffi.from_handle(user_data) try: + app = ffi.from_handle(user_data) if hostname == ffi.NULL: data = None else: @@ -274,12 +276,12 @@ def uws_missing_server_name(hostname, hostname_length, user_data): except Exception as err: print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention -@ffi.callback("void(uws_websocket_t *, void *)") +@ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_drain_handler(ws, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) try: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) handler = handlers.drain if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() @@ -289,12 +291,13 @@ def uws_websocket_drain_handler(ws, user_data): except Exception as err: print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention -@ffi.callback("void(uws_websocket_t *, void *)") +@ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_open_handler(ws, user_data): - if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + + if not user_data == ffi.NULL: try: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) handler = handlers.open if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() @@ -304,13 +307,13 @@ def uws_websocket_open_handler(ws, user_data): except Exception as err: print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention -@ffi.callback("void(uws_websocket_t*, const char *,size_t, uws_opcode_t, void*)") +@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") def uws_websocket_message_handler(ws, message, length, opcode, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) - try: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + if message == ffi.NULL: data = None else: @@ -332,10 +335,9 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data): @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_pong_handler(ws, message, length, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) - try: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) if message == ffi.NULL: data = None else: @@ -354,10 +356,10 @@ def uws_websocket_pong_handler(ws, message, length, user_data): @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_ping_handler(ws, message,length, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) - try: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + if message == ffi.NULL: data = None else: @@ -377,11 +379,11 @@ def uws_websocket_ping_handler(ws, message,length, user_data): @ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") def uws_websocket_close_handler(ws, code, message, length, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - #pass to free data on WebSocket if needed - ws = WebSocket(ws, app.SSL, app.loop, True) - try: + (handlers, app) = ffi.from_handle(user_data) + #pass to free data on WebSocket if needed + ws = WebSocket(ws, app.SSL, app.loop, True) + if message == ffi.NULL: data = None else: @@ -404,10 +406,10 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): @ffi.callback("void(uws_res_t*, uws_req_t*, uws_socket_context_t*, void*)") def uws_websocket_upgrade_handler(res, req, context, user_data): if not user_data == ffi.NULL: - (handlers, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL) - request = AppRequest(req) try: + (handlers, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, app.SSL) + request = AppRequest(req) handler = handlers.upgrade if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() @@ -419,22 +421,21 @@ def uws_websocket_upgrade_handler(res, req, context, user_data): print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention -@ffi.callback("void(const char *topic, size_t length, void *user_data)") +@ffi.callback("void(const char*, size_t, void*)") def uws_req_for_each_topic_handler(topic, topic_size, user_data): if not user_data == ffi.NULL: - ws = ffi.from_handle(user_data) try: + ws = ffi.from_handle(user_data) header_name = ffi.unpack(topic, topic_size).decode("utf-8") ws.trigger_for_each_topic_handler(header_name, header_value) except Exception: #invalid utf-8 return -@ffi.callback("void(const char *, size_t, const char *, size_t, void *)") +@ffi.callback("void(const char*, size_t, const char*, size_t, void*)") def uws_req_for_each_header_handler(header_name, header_name_size, header_value, header_value_size, user_data): if not user_data == ffi.NULL: - req = ffi.from_handle(user_data) try: - + req = ffi.from_handle(user_data) header_name = ffi.unpack(header_name, header_name_size).decode("utf-8") header_value = ffi.unpack(header_value, header_value_size).decode("utf-8") @@ -443,13 +444,13 @@ def uws_req_for_each_header_handler(header_name, header_name_size, header_value, return -@ffi.callback("void(uws_res_t *, uws_req_t *, void *)") +@ffi.callback("void(uws_res_t*, uws_req_t*, void*)") def uws_generic_method_handler(res, req, user_data): if not user_data == ffi.NULL: - (handler, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL) - request = AppRequest(req) try: + (handler, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, app.SSL) + request = AppRequest(req) if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() response.run_async(handler(response, request)) @@ -460,7 +461,7 @@ def uws_generic_method_handler(res, req, user_data): app.trigger_error(err, response, request) -@ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)") +@ffi.callback("void(struct us_listen_socket_t*, uws_app_listen_config_t, void*)") def uws_generic_listen_handler(listen_socket, config, user_data): if listen_socket == ffi.NULL: raise RuntimeError("Failed to listen on port %d" % int(config.port)) @@ -472,7 +473,7 @@ def uws_generic_listen_handler(listen_socket, config, user_data): app.socket = listen_socket app._listen_handler(None if config == ffi.NULL else AppListenOptions(port=int(config.port),host=None if config.host == ffi.NULL else ffi.string(config.host).decode("utf-8"), options=int(config.options))) -@ffi.callback("void(uws_res_t *, void*)") +@ffi.callback("void(uws_res_t*, void*)") def uws_generic_aborted_handler(response, user_data): if not user_data == ffi.NULL: try: @@ -480,7 +481,7 @@ def uws_generic_aborted_handler(response, user_data): res.trigger_aborted() except: pass -@ffi.callback("void(uws_res_t *, const char *, size_t, bool, void*)") +@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data): if not user_data == ffi.NULL: res = ffi.from_handle(user_data) @@ -491,7 +492,7 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data): res.trigger_data_handler(data, bool(is_end)) -@ffi.callback("bool(uws_res_t *, uintmax_t, void*)") +@ffi.callback("bool(uws_res_t*, uintmax_t, void*)") def uws_generic_on_writable_handler(res, offset, user_data): if not user_data == ffi.NULL: res = ffi.from_handle(user_data) @@ -500,7 +501,7 @@ def uws_generic_on_writable_handler(res, offset, user_data): return False -@ffi.callback("void(uws_res_t *, void*)") +@ffi.callback("void(uws_res_t*, void*)") def uws_generic_cork_handler(res, user_data): if not user_data == ffi.NULL: response = ffi.from_handle(user_data) @@ -524,6 +525,38 @@ def uws_ws_cork_handler(user_data): print("Error on cork handler %s" % str(err)) +# Compressor mode is 8 lowest bits where HIGH4(windowBits), LOW4(memLevel). +# Decompressor mode is 8 highest bits LOW4(windowBits). +# If compressor or decompressor bits are 1, then they are shared. +# If everything is just simply 0, then everything is disabled. +class CompressOptions(IntEnum): + #Disabled, shared, shared are "special" values + DISABLED = lib.DISABLED + SHARED_COMPRESSOR = lib.SHARED_COMPRESSOR + SHARED_DECOMPRESSOR = lib.SHARED_DECOMPRESSOR + #Highest 4 bits describe decompressor + DEDICATED_DECOMPRESSOR_32KB = lib.DEDICATED_DECOMPRESSOR_32KB + DEDICATED_DECOMPRESSOR_16KB = lib.DEDICATED_DECOMPRESSOR_16KB + DEDICATED_DECOMPRESSOR_8KB = lib.DEDICATED_DECOMPRESSOR_8KB + DEDICATED_DECOMPRESSOR_4KB = lib.DEDICATED_DECOMPRESSOR_4KB + DEDICATED_DECOMPRESSOR_2KB = lib.DEDICATED_DECOMPRESSOR_2KB + DEDICATED_DECOMPRESSOR_1KB = lib.DEDICATED_DECOMPRESSOR_1KB + DEDICATED_DECOMPRESSOR_512B = lib.DEDICATED_DECOMPRESSOR_512B + #Same as 32kb + DEDICATED_DECOMPRESSOR = lib.DEDICATED_DECOMPRESSOR, + + #Lowest 8 bit describe compressor + DEDICATED_COMPRESSOR_3KB = lib.DEDICATED_COMPRESSOR_3KB + DEDICATED_COMPRESSOR_4KB = lib.DEDICATED_COMPRESSOR_4KB + DEDICATED_COMPRESSOR_8KB = lib.DEDICATED_COMPRESSOR_8KB + DEDICATED_COMPRESSOR_16KB = lib.DEDICATED_COMPRESSOR_16KB + DEDICATED_COMPRESSOR_32KB = lib.DEDICATED_COMPRESSOR_32KB + DEDICATED_COMPRESSOR_64KB = lib.DEDICATED_COMPRESSOR_64KB + DEDICATED_COMPRESSOR_128KB = lib.DEDICATED_COMPRESSOR_128KB + DEDICATED_COMPRESSOR_256KB = lib.DEDICATED_COMPRESSOR_256KB + #Same as 256kb + DEDICATED_COMPRESSOR = lib.DEDICATED_COMPRESSOR + class OpCode(IntEnum): CONTINUATION = 0 @@ -1275,27 +1308,26 @@ class AppResponse: if self.aborted: return False - if isinstance(sec_web_socket_key, str): sec_web_socket_key_data = sec_web_socket_key.encode('utf-8') elif isinstance(sec_web_socket_key, bytes): sec_web_socket_key_data = sec_web_socket_key else: - raise RuntimeError("sec_web_socket_key need to be an String or Bytes") + sec_web_socket_key_data = b'' if isinstance(sec_web_socket_protocol, str): sec_web_socket_protocol_data = sec_web_socket_protocol.encode('utf-8') elif isinstance(sec_web_socket_protocol, bytes): sec_web_socket_protocol_data = sec_web_socket_protocol else: - raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes") + sec_web_socket_protocol_data = b'' if isinstance(sec_web_socket_extensions, str): sec_web_socket_extensions_data = sec_web_socket_extensions.encode('utf-8') elif isinstance(sec_web_socket_extensions, bytes): sec_web_socket_extensions_data = sec_web_socket_extensions else: - raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes") + sec_web_socket_extensions_data = b'' user_data_ptr = ffi.NULL if not user_data is None: @@ -1305,7 +1337,7 @@ class AppResponse: SocketRefs[_id] = user_data_ptr lib.uws_res_upgrade(self.SSL, self.res, user_data_ptr, sec_web_socket_key_data, len(sec_web_socket_key_data),sec_web_socket_protocol_data, len(sec_web_socket_protocol_data),sec_web_socket_extensions_data, len(sec_web_socket_extensions_data), socket_context) - + return True def on_writable(self, handler): if not self.aborted: diff --git a/src/tests.py b/src/tests.py index 8baf79c..5eaecc4 100644 --- a/src/tests.py +++ b/src/tests.py @@ -3,85 +3,42 @@ # import os.path - # DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data); - # DLL_EXPORT typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data); - -# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, uws_listen_domain_handler handler, void *user_data); - # DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, int options, uws_listen_domain_handler handler, void *user_data); - # DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length); - - # DLL_EXPORT void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); - # DLL_EXPORT void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); - # DLL_EXPORT void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options); - # DLL_EXPORT void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); - # DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); +# DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data); + +# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length,_listen_domain_handler handler, void *user_data); +# DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t servere_length, int options, uws_listen_domain_handler handler, void *user_data); +# DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length); +# DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); - -# from socketify import App -from socketify import App, AppOptions, OpCode - -# import os -import multiprocessing +from socketify import App, AppOptions, OpCode, CompressOptions import asyncio -import time -import mimetypes -mimetypes.init() - - -async def home(res, req): - res.end("Home") def ws_open(ws): - print("Upgrated!") - print(ws.send("Xablau!", OpCode.TEXT)) + print('A WebSocket got connected!') + ws.send("Hello World!", OpCode.TEXT) def ws_message(ws, message, opcode): print(message, opcode) + #Ok is false if backpressure was built up, wait for drain + ok = ws.send(message, opcode) async def ws_upgrade(res, req, socket_context): key = req.get_header("sec-websocket-key") protocol = req.get_header("sec-websocket-protocol") extensions = req.get_header("sec-websocket-extensions") await asyncio.sleep(2) - print("request upgrade!") res.upgrade(key, protocol, extensions, socket_context) app = App() app.ws("/*", { + 'compression': CompressOptions.SHARED_COMPRESSOR, + 'max_payload_length': 16 * 1024 * 1024, + 'idle_timeout': 12, 'open': ws_open, 'message': ws_message, 'upgrade': ws_upgrade }) -app.get("/", home) +app.any("/", lambda res,req: res.end("Nothing to see here!'")) app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port))) -app.run() - -# def create_fork(): -# n = os.fork() -# # n greater than 0 means parent process -# if not n > 0: -# run_app() - -#openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -passout pass:1234 -keyout ./misc/key.pem -out ./misc/cert.pem -# # fork limiting the cpu count - 1 -# for i in range(1, multiprocessing.cpu_count()): -# create_fork() - -# run_app() # run app on the main process too :) -# from datetime import datetime -# raw = "_ga=GA1.1.1871393672.1649875681; affclick=null; __udf_j=d31b9af0d332fec181c1a893320322c0cb33ce95d7bdbd21a4cc4ee66d6d8c23817686b4ba59dd0e015cb95e8196157c" - -# jar = Cookies(None) -# jar.set("session_id", "123132", { -# "path": "/", -# "domain": "*.test.com", -# "httponly": True, -# "expires": datetime.now() -# }) -# print(jar.output()) -# jar = cookies.SimpleCookie(raw) -# print(jar["_gaasasd"]) -# print(split_header_words(raw)) - -#git submodule sync \ No newline at end of file +app.run() \ No newline at end of file