diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index a59271f..7042e57 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -1,3 +1,4 @@ + from socketify import App import os import multiprocessing @@ -7,7 +8,7 @@ def run_app(): router = app.router() @router.get("/") - def home(res, req): + async def home(res, req): res.send(b"Hello, World!") app.listen( @@ -28,7 +29,7 @@ def create_fork(): # fork limiting the cpu count - 1 -for i in range(1, multiprocessing.cpu_count()): - create_fork() +# for i in range(1, multiprocessing.cpu_count()): +# create_fork() run_app() # run app on the main process too :) diff --git a/src/socketify/__init__.py b/src/socketify/__init__.py index 2304b5d..ecb974e 100644 --- a/src/socketify/__init__.py +++ b/src/socketify/__init__.py @@ -1,9 +1,9 @@ import asyncio +from .dataclasses import AppListenOptions, AppOptions + from .socketify import ( App, - AppOptions, - AppListenOptions, OpCode, SendStatus, CompressOptions, diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 9af72f3..1c49c07 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,7 +1,7 @@ -from socketify import App, OpCode, Loop +from socketify import App, OpCode from queue import SimpleQueue from .native import lib, ffi -from .tasks import create_task, create_task_with_factory +from .tasks import create_task, TaskFactory import os import platform import sys @@ -523,11 +523,10 @@ class _ASGI: # internally will still use custom task factory for pypy because of Loop if is_pypy: if task_factory_max_items > 0: - factory = create_task_with_factory(task_factory_max_items) + factory = TaskFactory(task_factory_max_items) def run_task(task): factory(loop, task_wrapper(task)) - loop._run_once() self._run_task = run_task else: @@ -535,7 +534,6 @@ class _ASGI: def run_task(task): future = create_task(loop, task_wrapper(task)) future._log_destroy_pending = False - loop._run_once() self._run_task = run_task @@ -544,7 +542,6 @@ class _ASGI: def run_task(task): future = create_task(loop, task_wrapper(task)) - future._log_destroy_pending = False self._run_task = run_task diff --git a/src/socketify/dataclasses.py b/src/socketify/dataclasses.py new file mode 100644 index 0000000..7d8bcf6 --- /dev/null +++ b/src/socketify/dataclasses.py @@ -0,0 +1,52 @@ + +from dataclasses import dataclass + +@dataclass +class AppListenOptions: + port: int = 0 + host: str = None + options: int = 0 + domain: str = None + + def __post_init__(self): + if not isinstance(self.port, int): + raise RuntimeError("port must be an int") + if not isinstance(self.host, (type(None), str)): + raise RuntimeError("host must be a str if specified") + if not isinstance(self.domain, (type(None), str)): + raise RuntimeError("domain must be a str if specified") + if not isinstance(self.options, int): + raise RuntimeError("options must be an int") + if self.domain and (self.host or self.port != 0): + raise RuntimeError( + "if domain is specified, host and port will be no effect" + ) + + +@dataclass +class AppOptions: + key_file_name: str = None + cert_file_name: str = None + passphrase: str = None + dh_params_file_name: str = None + ca_file_name: str = None + ssl_ciphers: str = None + ssl_prefer_low_memory_usage: int = 0 + + def __post_init__(self): + NoneType = type(None) + + if not isinstance(self.key_file_name, (NoneType, str)): + raise RuntimeError("key_file_name must be a str if specified") + if not isinstance(self.cert_file_name, (NoneType, str)): + raise RuntimeError("cert_file_name must be a str if specified") + if not isinstance(self.passphrase, (NoneType, str)): + raise RuntimeError("passphrase must be a str if specified") + if not isinstance(self.dh_params_file_name, (NoneType, str)): + raise RuntimeError("dh_params_file_name must be a str if specified") + if not isinstance(self.ca_file_name, (NoneType, str)): + raise RuntimeError("ca_file_name must be a str if specified") + if not isinstance(self.ssl_ciphers, (NoneType, str)): + raise RuntimeError("ssl_ciphers must be a str if specified") + if not isinstance(self.ssl_prefer_low_memory_usage, int): + raise RuntimeError("ssl_prefer_low_memory_usage must be an int") diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 1d33836..74b1b42 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -1,6 +1,6 @@ import asyncio import logging -from .tasks import create_task, create_task_with_factory +from .tasks import create_task, TaskFactory from .uv import UVLoop import asyncio @@ -8,7 +8,6 @@ import platform is_pypy = platform.python_implementation() == "PyPy" - async def task_wrapper(exception_handler, loop, response, task): try: return await task @@ -47,7 +46,7 @@ class Loop: self.started = False if is_pypy: # PyPy async Optimizations if task_factory_max_items > 0: # Only available in PyPy for now - self._task_factory = create_task_with_factory(task_factory_max_items) + self._task_factory = TaskFactory(task_factory_max_items) else: self._task_factory = create_task self.run_async = self._run_async_pypy @@ -57,10 +56,7 @@ class Loop: self.loop.set_task_factory(pypy_task_factory) else: - # CPython performs worse using custom create_task, so native create_task is used - # but this also did not allow the use of create_task_with_factory :/ - # native create_task do not allow to change context, callbacks, state etc - + # CPython performs equals or worse using TaskFactory self.run_async = self._run_async_cpython def set_timeout(self, timeout, callback, user_data): @@ -125,19 +121,16 @@ class Loop: def _run_async_pypy(self, task, response=None): # this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler # using an coroutine wrapper generates less overhead than using add_done_callback - # this is an custom task/future with less overhead + # this is an custom task/future with less overhead and that calls the first step future = self._task_factory( self.loop, task_wrapper(self.exception_handler, self.loop, response, task) ) - # this call makes pypy 10% to 20% faster in async, but will work without it - # this also makes uvloop incompatible if uvloop becomes compatible with pypy - self.loop._run_once() return None # this future maybe already done and reused not safe to await def _run_async_cpython(self, task, response=None): # this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler # using an coroutine wrapper generates less overhead than using add_done_callback - # custom task will call _step, reusing tasks in CPython is not worth + # this is an custom task/future with less overhead and that calls the first step future = create_task(self.loop, task_wrapper(self.exception_handler, self.loop, response, task)) return None # this future is safe to await but we return None for compatibility, and in the future will be the same behavior as PyPy diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 595d019..1977a7d 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -11,12 +11,10 @@ import logging from .native import ffi, lib from .loop import Loop -from .status_codes import status_codes from .helpers import static_route -from dataclasses import dataclass from .helpers import DecoratorRouter from typing import Union - +from .dataclasses import AppListenOptions @ffi.callback("void(const char*, size_t, void*)") def uws_missing_server_name(hostname, hostname_length, user_data): @@ -2549,6 +2547,7 @@ class App: task_factory_max_items=100_000, lifespan=True, ): + socket_options_ptr = ffi.new("struct us_socket_context_options_t *") socket_options = socket_options_ptr[0] self._options = options @@ -2626,7 +2625,7 @@ class App: lambda loop, context, response: self.trigger_error(context, response, None), task_factory_max_items, ) - + self.run_async = self.loop.run_async # set async loop to be the last created (is thread_local), App must be one per thread otherwise will use only the lasted loop # needs to be called before uws_create_app or otherwise will create another loop and will not receive the right one lib.uws_get_loop_with_native(self.loop.get_native_loop()) @@ -3314,8 +3313,6 @@ class App: return self - def run_async(self, task, response=None): - return self.loop.run_async(task, response) def run(self): # populate factories @@ -3409,53 +3406,3 @@ class App: self.loop.dispose() self.loop = None - -@dataclass -class AppListenOptions: - port: int = 0 - host: str = None - options: int = 0 - domain: str = None - - def __post_init__(self): - if not isinstance(self.port, int): - raise RuntimeError("port must be an int") - if not isinstance(self.host, (type(None), str)): - raise RuntimeError("host must be a str if specified") - if not isinstance(self.domain, (type(None), str)): - raise RuntimeError("domain must be a str if specified") - if not isinstance(self.options, int): - raise RuntimeError("options must be an int") - if self.domain and (self.host or self.port != 0): - raise RuntimeError( - "if domain is specified, host and port will be no effect" - ) - - -@dataclass -class AppOptions: - key_file_name: str = None - cert_file_name: str = None - passphrase: str = None - dh_params_file_name: str = None - ca_file_name: str = None - ssl_ciphers: str = None - ssl_prefer_low_memory_usage: int = 0 - - def __post_init__(self): - NoneType = type(None) - - if not isinstance(self.key_file_name, (NoneType, str)): - raise RuntimeError("key_file_name must be a str if specified") - if not isinstance(self.cert_file_name, (NoneType, str)): - raise RuntimeError("cert_file_name must be a str if specified") - if not isinstance(self.passphrase, (NoneType, str)): - raise RuntimeError("passphrase must be a str if specified") - if not isinstance(self.dh_params_file_name, (NoneType, str)): - raise RuntimeError("dh_params_file_name must be a str if specified") - if not isinstance(self.ca_file_name, (NoneType, str)): - raise RuntimeError("ca_file_name must be a str if specified") - if not isinstance(self.ssl_ciphers, (NoneType, str)): - raise RuntimeError("ssl_ciphers must be a str if specified") - if not isinstance(self.ssl_prefer_low_memory_usage, int): - raise RuntimeError("ssl_prefer_low_memory_usage must be an int") diff --git a/src/socketify/tasks.py b/src/socketify/tasks.py index e6f69fe..8ff06ac 100644 --- a/src/socketify/tasks.py +++ b/src/socketify/tasks.py @@ -582,29 +582,52 @@ class RequestTask: __iter__ = __await__ # make compatible with 'yield from'. -def create_task_with_factory(task_factory_max_items=100_000): - items = [] - for _ in range(0, task_factory_max_items): - task = RequestTask(None, None, None, True) - if task._source_traceback: - del task._source_traceback[-1] - items.append(task) +# def create_task_with_factory(task_factory_max_items=100_000): +# items = [] +# for _ in range(0, task_factory_max_items): +# task = RequestTask(None, None, None, True) +# if task._source_traceback: +# del task._source_traceback[-1] +# items.append(task) - def factory(loop, coro, default_done_callback=None): - if len(items) == 0: - return create_task(loop, coro, default_done_callback) - task = items.pop() +# def factory(loop, coro, default_done_callback=None): +# if len(items) == 0: +# return create_task(loop, coro, default_done_callback) +# task = items.pop() - def done(f): - if default_done_callback is not None: - default_done_callback(f) - items.append(f) +# def done(f): +# if default_done_callback is not None: +# default_done_callback(f) +# items.append(f) - task._reuse(coro, loop, done) +# task._reuse(coro, loop, done) +# return task + +# return factory + +async def factory_task_wrapper(task, dispose): + try: + await task + finally: + dispose() + +class TaskFactory: + def __init__(self, task_factory_max_items=100_000): + self.items = [] + for _ in range(0, task_factory_max_items): + task = RequestTask(None, None, None, True) + if task._source_traceback: + del task._source_traceback[-1] + self.items.append(task) + + def __call__(self, loop, coro): + if len(self.items) == 0: + return create_task(loop, coro) + task = self.items.pop() + + task._reuse(factory_task_wrapper(coro, lambda : self.items.append(task)), loop) return task - return factory - def create_task(loop, coro, default_done_callback=None, context=None): """Schedule a coroutine object. diff --git a/src/socketify/uv.py b/src/socketify/uv.py index 4087334..865685c 100644 --- a/src/socketify/uv.py +++ b/src/socketify/uv.py @@ -1,3 +1,4 @@ + from .native import ffi, lib @@ -90,7 +91,7 @@ class UVLoop: def run(self): if self._loop != ffi.NULL: return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT) - + def run_once(self): if self._loop != ffi.NULL: return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE) diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index c0b2605..291278a 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -7,7 +7,7 @@ from .native import lib, ffi import platform is_pypy = platform.python_implementation() == "PyPy" -from .tasks import create_task, create_task_with_factory +from .tasks import create_task, TaskFactory import sys import logging @@ -351,11 +351,10 @@ class _WSGI: # internally will still use custom task factory for pypy because of Loop if is_pypy: if task_factory_max_items > 0: - factory = create_task_with_factory(task_factory_max_items) + factory = TaskFactory(task_factory_max_items) def run_task(task): factory(loop, task) - loop._run_once() self._run_task = run_task else: @@ -363,7 +362,6 @@ class _WSGI: def run_task(task): future = create_task(loop, task) future._log_destroy_pending = False - loop._run_once() self._run_task = run_task