diff --git a/bench/asgi_wsgi/falcon-asgi.py b/bench/asgi_wsgi/falcon-asgi.py index dc4d133..b41ae44 100644 --- a/bench/asgi_wsgi/falcon-asgi.py +++ b/bench/asgi_wsgi/falcon-asgi.py @@ -24,4 +24,4 @@ home = Home() app.add_route("/", home) if __name__ == "__main__": - ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=8) diff --git a/bench/asgi_wsgi/falcon-wsgi.py b/bench/asgi_wsgi/falcon-wsgi.py index e4a886f..3d7429e 100644 --- a/bench/asgi_wsgi/falcon-wsgi.py +++ b/bench/asgi_wsgi/falcon-wsgi.py @@ -23,5 +23,4 @@ home = Home() app.add_route("/", home) if __name__ == "__main__": - WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() - \ No newline at end of file + WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=2) \ No newline at end of file diff --git a/examples/hello_world_cli.py b/examples/hello_world_cli.py new file mode 100644 index 0000000..8710378 --- /dev/null +++ b/examples/hello_world_cli.py @@ -0,0 +1,12 @@ +from socketify import App + +# App will be created by the cli with all things you want configured +def run(app: App): + # add your routes here + app.get("/", lambda res, req: res.end("Hello World!")) + +# python -m socketify hello_world_cli:run --port 8080 --workers 2 +# python3 -m socketify hello_world_cli:run --port 8080 --workers 2 +# pypy3 -m socketify hello_world_cli:run --port 8080 --workers 2 + +# see options in with: python3 -m socketify --help \ No newline at end of file diff --git a/examples/hello_world_cli_ws.py b/examples/hello_world_cli_ws.py new file mode 100644 index 0000000..43f6145 --- /dev/null +++ b/examples/hello_world_cli_ws.py @@ -0,0 +1,18 @@ +from socketify import App, OpCode + +def run(app: App): + # add your routes here + app.get("/", lambda res, req: res.end("Hello World!")) + + +# cli will use this configuration for serving in "/*" route, you can still use .ws("/*", config) if you want but --ws* options will not have effect +websocket = { + "open": lambda ws: ws.send("Hello World!", OpCode.TEXT), + "message": lambda ws, message, opcode: ws.send(message, opcode), + "close": lambda ws, code, message: print("WebSocket closed"), +} +# python -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket --port 8080 --workers 2 +# python3 -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket--port 8080 --workers 2 +# pypy3 -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket--port 8080 --workers 2 + +# see options in with: python3 -m socketify --help \ No newline at end of file diff --git a/src/socketify/__main__.py b/src/socketify/__main__.py new file mode 100644 index 0000000..ac29a06 --- /dev/null +++ b/src/socketify/__main__.py @@ -0,0 +1,3 @@ +import sys +from .cli import execute +execute(sys.argv) \ No newline at end of file diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 8b2c8dc..1dc2e3b 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,7 +1,7 @@ from socketify import App, CompressOptions, OpCode from queue import SimpleQueue from .native import lib, ffi -import asyncio +import os EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False } @@ -50,6 +50,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): extensions = None else: extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8') + compress = app.ws_compression ws = ASGIWebSocket(app.server.loop) scope = { 'type': 'websocket', @@ -77,10 +78,10 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): data = options.get("bytes", None) if ws.ws: if data: - lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.BINARY)) + lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.BINARY), int(compress), 0) else: data = options.get('text', '').encode('utf8') - lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.TEXT)) + lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0) return True return False if type == 'websocket.accept': # upgrade! @@ -136,9 +137,9 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): if type == 'websocket.publish': # publish extension data = options.get("bytes", None) if data: - app.server.publish(options.get('topic'), data) + app.server.publish(options.get('topic'), data, OpCode.BINARY, compress) else: - app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT) + app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT, compress) return True if type == 'websocket.subscribe': # subscribe extension if ws.ws: @@ -421,15 +422,16 @@ def asgi(ssl, response, info, user_data, aborted): return False app.server.loop.run_async(app.app(scope, receive, send)) -class ASGI: - def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): - self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) +class _ASGI: + def __init__(self, app, options=None, websocket=True, websocket_options=None): + self.server = App(options) self.SERVER_PORT = None self.SERVER_HOST = '' self.SERVER_SCHEME = 'https' if self.server.options else 'http' self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' self.app = app + self.ws_compression = False # optimized in native self._ptr = ffi.new_handle(self) self.asgi_http_info = lib.socketify_add_asgi_http_handler( @@ -438,53 +440,65 @@ class ASGI: asgi, self._ptr ) + self.asgi_ws_info = None + if isinstance(websocket, dict): #serve websocket as socketify.py + if websocket_options: + websocket.update(websocket_options) - native_options = ffi.new("uws_socket_behavior_t *") - native_behavior = native_options[0] - - native_behavior.maxPayloadLength = ffi.cast( - "unsigned int", - 16 * 1024 * 1024, - ) - native_behavior.idleTimeout = ffi.cast( - "unsigned short", - 0, - ) - native_behavior.maxBackpressure = ffi.cast( - "unsigned int", - 1024 * 1024 * 1024, - ) - native_behavior.compression = ffi.cast( - "uws_compress_options_t", 0 - ) - native_behavior.maxLifetime = ffi.cast( - "unsigned short", 0 - ) - native_behavior.closeOnBackpressureLimit = ffi.cast( - "int", 0 - ) - native_behavior.resetIdleTimeoutOnSend = ffi.cast( - "int", 0 - ) - native_behavior.sendPingsAutomatically = ffi.cast( - "int", 0 - ) + self.server.ws("/*", websocket) + elif websocket: #serve websocket as ASGI + + native_options = ffi.new("uws_socket_behavior_t *") + native_behavior = native_options[0] - native_behavior.upgrade = ffi.NULL # will be set first on C++ + if not websocket_options: + websocket_options = {} - native_behavior.open = ws_open - native_behavior.message = ws_message - native_behavior.ping = ffi.NULL - native_behavior.pong = ffi.NULL - native_behavior.close = ws_close - - self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( - self.server.SSL, - self.server.app, - native_behavior, - ws_upgrade, - self._ptr - ) + self.ws_compression = bool(websocket_options.get('compression', False)) + + native_behavior.maxPayloadLength = ffi.cast( + "unsigned int", + int(websocket_options.get('max_payload_length', 16777216)), + ) + native_behavior.idleTimeout = ffi.cast( + "unsigned short", + int(websocket_options.get('idle_timeout', 20)), + ) + native_behavior.maxBackpressure = ffi.cast( + "unsigned int", + int(websocket_options.get('max_backpressure', 16777216)), + ) + native_behavior.compression = ffi.cast( + "uws_compress_options_t", int(self.ws_compression) + ) + native_behavior.maxLifetime = ffi.cast( + "unsigned short", int(websocket_options.get('max_lifetime', 0)) + ) + native_behavior.closeOnBackpressureLimit = ffi.cast( + "int", int(websocket_options.get('close_on_backpressure_limit', 0)), + ) + native_behavior.resetIdleTimeoutOnSend = ffi.cast( + "int", bool(websocket_options.get('reset_idle_timeout_on_send', True)) + ) + native_behavior.sendPingsAutomatically = ffi.cast( + "int", bool(websocket_options.get('send_pings_automatically', True)) + ) + + native_behavior.upgrade = ffi.NULL # will be set first on C++ + + native_behavior.open = ws_open + native_behavior.message = ws_message + native_behavior.ping = ffi.NULL + native_behavior.pong = ffi.NULL + native_behavior.close = ws_close + + self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( + self.server.SSL, + self.server.app, + native_behavior, + ws_upgrade, + self._ptr + ) def listen(self, port_or_options, handler=None): self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port @@ -492,11 +506,47 @@ class ASGI: self.server.listen(port_or_options, handler) return self def run(self): - self.server.run() + self.server.run() # run app on the main process too :) return self def __del__(self): if self.asgi_http_info: lib.socketify_destroy_asgi_app_info(self.asgi_http_info) if self.asgi_ws_info: - lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) \ No newline at end of file + lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) + +# "Public" ASGI interface to allow easy forks/workers +class ASGI: + def __init__(self, app, options=None, websocket=True, websocket_options=None): + self.app = app + self.options = options + self.websocket = websocket + self.websocket_options = websocket_options + self.listen_options = None + + def listen(self, port_or_options, handler=None): + self.listen_options = (port_or_options, handler) + return self + + def run(self, workers=1): + def run_app(): + server = _ASGI(self.app, self.options, self.websocket, self.websocket_options) + if self.listen_options: + (port_or_options, handler) = self.listen_options + server.listen(port_or_options, handler) + server.run() + + + def create_fork(): + n = os.fork() + # n greater than 0 means parent process + if not n > 0: + run_app() + + + # fork limiting the cpu count - 1 + for i in range(1, workers): + create_fork() + + run_app() # run app on the main process too :) + return self \ No newline at end of file diff --git a/src/socketify/cli.py b/src/socketify/cli.py new file mode 100644 index 0000000..b28d493 --- /dev/null +++ b/src/socketify/cli.py @@ -0,0 +1,255 @@ +import inspect +import os +from . import App, AppOptions, AppListenOptions +help = """ +Usage: python -m socketify APP [OPTIONS] + python3 -m socketify APP [OPTIONS] + pypy3 -m socketify APP [OPTIONS] + +Options: + --help Show this Help + --host or -h TEXT Bind socket to this host. [default:127.0.0.1] + --port or -p INTEGER Bind socket to this port. [default: 8000] + --workers or -w INTEGER Number of worker processes. Defaults to the WEB_CONCURRENCY + environment variable if available, or 1 + --ws [auto|none|module:ws] If informed module:ws will auto detect to use socketify.py or ASGI websockets + interface and disabled if informed none [default: auto] + --ws-max-size INTEGER WebSocket max size message in bytes [default: 16777216] + --ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True] + --ws-idle-timeout INT WebSocket idle timeout [default: 20] + --ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True] + --ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True] + --ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0] + --ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216] + --ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False] + --lifespan [auto|on|off] Lifespan implementation. [default: auto] + --interface [auto|asgi|asgi3|wsgi|ssgi|socketify] Select ASGI (same as ASGI3), ASGI3, WSGI or SSGI as the application interface. [default: auto] + --disable-listen-log BOOLEAN Disable log when start listenning [default: False] + --version or -v Display the socketify.py version and exit. + --ssl-keyfile TEXT SSL key file + --ssl-certfile TEXT SSL certificate file + --ssl-keyfile-password TEXT SSL keyfile password + --ssl-ca-certs TEXT CA certificates file + --ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1] + --req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0] + --ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0] + + --uds TEXT Bind to a UNIX domain socket. + --reload Enable auto-reload. This options also disable --workers or -w option. + --reload-dir PATH Set reload directories explicitly, instead of using the current working directory. + --reload-include TEXT Set extensions to include while watching for files. + Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default; + these defaults can be overridden with `--reload-exclude`. + --reload-exclude TEXT Set extensions to include while watching for files. + --reload-delay INT Milliseconds to delay reload between file changes. [default: 1000] + +Example: + python3 -m socketify main:app -w 8 -p 8181 + +""" + +def is_wsgi(module): + return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 2 +def is_asgi(module): + return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3 +def is_ssgi(module): + return False # no spec yet +def is_ssgi(module): + return False # no spec yet + +def is_socketify(module): + return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 1 + +def is_factory(module): + return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0 + +def str_bool(text): + text = text.lower() + return text == "true" + +def load_module(file, reload=False): + try: + [full_module, app] = file.split(':') + import importlib + module =importlib.import_module(full_module) + if reload: + importlib.reload(module) + + app = getattr(module, app) + # if is an factory just auto call + if is_factory(module): + app = app() + return app + except: + return None +def execute(args): + arguments_length = len(args) + if arguments_length <= 2: + if arguments_length == 2 and (args[1] == "--help"): + return print(help) + if arguments_length == 2 and (args[1] == "--version" or args[1] == "-v"): + import pkg_resources # part of setuptools + import platform + version = pkg_resources.require("socketify")[0].version + return print(f"Running socketify {version} with {platform.python_implementation()} {platform.python_version()} on {platform.system()}") + elif arguments_length < 2: + return print(help) + + file = (args[1]).lower() + module = load_module(file) + + if not module: + return print(f"Cannot load module {file}") + + options_list = args[2:] + options = {} + selected_option = None + for option in options_list: + if selected_option: + options[selected_option] = option + selected_option = None + else: + selected_option = option + if selected_option: # --factory + options[selected_option] = True + + interface = (options.get("--interface", "auto")).lower() + + if interface == "auto": + if is_asgi(module): + from . import ASGI as Interface + interface = "asgi" + elif is_wsgi(module): + from . import WSGI as Interface + interface = "wsgi" + elif is_ssgi(module): + from . import SSGI as Interface + interface = "ssgi" + else: + interface = "socketify" + + elif interface == "asgi" or interface == "asgi3": + from . import ASGI as Interface + interface = "asgi" + # you may use ASGI in SSGI so no checks here + if is_wsgi(module): + return print("Cannot use WSGI interface as ASGI interface") + if not is_asgi(module): + return print("ASGI interface must be callable with 3 parameters async def app(scope, receive and send)") + elif interface == "wsgi": + from . import WSGI as Interface + # you may use WSGI in SSGI so no checks here + if is_asgi(module): + return print("Cannot use ASGI interface as WSGI interface") + if not is_wsgi(module): + return print("WSGI interface must be callable with 2 parameters def app(environ, start_response)") + + elif interface == "ssgi": + # if not is_ssgi(module): + # return print("SSGI is in development yet but is comming soon") + # from . import SSGI as Interface + # interface = "ssgi" + return print("SSGI is in development yet but is comming soon") + elif interface != "socketify": + return print(f"{interface} interface is not supported yet") + + workers = int(options.get("--workers", options.get("-w", os.environ.get('WEB_CONCURRENCY', 1)))) + if workers < 1: + workers = 1 + + port = int(options.get("--port", options.get("-p", 8000))) + host = options.get("--host", options.get("-h", "127.0.0.1")) + disable_listen_log = options.get("--disable-listen-log", False) + websockets = options.get("--ws", "auto") + + if websockets == "none": + # disable websockets + websockets = None + elif websockets == "auto": + # if is ASGI serve websocket by default + if is_asgi(module): + websockets = True + elif is_wsgi(module): + # if is WSGI no websockets using auto + websockets = False + else: # if is socketify websockets must be set in app + websockets = False + else: + #websocket dedicated module + ws_module = load_module(websockets) + if not ws_module: + return print(f"Cannot load websocket module {websockets}") + websockets = ws_module + + + key_file_name = options.get('--ssl-keyfile', None) + + if key_file_name: + ssl_options = AppOptions( + key_file_name=options.get('--ssl-keyfile', None), + cert_file_name=options.get('--ssl-certfile', None), + passphrase=options.get('--ssl-keyfile-password', None), + ca_file_name=options.get('--ssl-ca-certs', None), + ssl_ciphers=options.get('--ssl-ciphers', None) + ) + else: + ssl_options = None + + def listen_log(config): + if not disable_listen_log: + print(f"Listening on {'https' if ssl_options else 'http'}://{config.host if config.host and len(config.host) > 1 else '127.0.0.1' }:{config.port} now\n") + + if websockets: + websocket_options = { + 'compression': int(1 if options.get('--ws-per-message-deflate', True) else 0), + 'max_payload_length': int(options.get('--ws-max-size', 16777216)), + 'idle_timeout': int(options.get('--ws-idle-timeout', 20)), + 'send_pings_automatically': str_bool(options.get('--ws-auto-ping', True)), + 'reset_idle_timeout_on_send': str_bool(options.get('--ws-reset-idle-on-send', True)), + 'max_lifetime': int(options.get('--ws-max-lifetime', 0)), + 'max_backpressure': int(options.get('--max-backpressure', 16777216)), + 'close_on_backpressure_limit': str_bool(options.get('--ws-close-on-backpressure-limit', False)) + } + else: + websocket_options = None + + if interface == "socketify": + if is_asgi(websockets): + return print("Cannot mix ASGI websockets with socketify.py interface yet") + if is_asgi(module): + return print("Cannot use ASGI interface as socketify interface") + elif is_wsgi(module): + return print("Cannot use WSGI interface as socketify interface") + elif not is_socketify(module): + return print("socketify interface must be callable with 1 parameter def run(app: App)") + # run app with the settings desired + def run_app(): + fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0))) + module(fork_app) # call module factory + + if websockets: # if socketify websockets are added using --ws in socketify interface we can set here + websockets.update(websocket_options) # set websocket options + fork_app.ws("/*", websockets) + fork_app.listen(AppListenOptions(port=port, host=host), listen_log) + fork_app.run() + + # now we can start all over again + def create_fork(_): + n = os.fork() + # n greater than 0 means parent process + if not n > 0: + run_app() + return n + + # run in all forks + pid_list = list(map(create_fork, range(1, workers))) + + # run in this process + run_app() + # sigint everything to gracefull shutdown + import signal + for pid in pid_list: + os.kill(pid, signal.SIGINT) + else: + #Generic WSGI, ASGI, SSGI Interface + Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers) \ No newline at end of file diff --git a/src/socketify/libsocketify_linux_amd64.so b/src/socketify/libsocketify_linux_amd64.so index 3226cf5..150f19a 100755 Binary files a/src/socketify/libsocketify_linux_amd64.so and b/src/socketify/libsocketify_linux_amd64.so differ diff --git a/src/socketify/loop.py b/src/socketify/loop.py index f379e72..9eb32f6 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -65,9 +65,10 @@ class Loop: def stop(self): - # Just mark as started = False and wait - self.started = False - self.loop.stop() + if self.started: + # Just mark as started = False and wait + self.started = False + self.loop.stop() # Exposes native loop for uWS def get_native_loop(self): @@ -87,6 +88,10 @@ class Loop: return future + def dispose(self): + if self.uv_loop: + self.uv_loop.dispose() + self.uv_loop = None # if sys.version_info >= (3, 11) # with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: diff --git a/src/socketify/native.py b/src/socketify/native.py index 8c4bcf0..8703895 100644 --- a/src/socketify/native.py +++ b/src/socketify/native.py @@ -376,6 +376,9 @@ void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app); void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length); void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection); void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode); + + +void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool close_connection); """ ) diff --git a/src/socketify/native/src/libsocketify.cpp b/src/socketify/native/src/libsocketify.cpp index 86136ab..201da73 100644 --- a/src/socketify/native/src/libsocketify.cpp +++ b/src/socketify/native/src/libsocketify.cpp @@ -329,6 +329,23 @@ void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size } +void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool fin){ + if (ssl) + { + uWS::WebSocket *uws = (uWS::WebSocket *)ws; + uws->cork([&](){ + uws->send(std::string_view(data, length), (uWS::OpCode)(unsigned char) opcode, compress, fin); + }); + } + else + { + uWS::WebSocket *uws = (uWS::WebSocket *)ws; + uws->cork([&](){ + uws->send(std::string_view(data, length), (uWS::OpCode)(unsigned char) opcode, compress, fin); + }); + } +} + socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data){ socksocketify_asgi_ws_app_info* info = (socksocketify_asgi_ws_app_info*)malloc(sizeof(socksocketify_asgi_ws_app_info)); diff --git a/src/socketify/native/src/libsocketify.h b/src/socketify/native/src/libsocketify.h index a766cf7..e5f0b75 100644 --- a/src/socketify/native/src/libsocketify.h +++ b/src/socketify/native/src/libsocketify.h @@ -137,6 +137,7 @@ DLL_EXPORT void socketify_res_cork_end(int ssl, uws_res_t *response, const char* DLL_EXPORT socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data); DLL_EXPORT void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app); DLL_EXPORT void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode); +DLL_EXPORT void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool fin); #endif #ifdef __cplusplus } diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index d2ffece..7fdab4f 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -540,12 +540,13 @@ def uws_generic_listen_handler(listen_socket, config, user_data): app.socket = listen_socket app._listen_handler( None + if config == ffi.NULL else AppListenOptions( port=int(config.port), host=None if config.host == ffi.NULL - else ffi.string(config.host).decode("utf-8"), + else ffi.string(config.host).decode("utf8"), options=int(config.options), ) ) @@ -2261,7 +2262,8 @@ class App: if hasattr(self, "socket"): if not self.socket == ffi.NULL: lib.us_listen_socket_close(self.SSL, self.socket) - self.loop.stop() + self.socket = ffi.NULL + self.loop.stop() return self def set_error_handler(self, handler): @@ -2297,9 +2299,24 @@ class App: finally: pass + def dispose(self): + if self.app: #only destroy if exists + self.close() + lib.uws_app_destroy(self.SSL, self.app) + self.app = None + + if self.loop: + self.loop.dispose() + self.loop = None + def __del__(self): if self.app: #only destroy if exists + self.close() lib.uws_app_destroy(self.SSL, self.app) + if self.loop: + self.loop.dispose() + self.loop = None + class AppListenOptions: @@ -2348,3 +2365,4 @@ class AppOptions: self.ca_file_name = ca_file_name self.ssl_ciphers = ssl_ciphers self.ssl_prefer_low_memory_usage = ssl_prefer_low_memory_usage + diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index 8fef69b..5773238 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit 8fef69b3a3903b771839ad6ea4a0c9bd3c009a9e +Subproject commit 5773238f673348d560e769b10a424c6dbe598730 diff --git a/src/socketify/uv.py b/src/socketify/uv.py index 18bc6cf..ada95b5 100644 --- a/src/socketify/uv.py +++ b/src/socketify/uv.py @@ -70,9 +70,17 @@ class UVLoop: def get_native_loop(self): return lib.socketify_get_native_loop(self._loop) + def dispose(self): + if self._loop != ffi.NULL: + lib.socketify_destroy_loop(self._loop) + self._handler_data = None + self._loop = ffi.NULL + def __del__(self): - lib.socketify_destroy_loop(self._loop) - self._handler_data = None + if self._loop != ffi.NULL: + lib.socketify_destroy_loop(self._loop) + self._handler_data = None + self._loop = ffi.NULL def run_nowait(self): return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT) diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index f737679..e6bc7d4 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -1,7 +1,7 @@ - - import os +import inspect from socketify import App +from .asgi import ws_close, ws_upgrade, ws_open, ws_message from io import BytesIO from .native import lib, ffi @@ -132,12 +132,17 @@ def wsgi(ssl, response, info, user_data, aborted): app_iter.close() lib.uws_res_end_without_body(ssl, response, 0) -class WSGI: - def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): - self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) + +class _WSGI: + + def __init__(self, app, options=None, websocket=None, websocket_options=None): + self.server = App(options) + self.SERVER_HOST = None self.SERVER_PORT = None + self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' self.app = app self.BASIC_ENVIRON = dict(os.environ) + self.ws_compression = False self._ptr = ffi.new_handle(self) self.asgi_http_info = lib.socketify_add_asgi_http_handler( @@ -146,9 +151,70 @@ class WSGI: wsgi, self._ptr ) + self.asgi_ws_info = None + if isinstance(websocket, dict): #serve websocket as socketify.py + if websocket_options: + websocket.update(websocket_options) + + self.server.ws("/*", websocket) + elif inspect.iscoroutine(websocket): + # detect ASGI to use as WebSocket as mixed protocol + native_options = ffi.new("uws_socket_behavior_t *") + native_behavior = native_options[0] + + if not websocket_options: + websocket_options = {} + + self.ws_compression = websocket_options.get('compression', False) + + native_behavior.maxPayloadLength = ffi.cast( + "unsigned int", + int(websocket_options.get('max_payload_length', 16777216)), + ) + native_behavior.idleTimeout = ffi.cast( + "unsigned short", + int(websocket_options.get('idle_timeout', 20)), + ) + native_behavior.maxBackpressure = ffi.cast( + "unsigned int", + int(websocket_options.get('max_backpressure', 16777216)), + ) + native_behavior.compression = ffi.cast( + "uws_compress_options_t", int(self.ws_compression) + ) + native_behavior.maxLifetime = ffi.cast( + "unsigned short", int(websocket_options.get('max_lifetime', 0)) + ) + native_behavior.closeOnBackpressureLimit = ffi.cast( + "int", int(websocket_options.get('close_on_backpressure_limit', 0)), + ) + native_behavior.resetIdleTimeoutOnSend = ffi.cast( + "int", bool(websocket_options.get('reset_idle_timeout_on_send', True)) + ) + native_behavior.sendPingsAutomatically = ffi.cast( + "int", bool(websocket_options.get('send_pings_automatically', True)) + ) + + native_behavior.upgrade = ffi.NULL # will be set first on C++ + + native_behavior.open = ws_open + native_behavior.message = ws_message + native_behavior.ping = ffi.NULL + native_behavior.pong = ffi.NULL + native_behavior.close = ws_close + + self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( + self.server.SSL, + self.server.app, + native_behavior, + ws_upgrade, + self._ptr + ) + def listen(self, port_or_options, handler=None): self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port + self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host self.BASIC_ENVIRON.update({ 'GATEWAY_INTERFACE': 'CGI/1.1', 'SERVER_PORT': str(self.SERVER_PORT), @@ -170,3 +236,46 @@ class WSGI: def run(self): self.server.run() return self + + def __del__(self): + if self.asgi_http_info: + lib.socketify_destroy_asgi_app_info(self.asgi_http_info) + if self.asgi_ws_info: + lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) + +# "Public" WSGI interface to allow easy forks/workers +class WSGI: + def __init__(self, app, options=None, websocket=None, websocket_options=None): + self.app = app + self.options = options + self.websocket = websocket + self.websocket_options = websocket_options + self.listen_options = None + + + def listen(self, port_or_options, handler=None): + self.listen_options = (port_or_options, handler) + return self + + def run(self, workers=1): + def run_app(): + server = _WSGI(self.app, self.options, self.websocket, self.websocket_options) + if self.listen_options: + (port_or_options, handler) = self.listen_options + server.listen(port_or_options, handler) + server.run() + + + def create_fork(): + n = os.fork() + # n greater than 0 means parent process + if not n > 0: + run_app() + + + # fork limiting the cpu count - 1 + for i in range(1, workers): + create_fork() + + run_app() # run app on the main process too :) + return self \ No newline at end of file