diff --git a/bench/asgi_wsgi/falcon-mixed.py b/bench/asgi_wsgi/falcon-mixed.py new file mode 100644 index 0000000..cfe3d4b --- /dev/null +++ b/bench/asgi_wsgi/falcon-mixed.py @@ -0,0 +1,36 @@ +import falcon +import falcon.asgi + +class Home: + def on_get(self, req, resp): + resp.status = falcon.HTTP_200 # This is the default status + resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override + resp.text = "Hello, World!" + +class WebSocket: + async def on_get(self, req, resp): + resp.status = falcon.HTTP_200 # This is the default status + resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override + resp.text = "Connect via ws protocol!" + + async def on_websocket(self, req, ws): + try: + await ws.accept() + while True: + payload = await ws.receive_text() + if payload: + await ws.send_text(payload) + + except falcon.WebSocketDisconnected: + print("Disconnected!") + + + +# falcon WSGI APP +app = falcon.App() +home = Home() +app.add_route("/", home) + +# ASGI WebSockets Falcon APP +ws = falcon.asgi.App() +ws.add_route("/", WebSocket()) \ No newline at end of file diff --git a/bench/asgi_wsgi/falcon-ws-echo.py b/bench/asgi_wsgi/falcon-ws-echo.py index dbb701d..f744c41 100644 --- a/bench/asgi_wsgi/falcon-ws-echo.py +++ b/bench/asgi_wsgi/falcon-ws-echo.py @@ -22,6 +22,8 @@ class SomeResource: app = falcon.asgi.App() +app.ws_options.max_receive_queue = 20_000_000 # this virtual disables queue but adds overhead +app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon app.add_route("/", SomeResource()) # python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker # pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker diff --git a/docs/cli.md b/docs/cli.md index 50d6d72..e9f365f 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -24,7 +24,7 @@ Options: --ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True] --ws-idle-timeout INT WebSocket idle timeout [default: 20] --ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True] - --ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True] + --ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: False] --ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0] --ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216] --ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False] @@ -39,7 +39,7 @@ Options: --ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1] --req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0] --ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0] - + --task-factory-maxitems INT Pre allocated instances of Task objects for socketify, ASGI interface [default: 100000] Example: python3 -m socketify main:app -w 8 -p 8181 @@ -59,10 +59,19 @@ def run(app: App): WebSockets can be in the same or another module, you can still use .ws("/*) to serve Websockets ```bash - python3 -m socketify hello_world_cli:run --ws hello_world_cli:websocket --port 8080 --workers 2 + python3 -m socketify hello_world_cli:run --ws hello_world_cli:ws --port 8080 --workers 2 ``` + +Socketify.py hello world + websockets: + ```python - websocket = { +from socketify import App, OpCode +# App will be created by the cli with all things you want configured +def run(app: App): + # add your routes here + app.get("/", lambda res, req: res.end("Hello World!")) + + ws = { "open": lambda ws: ws.send("Hello World!", OpCode.TEXT), "message": lambda ws, message, opcode: ws.send(message, opcode), "close": lambda ws, code, message: print("WebSocket closed"), @@ -71,13 +80,76 @@ def run(app: App): When running ASGI websocket will be served by default, but you can disabled it ```bash - python3 -m socketify falcon_asgi:app --ws none --port 8080 --workers 2 + python3 -m socketify falcon_asgi:app --ws none --port 8080 --workers 2 ``` - When running WSGI or ASGI you can still use socketify.py or ASGI websockets in the same server, mixing all available methods +When running WSGI or ASGI you can still use socketify.py or ASGI websockets in the same server, mixing all available methods You can use WSGI to more throughput in HTTP and use ASGI for websockets for example or you can use ASGI/WSGI for HTTP to keep compatibility and just re-write the websockets to use socketify interface with pub/sub and all features ```bash - python3 -m socketify falcon_wsgi:app --ws falcon:ws none --port 8080 --workers 2 + python3 -m socketify falcon_wsgi:app --ws falcon_wsgi:ws --port 8080 --workers 2 ``` - + +Falcon WSGI + socketify websocket code sample + ```python +import falcon +from socketify import OpCode + +class Home: + def on_get(self, req, resp): + resp.status = falcon.HTTP_200 # This is the default status + resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override + resp.text = "Hello, World!" + +# falcon APP +app = falcon.App() +home = Home() +app.add_route("/", home) + +# socketify websocket app +ws = { + "open": lambda ws: ws.send("Hello World!", OpCode.TEXT), + "message": lambda ws, message, opcode: ws.send(message, opcode), + "close": lambda ws, code, message: print("WebSocket closed"), +} +``` + +Mixing ASGI websockets + WSGI HTTP + + ```bash + python3 -m socketify main:app --ws main:ws --port 8080 --workers 2 + ``` + +```python +import falcon +import falcon.asgi + +class Home: + def on_get(self, req, resp): + resp.status = falcon.HTTP_200 # This is the default status + resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override + resp.text = "Hello, World!" + +class WebSocket: + async def on_websocket(self, req, ws): + try: + await ws.accept() + while True: + payload = await ws.receive_text() + if payload: + await ws.send_text(payload) + + except falcon.WebSocketDisconnected: + print("Disconnected!") + + + +# falcon WSGI APP +app = falcon.App() +home = Home() +app.add_route("/", home) + +# ASGI WebSockets Falcon APP +ws = falcon.asgi.App() +ws.add_route("/", WebSocket()) +``` ### Next [API Reference](api.md) \ No newline at end of file diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 4ab4ef2..ecb4789 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,12 +1,22 @@ -from socketify import App, CompressOptions, OpCode +from socketify import App, OpCode from queue import SimpleQueue from .native import lib, ffi from .tasks import create_task, create_task_with_factory import os import platform import sys - +import logging +import uuid is_pypy = platform.python_implementation() == "PyPy" +async def task_wrapper(task): + try: + return await task + except Exception as error: + try: + # just log in console the error to call attention + logging.error("Uncaught Exception: %s" % str(error)) + finally: + return None EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False} @@ -19,7 +29,7 @@ def ws_message(ws, message, length, opcode, user_data): message = message.decode("utf8") socket_data.message(ws, message, OpCode(opcode)) - + @ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") def ws_close(ws, code, message, length, user_data): @@ -68,6 +78,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): extensions = ffi.unpack(info.extensions, info.extensions_size).decode("utf8") compress = app.ws_compression ws = ASGIWebSocket(app.server.loop) + scope = { "type": "websocket", "asgi": {"version": "3.0", "spec_version": "2.3"}, @@ -107,12 +118,12 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): len(data), int(OpCode.BINARY), int(compress), - 0, + 1, ) else: data = options.get("text", "").encode("utf8") lib.socketify_ws_cork_send_with_options( - ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0 + ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 1 ) return True return False @@ -147,7 +158,12 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): sec_web_socket_extensions_data = extensions else: sec_web_socket_extensions_data = b"" - + _id = uuid.uuid4() + + app.server._socket_refs[_id] = ws + def unregister(): + app.server._socket_refs.pop(_id, None) + ws.unregister = unregister lib.uws_res_upgrade( ssl, response, @@ -249,6 +265,7 @@ class ASGIWebSocket: self._code = None self._message = None self._ptr = ffi.new_handle(self) + self.unregister = None def accept(self): self.accept_future = self.loop.create_future() @@ -287,6 +304,8 @@ class ASGIWebSocket: future.set_result( {"type": "websocket.disconnect", "code": code, "message": message} ) + if self.unregister is not None: + self.unregister() def message(self, ws, value, opcode): self.ws = ws @@ -478,7 +497,7 @@ def asgi(ssl, response, info, user_data, aborted): class _ASGI: - def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=0): + def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=100_000): self.server = App(options) self.SERVER_PORT = None self.SERVER_HOST = "" @@ -493,24 +512,26 @@ class _ASGI: factory = create_task_with_factory(task_factory_max_items) def run_task(task): - factory(loop, task) + factory(loop, task_wrapper(task)) loop._run_once() self._run_task = run_task else: def run_task(task): - create_task(loop, task) + create_task(loop, task_wrapper(task)) loop._run_once() self._run_task = run_task else: if sys.version_info >= (3, 8): # name fixed to avoid dynamic name def run_task(task): - loop.create_task(task, name='socketify.py-request-task') + future = loop.create_task(task_wrapper(task), name='socketify.py-request-task') + future._log_destroy_pending = False loop._run_once() self._run_task = run_task else: def run_task(task): - loop.create_task(task) + future = loop.create_task(task_wrapper(task)) + future._log_destroy_pending = False loop._run_once() self._run_task = run_task diff --git a/src/socketify/cli.py b/src/socketify/cli.py index 289abc8..3558fe9 100644 --- a/src/socketify/cli.py +++ b/src/socketify/cli.py @@ -20,7 +20,7 @@ Options: --ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True] --ws-idle-timeout INT WebSocket idle timeout [default: 20] --ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True] - --ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True] + --ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: False] --ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0] --ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216] --ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False] @@ -35,6 +35,7 @@ Options: --ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1] --req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0] --ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0] + --task-factory-maxitems INT Pre allocated instances of Task objects for socketify, ASGI interface [default: 100000] Example: python3 -m socketify main:app -w 8 -p 8181 @@ -64,7 +65,7 @@ def is_factory(module): return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0 def str_bool(text): - text = text.lower() + text = str(text).lower() return text == "true" def load_module(file, reload=False): @@ -166,7 +167,7 @@ def execute(args): port = int(options.get("--port", options.get("-p", 8000))) host = options.get("--host", options.get("-h", "127.0.0.1")) uds = options.get('--uds', None) - + task_factory_maxitems = int(options.get("--task-factory-maxitems", 100000)) disable_listen_log = options.get("--disable-listen-log", False) websockets = options.get("--ws", "auto") @@ -213,7 +214,7 @@ def execute(args): if websockets: websocket_options = { - 'compression': int(1 if options.get('--ws-per-message-deflate', True) else 0), + 'compression': int(1 if options.get('--ws-per-message-deflate', False) else 0), 'max_payload_length': int(options.get('--ws-max-size', 16777216)), 'idle_timeout': int(options.get('--ws-idle-timeout', 20)), 'send_pings_automatically': str_bool(options.get('--ws-auto-ping', True)), @@ -236,7 +237,7 @@ def execute(args): return print("socketify interface must be callable with 1 parameter def run(app: App)") # run app with the settings desired def run_app(): - fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0))) + fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems) module(fork_app) # call module factory if websockets: # if socketify websockets are added using --ws in socketify interface we can set here @@ -268,6 +269,6 @@ def execute(args): else: if uds: - Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers) + Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers) else: - Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers) + Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers) diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 76d771c..2a1af45 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -346,7 +346,7 @@ def uws_websocket_factory_close_handler(ws, code, message, length, user_data): if inspect.iscoroutinefunction(handler): async def wrapper(app, instances, handler, ws, data, code, dispose): try: - await handler(ws, code, data) + return await handler(ws, code, data) finally: key = ws.get_user_data_uuid() if key is not None: @@ -389,7 +389,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): if inspect.iscoroutinefunction(handler): async def wrapper(app, handler, ws, data, code, dispose): try: - await handler(ws, code, data) + return await handler(ws, code, data) finally: key = ws.get_user_data_uuid() if key is not None: @@ -473,7 +473,7 @@ def uws_websocket_factory_upgrade_handler(res, req, context, user_data): def uws_websocket_upgrade_handler(res, req, context, user_data): if user_data != ffi.NULL: handlers, app = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL, app._template) + response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs) request = AppRequest(req) try: handler = handlers.upgrade @@ -548,7 +548,7 @@ def uws_generic_factory_method_handler(res, req, user_data): def uws_generic_method_handler(res, req, user_data): if user_data != ffi.NULL: (handler, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, app.SSL, app._template) + response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs) request = AppRequest(req) try: @@ -1039,13 +1039,13 @@ class RequestResponseFactory: def __init__(self, app, max_size): self.factory_queue = [] for _ in range(0, max_size): - response = AppResponse(None, app.loop, app.SSL, app._template) + response = AppResponse(None, app.loop, app.SSL, app._template, app._socket_refs) request = AppRequest(None) self.factory_queue.append((response, request, True)) def get(self, app, res, req): if len(self.factory_queue) == 0: - response = AppResponse(res, app.loop, app.SSL, app._template) + response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs) request = AppRequest(req) return response, request, False @@ -1310,9 +1310,10 @@ class AppRequest: class AppResponse: - def __init__(self, response, loop, ssl, render=None): + def __init__(self, response, loop, ssl, render, socket_refs): self.res = response self.SSL = ssl + self._socket_refs = socket_refs self.aborted = False self.loop = loop self._aborted_handler = None @@ -1764,7 +1765,7 @@ class AppResponse: _id = uuid.uuid4() user_data_ptr = ffi.new_handle((user_data, _id)) # keep alive data - SocketRefs[_id] = user_data_ptr + self._socket_refs[_id] = user_data_ptr lib.uws_res_upgrade( self.SSL, diff --git a/src/socketify/uv.py b/src/socketify/uv.py index ada95b5..69248c7 100644 --- a/src/socketify/uv.py +++ b/src/socketify/uv.py @@ -83,13 +83,17 @@ class UVLoop: self._loop = ffi.NULL def run_nowait(self): - return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT) + if self._loop != ffi.NULL: + return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT) def run(self): - return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT) + if self._loop != ffi.NULL: + return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT) def run_once(self): - return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE) + if self._loop != ffi.NULL: + return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE) def stop(self): - lib.socketify_loop_stop(self._loop) + if self._loop != ffi.NULL: + lib.socketify_loop_stop(self._loop) diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index f7b9090..1f047ad 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -4,7 +4,10 @@ from socketify import App from .asgi import ws_close, ws_upgrade, ws_open, ws_message from io import BytesIO from .native import lib, ffi - +import platform +is_pypy = platform.python_implementation() == "PyPy" +from .tasks import create_task, create_task_with_factory +import sys @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): @@ -124,7 +127,7 @@ def wsgi(ssl, response, info, user_data, aborted): return ssl = data_response.app.server.SSL - app_iter = data_response.app.app( + app_iter = data_response.app.wsgi( data_response.environ, data_response.start_response ) try: @@ -148,7 +151,7 @@ def wsgi(ssl, response, info, user_data, aborted): lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr) else: environ["wsgi.input"] = None - app_iter = app.app(environ, start_response) + app_iter = app.wsgi(environ, start_response) try: for data in app_iter: if isinstance(data, bytes): @@ -161,14 +164,16 @@ def wsgi(ssl, response, info, user_data, aborted): app_iter.close() lib.uws_res_end_without_body(ssl, response, 0) +def is_asgi(module): + return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3 class _WSGI: - def __init__(self, app, options=None, websocket=None, websocket_options=None): + def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000): self.server = App(options) self.SERVER_HOST = None self.SERVER_PORT = None self.SERVER_WS_SCHEME = "wss" if self.server.options else "ws" - self.app = app + self.wsgi = app self.BASIC_ENVIRON = dict(os.environ) self.ws_compression = False @@ -177,12 +182,43 @@ class _WSGI: self.server.SSL, self.server.app, wsgi, self._ptr ) self.asgi_ws_info = None + if isinstance(websocket, dict): # serve websocket as socketify.py if websocket_options: websocket.update(websocket_options) self.server.ws("/*", websocket) - elif inspect.iscoroutine(websocket): + elif is_asgi(websocket): + self.app = websocket # set ASGI app + loop = self.server.loop.loop + # ASGI do not use app.run_async to not add any overhead from socketify.py WebFramework + # internally will still use custom task factory for pypy because of Loop + if is_pypy: + if task_factory_max_items > 0: + factory = create_task_with_factory(task_factory_max_items) + + def run_task(task): + factory(loop, task) + loop._run_once() + self._run_task = run_task + else: + def run_task(task): + create_task(loop, task) + loop._run_once() + self._run_task = run_task + + else: + if sys.version_info >= (3, 8): # name fixed to avoid dynamic name + def run_task(task): + loop.create_task(task, name='socketify.py-request-task') + loop._run_once() + self._run_task = run_task + else: + def run_task(task): + loop.create_task(task) + loop._run_once() + self._run_task = run_task + # detect ASGI to use as WebSocket as mixed protocol native_options = ffi.new("uws_socket_behavior_t *") native_behavior = native_options[0] @@ -228,6 +264,7 @@ class _WSGI: native_behavior.ping = ffi.NULL native_behavior.pong = ffi.NULL native_behavior.close = ws_close + self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr @@ -276,12 +313,13 @@ class _WSGI: # "Public" WSGI interface to allow easy forks/workers class WSGI: - def __init__(self, app, options=None, websocket=None, websocket_options=None): + def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000): self.app = app self.options = options self.websocket = websocket self.websocket_options = websocket_options self.listen_options = None + self.task_factory_max_items = task_factory_max_items def listen(self, port_or_options, handler=None): self.listen_options = (port_or_options, handler) @@ -290,7 +328,7 @@ class WSGI: def run(self, workers=1): def run_app(): server = _WSGI( - self.app, self.options, self.websocket, self.websocket_options + self.app, self.options, self.websocket, self.websocket_options, self.task_factory_max_items ) if self.listen_options: (port_or_options, handler) = self.listen_options