more 10% to 20% in PyPy async

pull/106/head
Ciro 2023-02-08 20:54:14 -03:00
rodzic f9a76df2f1
commit a447904ec7
9 zmienionych plików z 114 dodań i 102 usunięć

Wyświetl plik

@ -1,3 +1,4 @@
from socketify import App
import os
import multiprocessing
@ -7,7 +8,7 @@ def run_app():
router = app.router()
@router.get("/")
def home(res, req):
async def home(res, req):
res.send(b"Hello, World!")
app.listen(
@ -28,7 +29,7 @@ def create_fork():
# fork limiting the cpu count - 1
for i in range(1, multiprocessing.cpu_count()):
create_fork()
# for i in range(1, multiprocessing.cpu_count()):
# create_fork()
run_app() # run app on the main process too :)

Wyświetl plik

@ -1,9 +1,9 @@
import asyncio
from .dataclasses import AppListenOptions, AppOptions
from .socketify import (
App,
AppOptions,
AppListenOptions,
OpCode,
SendStatus,
CompressOptions,

Wyświetl plik

@ -1,7 +1,7 @@
from socketify import App, OpCode, Loop
from socketify import App, OpCode
from queue import SimpleQueue
from .native import lib, ffi
from .tasks import create_task, create_task_with_factory
from .tasks import create_task, TaskFactory
import os
import platform
import sys
@ -523,11 +523,10 @@ class _ASGI:
# internally will still use custom task factory for pypy because of Loop
if is_pypy:
if task_factory_max_items > 0:
factory = create_task_with_factory(task_factory_max_items)
factory = TaskFactory(task_factory_max_items)
def run_task(task):
factory(loop, task_wrapper(task))
loop._run_once()
self._run_task = run_task
else:
@ -535,7 +534,6 @@ class _ASGI:
def run_task(task):
future = create_task(loop, task_wrapper(task))
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task
@ -544,7 +542,6 @@ class _ASGI:
def run_task(task):
future = create_task(loop, task_wrapper(task))
future._log_destroy_pending = False
self._run_task = run_task

Wyświetl plik

@ -0,0 +1,52 @@
from dataclasses import dataclass
@dataclass
class AppListenOptions:
port: int = 0
host: str = None
options: int = 0
domain: str = None
def __post_init__(self):
if not isinstance(self.port, int):
raise RuntimeError("port must be an int")
if not isinstance(self.host, (type(None), str)):
raise RuntimeError("host must be a str if specified")
if not isinstance(self.domain, (type(None), str)):
raise RuntimeError("domain must be a str if specified")
if not isinstance(self.options, int):
raise RuntimeError("options must be an int")
if self.domain and (self.host or self.port != 0):
raise RuntimeError(
"if domain is specified, host and port will be no effect"
)
@dataclass
class AppOptions:
key_file_name: str = None
cert_file_name: str = None
passphrase: str = None
dh_params_file_name: str = None
ca_file_name: str = None
ssl_ciphers: str = None
ssl_prefer_low_memory_usage: int = 0
def __post_init__(self):
NoneType = type(None)
if not isinstance(self.key_file_name, (NoneType, str)):
raise RuntimeError("key_file_name must be a str if specified")
if not isinstance(self.cert_file_name, (NoneType, str)):
raise RuntimeError("cert_file_name must be a str if specified")
if not isinstance(self.passphrase, (NoneType, str)):
raise RuntimeError("passphrase must be a str if specified")
if not isinstance(self.dh_params_file_name, (NoneType, str)):
raise RuntimeError("dh_params_file_name must be a str if specified")
if not isinstance(self.ca_file_name, (NoneType, str)):
raise RuntimeError("ca_file_name must be a str if specified")
if not isinstance(self.ssl_ciphers, (NoneType, str)):
raise RuntimeError("ssl_ciphers must be a str if specified")
if not isinstance(self.ssl_prefer_low_memory_usage, int):
raise RuntimeError("ssl_prefer_low_memory_usage must be an int")

Wyświetl plik

@ -1,6 +1,6 @@
import asyncio
import logging
from .tasks import create_task, create_task_with_factory
from .tasks import create_task, TaskFactory
from .uv import UVLoop
import asyncio
@ -8,7 +8,6 @@ import platform
is_pypy = platform.python_implementation() == "PyPy"
async def task_wrapper(exception_handler, loop, response, task):
try:
return await task
@ -47,7 +46,7 @@ class Loop:
self.started = False
if is_pypy: # PyPy async Optimizations
if task_factory_max_items > 0: # Only available in PyPy for now
self._task_factory = create_task_with_factory(task_factory_max_items)
self._task_factory = TaskFactory(task_factory_max_items)
else:
self._task_factory = create_task
self.run_async = self._run_async_pypy
@ -57,10 +56,7 @@ class Loop:
self.loop.set_task_factory(pypy_task_factory)
else:
# CPython performs worse using custom create_task, so native create_task is used
# but this also did not allow the use of create_task_with_factory :/
# native create_task do not allow to change context, callbacks, state etc
# CPython performs equals or worse using TaskFactory
self.run_async = self._run_async_cpython
def set_timeout(self, timeout, callback, user_data):
@ -125,19 +121,16 @@ class Loop:
def _run_async_pypy(self, task, response=None):
# this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler
# using an coroutine wrapper generates less overhead than using add_done_callback
# this is an custom task/future with less overhead
# this is an custom task/future with less overhead and that calls the first step
future = self._task_factory(
self.loop, task_wrapper(self.exception_handler, self.loop, response, task)
)
# this call makes pypy 10% to 20% faster in async, but will work without it
# this also makes uvloop incompatible if uvloop becomes compatible with pypy
self.loop._run_once()
return None # this future maybe already done and reused not safe to await
def _run_async_cpython(self, task, response=None):
# this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler
# using an coroutine wrapper generates less overhead than using add_done_callback
# custom task will call _step, reusing tasks in CPython is not worth
# this is an custom task/future with less overhead and that calls the first step
future = create_task(self.loop, task_wrapper(self.exception_handler, self.loop, response, task))
return None # this future is safe to await but we return None for compatibility, and in the future will be the same behavior as PyPy

Wyświetl plik

@ -11,12 +11,10 @@ import logging
from .native import ffi, lib
from .loop import Loop
from .status_codes import status_codes
from .helpers import static_route
from dataclasses import dataclass
from .helpers import DecoratorRouter
from typing import Union
from .dataclasses import AppListenOptions
@ffi.callback("void(const char*, size_t, void*)")
def uws_missing_server_name(hostname, hostname_length, user_data):
@ -2549,6 +2547,7 @@ class App:
task_factory_max_items=100_000,
lifespan=True,
):
socket_options_ptr = ffi.new("struct us_socket_context_options_t *")
socket_options = socket_options_ptr[0]
self._options = options
@ -2626,7 +2625,7 @@ class App:
lambda loop, context, response: self.trigger_error(context, response, None),
task_factory_max_items,
)
self.run_async = self.loop.run_async
# set async loop to be the last created (is thread_local), App must be one per thread otherwise will use only the lasted loop
# needs to be called before uws_create_app or otherwise will create another loop and will not receive the right one
lib.uws_get_loop_with_native(self.loop.get_native_loop())
@ -3314,8 +3313,6 @@ class App:
return self
def run_async(self, task, response=None):
return self.loop.run_async(task, response)
def run(self):
# populate factories
@ -3409,53 +3406,3 @@ class App:
self.loop.dispose()
self.loop = None
@dataclass
class AppListenOptions:
port: int = 0
host: str = None
options: int = 0
domain: str = None
def __post_init__(self):
if not isinstance(self.port, int):
raise RuntimeError("port must be an int")
if not isinstance(self.host, (type(None), str)):
raise RuntimeError("host must be a str if specified")
if not isinstance(self.domain, (type(None), str)):
raise RuntimeError("domain must be a str if specified")
if not isinstance(self.options, int):
raise RuntimeError("options must be an int")
if self.domain and (self.host or self.port != 0):
raise RuntimeError(
"if domain is specified, host and port will be no effect"
)
@dataclass
class AppOptions:
key_file_name: str = None
cert_file_name: str = None
passphrase: str = None
dh_params_file_name: str = None
ca_file_name: str = None
ssl_ciphers: str = None
ssl_prefer_low_memory_usage: int = 0
def __post_init__(self):
NoneType = type(None)
if not isinstance(self.key_file_name, (NoneType, str)):
raise RuntimeError("key_file_name must be a str if specified")
if not isinstance(self.cert_file_name, (NoneType, str)):
raise RuntimeError("cert_file_name must be a str if specified")
if not isinstance(self.passphrase, (NoneType, str)):
raise RuntimeError("passphrase must be a str if specified")
if not isinstance(self.dh_params_file_name, (NoneType, str)):
raise RuntimeError("dh_params_file_name must be a str if specified")
if not isinstance(self.ca_file_name, (NoneType, str)):
raise RuntimeError("ca_file_name must be a str if specified")
if not isinstance(self.ssl_ciphers, (NoneType, str)):
raise RuntimeError("ssl_ciphers must be a str if specified")
if not isinstance(self.ssl_prefer_low_memory_usage, int):
raise RuntimeError("ssl_prefer_low_memory_usage must be an int")

Wyświetl plik

@ -582,29 +582,52 @@ class RequestTask:
__iter__ = __await__ # make compatible with 'yield from'.
def create_task_with_factory(task_factory_max_items=100_000):
items = []
for _ in range(0, task_factory_max_items):
task = RequestTask(None, None, None, True)
if task._source_traceback:
del task._source_traceback[-1]
items.append(task)
# def create_task_with_factory(task_factory_max_items=100_000):
# items = []
# for _ in range(0, task_factory_max_items):
# task = RequestTask(None, None, None, True)
# if task._source_traceback:
# del task._source_traceback[-1]
# items.append(task)
def factory(loop, coro, default_done_callback=None):
if len(items) == 0:
return create_task(loop, coro, default_done_callback)
task = items.pop()
# def factory(loop, coro, default_done_callback=None):
# if len(items) == 0:
# return create_task(loop, coro, default_done_callback)
# task = items.pop()
def done(f):
if default_done_callback is not None:
default_done_callback(f)
items.append(f)
# def done(f):
# if default_done_callback is not None:
# default_done_callback(f)
# items.append(f)
task._reuse(coro, loop, done)
# task._reuse(coro, loop, done)
# return task
# return factory
async def factory_task_wrapper(task, dispose):
try:
await task
finally:
dispose()
class TaskFactory:
def __init__(self, task_factory_max_items=100_000):
self.items = []
for _ in range(0, task_factory_max_items):
task = RequestTask(None, None, None, True)
if task._source_traceback:
del task._source_traceback[-1]
self.items.append(task)
def __call__(self, loop, coro):
if len(self.items) == 0:
return create_task(loop, coro)
task = self.items.pop()
task._reuse(factory_task_wrapper(coro, lambda : self.items.append(task)), loop)
return task
return factory
def create_task(loop, coro, default_done_callback=None, context=None):
"""Schedule a coroutine object.

Wyświetl plik

@ -1,3 +1,4 @@
from .native import ffi, lib
@ -90,7 +91,7 @@ class UVLoop:
def run(self):
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT)
def run_once(self):
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE)

Wyświetl plik

@ -7,7 +7,7 @@ from .native import lib, ffi
import platform
is_pypy = platform.python_implementation() == "PyPy"
from .tasks import create_task, create_task_with_factory
from .tasks import create_task, TaskFactory
import sys
import logging
@ -351,11 +351,10 @@ class _WSGI:
# internally will still use custom task factory for pypy because of Loop
if is_pypy:
if task_factory_max_items > 0:
factory = create_task_with_factory(task_factory_max_items)
factory = TaskFactory(task_factory_max_items)
def run_task(task):
factory(loop, task)
loop._run_once()
self._run_task = run_task
else:
@ -363,7 +362,6 @@ class _WSGI:
def run_task(task):
future = create_task(loop, task)
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task