diff --git a/bench/asgi_wsgi/raw-asgi.py b/bench/asgi_wsgi/raw-asgi.py index 43c19f8..bb7ad95 100644 --- a/bench/asgi_wsgi/raw-asgi.py +++ b/bench/asgi_wsgi/raw-asgi.py @@ -1,6 +1,5 @@ from socketify import ASGI - async def app(scope, receive, send): assert scope['type'] == 'http' diff --git a/bench/quart_plaintext.py b/bench/quart_plaintext.py new file mode 100644 index 0000000..38f9763 --- /dev/null +++ b/bench/quart_plaintext.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +from quart import Quart + + +app = Quart(__name__) + +@app.get("/") +async def plaintext(): + return "Hello, World!", {"Content-Type": "text/plain"} + +# Quart perform really baddly for sure needs more optimizations, but socketify ASGI + PyPy performs better than uvicorn+httptools+gunicorn \ No newline at end of file diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index 7042e57..be15f43 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -1,16 +1,16 @@ - from socketify import App import os import multiprocessing import asyncio + def run_app(): app = App(request_response_factory_max_items=200_000) router = app.router() @router.get("/") - async def home(res, req): + def home(res, req): res.send(b"Hello, World!") - + app.listen( 8000, lambda config: print( @@ -29,7 +29,7 @@ def create_fork(): # fork limiting the cpu count - 1 -# for i in range(1, multiprocessing.cpu_count()): -# create_fork() +for i in range(1, multiprocessing.cpu_count()): + create_fork() run_app() # run app on the main process too :) diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 1c49c07..234bedb 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -25,7 +25,6 @@ async def task_wrapper(task): EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False} - @ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") def ws_message(ws, message, length, opcode, user_data): socket_data = ffi.from_handle(user_data) @@ -427,7 +426,7 @@ def asgi(ssl, response, info, user_data, aborted): "http_version": "1.1", "server": (app.SERVER_HOST, app.SERVER_PORT), "client": ( - ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"), + None if info.remote_address == ffi.NULL else ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"), None, ), "scheme": app.SERVER_SCHEME, @@ -444,7 +443,9 @@ def asgi(ssl, response, info, user_data, aborted): else: data_queue = None + sended_empty = False async def receive(): + nonlocal sended_empty if bool(aborted[0]): return {"type": "http.disconnect"} if data_queue: @@ -458,8 +459,15 @@ def asgi(ssl, response, info, user_data, aborted): else: return data_queue.queue.get(False) # consume queue - # no more body, just empty - return EMPTY_RESPONSE + # no more body, just EMPTY RESPONSE + if not sended_empty: + sended_empty = True + return EMPTY_RESPONSE + + # already sended empty body so wait for aborted request + while not bool(aborted[0]): + await asyncio.sleep(0.01) #10ms is good enought + return {"type": "http.disconnect"} async def send(options): if bool(aborted[0]): diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 74b1b42..9473b81 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -50,12 +50,22 @@ class Loop: else: self._task_factory = create_task self.run_async = self._run_async_pypy - # custom task factory + + # TODO: check if any framework breaks without current_task(loop) support + # custom task factory for other tasks def pypy_task_factory(loop, coro, context=None): return create_task(loop, coro, context=context) self.loop.set_task_factory(pypy_task_factory) else: + + # TODO: check if any framework breaks without current_task(loop) support + # custom task factory for other tasks + def cpython_task_factory(loop, coro, context=None): + return create_task(loop, coro, context=context) + + self.loop.set_task_factory(cpython_task_factory) + # CPython performs equals or worse using TaskFactory self.run_async = self._run_async_cpython diff --git a/src/socketify/tasks.py b/src/socketify/tasks.py index 8ff06ac..b56c52d 100644 --- a/src/socketify/tasks.py +++ b/src/socketify/tasks.py @@ -5,8 +5,9 @@ from asyncio import ( exceptions, futures, _register_task, - _enter_task, - _leave_task, + # _enter_task, + # current_task, + # _leave_task, _unregister_task, ) import contextvars @@ -36,7 +37,9 @@ class RequestTask: Differences: - - This class is only used by socketify.py loop.run_async + - This class do not support current_task + + - This class executes the first step like node.js Promise - This class is not thread-safe. @@ -115,9 +118,12 @@ class RequestTask: self._log_destroy_pending = False if self._loop.get_debug(): self._source_traceback = format_helpers.extract_stack(sys._getframe(1)) - # self._loop.call_soon(self.__step, context=self._context) - self.__step() _register_task(self) + # if current_task(): + # self._loop.call_soon(self.__step, context=self._context) + # else: + self.__step() + def _reuse(self, coro, loop, default_done_callback=None): """Reuse an future that is not pending anymore.""" @@ -148,9 +154,12 @@ class RequestTask: self._fut_waiter = None self._coro = coro - # self._loop.call_soon(self.__step, context=self._context) - self.__step() _register_task(self) + # if current_task(): + # self._loop.call_soon(self.__step, context=self._context) + # else: + self.__step() + def __repr__(self): return base_tasks._task_repr(self) @@ -489,8 +498,7 @@ class RequestTask: self._must_cancel = False coro = self._coro self._fut_waiter = None - - _enter_task(self._loop, self) + # _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: @@ -560,7 +568,7 @@ class RequestTask: new_exc = RuntimeError(f"Task got bad yield: {result!r}") self._loop.call_soon(self.__step, new_exc, context=self._context) finally: - _leave_task(self._loop, self) + # _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future):