added ASGI lifespan support

pull/75/head
Ciro 2023-01-03 15:15:43 -03:00
rodzic a8c44eac7f
commit 2a41f6f995
6 zmienionych plików z 176 dodań i 50 usunięć

Wyświetl plik

@ -5,6 +5,7 @@ from .socketify import (
OpCode, OpCode,
SendStatus, SendStatus,
CompressOptions, CompressOptions,
Loop
) )
from .asgi import ( from .asgi import (
ASGI ASGI

Wyświetl plik

@ -1,4 +1,4 @@
from socketify import App, OpCode from socketify import App, OpCode, Loop
from queue import SimpleQueue from queue import SimpleQueue
from .native import lib, ffi from .native import lib, ffi
from .tasks import create_task, create_task_with_factory from .tasks import create_task, create_task_with_factory
@ -7,6 +7,7 @@ import platform
import sys import sys
import logging import logging
import uuid import uuid
import asyncio
is_pypy = platform.python_implementation() == "PyPy" is_pypy = platform.python_implementation() == "PyPy"
async def task_wrapper(task): async def task_wrapper(task):
try: try:
@ -613,8 +614,86 @@ class _ASGI:
return self return self
def run(self): def run(self):
# scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}} scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}}
self.server.run() # run app on the main process too :)
lifespan_loop = Loop(lambda loop, error, response: logging.error("Uncaught Exception: %s" % str(error)))
is_starting = True
is_stopped = False
status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan
status_message = ""
stop_future = lifespan_loop.create_future()
async def send(options):
nonlocal status, status_message, is_stopped
type = options["type"]
status_message = options.get("message", "")
if type == "lifespan.startup.complete":
status = 1
elif type == "lifespan.startup.failed":
is_stopped = True
status = 2
elif type == "lifespan.shutdown.complete":
is_stopped = True
status = 4
elif type == "lifespan.shutdown.failed":
is_stopped = True
status = 5
async def receive():
nonlocal is_starting, is_stopped
while not is_stopped:
if is_starting:
is_starting = False
return {
"type": "lifespan.startup",
"asgi": {"version": "3.0", "spec_version": "2.3"},
}
return await stop_future
async def task_wrapper(task):
nonlocal status
try:
return await task
except Exception as error:
try:
# just log in console the error to call attention
logging.error("Uncaught Exception: %s" % str(error))
status = 6 # no more lifespan
finally:
return None
# start lifespan
lifespan_loop.ensure_future(task_wrapper(self.app(scope, receive, send)))
# run until start or fail
while status == 0:
lifespan_loop.run_once()
# failed to start
if status == 2:
logging.error("Startup failed: %s" % str(status_message))
return self
# run app
self.server.run()
# no more lifespan events
if status == 6:
return self
# signal stop
status = 3
stop_future.set_result({
"type": "lifespan.shutdown",
"asgi": {"version": "3.0", "spec_version": "2.3"},
})
# run until end or fail
while status == 3:
lifespan_loop.run_once()
# failed to stop
if status == 5:
logging.error("Shutdown failed: %s" % str(status_message))
return self return self
def __del__(self): def __del__(self):
@ -666,7 +745,7 @@ class ASGI:
run_task() run_task()
# fork limiting the cpu count - 1 # fork limiting the cpu count - 1
for i in range(1, workers): for _ in range(1, workers):
create_fork() create_fork()
run_task() # run app on the main process too :) run_task() # run app on the main process too :)

Wyświetl plik

@ -79,6 +79,18 @@ class Loop:
def ensure_future(self, task): def ensure_future(self, task):
return asyncio.ensure_future(task, loop=self.loop) return asyncio.ensure_future(task, loop=self.loop)
def run_until_complete(self, task=None):
self.started = True
if task is not None:
future = self.ensure_future(task)
else:
future = None
self.loop.call_soon(self._keep_alive)
self.loop.run_until_complete()
# clean up uvloop
self.uv_loop.stop()
return future
def run(self, task=None): def run(self, task=None):
self.started = True self.started = True
if task is not None: if task is not None:

Wyświetl plik

@ -598,7 +598,7 @@ def uws_generic_listen_handler(listen_socket, config, user_data):
else AppListenOptions( else AppListenOptions(
port=int(config.port), port=int(config.port),
host=None host=None
if config.host == ffi.NULL if config.host == ffi.NULL or listen_socket == ffi.NULL
else ffi.string(config.host).decode("utf8"), else ffi.string(config.host).decode("utf8"),
options=int(config.options), options=int(config.options),
) )

Wyświetl plik

@ -153,7 +153,7 @@ class RequestTask:
def __del__(self): def __del__(self):
if self._state == _PENDING and self._log_destroy_pending: if self._state == _PENDING and self._log_destroy_pending and self._loop:
context = { context = {
"task": self, "task": self,
"message": "Task was destroyed but it is pending!", "message": "Task was destroyed but it is pending!",

Wyświetl plik

@ -1,55 +1,89 @@
# https://github.com/Tinche/aiofiles from socketify.template import *
# https://github.com/uNetworking/uWebSockets/issues/1426
# import os.path # https://github.com/chtd/psycopg2cffi/
# https://github.com/tlocke/pg8000
# https://www.psycopg.org/docs/advanced.html#asynchronous-support (works in cffi version too)
# https://github.com/sass/libsass-python
# DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data); # @memo() # generate an static string after first execution aka skipping re-rendering when props are unchanged
# DLL_EXPORT typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data); # def title(message):
# return h1(message, classes="title-light")
# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length,_listen_domain_handler handler, void *user_data); # @memo(maxsize=128)
# DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t servere_length, int options, uws_listen_domain_handler handler, void *user_data); def htemplate(message, left_message, right_message):
# DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length);
# DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); return (
h1(message),
span(
children=(
span(left_message, classes=("text-light", "align-left")),
span(right_message, classes=("text-light", "align-right")),
),
),
)
from socketify import App, AppOptions, OpCode, CompressOptions # <!DOCTYPE html>
import asyncio # <html lang="en">
# <head>
# <meta charset="UTF-8">
# <meta http-equiv="X-UA-Compatible" content="IE=edge">
# <meta name="viewport" content="width=device-width, initial-scale=1.0">
# <title>Document</title>
# </head>
# <body>
# </body>
# </html>
def html5():
return (
doctype(),
html(lang="en", children=(
head(children=(
# meta(charset="UTF-8")
# meta(http_equiv="X-UA-Compatible",content="IE=edge")
# meta(name="vieport",content="width=device-width, initial-scale=1.0")
title("Document")
)),
body()
))
)
# print(render_tostring(html5()))
# from mako.template import Template
def ws_open(ws): # template = Template(
print("A WebSocket got connected!") # "<h1>${message}</h1><span><span classes=\"text-light align-left\">${left_message}</span><span classes=\"text-light align-right\">${right_message}</span></span>"
ws.send("Hello World!", OpCode.TEXT) # )
# from jinja2 import Environment, BaseLoader
# rtemplate = Environment(loader=BaseLoader()).from_string("<h1>{{ message }}</h1><span><span classes=\"text-light align-left\">{{ left_message }}</span><span classes=\"text-light align-right\">{{ right_message }}</span></span>")
def ws_message(ws, message, opcode): # print(
print(message, opcode) # render_tostring(htemplate(
# Ok is false if backpressure was built up, wait for drain # message="Hello, World!",
ok = ws.send(message, opcode) # left_message="Text in Left",
# right_message="Text in Right",
# ))
# )
# print(
# render_tostring(htemplate(
# message="Hello, World!",
# left_message="Text in Left",
# right_message="Text in Right",
# ))
# )
# for i in range(1_000_000):
# render_tostring(htemplate(message="Hello, World!", left_message="Text in Left", right_message="Text in Right"))
# template.render(message="Hello, World!", left_message="Text in Left", right_message="Text in Right")
# rtemplate.render(message="Hello, World!", left_message="Text in Left", right_message="Text in Right")
async def ws_upgrade(res, req, socket_context): # print(
key = req.get_header("sec-websocket-key") # render(
protocol = req.get_header("sec-websocket-protocol") # html(
extensions = req.get_header("sec-websocket-extensions") # message="Hello, World!",
await asyncio.sleep(2) # left_message="Text in Left",
res.upgrade(key, protocol, extensions, socket_context) # right_message="Text in Right",
# )
# )
app = App() # )
app.ws(
"/*",
{
"compression": CompressOptions.SHARED_COMPRESSOR,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 12,
"open": ws_open,
"message": ws_message,
"upgrade": ws_upgrade,
},
)
app.any("/", lambda res, req: res.end("Nothing to see here!"))
app.listen(
3000,
lambda config: print("Listening on port http://localhost:%d now\n" % (config.port)),
)
app.run()