update uSockets and projects

pull/106/head
Ciro 2023-01-18 16:04:41 -03:00
rodzic 59619dd697
commit 291af489f1
13 zmienionych plików z 351 dodań i 189 usunięć

Wyświetl plik

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "socketify"
version = "0.0.6"
version = "0.0.7"
authors = [
{ name="Ciro Spaciari", email="ciro.spaciari@gmail.com" },
]

Wyświetl plik

@ -58,7 +58,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
setuptools.setup(
name="socketify",
version="0.0.6",
version="0.0.7",
platforms=["any"],
author="Ciro Spaciari",
author_email="ciro.spaciari@gmail.com",

Wyświetl plik

@ -1,3 +1,5 @@
import asyncio
from .socketify import (
App,
AppOptions,

Wyświetl plik

@ -4,11 +4,14 @@ from .native import lib, ffi
from .tasks import create_task, create_task_with_factory
import os
import platform
import sys
import sys
import logging
import uuid
import asyncio
is_pypy = platform.python_implementation() == "PyPy"
async def task_wrapper(task):
try:
return await task
@ -19,6 +22,7 @@ async def task_wrapper(task):
finally:
return None
EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False}
@ -30,7 +34,7 @@ def ws_message(ws, message, length, opcode, user_data):
message = message.decode("utf8")
socket_data.message(ws, message, OpCode(opcode))
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def ws_close(ws, code, message, length, user_data):
@ -79,7 +83,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
extensions = ffi.unpack(info.extensions, info.extensions_size).decode("utf8")
compress = app.ws_compression
ws = ASGIWebSocket(app.server.loop)
scope = {
"type": "websocket",
"asgi": {"version": "3.0", "spec_version": "2.3"},
@ -160,10 +164,12 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
else:
sec_web_socket_extensions_data = b""
_id = uuid.uuid4()
app.server._socket_refs[_id] = ws
def unregister():
app.server._socket_refs.pop(_id, None)
ws.unregister = unregister
lib.uws_res_upgrade(
ssl,
@ -492,10 +498,18 @@ def asgi(ssl, response, info, user_data, aborted):
return False
app._run_task(app.app(scope, receive, send))
class _ASGI:
def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=100_000, lifespan=True):
def __init__(
self,
app,
options=None,
websocket=True,
websocket_options=None,
task_factory_max_items=100_000,
lifespan=True,
):
self.server = App(options, task_factory_max_items=0)
self.SERVER_PORT = None
self.SERVER_HOST = ""
@ -510,31 +524,39 @@ class _ASGI:
if is_pypy:
if task_factory_max_items > 0:
factory = create_task_with_factory(task_factory_max_items)
def run_task(task):
factory(loop, task_wrapper(task))
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
create_task(loop, task_wrapper(task))
loop._run_once()
self._run_task = run_task
else:
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
def run_task(task):
future = loop.create_task(task_wrapper(task), name='socketify.py-request-task')
future = loop.create_task(
task_wrapper(task), name="socketify.py-request-task"
)
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
future = loop.create_task(task_wrapper(task))
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task
self.app = app
self.ws_compression = False
@ -613,16 +635,16 @@ class _ASGI:
if not self.lifespan:
self.server.listen(port_or_options, handler)
return self
scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}}
asgi_app = self
self.is_starting = True
self.is_stopped = False
self.status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan
self.status = 0 # 0 starting, 1 ok, 2 error, 3 stoping, 4 stopped, 5 stopped with error, 6 no lifespan
self.status_message = ""
self.stop_future = self.server.loop.create_future()
async def send(options):
nonlocal asgi_app
type = options["type"]
@ -660,7 +682,7 @@ class _ASGI:
# just log in console the error to call attention
logging.error("Uncaught Exception: %s" % str(error))
if asgi_app.status < 2:
asgi_app.status = 6 # no more lifespan
asgi_app.status = 6 # no more lifespan
asgi_app.server.listen(port_or_options, handler)
finally:
return None
@ -676,21 +698,23 @@ class _ASGI:
def run(self):
if not self.lifespan:
self.server.run()
self.server.run()
return self
# run app
self.server.run()
self.server.run()
# no more lifespan events
if self.status == 6:
return self
# signal stop
self.status = 3
self.stop_future.set_result({
"type": "lifespan.shutdown",
"asgi": {"version": "3.0", "spec_version": "2.3"},
})
# run until end or fail
self.stop_future.set_result(
{
"type": "lifespan.shutdown",
"asgi": {"version": "3.0", "spec_version": "2.3"},
}
)
# run until end or fail
while self.status == 3:
self.server.loop.run_once()
@ -714,8 +738,8 @@ class ASGI:
options=None,
websocket=True,
websocket_options=None,
task_factory_max_items=100_000, #default = 100k = +20mib in memory
lifespan=True
task_factory_max_items=100_000, # default = 100k = +20mib in memory
lifespan=True,
):
self.app = app
self.options = options
@ -737,7 +761,7 @@ class ASGI:
self.websocket,
self.websocket_options,
self.task_factory_max_items,
self.lifespan
self.lifespan,
)
if self.listen_options:
(port_or_options, handler) = self.listen_options

Wyświetl plik

@ -2,6 +2,7 @@ import inspect
import os
import logging
from . import App, AppOptions, AppListenOptions
help = """
Usage: python -m socketify APP [OPTIONS]
python3 -m socketify APP [OPTIONS]
@ -43,40 +44,59 @@ Example:
"""
# --reload Enable auto-reload. This options also disable --workers or -w option.
# --reload-dir PATH Set reload directories explicitly, instead of using the current working directory.
# --reload-include TEXT Set extensions to include while watching for files.
# Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default;
# these defaults can be overridden with `--reload-exclude`.
# --reload-exclude TEXT Set extensions to include while watching for files.
# --reload-delay INT Milliseconds to delay reload between file changes. [default: 1000]
# --reload Enable auto-reload. This options also disable --workers or -w option.
# --reload-dir PATH Set reload directories explicitly, instead of using the current working directory.
# --reload-include TEXT Set extensions to include while watching for files.
# Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default;
# these defaults can be overridden with `--reload-exclude`.
# --reload-exclude TEXT Set extensions to include while watching for files.
# --reload-delay INT Milliseconds to delay reload between file changes. [default: 1000]
def is_wsgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 2
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 2
)
def is_asgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
)
def is_ssgi(module):
return False # no spec yet
return False # no spec yet
def is_ssgi(module):
return False # no spec yet
return False # no spec yet
def is_socketify(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 1
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 1
)
def is_factory(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0
)
def str_bool(text):
text = str(text).lower()
return text == "true"
def load_module(file, reload=False):
try:
[full_module, app] = file.split(':')
[full_module, app] = file.split(":")
import importlib
module =importlib.import_module(full_module)
module = importlib.import_module(full_module)
if reload:
importlib.reload(module)
app = getattr(module, app)
# if is an factory just auto call
if is_factory(module):
@ -85,6 +105,8 @@ def load_module(file, reload=False):
except Exception as error:
logging.exception(error)
return None
def execute(args):
arguments_length = len(args)
if arguments_length <= 2:
@ -93,17 +115,20 @@ def execute(args):
if arguments_length == 2 and (args[1] == "--version" or args[1] == "-v"):
import pkg_resources # part of setuptools
import platform
version = pkg_resources.require("socketify")[0].version
return print(f"Running socketify {version} with {platform.python_implementation()} {platform.python_version()} on {platform.system()}")
return print(
f"Running socketify {version} with {platform.python_implementation()} {platform.python_version()} on {platform.system()}"
)
elif arguments_length < 2:
return print(help)
file = (args[1]).lower()
file = (args[1]).lower()
module = load_module(file)
if not module:
return print(f"Cannot load module {file}")
options_list = args[2:]
options = {}
selected_option = None
@ -111,46 +136,55 @@ def execute(args):
if selected_option:
options[selected_option] = option
selected_option = None
elif option.startswith('--') or option.startswith('-'):
elif option.startswith("--") or option.startswith("-"):
if selected_option is None:
selected_option = option
else: # --factory, --reload etc
else: # --factory, --reload etc
options[selected_option] = True
else:
return print(f"Invalid option ${selected_option} see --help")
if selected_option: # --factory, --reload etc
if selected_option: # --factory, --reload etc
options[selected_option] = True
interface = (options.get("--interface", "auto")).lower()
if interface == "auto":
if is_asgi(module):
from . import ASGI as Interface
interface = "asgi"
elif is_wsgi(module):
from . import WSGI as Interface
interface = "wsgi"
elif is_ssgi(module):
from . import SSGI as Interface
interface = "ssgi"
else:
interface = "socketify"
elif interface == "asgi" or interface == "asgi3":
from . import ASGI as Interface
interface = "asgi"
# you may use ASGI in SSGI so no checks here
if is_wsgi(module):
return print("Cannot use WSGI interface as ASGI interface")
if not is_asgi(module):
return print("ASGI interface must be callable with 3 parameters async def app(scope, receive and send)")
return print(
"ASGI interface must be callable with 3 parameters async def app(scope, receive and send)"
)
elif interface == "wsgi":
from . import WSGI as Interface
# you may use WSGI in SSGI so no checks here
if is_asgi(module):
return print("Cannot use ASGI interface as WSGI interface")
if not is_wsgi(module):
return print("WSGI interface must be callable with 2 parameters def app(environ, start_response)")
return print(
"WSGI interface must be callable with 2 parameters def app(environ, start_response)"
)
elif interface == "ssgi":
# if not is_ssgi(module):
@ -161,21 +195,25 @@ def execute(args):
elif interface != "socketify":
return print(f"{interface} interface is not supported yet")
auto_reload = options.get('--reload', False)
workers = int(options.get("--workers", options.get("-w", os.environ.get('WEB_CONCURRENCY', 1))))
auto_reload = options.get("--reload", False)
workers = int(
options.get(
"--workers", options.get("-w", os.environ.get("WEB_CONCURRENCY", 1))
)
)
if workers < 1 or auto_reload:
workers = 1
port = int(options.get("--port", options.get("-p", 8000)))
host = options.get("--host", options.get("-h", "127.0.0.1"))
uds = options.get('--uds', None)
lifespan = options.get('--lifespan', "auto")
uds = options.get("--uds", None)
lifespan = options.get("--lifespan", "auto")
lifespan = False if lifespan == "off" else True
task_factory_maxitems = int(options.get("--task-factory-maxitems", 100000))
disable_listen_log = options.get("--disable-listen-log", False)
websockets = options.get("--ws", "auto")
if websockets == "none":
# disable websockets
websockets = None
@ -185,26 +223,25 @@ def execute(args):
websockets = True
elif is_wsgi(module):
# if is WSGI no websockets using auto
websockets = False
else: # if is socketify websockets must be set in app
websockets = False
websockets = False
else: # if is socketify websockets must be set in app
websockets = False
else:
#websocket dedicated module
# websocket dedicated module
ws_module = load_module(websockets)
if not ws_module:
return print(f"Cannot load websocket module {websockets}")
websockets = ws_module
websockets = ws_module
key_file_name = options.get("--ssl-keyfile", None)
key_file_name = options.get('--ssl-keyfile', None)
if key_file_name:
ssl_options = AppOptions(
key_file_name=options.get('--ssl-keyfile', None),
cert_file_name=options.get('--ssl-certfile', None),
passphrase=options.get('--ssl-keyfile-password', None),
ca_file_name=options.get('--ssl-ca-certs', None),
ssl_ciphers=options.get('--ssl-ciphers', None)
key_file_name=options.get("--ssl-keyfile", None),
cert_file_name=options.get("--ssl-certfile", None),
passphrase=options.get("--ssl-keyfile-password", None),
ca_file_name=options.get("--ssl-ca-certs", None),
ssl_ciphers=options.get("--ssl-ciphers", None),
)
else:
ssl_options = None
@ -212,24 +249,34 @@ def execute(args):
def listen_log(config):
if not disable_listen_log:
if uds:
print(f"Listening on {config.domain} {'https' if ssl_options else 'http'}://localhost now\n")
print(
f"Listening on {config.domain} {'https' if ssl_options else 'http'}://localhost now\n"
)
else:
print(f"Listening on {'https' if ssl_options else 'http'}://{config.host if config.host and len(config.host) > 1 else '127.0.0.1' }:{config.port} now\n")
print(
f"Listening on {'https' if ssl_options else 'http'}://{config.host if config.host and len(config.host) > 1 else '127.0.0.1' }:{config.port} now\n"
)
if websockets:
websocket_options = {
'compression': int(1 if options.get('--ws-per-message-deflate', False) else 0),
'max_payload_length': int(options.get('--ws-max-size', 16777216)),
'idle_timeout': int(options.get('--ws-idle-timeout', 20)),
'send_pings_automatically': str_bool(options.get('--ws-auto-ping', True)),
'reset_idle_timeout_on_send': str_bool(options.get('--ws-reset-idle-on-send', True)),
'max_lifetime': int(options.get('--ws-max-lifetime', 0)),
'max_backpressure': int(options.get('--max-backpressure', 16777216)),
'close_on_backpressure_limit': str_bool(options.get('--ws-close-on-backpressure-limit', False))
"compression": int(
1 if options.get("--ws-per-message-deflate", False) else 0
),
"max_payload_length": int(options.get("--ws-max-size", 16777216)),
"idle_timeout": int(options.get("--ws-idle-timeout", 20)),
"send_pings_automatically": str_bool(options.get("--ws-auto-ping", True)),
"reset_idle_timeout_on_send": str_bool(
options.get("--ws-reset-idle-on-send", True)
),
"max_lifetime": int(options.get("--ws-max-lifetime", 0)),
"max_backpressure": int(options.get("--max-backpressure", 16777216)),
"close_on_backpressure_limit": str_bool(
options.get("--ws-close-on-backpressure-limit", False)
),
}
else:
websocket_options = None
if interface == "socketify":
if is_asgi(websockets):
return print("Cannot mix ASGI websockets with socketify.py interface yet")
@ -238,14 +285,24 @@ def execute(args):
elif is_wsgi(module):
return print("Cannot use WSGI interface as socketify interface")
elif not is_socketify(module):
return print("socketify interface must be callable with 1 parameter def run(app: App)")
return print(
"socketify interface must be callable with 1 parameter def run(app: App)"
)
# run app with the settings desired
def run_app():
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems, lifespan)
module(fork_app) # call module factory
fork_app = App(
ssl_options,
int(options.get("--req-res-factory-maxitems", 0)),
int(options.get("--ws-factory-maxitems", 0)),
task_factory_maxitems,
lifespan,
)
module(fork_app) # call module factory
if websockets: # if socketify websockets are added using --ws in socketify interface we can set here
websockets.update(websocket_options) # set websocket options
if (
websockets
): # if socketify websockets are added using --ws in socketify interface we can set here
websockets.update(websocket_options) # set websocket options
fork_app.ws("/*", websockets)
if uds:
fork_app.listen(AppListenOptions(domain=uds), listen_log)
@ -268,11 +325,28 @@ def execute(args):
run_app()
# sigint everything to gracefull shutdown
import signal
for pid in pid_list:
os.kill(pid, signal.SIGINT)
else:
if uds:
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems, lifespan=lifespan).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers)
Interface(
module,
options=ssl_options,
websocket=websockets,
websocket_options=websocket_options,
task_factory_max_items=task_factory_maxitems,
lifespan=lifespan,
).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers)
else:
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems, lifespan=lifespan).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers)
Interface(
module,
options=ssl_options,
websocket=websockets,
websocket_options=websocket_options,
task_factory_max_items=task_factory_maxitems,
lifespan=lifespan,
).listen(AppListenOptions(port=port, host=host), listen_log).run(
workers=workers
)

Wyświetl plik

@ -9,7 +9,7 @@ mimetypes.init()
# We have an version of this using aiofile and aiofiles
# This is an sync version without any dependencies is normally much faster in CPython and PyPy3
# In production we highly recommend to use CDN like CloudFlare or/and NGINX or similar for static files
# TODO: this must be reimplemented pure C++ to avoid GIL
# TODO: this must be reimplemented pure C++ to avoid GIL
async def sendfile(res, req, filename):
# read headers before the first await
if_modified_since = req.get_header("if-modified-since")
@ -39,13 +39,13 @@ async def sendfile(res, req, filename):
# check if modified since is provided
if if_modified_since == last_modified:
return res.cork(lambda res: res.write_status(304).end_without_body())
return res.cork(lambda res: res.write_status(304).end_without_body())
# add content type
(content_type, encoding) = mimetypes.guess_type(filename, strict=True)
if content_type and encoding:
content_type = "%s; %s" % (content_type, encoding)
with open(filename, "rb") as fd:
# check range and support it
if start > 0 or not end == -1:
@ -55,8 +55,14 @@ async def sendfile(res, req, filename):
fd.seek(start)
if start > total_size or size > total_size or size < 0 or start < 0:
if content_type:
return res.cork(lambda res: res.write_header(b"Content-Type", content_type).write_status(416).end_without_body())
return res.cork(lambda res: res.write_status(416).end_without_body())
return res.cork(
lambda res: res.write_header(b"Content-Type", content_type)
.write_status(416)
.end_without_body()
)
return res.cork(
lambda res: res.write_status(416).end_without_body()
)
status = 206
else:
end = size - 1
@ -74,6 +80,7 @@ async def sendfile(res, req, filename):
res.write_header(
b"Content-Range", "bytes %d-%d/%d" % (start, end, total_size)
)
res.cork(send_headers)
pending_size = size
# keep sending until abort or done
@ -104,9 +111,9 @@ def static_route(app, route, directory):
def route_handler(res, req):
url = req.get_url()
res.grab_aborted_handler()
url = url[len(route)::]
url = url[len(route) : :]
if url.endswith("/"):
if url.startswith("/"):
if url.startswith("/"):
url = url[1:-1]
else:
url = url[:-1]
@ -123,6 +130,7 @@ def static_route(app, route, directory):
route = route[:-1]
app.get("%s/*" % route, route_handler)
def middleware(*functions):
syncs = []
asyncs = []
@ -130,13 +138,13 @@ def middleware(*functions):
# all is async after the first async
if inspect.iscoroutinefunction(function) or len(asyncs) > 0:
asyncs.append(function)
else:
else:
syncs.append(function)
if len(asyncs) == 0: # pure sync
if len(asyncs) == 0: # pure sync
return sync_middleware(*functions)
if len(syncs) == 0: # pure async
if len(syncs) == 0: # pure async
return async_middleware(*functions)
# we use Optional data=None at the end so you can use and middleware inside a middleware
def optimized_middleware_route(res, req, data=None):
# cicle to all middlewares
@ -146,6 +154,7 @@ def middleware(*functions):
# stops if returns Falsy
if not data:
return
async def wrapper(res, req, data):
# cicle to all middlewares
for function in asyncs:
@ -157,7 +166,7 @@ def middleware(*functions):
data = function(res, req, data)
# stops if returns Falsy
if not data:
break
break
return data
# in async query string, arguments and headers are only valid until the first await
@ -168,7 +177,6 @@ def middleware(*functions):
res.run_async(wrapper(res, req, data))
return optimized_middleware_route
def sync_middleware(*functions):
@ -185,6 +193,7 @@ def sync_middleware(*functions):
return middleware_route
def async_middleware(*functions):
# we use Optional data=None at the end so you can use and middleware inside a middleware
async def middleware_route(res, req, data=None):
@ -211,14 +220,14 @@ def async_middleware(*functions):
class DecoratorRouter:
def __init__(self, app, prefix: str="", *middlewares):
def __init__(self, app, prefix: str = "", *middlewares):
self.app = app
self.middlewares = middlewares
self.prefix = prefix
def get(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -227,11 +236,12 @@ class DecoratorRouter:
else:
self.app.get(path, handler)
return handler
return decorator
def post(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -239,11 +249,12 @@ class DecoratorRouter:
self.app.post(path, middleware(*middies))
else:
self.app.post(path, handler)
return decorator
def options(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -256,6 +267,7 @@ class DecoratorRouter:
def delete(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -268,6 +280,7 @@ class DecoratorRouter:
def patch(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -280,6 +293,7 @@ class DecoratorRouter:
def put(self, path: str):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -292,6 +306,7 @@ class DecoratorRouter:
def head(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -304,6 +319,7 @@ class DecoratorRouter:
def connect(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -316,6 +332,7 @@ class DecoratorRouter:
def trace(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -328,6 +345,7 @@ class DecoratorRouter:
def any(self, path):
path = f"{self.prefix}{path}"
def decorator(handler):
if len(self.middlewares) > 0:
middies = list(*self.middlewares)
@ -338,6 +356,7 @@ class DecoratorRouter:
return decorator
class MiddlewareRouter:
def __init__(self, app, *middlewares):
self.app = app

Wyświetl plik

@ -6,7 +6,9 @@ from .uv import UVLoop
import asyncio
import platform
is_pypy = platform.python_implementation() == 'PyPy'
is_pypy = platform.python_implementation() == "PyPy"
async def task_wrapper(exception_handler, loop, response, task):
try:
return await task
@ -25,13 +27,13 @@ async def task_wrapper(exception_handler, loop, response, task):
class Loop:
def __init__(self, exception_handler=None, task_factory_max_items=0):
# get the current running loop or create a new one without warnings
self.loop = asyncio._get_running_loop()
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.uv_loop = UVLoop()
if hasattr(exception_handler, "__call__"):
@ -43,8 +45,8 @@ class Loop:
self.exception_handler = None
self.started = False
if is_pypy: # PyPy async Optimizations
if task_factory_max_items > 0: # Only available in PyPy for now
if is_pypy: # PyPy async Optimizations
if task_factory_max_items > 0: # Only available in PyPy for now
self._task_factory = create_task_with_factory(task_factory_max_items)
else:
self._task_factory = create_task
@ -52,14 +54,14 @@ class Loop:
# custom task factory
def pypy_task_factory(loop, coro, context=None):
return create_task(loop, coro, context=context)
self.loop.set_task_factory(pypy_task_factory)
else:
# CPython performs worse using custom create_task, so native create_task is used
# but this also did not allow the use of create_task_with_factory :/
# native create_task do not allow to change context, callbacks, state etc
self.run_async = self._run_async_cpython
def set_timeout(self, timeout, callback, user_data):
return self.uv_loop.create_timer(timeout, 0, callback, user_data)
@ -71,7 +73,7 @@ class Loop:
if self.started:
self.uv_loop.run_once()
self.loop.call_soon(self._keep_alive)
def create_task(self, *args, **kwargs):
# this is not using optimized create_task yet
return self.loop.create_task(*args, **kwargs)
@ -84,7 +86,7 @@ class Loop:
if task is not None:
future = self.ensure_future(task)
else:
future = None
future = None
self.loop.call_soon(self._keep_alive)
self.loop.run_until_complete(future)
# clean up uvloop
@ -96,7 +98,7 @@ class Loop:
if task is not None:
future = self.ensure_future(task)
else:
future = None
future = None
self.loop.call_soon(self._keep_alive)
self.loop.run_forever()
# clean up uvloop
@ -104,13 +106,12 @@ class Loop:
return future
def run_once(self):
# run one step of asyncio
# run one step of asyncio
self.loop._stopping = True
self.loop._run_once()
# run one step of libuv
self.uv_loop.run_once()
def stop(self):
if self.started:
# Just mark as started = False and wait
@ -120,29 +121,34 @@ class Loop:
# Exposes native loop for uWS
def get_native_loop(self):
return self.uv_loop.get_native_loop()
def _run_async_pypy(self, task, response=None):
# this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler
# using an coroutine wrapper generates less overhead than using add_done_callback
# using an coroutine wrapper generates less overhead than using add_done_callback
# this is an custom task/future with less overhead
future = self._task_factory(self.loop, task_wrapper(self.exception_handler, self.loop, response, task))
future = self._task_factory(
self.loop, task_wrapper(self.exception_handler, self.loop, response, task)
)
# force asyncio run once to enable req in async functions before first await
self.loop._run_once()
return None # this future maybe already done and reused not safe to await
return None # this future maybe already done and reused not safe to await
def _run_async_cpython(self, task, response=None):
def _run_async_cpython(self, task, response=None):
# this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler
# using an coroutine wrapper generates less overhead than using add_done_callback
future = self.loop.create_task(task_wrapper(self.exception_handler, self.loop, response, task))
# using an coroutine wrapper generates less overhead than using add_done_callback
future = self.loop.create_task(
task_wrapper(self.exception_handler, self.loop, response, task)
)
# force asyncio run once to enable req in async functions before first await
self.loop._run_once()
return None # this future is safe to await but we return None for compatibility, and in the future will be the same behavior as PyPy
return None # this future is safe to await but we return None for compatibility, and in the future will be the same behavior as PyPy
def dispose(self):
if self.uv_loop:
self.uv_loop.dispose()
self.uv_loop = None
# if sys.version_info >= (3, 11)
# with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
# runner.run(main())

Wyświetl plik

@ -401,5 +401,3 @@ library_path = os.path.join(
lib = ffi.dlopen(library_path)

Wyświetl plik

@ -4,9 +4,6 @@ from http import cookies
import inspect
from io import BytesIO
import json
import mimetypes
import os
import platform
import signal
import uuid
from urllib.parse import parse_qs, quote_plus, unquote_plus
@ -20,8 +17,6 @@ from dataclasses import dataclass
from .helpers import DecoratorRouter
from typing import Union
mimetypes.init()
@ffi.callback("void(const char*, size_t, void*)")
def uws_missing_server_name(hostname, hostname_length, user_data):
@ -1772,8 +1767,8 @@ class AppResponse:
else:
data = self.app._json_serializer.dumps(message).encode("utf-8")
# ignores content_type should always be json here
self.write_header(b"Content-Type", b'application/json')
self.write_header(b"Content-Type", b"application/json")
lib.uws_res_end(
self.app.SSL, self.res, data, len(data), 1 if end_connection else 0
)
@ -2476,7 +2471,7 @@ class App:
request_response_factory_max_items=0,
websocket_factory_max_items=0,
task_factory_max_items=100_000,
lifespan=True
lifespan=True,
):
socket_options_ptr = ffi.new("struct us_socket_context_options_t *")
socket_options = socket_options_ptr[0]
@ -2560,17 +2555,17 @@ class App:
self._request_extension = None
self._response_extension = None
self._ws_extension = None
self._on_start_handler = None
self._on_start_handler = None
self._on_shutdown_handler = None
def on_start(self, method: callable):
self._on_start_handler = method
self._on_start_handler = method
return method
def on_shutdown(self, method: callable):
self._on_shutdown_handler = method
self._on_shutdown_handler = method
return method
def router(self, prefix: str = "", *middlewares):
return DecoratorRouter(self, prefix, middlewares)
@ -3128,21 +3123,22 @@ class App:
def listen(self, port_or_options=None, handler=None):
if self.lifespan:
async def task_wrapper(task):
try:
if inspect.iscoroutinefunction(task):
await task()
else:
task()
except Exception as error:
try:
self.trigger_error(error, None, None)
finally:
return None
# start lifespan
if self._on_start_handler:
self.loop.run_until_complete(task_wrapper(self._on_start_handler))
async def task_wrapper(task):
try:
if inspect.iscoroutinefunction(task):
await task()
else:
task()
except Exception as error:
try:
self.trigger_error(error, None, None)
finally:
return None
# start lifespan
if self._on_start_handler:
self.loop.run_until_complete(task_wrapper(self._on_start_handler))
# actual listen to server
self._listen_handler = handler
@ -3229,6 +3225,7 @@ class App:
signal.signal(signal.SIGINT, lambda sig, frame: self.close())
self.loop.run()
if self.lifespan:
async def task_wrapper(task):
try:
if inspect.iscoroutinefunction(task):
@ -3240,6 +3237,7 @@ class App:
self.trigger_error(error, None, None)
finally:
return None
# shutdown lifespan
if self._on_shutdown_handler:
self.loop.run_until_complete(task_wrapper(self._on_shutdown_handler))
@ -3276,7 +3274,6 @@ class App:
else:
try:
if inspect.iscoroutinefunction(self.error_handler):
print("coroutine!", error)
self.run_async(
self.error_handler(error, response, request), response
)

Wyświetl plik

@ -89,7 +89,9 @@ class RequestTask:
# status is still pending
_log_destroy_pending = True
def __init__(self, coro, loop, default_done_callback=None, no_register=False, context=None):
def __init__(
self, coro, loop, default_done_callback=None, no_register=False, context=None
):
"""Initialize the future.
The optional event_loop argument allows explicitly setting the event
@ -590,10 +592,12 @@ def create_task_with_factory(task_factory_max_items=100_000):
if len(items) == 0:
return create_task(loop, coro, default_done_callback)
task = items.pop()
def done(f):
if default_done_callback is not None:
default_done_callback(f)
default_done_callback(f)
items.append(f)
task._reuse(coro, loop, done)
return task

@ -1 +1 @@
Subproject commit 9b13a6b02886c792189252e948290d2eea9aeda9
Subproject commit 7187fc3d658d4335cdf0c79371eeb8310717b95c

Wyświetl plik

@ -7,6 +7,7 @@ def socketify_generic_handler(data):
(handler, user_data) = ffi.from_handle(data)
handler(user_data)
class UVCheck:
def __init__(self, loop, handler, user_data):
self._handler_data = ffi.new_handle((handler, user_data))
@ -85,7 +86,7 @@ class UVLoop:
def run_nowait(self):
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT)
def run(self):
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT)

Wyświetl plik

@ -5,11 +5,13 @@ from .asgi import ws_close, ws_upgrade, ws_open, ws_message
from io import BytesIO, BufferedReader
from .native import lib, ffi
import platform
is_pypy = platform.python_implementation() == "PyPy"
from .tasks import create_task, create_task_with_factory
import sys
import logging
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
@ -23,6 +25,7 @@ def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response._ptr,
)
class WSGIBody:
def __init__(self, buffer):
self.buf = buffer
@ -106,10 +109,11 @@ class WSGIBody:
ret.append(data)
data = b""
else:
line, data = data[:pos + 1], data[pos + 1:]
line, data = data[: pos + 1], data[pos + 1 :]
ret.append(line)
return ret
class WSGIDataResponse:
def __init__(self, app, environ, start_response, aborted, buffer, on_data):
self.buffer = buffer
@ -120,6 +124,7 @@ class WSGIDataResponse:
self.app = app
self.start_response = start_response
@ffi.callback("void(uws_res_t*, void*)")
def wsgi_corked_response_start_handler(res, user_data):
data_response = ffi.from_handle(user_data)
@ -158,23 +163,23 @@ def wsgi(ssl, response, info, user_data, aborted):
headers_set = None
headers_written = False
status_text = None
def write_headers(headers):
nonlocal headers_written, headers_set, status_text
if headers_written or not headers_set:
return
headers_written = True
if isinstance(status_text, str):
data = status_text.encode("utf-8")
lib.uws_res_write_status(ssl, response, data, len(data))
elif isinstance(status_text, bytes):
lib.uws_res_write_status(ssl, response, status_text, len(status_text))
for (key, value) in headers:
if isinstance(key, str):
# this is faster than using .lower()
# this is faster than using .lower()
if (
key == "content-length"
or key == "Content-Length"
@ -194,7 +199,6 @@ def wsgi(ssl, response, info, user_data, aborted):
continue # auto
key_data = key
if isinstance(value, str):
value_data = value.encode("utf-8")
elif isinstance(value, bytes):
@ -208,10 +212,11 @@ def wsgi(ssl, response, info, user_data, aborted):
ffi.cast("uint64_t", value),
)
continue
lib.uws_res_write_header(
ssl, response, key_data, len(key_data), value_data, len(value_data)
)
def start_response(status, headers, exc_info=None):
nonlocal headers_set, status_text
if exc_info:
@ -220,23 +225,23 @@ def wsgi(ssl, response, info, user_data, aborted):
# Re-raise original exception if headers sent
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None # avoid dangling circular ref
exc_info = None # avoid dangling circular ref
elif headers_set:
raise AssertionError("Headers already set!")
headers_set = headers
status_text = status
def write(data):
if not headers_written:
write_headers(headers_set)
if isinstance(data, bytes):
lib.uws_res_write(ssl, response, data, len(data))
elif isinstance(data, str):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
return write
# check for body
@ -249,7 +254,9 @@ def wsgi(ssl, response, info, user_data, aborted):
return
ssl = data_response.app.server.SSL
data_response.environ["CONTENT_LENGTH"] = str(data_response.buffer.getbuffer().nbytes)
data_response.environ["CONTENT_LENGTH"] = str(
data_response.buffer.getbuffer().nbytes
)
app_iter = data_response.app.wsgi(
data_response.environ, data_response.start_response
)
@ -263,7 +270,7 @@ def wsgi(ssl, response, info, user_data, aborted):
elif isinstance(data, str):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
except Exception as error:
logging.exception(error)
finally:
@ -293,20 +300,31 @@ def wsgi(ssl, response, info, user_data, aborted):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
except Exception as error:
logging.exception(error)
logging.exception(error)
finally:
if hasattr(app_iter, "close"):
app_iter.close()
if not headers_written:
write_headers(headers_set)
write_headers(headers_set)
lib.uws_res_end_without_body(ssl, response, 0)
def is_asgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
)
class _WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000):
def __init__(
self,
app,
options=None,
websocket=None,
websocket_options=None,
task_factory_max_items=100_000,
):
self.server = App(options, task_factory_max_items=0)
self.SERVER_HOST = None
self.SERVER_PORT = None
@ -320,43 +338,50 @@ class _WSGI:
self.server.SSL, self.server.app, wsgi, self._ptr
)
self.asgi_ws_info = None
if isinstance(websocket, dict): # serve websocket as socketify.py
if websocket_options:
websocket.update(websocket_options)
self.server.ws("/*", websocket)
elif is_asgi(websocket):
self.app = websocket # set ASGI app
self.app = websocket # set ASGI app
loop = self.server.loop.loop
# ASGI do not use app.run_async to not add any overhead from socketify.py WebFramework
# internally will still use custom task factory for pypy because of Loop
if is_pypy:
if task_factory_max_items > 0:
factory = create_task_with_factory(task_factory_max_items)
def run_task(task):
factory(loop, task)
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
create_task(loop, task)
loop._run_once()
self._run_task = run_task
else:
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
def run_task(task):
loop.create_task(task, name='socketify.py-request-task')
loop.create_task(task, name="socketify.py-request-task")
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
loop.create_task(task)
loop._run_once()
self._run_task = run_task
# detect ASGI to use as WebSocket as mixed protocol
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
@ -435,7 +460,7 @@ class _WSGI:
"REMOTE_HOST": "",
"CONTENT_LENGTH": "0",
"CONTENT_TYPE": "",
'wsgi.input_terminated': True
"wsgi.input_terminated": True,
}
)
self.server.listen(port_or_options, handler)
@ -454,7 +479,15 @@ class _WSGI:
# "Public" WSGI interface to allow easy forks/workers
class WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000, lifespan=False):
def __init__(
self,
app,
options=None,
websocket=None,
websocket_options=None,
task_factory_max_items=100_000,
lifespan=False,
):
self.app = app
self.options = options
self.websocket = websocket
@ -470,7 +503,11 @@ class WSGI:
def run(self, workers=1):
def run_app():
server = _WSGI(
self.app, self.options, self.websocket, self.websocket_options, self.task_factory_max_items
self.app,
self.options,
self.websocket,
self.websocket_options,
self.task_factory_max_items,
)
if self.listen_options:
(port_or_options, handler) = self.listen_options