CPU relax strategy

pull/129/head
cirospaciari 2023-06-23 10:57:00 -03:00
rodzic 99e716e657
commit a63b75e30a
6 zmienionych plików z 100 dodań i 16 usunięć

Wyświetl plik

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "socketify"
version = "0.0.20"
version = "0.0.21"
authors = [
{ name="Ciro Spaciari", email="ciro.spaciari@gmail.com" },
]

Wyświetl plik

@ -58,7 +58,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
setuptools.setup(
name="socketify",
version="0.0.20",
version="0.0.21",
platforms=["any"],
author="Ciro Spaciari",
author_email="ciro.spaciari@gmail.com",

Wyświetl plik

@ -14,6 +14,8 @@ is_pypy = platform.python_implementation() == "PyPy"
def asgi_on_abort_handler(res, user_data):
ctx = ffi.from_handle(user_data)
ctx.aborted = True
ctx.loop.is_idle = False
if ctx.abort_future is not None:
ctx.abort_future.set_result(True)
ctx.abort_future = None
@ -59,6 +61,7 @@ def ws_open(ws, user_data):
)
def ws_upgrade(ssl, response, info, socket_context, user_data):
app = ffi.from_handle(user_data)
app.server.loop.is_idle = False
headers = []
next_header = info.header_list
while next_header != ffi.NULL:
@ -117,6 +120,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data):
async def send(options):
if ws.aborted:
return False
ws.loop.is_idle = False
type = options["type"]
if type == "websocket.send":
data = options.get("bytes", None)
@ -244,6 +248,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data):
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
data_response.loop.is_idle = False
data_response.is_end = bool(is_end)
more_body = not data_response.is_end
result = {
@ -438,7 +443,8 @@ def uws_asgi_corked_403_handler(res, user_data):
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*)")
def asgi(ssl, response, info, user_data):
app = ffi.from_handle(user_data)
app.server.loop.is_idle = False
headers = []
next_header = info.header_list
while next_header != ffi.NULL:
@ -481,6 +487,8 @@ def asgi(ssl, response, info, user_data):
async def receive():
if ctx.aborted:
return {"type": "http.disconnect"}
ctx.loop.is_idle = False
data_queue = ctx.data_queue
if data_queue:
if data_queue.queue.empty():
@ -506,6 +514,8 @@ def asgi(ssl, response, info, user_data):
async def send(options):
if ctx.aborted:
return False
ctx.loop.is_idle = False
type = options["type"]
ssl = ctx.ssl
response = ctx.response
@ -684,6 +694,7 @@ class _ASGI:
async def send(options):
nonlocal asgi_app
asgi_app.server.loop.is_idle = False
type = options["type"]
asgi_app.status_message = options.get("message", "")
if type == "lifespan.startup.complete":
@ -701,6 +712,7 @@ class _ASGI:
async def receive():
nonlocal asgi_app
asgi_app.server.loop.is_idle = False
while not asgi_app.is_stopped:
if asgi_app.is_starting:
asgi_app.is_starting = False
@ -723,7 +735,7 @@ class _ASGI:
asgi_app.server.listen(port_or_options, handler)
finally:
return None
self.server.loop.is_idle = False
# start lifespan
self.server.loop.ensure_future(task_wrapper(self.app(scope, receive, send)))
self.server.run()

Wyświetl plik

@ -29,6 +29,8 @@ class Loop:
# get the current running loop or create a new one without warnings
self.loop = asyncio._get_running_loop()
self._idle_count = 0
self.is_idle = False
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
@ -73,15 +75,27 @@ class Loop:
def create_future(self):
return self.loop.create_future()
def _keep_alive(self):
if self.started:
self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)
relax = False
if not self.is_idle:
self._idle_count = 0
elif self._idle_count < 10000:
self._idle_count += 1
else:
relax = True
self.is_idle = True
if relax:
self.uv_loop.run_nowait()
self.loop.call_later(0.001, self._keep_alive)
else:
self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)
def create_task(self, *args, **kwargs):
# this is not using optimized create_task yet
return self.loop.create_task(*args, **kwargs)

Wyświetl plik

@ -41,6 +41,7 @@ def uws_missing_server_name(hostname, hostname_length, user_data):
def uws_websocket_factory_drain_handler(ws, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
@ -76,6 +77,7 @@ def uws_websocket_drain_handler_with_extension(ws, user_data):
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
app.loop.is_idle = False
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
@ -97,6 +99,7 @@ def uws_websocket_drain_handler(ws, user_data):
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
app.loop.is_idle = False
handler = handlers.drain
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws))
@ -119,6 +122,7 @@ def uws_websocket_factory_subscription_handler(
):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
@ -201,6 +205,7 @@ def uws_websocket_subscription_handler(
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
handler = handlers.subscription
@ -243,6 +248,7 @@ def uws_websocket_subscription_handler_with_extension(
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
@ -281,6 +287,7 @@ def uws_websocket_subscription_handler_with_extension(
def uws_websocket_factory_open_handler(ws, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
@ -315,6 +322,7 @@ def uws_websocket_open_handler_with_extension(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
@ -337,6 +345,7 @@ def uws_websocket_open_handler(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
handler = handlers.open
if inspect.iscoroutinefunction(handler):
@ -353,6 +362,7 @@ def uws_websocket_open_handler(ws, user_data):
def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
@ -397,6 +407,7 @@ def uws_websocket_message_handler_with_extension(
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
@ -428,6 +439,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
if message == ffi.NULL:
@ -454,6 +466,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data):
def uws_websocket_factory_pong_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
@ -493,6 +506,7 @@ def uws_websocket_pong_handler_with_extension(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
@ -519,6 +533,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
if message == ffi.NULL:
data = None
@ -540,6 +555,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data):
def uws_websocket_factory_ping_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
@ -580,6 +596,7 @@ def uws_websocket_ping_handler_with_extension(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
@ -608,6 +625,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
ws = WebSocket(ws, app)
if message == ffi.NULL:
@ -631,6 +649,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data):
def uws_websocket_factory_close_handler(ws, code, message, length, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
@ -681,6 +700,7 @@ def uws_websocket_close_handler_with_extension(ws, code, message, length, user_d
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
# pass to free data on WebSocket if needed
ws = WebSocket(ws, app)
# bind methods to websocket
@ -726,6 +746,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
# pass to free data on WebSocket if needed
ws = WebSocket(ws, app)
@ -766,6 +787,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data):
def uws_generic_factory_method_handler(res, req, user_data):
if user_data != ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._factory.get(app, res, req)
(response, request, dispose) = instances
try:
@ -800,6 +822,7 @@ def uws_generic_factory_method_handler(res, req, user_data):
def uws_websocket_factory_upgrade_handler(res, req, context, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._factory.get(app, res, req)
(response, request, dispose) = instances
try:
@ -837,6 +860,7 @@ def uws_websocket_factory_upgrade_handler(res, req, context, user_data):
def uws_websocket_upgrade_handler_with_extension(res, req, context, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
response = AppResponse(res, app)
# set default value in properties
app._response_extension.set_properties(response)
@ -864,6 +888,7 @@ def uws_websocket_upgrade_handler_with_extension(res, req, context, user_data):
def uws_websocket_upgrade_handler(res, req, context, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
app.loop.is_idle = False
response = AppResponse(res, app)
request = AppRequest(req, app)
try:
@ -908,6 +933,7 @@ def uws_req_for_each_header_handler(
def uws_generic_factory_method_handler(res, req, user_data):
if user_data != ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
app.loop.is_idle = False
instances = app._factory.get(app, res, req)
(response, request, dispose) = instances
try:
@ -943,6 +969,7 @@ def uws_generic_factory_method_handler(res, req, user_data):
def uws_generic_method_handler_with_extension(res, req, user_data):
if user_data != ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
app.loop.is_idle = False
response = AppResponse(res, app)
# set default value in properties
app._response_extension.set_properties(response)
@ -969,6 +996,7 @@ def uws_generic_method_handler_with_extension(res, req, user_data):
def uws_generic_method_handler(res, req, user_data):
if user_data != ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
app.loop.is_idle = False
response = AppResponse(res, app)
request = AppRequest(req, app)
@ -1006,6 +1034,7 @@ def uws_generic_listen_handler(listen_socket, config, user_data):
if user_data != ffi.NULL:
app = ffi.from_handle(user_data)
app.loop.is_idle = False
config.port = lib.us_socket_local_port(app.SSL, listen_socket)
if hasattr(app, "_listen_handler") and hasattr(app._listen_handler, "__call__"):
app.socket = listen_socket
@ -1041,6 +1070,7 @@ def uws_generic_aborted_handler(response, user_data):
def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data):
if user_data != ffi.NULL:
res = ffi.from_handle(user_data)
res.app.loop.is_idle = False
if chunk == ffi.NULL:
data = None
else:
@ -1053,6 +1083,7 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data):
def uws_generic_on_writable_handler(res, offset, user_data):
if user_data != ffi.NULL:
res = ffi.from_handle(user_data)
res.app.loop.is_idle = False
result = res.trigger_writable_handler(offset)
return result
return False
@ -1307,6 +1338,7 @@ class WebSocket:
return None
def send_fragment(self, message, compress=False):
self.app.loop.is_idle = False
try:
if isinstance(message, str):
data = message.encode("utf-8")
@ -1327,6 +1359,7 @@ class WebSocket:
return None
def send_last_fragment(self, message, compress=False):
self.app.loop.is_idle = False
try:
if isinstance(message, str):
data = message.encode("utf-8")
@ -1347,6 +1380,7 @@ class WebSocket:
return None
def send_first_fragment(self, message, opcode=OpCode.BINARY, compress=False):
self.app.loop.is_idle = False
try:
if isinstance(message, str):
data = message.encode("utf-8")
@ -1373,6 +1407,7 @@ class WebSocket:
return self
def send(self, message, opcode=OpCode.BINARY, compress=False, fin=True):
self.app.loop.is_idle = False
try:
if isinstance(message, str):
data = message.encode("utf-8")
@ -1399,6 +1434,7 @@ class WebSocket:
return self
def end(self, code=0, message=None):
self.app.loop.is_idle = False
try:
if not isinstance(code, int):
raise RuntimeError("code must be an int")
@ -1447,6 +1483,8 @@ class AppResponse:
self._data = None
def cork(self, callback):
self.app.loop.is_idle = False
if not self.aborted:
self.grab_aborted_handler()
self._cork_handler = callback
@ -1645,6 +1683,7 @@ class AppResponse:
return self
def try_end(self, message, total_size, end_connection=False):
self.app.loop.is_idle = False
try:
if self.aborted:
return False, True
@ -1752,7 +1791,8 @@ class AppResponse:
headers = None,
end_connection: bool = False,
):
self.app.loop.is_idle = False
# TODO: optimize headers
if headers is not None:
for name, value in headers:
@ -1851,6 +1891,8 @@ class AppResponse:
return self
def end(self, message, end_connection=False):
self.app.loop.is_idle = False
try:
if self.aborted:
return self
@ -1879,16 +1921,19 @@ class AppResponse:
return self
def resume(self):
self.app.loop.is_idle = False
if not self.aborted:
lib.uws_res_resume(self.app.SSL, self.res)
return self
def write_continue(self):
self.app.loop.is_idle = False
if not self.aborted:
lib.uws_res_write_continue(self.app.SSL, self.res)
return self
def write_status(self, status_or_status_text):
self.app.loop.is_idle = False
if not self.aborted:
if isinstance(status_or_status_text, int):
if bool(
@ -1914,6 +1959,7 @@ class AppResponse:
return self
def write_header(self, key, value):
self.app.loop.is_idle = False
if not self.aborted:
if isinstance(key, str):
key_data = key.encode("utf-8")
@ -1947,6 +1993,7 @@ class AppResponse:
return self
def end_without_body(self, end_connection=False):
self.app.loop.is_idle = False
if not self.aborted:
if self._write_jar is not None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
@ -1956,6 +2003,7 @@ class AppResponse:
return self
def write(self, message):
self.app.loop.is_idle = False
if not self.aborted:
if isinstance(message, str):
data = message.encode("utf-8")
@ -2923,6 +2971,7 @@ class App:
)
def publish(self, topic, message, opcode=OpCode.BINARY, compress=False):
self.loop.is_idle = False
if isinstance(topic, str):
topic_data = topic.encode("utf-8")

Wyświetl plik

@ -15,6 +15,8 @@ import uuid
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
data_response.app.server.loop.is_idle = False
if chunk != ffi.NULL:
data_response.buffer.write(ffi.unpack(chunk, chunk_length))
if bool(is_end):
@ -29,6 +31,7 @@ def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
def wsgi_on_data_ref_abort_handler(res, user_data):
data_retry = ffi.from_handle(user_data)
data_retry.aborted = True
data_retry.server.loop.is_idle = False
if data_retry.id is not None:
data_retry.app._data_refs.pop(data_retry.id, None)
@ -41,7 +44,9 @@ def wsgi_on_writable_handler(res, offset, user_data):
chunks = data_retry.chunks
last_sended_offset = data_retry.last_offset
ssl = data_retry.app.server.SSL
server = data_retry.app.server
ssl = server.SSL
server.loop.is_idle = False
content_length = data_retry.content_length
data = chunks[0]
@ -224,6 +229,7 @@ def wsgi_corked_response_start_handler(res, user_data):
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*)")
def wsgi(ssl, response, info, user_data):
app = ffi.from_handle(user_data)
app.server.loop.is_idle = False
# reusing the dict is slower than cloning because we need to clear HTTP headers
environ = dict(app.BASIC_ENVIRON)
@ -257,9 +263,10 @@ def wsgi(ssl, response, info, user_data):
is_chunked = False
content_length = -1
def write_headers(headers):
nonlocal headers_written, headers_set, status_text, content_length, is_chunked
nonlocal headers_written, headers_set, status_text, content_length, is_chunked, app
if headers_written or not headers_set:
return
app.server.loop.is_idle = False
headers_written = True
@ -327,7 +334,8 @@ def wsgi(ssl, response, info, user_data):
content_length = ffi.cast("uintmax_t", content_length)
def start_response(status, headers, exc_info=None):
nonlocal headers_set, status_text
nonlocal headers_set, status_text, app
app.server.loop.is_idle = False
if exc_info:
try:
if headers_written:
@ -342,7 +350,8 @@ def wsgi(ssl, response, info, user_data):
status_text = status
def write(data):
nonlocal is_chunked
nonlocal is_chunked, app
app.server.loop.is_idle = False
if not headers_written:
write_headers(headers_set)
# will allow older frameworks only with is_chunked
@ -371,7 +380,7 @@ def wsgi(ssl, response, info, user_data):
if data_response.aborted:
return
data_response.app.server.loop.is_idle = False
ssl = data_response.app.server.SSL
data_response.environ["CONTENT_LENGTH"] = str(
data_response.buffer.getbuffer().nbytes