From cb6c8334a16719be2a6e63fa95e4ce3071103914 Mon Sep 17 00:00:00 2001 From: Ciro Date: Sun, 8 Jan 2023 18:49:16 -0300 Subject: [PATCH] on_start/on_shutdown lifespan events #58 --- docs/api.md | 5 ++- docs/basics.md | 33 +++++++++++++++++ src/socketify/cli.py | 3 +- src/socketify/loop.py | 2 +- src/socketify/socketify.py | 45 +++++++++++++++++++++++ src/tests.py | 73 ++++++++------------------------------ 6 files changed, 98 insertions(+), 63 deletions(-) diff --git a/docs/api.md b/docs/api.md index 294c4b2..7122835 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2,7 +2,10 @@ ```python class App: - def __init__(self, options=None): + def __init__(self, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0, task_factory_max_items=100_000, lifespan=True): + + def on_start(self, method: callable): + def on_shutdown(self, method: callable): def router(self, prefix: str="", *middlewares): def register(self, extension): def template(self, template_engine): diff --git a/docs/basics.md b/docs/basics.md index 00de1d8..1c2d692 100644 --- a/docs/basics.md +++ b/docs/basics.md @@ -233,4 +233,37 @@ If you need to access the raw pointer of `libuv` you can use `app.get_native_han HttpRequest object being stack-allocated and only valid in one single callback invocation so only valid in the first "segment" before the first await. If you just want to preserve headers, url, method, cookies and query string you can use `req.preserve()` to copy all data and keep it in the request object, but will be some performance penalty. + +# Lifespan / Lifecycle events +You can use socketify start and shutdown events to create/clean thread pools, connections pools, etc when the application starts or shutdown itself. + +If any exception occurs in the start event the application will continue and start normally, +if you want to fail a start you need to catch the exception and use `sys.exit(1)` to shut down prematurely. + +Both `app.on_start` and `app.on_shutdown` can be sync or async. + + +```python +from socketify import App + +def run(app: App) + @app.on_start + async def on_start(): + print("wait...") + await asyncio.sleep(1) + print("start!") + + @app.on_shutdown + async def on_shutdown(): + print("wait...") + await asyncio.sleep(1) + print("shutdown!") + + router = app.router() + + @router.get("/") + def home(res, req): + res.send("Hello, World!") + +``` ### Next [Upload and Post](upload-post.md) \ No newline at end of file diff --git a/src/socketify/cli.py b/src/socketify/cli.py index 6cecc96..45ced5e 100644 --- a/src/socketify/cli.py +++ b/src/socketify/cli.py @@ -241,8 +241,7 @@ def execute(args): return print("socketify interface must be callable with 1 parameter def run(app: App)") # run app with the settings desired def run_app(): - # Add lifespan when lifespan hooks are implemented - fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems) + fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems, lifespan) module(fork_app) # call module factory if websockets: # if socketify websockets are added using --ws in socketify interface we can set here diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 5dc782d..fc7e405 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -86,7 +86,7 @@ class Loop: else: future = None self.loop.call_soon(self._keep_alive) - self.loop.run_until_complete() + self.loop.run_until_complete(future) # clean up uvloop self.uv_loop.stop() return future diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 0874971..46bc521 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -2476,11 +2476,13 @@ class App: request_response_factory_max_items=0, websocket_factory_max_items=0, task_factory_max_items=100_000, + lifespan=True ): socket_options_ptr = ffi.new("struct us_socket_context_options_t *") socket_options = socket_options_ptr[0] self.options = options self._template = None + self.lifespan = lifespan # keep socket data alive for CFFI self._socket_refs = {} if options is not None: @@ -2558,7 +2560,16 @@ class App: self._request_extension = None self._response_extension = None self._ws_extension = None + self._on_start_handler = None + def on_start(self, method: callable): + self._on_start_handler = method + return method + + def on_shutdown(self, method: callable): + self._on_shutdown_handler = method + return method + def router(self, prefix: str = "", *middlewares): return DecoratorRouter(self, prefix, middlewares) @@ -3115,6 +3126,24 @@ class App: return self def listen(self, port_or_options=None, handler=None): + if self.lifespan: + async def task_wrapper(task): + try: + if inspect.iscoroutinefunction(task): + await task() + else: + task() + except Exception as error: + try: + self.trigger_error(error, None, None) + finally: + return None + + # start lifespan + if self._on_start_handler: + self.loop.run_until_complete(task_wrapper(self._on_start_handler)) + + # actual listen to server self._listen_handler = handler if port_or_options is None: lib.uws_app_listen( @@ -3198,6 +3227,22 @@ class App: signal.signal(signal.SIGINT, lambda sig, frame: self.close()) self.loop.run() + if self.lifespan: + async def task_wrapper(task): + try: + if inspect.iscoroutinefunction(task): + await task() + else: + task() + except Exception as error: + try: + self.trigger_error(error, None, None) + finally: + return None + # shutdown lifespan + if self._on_shutdown_handler: + self.loop.run_until_complete(task_wrapper(self._on_shutdown_handler)) + return self def close(self): diff --git a/src/tests.py b/src/tests.py index 2f8e12c..79bd5f7 100644 --- a/src/tests.py +++ b/src/tests.py @@ -1,31 +1,19 @@ -from socketify import App, CompressOptions, OpCode +from socketify import App +import asyncio +app = App(lifespan=False) +router = app.router() -app = App() +@app.on_start +async def on_start(): + print("wait...") + await asyncio.sleep(1) + print("start!") -def extension(request, response, ws): - - @request.method - async def get_user(self): - token = self.get_header("token") - self.token = token - return { "name": "Test" } if token else { "name", "Anonymous" } - - @request.method - async def get_cart(self): - return [{ "quantity": 10, "name": "T-Shirt" }] - - request.property("token", "testing") - -# extensions must be registered before routes -app.register(extension) - -def auth_middleware(res, req, data): - token = req.get_query("token") - print("token?", token) - req.token = token - return { "name": "Test" } if token else { "name", "Anonymous" } - -router = app.router("") +@app.on_shutdown +async def on_shutdown(): + print("wait...") + await asyncio.sleep(1) + print("shutdown!") @router.get("/") def home(res, req, data=None): @@ -39,42 +27,9 @@ def home(res, req, data=None): res.send({"Hello": "World!"}, headers=(("X-Rate-Limit-Remaining", "10"), (b'Another-Headers', b'Value'))) -def ws_open(ws): - print("A WebSocket got connected!") - ws.send("Hello World!", OpCode.TEXT) - - -def ws_message(ws, message, opcode): - print(message, opcode) - # Ok is false if backpressure was built up, wait for drain - ok = ws.send(message, opcode) - - -app.ws( - "/*", - { - "compression": CompressOptions.SHARED_COMPRESSOR, - "max_payload_length": 16 * 1024 * 1024, - "idle_timeout": 12, - "open": ws_open, - "message": ws_message, - "drain": lambda ws: print( - "WebSocket backpressure: %s", ws.get_buffered_amount() - ), - "close": lambda ws, code, message: print("WebSocket closed"), - }, -) app.listen( 3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port), ) app.run() - -# uws_websocket_drain_handler -# uws_websocket_subscription_handler -# uws_websocket_open_handler -# uws_websocket_message_handler -# uws_websocket_pong_handler -# uws_websocket_ping_handler -# uws_websocket_close_handler \ No newline at end of file