socketify.py/src/socketify/wsgi.py

533 wiersze
17 KiB
Python
Czysty Zwykły widok Historia

2022-12-01 00:42:07 +00:00
import os
2022-12-07 12:38:42 +00:00
import inspect
2022-12-01 00:42:07 +00:00
from socketify import App
2022-12-07 12:38:42 +00:00
from .asgi import ws_close, ws_upgrade, ws_open, ws_message
2023-01-04 12:11:30 +00:00
from io import BytesIO, BufferedReader
2022-12-04 11:59:12 +00:00
from .native import lib, ffi
2022-12-18 18:24:45 +00:00
import platform
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
is_pypy = platform.python_implementation() == "PyPy"
2023-02-08 23:54:14 +00:00
from .tasks import create_task, TaskFactory
2022-12-18 18:24:45 +00:00
import sys
2023-01-07 12:34:41 +00:00
import logging
2023-01-18 19:04:41 +00:00
2022-12-04 11:59:12 +00:00
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
if chunk != ffi.NULL:
data_response.buffer.write(ffi.unpack(chunk, chunk_length))
if bool(is_end):
lib.uws_res_cork(
data_response.app.server.SSL,
res,
wsgi_corked_response_start_handler,
data_response._ptr,
)
2023-01-18 19:04:41 +00:00
2023-01-04 12:11:30 +00:00
class WSGIBody:
def __init__(self, buffer):
self.buf = buffer
self.reader = BufferedReader(buffer)
def __iter__(self):
return self
def __next__(self):
ret = self.readline()
if not ret:
raise StopIteration()
return ret
next = __next__
def getsize(self, size):
if size is None:
return sys.maxsize
elif not isinstance(size, int):
raise TypeError("size must be an integral type")
elif size < 0:
return sys.maxsize
return size
def read(self, size=None):
size = self.getsize(size)
if size == 0:
return b""
if size < self.buf.tell():
data = self.buf.getvalue()
ret, rest = data[:size], data[size:]
self.buf = BytesIO()
self.buf.write(rest)
return ret
while size > self.buf.tell():
data = self.reader.read(1024)
if not data:
break
self.buf.write(data)
data = self.buf.getvalue()
ret, rest = data[:size], data[size:]
self.buf = BytesIO()
self.buf.write(rest)
return ret
def readline(self, size=None):
size = self.getsize(size)
if size == 0:
return b""
data = self.buf.getvalue()
self.buf = BytesIO()
ret = []
while 1:
idx = data.find(b"\n", 0, size)
idx = idx + 1 if idx >= 0 else size if len(data) >= size else 0
if idx:
ret.append(data[:idx])
self.buf.write(data[idx:])
break
ret.append(data)
size -= len(data)
data = self.reader.read(min(1024, size))
if not data:
break
return b"".join(ret)
def readlines(self, size=None):
ret = []
data = self.read()
while data:
pos = data.find(b"\n")
if pos < 0:
ret.append(data)
data = b""
else:
2023-01-18 19:04:41 +00:00
line, data = data[: pos + 1], data[pos + 1 :]
2023-01-04 12:11:30 +00:00
ret.append(line)
return ret
2022-12-04 11:59:12 +00:00
2023-01-18 19:04:41 +00:00
2022-12-04 11:59:12 +00:00
class WSGIDataResponse:
def __init__(self, app, environ, start_response, aborted, buffer, on_data):
self.buffer = buffer
self.aborted = aborted
self._ptr = ffi.new_handle(self)
self.on_data = on_data
self.environ = environ
self.app = app
self.start_response = start_response
2023-01-18 19:04:41 +00:00
2022-12-04 11:59:12 +00:00
@ffi.callback("void(uws_res_t*, void*)")
def wsgi_corked_response_start_handler(res, user_data):
data_response = ffi.from_handle(user_data)
data_response.on_data(data_response, res)
2022-12-04 11:59:12 +00:00
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)")
def wsgi(ssl, response, info, user_data, aborted):
2022-12-04 11:59:12 +00:00
app = ffi.from_handle(user_data)
# reusing the dict is slower than cloning because we need to clear HTTP headers
2022-12-04 11:59:12 +00:00
environ = dict(app.BASIC_ENVIRON)
environ["REQUEST_METHOD"] = ffi.unpack(info.method, info.method_size).decode("utf8")
environ["PATH_INFO"] = ffi.unpack(info.url, info.url_size).decode("utf8")
environ["QUERY_STRING"] = ffi.unpack(
info.query_string, info.query_string_size
).decode("utf8")
if info.remote_address != ffi.NULL:
environ["REMOTE_ADDR"] = ffi.unpack(
info.remote_address, info.remote_address_size
).decode("utf8")
else:
environ["REMOTE_ADDR"] = "127.0.0.1"
2022-12-04 11:59:12 +00:00
next_header = info.header_list
2022-12-04 11:59:12 +00:00
while next_header != ffi.NULL:
header = next_header[0]
name = ffi.unpack(header.name, header.name_size).decode("utf8")
value = ffi.unpack(header.value, header.value_size).decode("utf8")
2022-12-04 19:03:36 +00:00
# this conversion should be optimized in future
environ[f"HTTP_{name.replace('-', '_').upper()}"] = value
2022-12-04 11:59:12 +00:00
next_header = ffi.cast("socketify_header*", next_header.next)
environ["CONTENT_TYPE"] = environ.get("HTTP_CONTENT_TYPE", "")
2023-01-04 12:11:30 +00:00
2023-01-07 12:34:41 +00:00
headers_set = None
headers_written = False
status_text = None
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
def write_headers(headers):
nonlocal headers_written, headers_set, status_text
if headers_written or not headers_set:
return
headers_written = True
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
if isinstance(status_text, str):
data = status_text.encode("utf-8")
lib.uws_res_write_status(ssl, response, data, len(data))
2023-01-07 12:34:41 +00:00
elif isinstance(status_text, bytes):
lib.uws_res_write_status(ssl, response, status_text, len(status_text))
for (key, value) in headers:
if isinstance(key, str):
2023-01-18 19:04:41 +00:00
# this is faster than using .lower()
if (
key == "content-length"
or key == "Content-Length"
or key == "Transfer-Encoding"
or key == "transfer-encoding"
):
continue # auto
key_data = key.encode("utf-8")
elif isinstance(key, bytes):
# this is faster than using .lower()
if (
key == b"content-length"
or key == b"Content-Length"
or key == b"Transfer-Encoding"
or key == b"transfer-encoding"
):
continue # auto
key_data = key
if isinstance(value, str):
value_data = value.encode("utf-8")
elif isinstance(value, bytes):
value_data = value
elif isinstance(value, int):
lib.uws_res_write_header_int(
ssl,
response,
key_data,
len(key_data),
ffi.cast("uint64_t", value),
)
continue
2023-01-18 19:04:41 +00:00
lib.uws_res_write_header(
ssl, response, key_data, len(key_data), value_data, len(value_data)
)
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
def start_response(status, headers, exc_info=None):
nonlocal headers_set, status_text
if exc_info:
try:
if headers_written:
# Re-raise original exception if headers sent
raise exc_info[1].with_traceback(exc_info[2])
finally:
2023-01-18 19:04:41 +00:00
exc_info = None # avoid dangling circular ref
2023-01-07 12:34:41 +00:00
elif headers_set:
raise AssertionError("Headers already set!")
headers_set = headers
status_text = status
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
def write(data):
if not headers_written:
write_headers(headers_set)
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
if isinstance(data, bytes):
lib.uws_res_write(ssl, response, data, len(data))
elif isinstance(data, str):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
return write
2022-12-04 19:03:36 +00:00
# check for body
if bool(info.has_content):
2022-12-04 11:59:12 +00:00
WSGI_INPUT = BytesIO()
2023-01-04 12:11:30 +00:00
environ["wsgi.input"] = WSGIBody(WSGI_INPUT)
2022-12-04 11:59:12 +00:00
def on_data(data_response, response):
if bool(data_response.aborted[0]):
return
ssl = data_response.app.server.SSL
2023-01-18 19:04:41 +00:00
data_response.environ["CONTENT_LENGTH"] = str(
data_response.buffer.getbuffer().nbytes
)
2022-12-18 18:24:45 +00:00
app_iter = data_response.app.wsgi(
data_response.environ, data_response.start_response
)
2022-12-04 11:59:12 +00:00
try:
for data in app_iter:
2023-01-07 12:34:41 +00:00
if data and not headers_written:
write_headers(headers_set)
if isinstance(data, bytes):
lib.uws_res_write(ssl, response, data, len(data))
elif isinstance(data, str):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
2023-01-18 19:04:41 +00:00
2023-01-07 12:34:41 +00:00
except Exception as error:
logging.exception(error)
2022-12-04 11:59:12 +00:00
finally:
if hasattr(app_iter, "close"):
2022-12-04 11:59:12 +00:00
app_iter.close()
2023-01-07 12:34:41 +00:00
if not headers_written:
write_headers(headers_set)
2022-12-04 11:59:12 +00:00
lib.uws_res_end_without_body(ssl, response, 0)
data_response = WSGIDataResponse(
app, environ, start_response, aborted, WSGI_INPUT, on_data
2022-12-04 11:59:12 +00:00
)
lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr)
2022-12-04 11:59:12 +00:00
else:
environ["wsgi.input"] = None
2022-12-18 18:24:45 +00:00
app_iter = app.wsgi(environ, start_response)
2022-12-04 11:59:12 +00:00
try:
for data in app_iter:
2023-01-07 12:34:41 +00:00
if data and not headers_written:
write_headers(headers_set)
if isinstance(data, bytes):
lib.uws_res_write(ssl, response, data, len(data))
elif isinstance(data, str):
data = data.encode("utf-8")
lib.uws_res_write(ssl, response, data, len(data))
2023-01-07 12:34:41 +00:00
except Exception as error:
2023-01-18 19:04:41 +00:00
logging.exception(error)
2022-12-04 11:59:12 +00:00
finally:
if hasattr(app_iter, "close"):
2022-12-04 11:59:12 +00:00
app_iter.close()
2023-01-07 12:34:41 +00:00
if not headers_written:
2023-01-18 19:04:41 +00:00
write_headers(headers_set)
2022-12-04 11:59:12 +00:00
lib.uws_res_end_without_body(ssl, response, 0)
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
def is_asgi(module):
2023-01-18 19:04:41 +00:00
return (
hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
)
2022-12-07 12:38:42 +00:00
class _WSGI:
2023-01-18 19:04:41 +00:00
def __init__(
self,
app,
options=None,
websocket=None,
websocket_options=None,
task_factory_max_items=100_000,
):
self.server = App(options, task_factory_max_items=0)
2022-12-07 12:38:42 +00:00
self.SERVER_HOST = None
2022-12-01 00:42:07 +00:00
self.SERVER_PORT = None
self.SERVER_WS_SCHEME = "wss" if self.server.options else "ws"
2022-12-18 18:24:45 +00:00
self.wsgi = app
2022-12-01 00:42:07 +00:00
self.BASIC_ENVIRON = dict(os.environ)
2022-12-07 12:38:42 +00:00
self.ws_compression = False
2022-12-01 00:42:07 +00:00
2022-12-04 11:59:12 +00:00
self._ptr = ffi.new_handle(self)
self.asgi_http_info = lib.socketify_add_asgi_http_handler(
self.server.SSL, self.server.app, wsgi, self._ptr
2022-12-04 11:59:12 +00:00
)
2022-12-07 12:38:42 +00:00
self.asgi_ws_info = None
2023-01-18 19:04:41 +00:00
if isinstance(websocket, dict): # serve websocket as socketify.py
2022-12-07 12:38:42 +00:00
if websocket_options:
websocket.update(websocket_options)
2022-12-07 12:38:42 +00:00
self.server.ws("/*", websocket)
2022-12-18 18:24:45 +00:00
elif is_asgi(websocket):
2023-01-18 19:04:41 +00:00
self.app = websocket # set ASGI app
2022-12-18 18:24:45 +00:00
loop = self.server.loop.loop
# ASGI do not use app.run_async to not add any overhead from socketify.py WebFramework
# internally will still use custom task factory for pypy because of Loop
if is_pypy:
if task_factory_max_items > 0:
2023-02-08 23:54:14 +00:00
factory = TaskFactory(task_factory_max_items)
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
def run_task(task):
factory(loop, task)
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
self._run_task = run_task
else:
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
def run_task(task):
2023-02-08 18:53:52 +00:00
future = create_task(loop, task)
future._log_destroy_pending = False
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
self._run_task = run_task
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
else:
2023-01-18 19:04:41 +00:00
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
2022-12-18 18:24:45 +00:00
def run_task(task):
2023-02-08 18:53:52 +00:00
future = create_task(loop, task)
future._log_destroy_pending = False
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
self._run_task = run_task
else:
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
def run_task(task):
2023-02-08 18:53:52 +00:00
future = create_task(loop, task)
future._log_destroy_pending = False
2023-01-18 19:04:41 +00:00
2022-12-18 18:24:45 +00:00
self._run_task = run_task
2023-01-18 19:04:41 +00:00
2022-12-07 12:38:42 +00:00
# detect ASGI to use as WebSocket as mixed protocol
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
if not websocket_options:
websocket_options = {}
self.ws_compression = websocket_options.get("compression", False)
2022-12-07 12:38:42 +00:00
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
int(websocket_options.get("max_payload_length", 16777216)),
2022-12-07 12:38:42 +00:00
)
native_behavior.idleTimeout = ffi.cast(
"unsigned short",
int(websocket_options.get("idle_timeout", 20)),
2022-12-07 12:38:42 +00:00
)
native_behavior.maxBackpressure = ffi.cast(
"unsigned int",
int(websocket_options.get("max_backpressure", 16777216)),
2022-12-07 12:38:42 +00:00
)
native_behavior.compression = ffi.cast(
"uws_compress_options_t", int(self.ws_compression)
)
native_behavior.maxLifetime = ffi.cast(
"unsigned short", int(websocket_options.get("max_lifetime", 0))
2022-12-07 12:38:42 +00:00
)
native_behavior.closeOnBackpressureLimit = ffi.cast(
"int",
int(websocket_options.get("close_on_backpressure_limit", 0)),
2022-12-07 12:38:42 +00:00
)
native_behavior.resetIdleTimeoutOnSend = ffi.cast(
"int", bool(websocket_options.get("reset_idle_timeout_on_send", True))
2022-12-07 12:38:42 +00:00
)
native_behavior.sendPingsAutomatically = ffi.cast(
"int", bool(websocket_options.get("send_pings_automatically", True))
2022-12-07 12:38:42 +00:00
)
native_behavior.upgrade = ffi.NULL # will be set first on C++
2022-12-07 12:38:42 +00:00
native_behavior.open = ws_open
native_behavior.message = ws_message
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
2023-01-05 12:01:05 +00:00
native_behavior.subscription = ffi.NULL
2022-12-07 12:38:42 +00:00
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr
2022-12-07 12:38:42 +00:00
)
2022-12-01 00:42:07 +00:00
2022-12-04 19:03:36 +00:00
def listen(self, port_or_options, handler=None):
self.SERVER_PORT = (
port_or_options
if isinstance(port_or_options, int)
else port_or_options.port
)
self.SERVER_HOST = (
"0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
)
self.BASIC_ENVIRON.update(
{
"GATEWAY_INTERFACE": "CGI/1.1",
"SERVER_PORT": str(self.SERVER_PORT),
"SERVER_SOFTWARE": "WSGIServer/0.2",
"wsgi.input": None,
2023-01-03 18:24:22 +00:00
"wsgi.errors": sys.stderr,
"wsgi.version": (1, 0),
"wsgi.run_once": False,
"wsgi.url_scheme": "https" if self.server.options else "http",
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.file_wrapper": None, # No file wrapper support for now
"SCRIPT_NAME": "",
"SERVER_PROTOCOL": "HTTP/1.1",
"REMOTE_HOST": "",
2023-01-04 12:11:30 +00:00
"CONTENT_LENGTH": "0",
"CONTENT_TYPE": "",
2023-01-18 19:04:41 +00:00
"wsgi.input_terminated": True,
}
)
2022-12-01 00:42:07 +00:00
self.server.listen(port_or_options, handler)
return self
2022-12-01 00:42:07 +00:00
def run(self):
self.server.run()
return self
2022-12-07 12:38:42 +00:00
def __del__(self):
if self.asgi_http_info:
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
if self.asgi_ws_info:
lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info)
2022-12-07 12:38:42 +00:00
# "Public" WSGI interface to allow easy forks/workers
class WSGI:
2023-01-18 19:04:41 +00:00
def __init__(
self,
app,
options=None,
websocket=None,
websocket_options=None,
task_factory_max_items=100_000,
lifespan=False,
):
2022-12-07 12:38:42 +00:00
self.app = app
self.options = options
self.websocket = websocket
self.websocket_options = websocket_options
self.listen_options = None
2022-12-18 18:24:45 +00:00
self.task_factory_max_items = task_factory_max_items
2023-01-03 20:36:47 +00:00
# lifespan is not supported in WSGI
2022-12-07 12:38:42 +00:00
def listen(self, port_or_options, handler=None):
self.listen_options = (port_or_options, handler)
return self
def run(self, workers=1):
2023-03-12 19:08:54 +00:00
def run_task():
server = _WSGI(
2023-01-18 19:04:41 +00:00
self.app,
self.options,
self.websocket,
self.websocket_options,
self.task_factory_max_items,
)
2022-12-07 12:38:42 +00:00
if self.listen_options:
(port_or_options, handler) = self.listen_options
server.listen(port_or_options, handler)
server.run()
2023-03-12 19:08:54 +00:00
pid_list = []
# fork limiting the cpu count - 1
for _ in range(1, workers):
pid = os.fork()
2022-12-07 12:38:42 +00:00
# n greater than 0 means parent process
2023-03-12 19:08:54 +00:00
if not pid > 0:
run_task()
break
pid_list.append(pid)
2022-12-07 12:38:42 +00:00
2023-03-12 19:08:54 +00:00
run_task() # run app on the main process too :)
2022-12-07 12:38:42 +00:00
2023-03-12 19:08:54 +00:00
# sigint everything to gracefull shutdown
import signal
for pid in pid_list:
os.kill(pid, signal.SIGINT)
return self