preliminar support for Quart, fix ASGI receive behavior, fix custom Task behavior

pull/106/head
Ciro 2023-02-16 14:17:58 -03:00
rodzic f562556844
commit 2ae9059e05
6 zmienionych plików z 58 dodań i 21 usunięć

Wyświetl plik

@ -1,6 +1,5 @@
from socketify import ASGI
async def app(scope, receive, send):
assert scope['type'] == 'http'

Wyświetl plik

@ -0,0 +1,12 @@
#!/usr/bin/env python3
from quart import Quart
app = Quart(__name__)
@app.get("/")
async def plaintext():
return "Hello, World!", {"Content-Type": "text/plain"}
# Quart perform really baddly for sure needs more optimizations, but socketify ASGI + PyPy performs better than uvicorn+httptools+gunicorn

Wyświetl plik

@ -1,16 +1,16 @@
from socketify import App
import os
import multiprocessing
import asyncio
def run_app():
app = App(request_response_factory_max_items=200_000)
router = app.router()
@router.get("/")
async def home(res, req):
def home(res, req):
res.send(b"Hello, World!")
app.listen(
8000,
lambda config: print(
@ -29,7 +29,7 @@ def create_fork():
# fork limiting the cpu count - 1
# for i in range(1, multiprocessing.cpu_count()):
# create_fork()
for i in range(1, multiprocessing.cpu_count()):
create_fork()
run_app() # run app on the main process too :)

Wyświetl plik

@ -25,7 +25,6 @@ async def task_wrapper(task):
EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False}
@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)")
def ws_message(ws, message, length, opcode, user_data):
socket_data = ffi.from_handle(user_data)
@ -427,7 +426,7 @@ def asgi(ssl, response, info, user_data, aborted):
"http_version": "1.1",
"server": (app.SERVER_HOST, app.SERVER_PORT),
"client": (
ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"),
None if info.remote_address == ffi.NULL else ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"),
None,
),
"scheme": app.SERVER_SCHEME,
@ -444,7 +443,9 @@ def asgi(ssl, response, info, user_data, aborted):
else:
data_queue = None
sended_empty = False
async def receive():
nonlocal sended_empty
if bool(aborted[0]):
return {"type": "http.disconnect"}
if data_queue:
@ -458,8 +459,15 @@ def asgi(ssl, response, info, user_data, aborted):
else:
return data_queue.queue.get(False) # consume queue
# no more body, just empty
return EMPTY_RESPONSE
# no more body, just EMPTY RESPONSE
if not sended_empty:
sended_empty = True
return EMPTY_RESPONSE
# already sended empty body so wait for aborted request
while not bool(aborted[0]):
await asyncio.sleep(0.01) #10ms is good enought
return {"type": "http.disconnect"}
async def send(options):
if bool(aborted[0]):

Wyświetl plik

@ -50,12 +50,22 @@ class Loop:
else:
self._task_factory = create_task
self.run_async = self._run_async_pypy
# custom task factory
# TODO: check if any framework breaks without current_task(loop) support
# custom task factory for other tasks
def pypy_task_factory(loop, coro, context=None):
return create_task(loop, coro, context=context)
self.loop.set_task_factory(pypy_task_factory)
else:
# TODO: check if any framework breaks without current_task(loop) support
# custom task factory for other tasks
def cpython_task_factory(loop, coro, context=None):
return create_task(loop, coro, context=context)
self.loop.set_task_factory(cpython_task_factory)
# CPython performs equals or worse using TaskFactory
self.run_async = self._run_async_cpython

Wyświetl plik

@ -5,8 +5,9 @@ from asyncio import (
exceptions,
futures,
_register_task,
_enter_task,
_leave_task,
# _enter_task,
# current_task,
# _leave_task,
_unregister_task,
)
import contextvars
@ -36,7 +37,9 @@ class RequestTask:
Differences:
- This class is only used by socketify.py loop.run_async
- This class do not support current_task
- This class executes the first step like node.js Promise
- This class is not thread-safe.
@ -115,9 +118,12 @@ class RequestTask:
self._log_destroy_pending = False
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
# self._loop.call_soon(self.__step, context=self._context)
self.__step()
_register_task(self)
# if current_task():
# self._loop.call_soon(self.__step, context=self._context)
# else:
self.__step()
def _reuse(self, coro, loop, default_done_callback=None):
"""Reuse an future that is not pending anymore."""
@ -148,9 +154,12 @@ class RequestTask:
self._fut_waiter = None
self._coro = coro
# self._loop.call_soon(self.__step, context=self._context)
self.__step()
_register_task(self)
# if current_task():
# self._loop.call_soon(self.__step, context=self._context)
# else:
self.__step()
def __repr__(self):
return base_tasks._task_repr(self)
@ -489,8 +498,7 @@ class RequestTask:
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
# _enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
@ -560,7 +568,7 @@ class RequestTask:
new_exc = RuntimeError(f"Task got bad yield: {result!r}")
self._loop.call_soon(self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
# _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):