on_start/on_shutdown lifespan events #58

pull/84/head
Ciro 2023-01-08 18:49:16 -03:00
rodzic 99f54d900d
commit cb6c8334a1
6 zmienionych plików z 98 dodań i 63 usunięć

Wyświetl plik

@ -2,7 +2,10 @@
```python
class App:
def __init__(self, options=None):
def __init__(self, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0, task_factory_max_items=100_000, lifespan=True):
def on_start(self, method: callable):
def on_shutdown(self, method: callable):
def router(self, prefix: str="", *middlewares):
def register(self, extension):
def template(self, template_engine):

Wyświetl plik

@ -233,4 +233,37 @@ If you need to access the raw pointer of `libuv` you can use `app.get_native_han
HttpRequest object being stack-allocated and only valid in one single callback invocation so only valid in the first "segment" before the first await.
If you just want to preserve headers, url, method, cookies and query string you can use `req.preserve()` to copy all data and keep it in the request object, but will be some performance penalty.
# Lifespan / Lifecycle events
You can use socketify start and shutdown events to create/clean thread pools, connections pools, etc when the application starts or shutdown itself.
If any exception occurs in the start event the application will continue and start normally,
if you want to fail a start you need to catch the exception and use `sys.exit(1)` to shut down prematurely.
Both `app.on_start` and `app.on_shutdown` can be sync or async.
```python
from socketify import App
def run(app: App)
@app.on_start
async def on_start():
print("wait...")
await asyncio.sleep(1)
print("start!")
@app.on_shutdown
async def on_shutdown():
print("wait...")
await asyncio.sleep(1)
print("shutdown!")
router = app.router()
@router.get("/")
def home(res, req):
res.send("Hello, World!")
```
### Next [Upload and Post](upload-post.md)

Wyświetl plik

@ -241,8 +241,7 @@ def execute(args):
return print("socketify interface must be callable with 1 parameter def run(app: App)")
# run app with the settings desired
def run_app():
# Add lifespan when lifespan hooks are implemented
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems)
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems, lifespan)
module(fork_app) # call module factory
if websockets: # if socketify websockets are added using --ws in socketify interface we can set here

Wyświetl plik

@ -86,7 +86,7 @@ class Loop:
else:
future = None
self.loop.call_soon(self._keep_alive)
self.loop.run_until_complete()
self.loop.run_until_complete(future)
# clean up uvloop
self.uv_loop.stop()
return future

Wyświetl plik

@ -2476,11 +2476,13 @@ class App:
request_response_factory_max_items=0,
websocket_factory_max_items=0,
task_factory_max_items=100_000,
lifespan=True
):
socket_options_ptr = ffi.new("struct us_socket_context_options_t *")
socket_options = socket_options_ptr[0]
self.options = options
self._template = None
self.lifespan = lifespan
# keep socket data alive for CFFI
self._socket_refs = {}
if options is not None:
@ -2558,7 +2560,16 @@ class App:
self._request_extension = None
self._response_extension = None
self._ws_extension = None
self._on_start_handler = None
def on_start(self, method: callable):
self._on_start_handler = method
return method
def on_shutdown(self, method: callable):
self._on_shutdown_handler = method
return method
def router(self, prefix: str = "", *middlewares):
return DecoratorRouter(self, prefix, middlewares)
@ -3115,6 +3126,24 @@ class App:
return self
def listen(self, port_or_options=None, handler=None):
if self.lifespan:
async def task_wrapper(task):
try:
if inspect.iscoroutinefunction(task):
await task()
else:
task()
except Exception as error:
try:
self.trigger_error(error, None, None)
finally:
return None
# start lifespan
if self._on_start_handler:
self.loop.run_until_complete(task_wrapper(self._on_start_handler))
# actual listen to server
self._listen_handler = handler
if port_or_options is None:
lib.uws_app_listen(
@ -3198,6 +3227,22 @@ class App:
signal.signal(signal.SIGINT, lambda sig, frame: self.close())
self.loop.run()
if self.lifespan:
async def task_wrapper(task):
try:
if inspect.iscoroutinefunction(task):
await task()
else:
task()
except Exception as error:
try:
self.trigger_error(error, None, None)
finally:
return None
# shutdown lifespan
if self._on_shutdown_handler:
self.loop.run_until_complete(task_wrapper(self._on_shutdown_handler))
return self
def close(self):

Wyświetl plik

@ -1,31 +1,19 @@
from socketify import App, CompressOptions, OpCode
from socketify import App
import asyncio
app = App(lifespan=False)
router = app.router()
app = App()
@app.on_start
async def on_start():
print("wait...")
await asyncio.sleep(1)
print("start!")
def extension(request, response, ws):
@request.method
async def get_user(self):
token = self.get_header("token")
self.token = token
return { "name": "Test" } if token else { "name", "Anonymous" }
@request.method
async def get_cart(self):
return [{ "quantity": 10, "name": "T-Shirt" }]
request.property("token", "testing")
# extensions must be registered before routes
app.register(extension)
def auth_middleware(res, req, data):
token = req.get_query("token")
print("token?", token)
req.token = token
return { "name": "Test" } if token else { "name", "Anonymous" }
router = app.router("")
@app.on_shutdown
async def on_shutdown():
print("wait...")
await asyncio.sleep(1)
print("shutdown!")
@router.get("/")
def home(res, req, data=None):
@ -39,42 +27,9 @@ def home(res, req, data=None):
res.send({"Hello": "World!"}, headers=(("X-Rate-Limit-Remaining", "10"), (b'Another-Headers', b'Value')))
def ws_open(ws):
print("A WebSocket got connected!")
ws.send("Hello World!", OpCode.TEXT)
def ws_message(ws, message, opcode):
print(message, opcode)
# Ok is false if backpressure was built up, wait for drain
ok = ws.send(message, opcode)
app.ws(
"/*",
{
"compression": CompressOptions.SHARED_COMPRESSOR,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 12,
"open": ws_open,
"message": ws_message,
"drain": lambda ws: print(
"WebSocket backpressure: %s", ws.get_buffered_amount()
),
"close": lambda ws, code, message: print("WebSocket closed"),
},
)
app.listen(
3000,
lambda config: print("Listening on port http://localhost:%d now\n" % config.port),
)
app.run()
# uws_websocket_drain_handler
# uws_websocket_subscription_handler
# uws_websocket_open_handler
# uws_websocket_message_handler
# uws_websocket_pong_handler
# uws_websocket_ping_handler
# uws_websocket_close_handler