kopia lustrzana https://github.com/cirospaciari/socketify.py
run lifespan in same loop
rodzic
e2cfd96e7d
commit
2f8a0ca6d8
|
@ -613,7 +613,68 @@ class _ASGI:
|
|||
self.SERVER_HOST = (
|
||||
"0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
|
||||
)
|
||||
self.server.listen(port_or_options, handler)
|
||||
if not self.lifespan:
|
||||
self.server.listen(port_or_options, handler)
|
||||
return self
|
||||
|
||||
scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}}
|
||||
|
||||
asgi_app = self
|
||||
self.is_starting = True
|
||||
self.is_stopped = False
|
||||
self.status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan
|
||||
self.status_message = ""
|
||||
self.stop_future = self.server.loop.create_future()
|
||||
|
||||
async def send(options):
|
||||
nonlocal asgi_app
|
||||
type = options["type"]
|
||||
asgi_app.status_message = options.get("message", "")
|
||||
if type == "lifespan.startup.complete":
|
||||
asgi_app.status = 1
|
||||
asgi_app.server.listen(port_or_options, handler)
|
||||
elif type == "lifespan.startup.failed":
|
||||
asgi_app.is_stopped = True
|
||||
asgi_app.status = 2
|
||||
elif type == "lifespan.shutdown.complete":
|
||||
asgi_app.is_stopped = True
|
||||
asgi_app.status = 4
|
||||
elif type == "lifespan.shutdown.failed":
|
||||
asgi_app.is_stopped = True
|
||||
asgi_app.status = 5
|
||||
|
||||
async def receive():
|
||||
nonlocal asgi_app
|
||||
while not asgi_app.is_stopped:
|
||||
if asgi_app.is_starting:
|
||||
asgi_app.is_starting = False
|
||||
return {
|
||||
"type": "lifespan.startup",
|
||||
"asgi": {"version": "3.0", "spec_version": "2.3"},
|
||||
}
|
||||
return await asgi_app.stop_future
|
||||
|
||||
async def task_wrapper(task):
|
||||
nonlocal asgi_app
|
||||
try:
|
||||
return await task
|
||||
except Exception as error:
|
||||
try:
|
||||
# just log in console the error to call attention
|
||||
logging.error("Uncaught Exception: %s" % str(error))
|
||||
if asgi_app.status < 2:
|
||||
asgi_app.status = 6 # no more lifespan
|
||||
asgi_app.server.listen(port_or_options, handler)
|
||||
finally:
|
||||
return None
|
||||
|
||||
# start lifespan
|
||||
self.server.loop.ensure_future(task_wrapper(self.app(scope, receive, send)))
|
||||
self.server.run()
|
||||
# failed to start
|
||||
if self.status == 2:
|
||||
logging.error("Startup failed: %s" % str(self.status_message))
|
||||
return self
|
||||
return self
|
||||
|
||||
def run(self):
|
||||
|
@ -621,86 +682,24 @@ class _ASGI:
|
|||
self.server.run()
|
||||
return self
|
||||
|
||||
scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}}
|
||||
|
||||
lifespan_loop = Loop(lambda loop, error, response: logging.error("Uncaught Exception: %s" % str(error)))
|
||||
is_starting = True
|
||||
is_stopped = False
|
||||
status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan
|
||||
status_message = ""
|
||||
stop_future = lifespan_loop.create_future()
|
||||
async def send(options):
|
||||
nonlocal status, status_message, is_stopped
|
||||
type = options["type"]
|
||||
status_message = options.get("message", "")
|
||||
if type == "lifespan.startup.complete":
|
||||
status = 1
|
||||
elif type == "lifespan.startup.failed":
|
||||
is_stopped = True
|
||||
status = 2
|
||||
elif type == "lifespan.shutdown.complete":
|
||||
is_stopped = True
|
||||
status = 4
|
||||
elif type == "lifespan.shutdown.failed":
|
||||
is_stopped = True
|
||||
status = 5
|
||||
|
||||
async def receive():
|
||||
nonlocal is_starting, is_stopped
|
||||
while not is_stopped:
|
||||
if is_starting:
|
||||
is_starting = False
|
||||
return {
|
||||
"type": "lifespan.startup",
|
||||
"asgi": {"version": "3.0", "spec_version": "2.3"},
|
||||
}
|
||||
return await stop_future
|
||||
|
||||
async def task_wrapper(task):
|
||||
nonlocal status
|
||||
try:
|
||||
return await task
|
||||
except Exception as error:
|
||||
try:
|
||||
# just log in console the error to call attention
|
||||
logging.error("Uncaught Exception: %s" % str(error))
|
||||
status = 6 # no more lifespan
|
||||
finally:
|
||||
return None
|
||||
|
||||
# start lifespan
|
||||
lifespan_loop.ensure_future(task_wrapper(self.app(scope, receive, send)))
|
||||
|
||||
# run until start or fail
|
||||
while status == 0:
|
||||
lifespan_loop.run_once()
|
||||
|
||||
# failed to start
|
||||
if status == 2:
|
||||
logging.error("Startup failed: %s" % str(status_message))
|
||||
return self
|
||||
|
||||
# run app
|
||||
self.server.run()
|
||||
|
||||
# no more lifespan events
|
||||
if status == 6:
|
||||
if self.status == 6:
|
||||
return self
|
||||
|
||||
# signal stop
|
||||
status = 3
|
||||
stop_future.set_result({
|
||||
self.status = 3
|
||||
self.stop_future.set_result({
|
||||
"type": "lifespan.shutdown",
|
||||
"asgi": {"version": "3.0", "spec_version": "2.3"},
|
||||
})
|
||||
|
||||
# run until end or fail
|
||||
while status == 3:
|
||||
lifespan_loop.run_once()
|
||||
while self.status == 3:
|
||||
self.server.loop.run_once()
|
||||
|
||||
# failed to stop
|
||||
if status == 5:
|
||||
logging.error("Shutdown failed: %s" % str(status_message))
|
||||
if self.status == 5:
|
||||
logging.error("Shutdown failed: %s" % str(self.status_message))
|
||||
return self
|
||||
|
||||
def __del__(self):
|
||||
|
|
|
@ -170,8 +170,7 @@ def execute(args):
|
|||
host = options.get("--host", options.get("-h", "127.0.0.1"))
|
||||
uds = options.get('--uds', None)
|
||||
lifespan = options.get('--lifespan', "auto")
|
||||
lifespan=False if lifespan == "off" or lifespan is not True else True
|
||||
|
||||
lifespan = False if lifespan == "off" else True
|
||||
task_factory_maxitems = int(options.get("--task-factory-maxitems", 100000))
|
||||
|
||||
disable_listen_log = options.get("--disable-listen-log", False)
|
||||
|
|
Ładowanie…
Reference in New Issue