From 2f8a0ca6d85133e5045e8b67ae7b74dace1b9a6f Mon Sep 17 00:00:00 2001 From: Ciro Date: Fri, 6 Jan 2023 12:10:21 -0300 Subject: [PATCH] run lifespan in same loop --- src/socketify/asgi.py | 139 +++++++++++++++++++++--------------------- src/socketify/cli.py | 3 +- 2 files changed, 70 insertions(+), 72 deletions(-) diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 3ed9449..2c70a04 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -613,7 +613,68 @@ class _ASGI: self.SERVER_HOST = ( "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host ) - self.server.listen(port_or_options, handler) + if not self.lifespan: + self.server.listen(port_or_options, handler) + return self + + scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}} + + asgi_app = self + self.is_starting = True + self.is_stopped = False + self.status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan + self.status_message = "" + self.stop_future = self.server.loop.create_future() + + async def send(options): + nonlocal asgi_app + type = options["type"] + asgi_app.status_message = options.get("message", "") + if type == "lifespan.startup.complete": + asgi_app.status = 1 + asgi_app.server.listen(port_or_options, handler) + elif type == "lifespan.startup.failed": + asgi_app.is_stopped = True + asgi_app.status = 2 + elif type == "lifespan.shutdown.complete": + asgi_app.is_stopped = True + asgi_app.status = 4 + elif type == "lifespan.shutdown.failed": + asgi_app.is_stopped = True + asgi_app.status = 5 + + async def receive(): + nonlocal asgi_app + while not asgi_app.is_stopped: + if asgi_app.is_starting: + asgi_app.is_starting = False + return { + "type": "lifespan.startup", + "asgi": {"version": "3.0", "spec_version": "2.3"}, + } + return await asgi_app.stop_future + + async def task_wrapper(task): + nonlocal asgi_app + try: + return await task + except Exception as error: + try: + # just log in console the error to call attention + logging.error("Uncaught Exception: %s" % str(error)) + if asgi_app.status < 2: + asgi_app.status = 6 # no more lifespan + asgi_app.server.listen(port_or_options, handler) + finally: + return None + + # start lifespan + self.server.loop.ensure_future(task_wrapper(self.app(scope, receive, send))) + self.server.run() + # failed to start + if self.status == 2: + logging.error("Startup failed: %s" % str(self.status_message)) + return self return self def run(self): @@ -621,86 +682,24 @@ class _ASGI: self.server.run() return self - scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}} - - lifespan_loop = Loop(lambda loop, error, response: logging.error("Uncaught Exception: %s" % str(error))) - is_starting = True - is_stopped = False - status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan - status_message = "" - stop_future = lifespan_loop.create_future() - async def send(options): - nonlocal status, status_message, is_stopped - type = options["type"] - status_message = options.get("message", "") - if type == "lifespan.startup.complete": - status = 1 - elif type == "lifespan.startup.failed": - is_stopped = True - status = 2 - elif type == "lifespan.shutdown.complete": - is_stopped = True - status = 4 - elif type == "lifespan.shutdown.failed": - is_stopped = True - status = 5 - - async def receive(): - nonlocal is_starting, is_stopped - while not is_stopped: - if is_starting: - is_starting = False - return { - "type": "lifespan.startup", - "asgi": {"version": "3.0", "spec_version": "2.3"}, - } - return await stop_future - - async def task_wrapper(task): - nonlocal status - try: - return await task - except Exception as error: - try: - # just log in console the error to call attention - logging.error("Uncaught Exception: %s" % str(error)) - status = 6 # no more lifespan - finally: - return None - - # start lifespan - lifespan_loop.ensure_future(task_wrapper(self.app(scope, receive, send))) - - # run until start or fail - while status == 0: - lifespan_loop.run_once() - - # failed to start - if status == 2: - logging.error("Startup failed: %s" % str(status_message)) - return self - # run app self.server.run() - # no more lifespan events - if status == 6: + if self.status == 6: return self - # signal stop - status = 3 - stop_future.set_result({ + self.status = 3 + self.stop_future.set_result({ "type": "lifespan.shutdown", "asgi": {"version": "3.0", "spec_version": "2.3"}, }) - # run until end or fail - while status == 3: - lifespan_loop.run_once() + while self.status == 3: + self.server.loop.run_once() # failed to stop - if status == 5: - logging.error("Shutdown failed: %s" % str(status_message)) + if self.status == 5: + logging.error("Shutdown failed: %s" % str(self.status_message)) return self def __del__(self): diff --git a/src/socketify/cli.py b/src/socketify/cli.py index b780673..6cecc96 100644 --- a/src/socketify/cli.py +++ b/src/socketify/cli.py @@ -170,8 +170,7 @@ def execute(args): host = options.get("--host", options.get("-h", "127.0.0.1")) uds = options.get('--uds', None) lifespan = options.get('--lifespan', "auto") - lifespan=False if lifespan == "off" or lifespan is not True else True - + lifespan = False if lifespan == "off" else True task_factory_maxitems = int(options.get("--task-factory-maxitems", 100000)) disable_listen_log = options.get("--disable-listen-log", False)