diff --git a/README.md b/README.md index 72f84a0..896be5e 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ - Middlewares - Templates Support (examples with [`Mako`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_mako.py) and [`Jinja2`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_jinja2.py)) - ASGI Server with pub/sub extension for Falcon +- WSGI Server ## :mag_right: Upcoming Features - In-Memory Cache Tools @@ -45,7 +46,6 @@ - Full asyncio integration with libuv - SSGI Server spec and support - RSGI Server support -- WSGI Server compatibility - Full Http3 support - [`HPy`](https://hpyproject.org/) integration to better support [`CPython`](https://github.com/python/cpython), [`PyPy`](https://www.pypy.org/) and [`GraalPython`](https://github.com/oracle/graalpython) - Hot Reloading diff --git a/SSGI.md b/SSGI.md new file mode 100644 index 0000000..a79b5b7 --- /dev/null +++ b/SSGI.md @@ -0,0 +1,125 @@ +# First Ideas for SSGI + +```python +from typing import Union, Callable, Awaitable, Optional + +class SSGIHttpResponse: + aborted: bool = False, # detect if the connection was aborted + extensions: Optional[dict] = None # extensions for http + + # if payload is None, request ends without body + # if has_more is True, data is written but connection will not end + def send(self, payload: Union[str, bytes, bytearray, memoryview, None], has_more: Optional[bool] = False): + pass + + # send chunk of data, can be used to perform with less backpressure than using send + # total_size is the sum of all lenghts in bytes of all chunks to be sended + # connection will end when total_size is met + # returns tuple(bool, bool) first bool represents if the chunk is succefully sended, the second if the connection has ended + def send_chunk(self, chunk: Union[str, bytes, bytearray, memoryview], total_size: int = False) -> Awaitable: + pass + + # send status code + def send_status(self, status_code: Optional[int] = 200): + pass + + # send headers to the http response + def send_headers(self, headers: iter(tuple(str, str))): + pass + + # ensure async call for the handler, passing any arguments to it + def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + pass + + # get an all data + # returns an BytesIO() or None if no payload is availabl + def get_data(self) -> Awaitable: + pass + + # get an chunk of data (chunk size is decided by the Server implementation) + # returns the BytesIO or None if no more chunks are sent + def get_chunk(self) -> Awaitable: + pass + + # on aborted event, calle when the connection abort + def on_aborted(self, handler: Union[Awaitable, Callable], *arguments): + pass + +class SSGIWebSocket: + status: int = 0 # 0 pending upgrade, 1 rejected, 2 closed, 3 accepted + extensions: Optional[dict] = None # extensions for websocket + + # accept the connection upgrade + # can pass the protocol to accept if None is informed will use sec-websocket-protocol header if available + def accept(self, protocol: str = None) -> Awaitable: + pass + + # reject the connection upgrade, you can send status_code, payload and headers if you want, all optional + def reject(self, status_code: Optional[int] = 403, payload: Union[bytes, bytearray, memoryview, None] = None, headers: Optional[iter(tuple(str, str))] = None) -> Awaitable: + pass + + # if returns an future, this can be awaited or not + def send(self, payload: Union[bytes, bytearray, memoryview]): + pass + + # close connection + def close(self, code: Optional[int] = 1000): + pass + + # ensure async call for the handler, passing any arguments to it + def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + pass + + # on receive event, called when the socket disconnect + # passes ws: SSGIWebSocket, msg: Union[str, bytes, bytearray, memoryview], *arguments + def on_receive(self, handler: Union[Awaitable, Callable], *arguments): + pass + + # on close event, called when the socket disconnect + # passes ws: SSGIWebSocket, code: int and reason: Optional[str] = None, *arguments + def on_close(self, handler: Union[Awaitable, Callable], *arguments): + pass + + + +# only accepts sync +def wsgi(environ, start_response): + pass +# only accepts async +async def asgi(scope, receive, send): + pass +# async with less overhead +async def rsgi(scope, proto): + pass +# async and sync can be used +def ssgi(type: str, server_address: str, remote_address: str, method: str, path: str, query_string: str, get_header: Callable[[Optional[str]=None], [Union[str, iter(tuple(str, str))]], res: Union[SSGIHttpResponse, SSGIWebSocket]): + # this is called once every HTTP request, or when an websocket connection wants to upgrade + # type can be http or websocket + + # server_address contains {ipv4|ipv6}:{port} being :{port} optional + # remote_address contains {ipv4|ipv6}:{port} being :{port} optional + + # here routers can work without call any header, this can improve performance because headers are not allocated in + # if passed get_header() without arguments or None, must return all headers in an dict + # all headers must be lowercase + # headers will only be preserved until the end of this call or if res.run_async is called + # headers are not preserved after websocket accept or reject + # if this function is an coroutine, data will be preserved, run_async is automatic + pass + +# SSGI do not require that SSGI it self to be implemented, allowing other interfaces to be supported by the Server and Framework as will +class SSGIFramework: + def get_supported(self, supported_interfaces: dict) -> dict: + # supported_interfaces { "asgi": "2.3", "wsgi": "2.0", "ssgi": "1.0", "rsgi": "1.0" } + # you can use this to check what interface is available + + # returns http and websocket interface supported by the Web Framework + # you can use multiple interfaces one for http and other for websockets with SSGI if the Web Framework and Server supports it + # if None is passed, Server will not serve the protocol + # tuple(interface_name, interface_handler) + return { + "http": ( "ssgi", ssgi), #or "asgi", "rsgi", "wsgi", + "websockets": ("ssgi", ssgi) #or "asgi", "rsgi" + } + +``` \ No newline at end of file diff --git a/bench/asgi_wsgi/falcon-asgi.py b/bench/asgi_wsgi/falcon-asgi.py index 11d6e27..5f82afd 100644 --- a/bench/asgi_wsgi/falcon-asgi.py +++ b/bench/asgi_wsgi/falcon-asgi.py @@ -24,5 +24,3 @@ app.add_route("/", home) if __name__ == "__main__": ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() - -#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker \ No newline at end of file diff --git a/bench/asgi_wsgi/falcon-ws-pubsub.py b/bench/asgi_wsgi/falcon-ws-pubsub.py index e6a4a3e..a2d316c 100644 --- a/bench/asgi_wsgi/falcon-ws-pubsub.py +++ b/bench/asgi_wsgi/falcon-ws-pubsub.py @@ -31,8 +31,9 @@ class SomeResource: remaining_clients = remaining_clients + 1 print("remaining_clients", remaining_clients) - app = falcon.asgi.App() +app.ws_options.max_receive_queue = 20_000_000 # this virtual disables queue but adds overhead +app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon app.add_route("/", SomeResource()) if __name__ == "__main__": diff --git a/bench/asgi_wsgi/falcon-ws.py b/bench/asgi_wsgi/falcon-ws.py index 22104cb..5a204b0 100644 --- a/bench/asgi_wsgi/falcon-ws.py +++ b/bench/asgi_wsgi/falcon-ws.py @@ -25,23 +25,25 @@ class SomeResource: try: await ws.accept() clients.add(ws) - remaining_clients = remaining_clients - 1 + remaining_clients -= 1 + print("remaining_clients", remaining_clients) if remaining_clients == 0: await broadcast("ready") - else: - print("remaining_clients", remaining_clients) while True: payload = await ws.receive_text() - await broadcast(payload) + if payload: + await broadcast(payload) except falcon.WebSocketDisconnected: clients.remove(ws) - remaining_clients = remaining_clients + 1 + remaining_clients += 1 print("remaining_clients", remaining_clients) app = falcon.asgi.App() +app.ws_options.max_receive_queue = 20_000_000# this virtual disables queue but adds overhead +app.ws_options.enable_buffered_receiver = True # this disable queue but for now only available on cirospaciari/falcon app.add_route("/", SomeResource()) # python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker # pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker diff --git a/bench/asgi_wsgi/falcon-wsgi.py b/bench/asgi_wsgi/falcon-wsgi.py index 3abbae8..e4a886f 100644 --- a/bench/asgi_wsgi/falcon-wsgi.py +++ b/bench/asgi_wsgi/falcon-wsgi.py @@ -24,5 +24,4 @@ app.add_route("/", home) if __name__ == "__main__": WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() - -#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker \ No newline at end of file + \ No newline at end of file diff --git a/bench/asgi_wsgi/flask-wsgi.py b/bench/asgi_wsgi/flask-wsgi.py new file mode 100644 index 0000000..00c6f8d --- /dev/null +++ b/bench/asgi_wsgi/flask-wsgi.py @@ -0,0 +1,12 @@ +from flask import Flask +from socketify import WSGI + +app = Flask(__name__) + +@app.route('/') +def index(): + return 'Hello, World!' + +def run_app(): + WSGI(app, request_response_factory_max_itens=200_000).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + diff --git a/bench/asgi_wsgi/raw-asgi.py b/bench/asgi_wsgi/raw-asgi.py index e714e82..5cdf117 100644 --- a/bench/asgi_wsgi/raw-asgi.py +++ b/bench/asgi_wsgi/raw-asgi.py @@ -21,5 +21,3 @@ async def app(scope, receive, send): if __name__ == "__main__": ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() - -# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker \ No newline at end of file diff --git a/bench/asgi_wsgi/raw-ws-bench-pubsub.py b/bench/asgi_wsgi/raw-ws-bench-pubsub.py index 6ae7c89..1f1cdaa 100644 --- a/bench/asgi_wsgi/raw-ws-bench-pubsub.py +++ b/bench/asgi_wsgi/raw-ws-bench-pubsub.py @@ -47,9 +47,10 @@ async def app(scope, receive, send): else: print("remaining_clients", remaining_clients) - scope = await receive() # get data while True: + scope = await receive() + type = scope['type'] # disconnected! if type == 'websocket.disconnect': @@ -57,13 +58,14 @@ async def app(scope, receive, send): print("remaining_clients", remaining_clients) break + await send({ 'type': 'websocket.publish', 'topic': "all", 'text': scope.get('text', '') }) - scope = await receive() + @@ -71,4 +73,4 @@ async def app(scope, receive, send): if __name__ == "__main__": ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() -# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker +# python3 -m gunicorn raw-ws-bench-pubsub:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker diff --git a/bench/asgi_wsgi/raw-ws-bench.py b/bench/asgi_wsgi/raw-ws-bench.py index bdb060d..39bb277 100644 --- a/bench/asgi_wsgi/raw-ws-bench.py +++ b/bench/asgi_wsgi/raw-ws-bench.py @@ -44,12 +44,12 @@ async def app(scope, receive, send): if remaining_clients == 0: await broadcast("ready") - else: - print("remaining_clients", remaining_clients) - scope = await receive() + # get data while True: + scope = await receive() + type = scope['type'] # disconnected! if type == 'websocket.disconnect': @@ -58,7 +58,7 @@ async def app(scope, receive, send): break await broadcast(scope.get('text', '')) - scope = await receive() + @@ -66,4 +66,5 @@ async def app(scope, receive, send): if __name__ == "__main__": ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + # python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker diff --git a/bench/asgi_wsgi/raw-wsgi.py b/bench/asgi_wsgi/raw-wsgi.py index e020b7f..2391eee 100644 --- a/bench/asgi_wsgi/raw-wsgi.py +++ b/bench/asgi_wsgi/raw-wsgi.py @@ -6,4 +6,3 @@ def app(environ, start_response): if __name__ == "__main__": WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() - diff --git a/bench/falcon_plaintext.py b/bench/falcon_plaintext.py index c163756..062c43b 100644 --- a/bench/falcon_plaintext.py +++ b/bench/falcon_plaintext.py @@ -1,7 +1,7 @@ from wsgiref.simple_server import make_server import falcon - +# check ./asgi_wsgi/falcon-ws-pubsub.py for pub/sub extension class Home: def on_get(self, req, resp): diff --git a/bench/robyn_plaintext.py b/bench/robyn_plaintext.py index d1f86b2..92479ed 100644 --- a/bench/robyn_plaintext.py +++ b/bench/robyn_plaintext.py @@ -11,4 +11,4 @@ def h(request): app.start(port=8000) # python3 ./robyn_plaintext.py --processes 4 --log-level CRITICAL -# pypy3 did not compile +# pypy3 did not compile \ No newline at end of file diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index a57514e..a7db6ab 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -1,11 +1,12 @@ from socketify import App import os import multiprocessing - - def run_app(): app = App(request_response_factory_max_itens=200_000) - app.get("/", lambda res, req: res.end("Hello, World!")) + def home(res, req): + res.end("Hello, World!") + + app.get("/", home) app.listen( 8000, lambda config: print( @@ -26,5 +27,5 @@ def create_fork(): # fork limiting the cpu count - 1 for i in range(1, multiprocessing.cpu_count()): create_fork() - + run_app() # run app on the main process too :) diff --git a/bench/ssgi/raw-http.py b/bench/ssgi/raw-http.py new file mode 100644 index 0000000..7b93308 --- /dev/null +++ b/bench/ssgi/raw-http.py @@ -0,0 +1,23 @@ +from socketify import SSGI + +class Application: + def get_supported(self, supported_interfaces): + + def ssgi(type, method, path, query_string, get_header, res): + # if type == "http": + res.send(b'Hello, World!') + # else: + # res.reject() # reject websocket connections + + return { + "http": ("ssgi" if supported_interfaces.get("ssgi", None) else None, ssgi), + # "websocket": ("ssgi" if supported_interfaces.get("ssgi", None) else None, ssgi) + } + + +app = Application() + +if __name__ == "__main__": + SSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + +# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker \ No newline at end of file diff --git a/bench/uvicorn_plaintext.py b/bench/uvicorn_plaintext.py index 57a165c..696f671 100644 --- a/bench/uvicorn_plaintext.py +++ b/bench/uvicorn_plaintext.py @@ -18,5 +18,5 @@ async def app(scope, receive, send): ) -# python3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker -# pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker +# python3 -m gunicorn uvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker +# pypy3 -m gunicorn uvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker diff --git a/src/socketify/__init__.py b/src/socketify/__init__.py index 725117c..f56f334 100644 --- a/src/socketify/__init__.py +++ b/src/socketify/__init__.py @@ -12,4 +12,7 @@ from .asgi import ( from .wsgi import ( WSGI ) +from .ssgi import ( + SSGI +) from .helpers import sendfile, middleware, MiddlewareRouter diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index b68eaae..b932d38 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,6 +1,7 @@ from socketify import App, CompressOptions, OpCode -import asyncio from queue import SimpleQueue +from .native import lib, ffi + # Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way # re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False } @@ -10,11 +11,14 @@ class ASGIWebSocket: self.loop = loop self.accept_future = None self.ws = None + self._disconnected = False self.receive_queue = SimpleQueue() self.miss_receive_queue = SimpleQueue() self.miss_receive_queue.put({ 'type': 'websocket.connect' }, False) + self._code = None + self._message = None def accept(self): self.accept_future = self.loop.create_future() @@ -30,13 +34,23 @@ class ASGIWebSocket: if not self.miss_receive_queue.empty(): future.set_result(self.miss_receive_queue.get(False)) return future - + if self._disconnected: + future.set_result({ + 'type': 'websocket.disconnect', + 'code': self._code, + 'message': self._message + }) + return future + self.receive_queue.put(future, False) return future def disconnect(self, code, message): self.ws = None + self._disconnected = True + self._code = code + self._message = message if not self.receive_queue.empty(): future = self.receive_queue.get(False) future.set_result({ @@ -44,18 +58,12 @@ class ASGIWebSocket: 'code': code, 'message': message }) - else: - self.miss_receive_queue.put({ - 'type': 'websocket.disconnect', - 'code': code, - 'message': message - }, False) def message(self, ws, value, opcode): self.ws = ws - if self.receive_queue.empty(): + if self.receive_queue.empty(): if opcode == OpCode.TEXT: self.miss_receive_queue.put({ 'type': 'websocket.receive', @@ -82,6 +90,111 @@ class ASGIWebSocket: }) + +def write_header(ssl, res, key, value): + if isinstance(key, str): + key_data = key.encode("utf-8") + elif isinstance(key, bytes): + key_data = key + + if isinstance(value, int): + lib.uws_res_write_header_int( + ssl, + res, + key_data, + len(key_data), + ffi.cast("uint64_t", value), + ) + elif isinstance(value, str): + value_data = value.encode("utf-8") + elif isinstance(value, bytes): + value_data = value + lib.uws_res_write_header( + ssl, res, key_data, len(key_data), value_data, len(value_data) + ) + +@ffi.callback("void(uws_res_t*, void*)") +def uws_asgi_corked_response_start_handler(res, user_data): + (ssl, status, headers) = ffi.from_handle(user_data) + lib.socketify_res_write_int_status(ssl, res, status) + for name, value in headers: + write_header(ssl, res, name, value) + + +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") +def asgi(ssl, response, info, user_data, aborted): + app = ffi.from_handle(user_data) + + headers = [] + next_header = info.header_list + while next_header != ffi.NULL: + header = next_header[0] + headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) + next_header = ffi.cast("socketify_header*", next_header.next) + url = ffi.unpack(info.url, info.url_size) + scope = { + 'type': 'http', + 'asgi': { + 'version': '3.0', + 'spec_version': '2.3' + }, + 'http_version': '1.1', + 'server': (app.SERVER_HOST, app.SERVER_PORT), + 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), + 'scheme': app.SERVER_SCHEME, + 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), + 'root_path': '', + 'path': url.decode('utf8'), + 'raw_path': url, + 'query_string': ffi.unpack(info.query_string, info.query_string_size), + 'headers': headers + } + async def receive(): + if bool(aborted[0]): + return { 'type': 'http.disconnect'} + # if scope.get("content-length", False) or scope.get("transfer-encoding", False): + # data = await res.get_data() + # if data: + # # all at once but could get in chunks + # return { + # 'type': 'http.request', + # 'body': data.getvalue(), + # 'more_body': False + # } + # no body, just empty + return EMPTY_RESPONSE + async def send(options): + if bool(aborted[0]): + return False + type = options['type'] + if type == 'http.response.start': + #can also be more native optimized to do it in one GIL call + #try socketify_res_write_int_status_with_headers and create and socketify_res_cork_write_int_status_with_headers + status_code = options.get('status', 200) + headers = options.get('headers', []) + cork_data = ffi.new_handle((ssl, status_code, headers)) + lib.uws_res_cork(ssl, response, uws_asgi_corked_response_start_handler, cork_data) + return True + + if type == 'http.response.body': + + #native optimized end/send + message = options.get('body', b'') + + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + data = b'' + if options.get('more_body', False): + lib.socketify_res_cork_write(ssl, response, data, len(data)) + else: + lib.socketify_res_cork_end(ssl, response, data, len(data), 0) + return True + return False + + app.server.loop.run_async(app.app(scope, receive, send)) class ASGI: def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) @@ -91,80 +204,40 @@ class ASGI: self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' self.app = app - - def asgi(res, req): - PATH_INFO = req.get_url() - FULL_PATH_INFO = req.get_full_url() - headers = [] - req.for_each_header(lambda name, value: headers.append((name.encode('utf8'), value.encode('utf8')))) - - scope = { - 'type': 'http', - 'asgi': { - 'version': '3.0', - 'spec_version': '2.0' - }, - 'http_version': '1.1', - 'server': (self.SERVER_HOST, self.SERVER_PORT), - 'client': (res.get_remote_address(), None), - 'scheme': self.SERVER_SCHEME, - 'method': req.get_method(), - 'root_path': '', - 'path': PATH_INFO, - 'raw_path': PATH_INFO.encode('utf8'), - 'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'), - 'headers': headers - } - - async def receive(): - if res.aborted: - return { 'type': 'http.disconnect'} - - if scope.get("content-length", False) or scope.get("transfer-encoding", False): - data = await res.get_data() - if data: - # all at once but could get in chunks - return { - 'type': 'http.request', - 'body': data.getvalue(), - 'more_body': False - } - # no body, just empty - return EMPTY_RESPONSE - - async def send(options): - if res.aborted: return False - type = options['type'] - if type == 'http.response.start': - res.write_status(options.get('status', 200)) - for header in options.get('headers', []): - res.write_header(header[0], header[1]) - return True - - if type == 'http.response.body': - if options.get('more_body', False): - res.write(options.get('body', "")) - else: - res.cork_end(options.get('body', "")) - return True - return False - #grab handler - res.grab_aborted_handler() - asyncio.ensure_future(app(scope, receive, send)) - self.server.any("/*", asgi) + # optimized in native + self._ptr = ffi.new_handle(self) + self.asgi_http_info = lib.socketify_add_asgi_http_handler( + self.server.SSL, + self.server.app, + asgi, + self._ptr + ) def ws_upgrade(res, req, socket_context): - PATH_INFO = req.get_url() - FULL_PATH_INFO = req.get_full_url() + info = lib.socketify_asgi_ws_request(res.SSL, req.req, res.res) + headers = [] - def filtered_headers(name, value): - if name != "sec-websocket-protocol": - headers.append((name.encode('utf8'), value.encode('utf8'))) + next_header = info.header_list + while next_header != ffi.NULL: + header = next_header[0] + headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) + next_header = ffi.cast("socketify_header*", next_header.next) - req.for_each_header(filtered_headers) - key = req.get_header("sec-websocket-key") - protocol = req.get_header("sec-websocket-protocol") - extensions = req.get_header("sec-websocket-extensions") + url = ffi.unpack(info.url, info.url_size) + + if info.key == ffi.NULL: + key = None + else: + key = ffi.unpack(info.key, info.key_size).decode('utf8') + + if info.protocol == ffi.NULL: + protocol = None + else: + protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8') + if info.extensions == ffi.NULL: + extensions = None + else: + extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8') ws = ASGIWebSocket(self.server.loop) @@ -172,30 +245,29 @@ class ASGI: 'type': 'websocket', 'asgi': { 'version': '3.0', - 'spec_version': '2.0' + 'spec_version': '2.3' }, 'http_version': '1.1', 'server': (self.SERVER_HOST, self.SERVER_PORT), - 'client': (res.get_remote_address(), None), + 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), 'scheme': self.SERVER_WS_SCHEME, - 'method': req.get_method(), + 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), 'root_path': '', - 'path': PATH_INFO, - 'raw_path': PATH_INFO.encode('utf8'), - 'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'), + 'path': url.decode('utf8'), + 'raw_path': url, + 'query_string': ffi.unpack(info.query_string, info.query_string_size), 'headers': headers, 'subprotocols': [protocol] if protocol else [], 'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True } } - server = self.server + lib.socketify_destroy_headers(info.header_list) async def send(options): - nonlocal ws, res, server - if res.aborted: return False type = options['type'] if type == 'websocket.send': bytes = options.get("bytes", None) - if ws.ws: + + if ws.ws: if bytes: ws.ws.cork_send(bytes, OpCode.BINARY) else: @@ -204,8 +276,13 @@ class ASGI: return False if type == 'websocket.accept': # upgrade! - for header in options.get('headers', []): - res.write_header(header[0], header[1]) + res_headers = options.get('headers', None) + def corked(res): + for header in res_headers: + res.write_header(header[0], header[1]) + if res_headers: + res.cork(corked) + future = ws.accept() upgrade_protocol = options.get('subprotocol', protocol) res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws) @@ -213,29 +290,26 @@ class ASGI: if type == 'websocket.close': # code and reason? if ws.ws: ws.ws.close() - else: res.write_status(403).end_without_body() + else: res.cork(lambda res: res.write_status(403).end_without_body()) return True if type == 'websocket.publish': # publish extension bytes = options.get("bytes", None) if bytes: - server.publish(options.get('topic'), bytes) + self.server.publish(options.get('topic'), bytes) else: - server.publish(options.get('topic'), options.get('text'), OpCode.TEXT) + self.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT) return True if type == 'websocket.subscribe': # subscribe extension if ws.ws: ws.ws.subscribe(options.get('topic')) - else: res.write_status(403).end_without_body() + else: res.cork(lambda res: res.write_status(403).end_without_body()) return True if type == 'websocket.unsubscribe': # unsubscribe extension if ws.ws: ws.ws.unsubscribe(options.get('topic')) - else: res.write_status(403).end_without_body() + else: res.cork(lambda res: res.write_status(403).end_without_body()) return True return False - - #grab handler - res.grab_aborted_handler() - asyncio.ensure_future(app(scope, ws.receive, send)) + res.run_async(app(scope, ws.receive, send)) self.server.ws("/*", { @@ -255,4 +329,8 @@ class ASGI: return self def run(self): self.server.run() - return self \ No newline at end of file + return self + + def __del__(self): + if self.asgi_http_info: + lib.socketify_destroy_asgi_app_info(self.asgi_http_info) \ No newline at end of file diff --git a/src/socketify/libsocketify_linux_amd64.so b/src/socketify/libsocketify_linux_amd64.so index 75d4989..329d734 100755 Binary files a/src/socketify/libsocketify_linux_amd64.so and b/src/socketify/libsocketify_linux_amd64.so differ diff --git a/src/socketify/loop.py b/src/socketify/loop.py index f177a8f..f379e72 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -25,7 +25,7 @@ def future_handler(future, loop, exception_handler, response): class Loop: def __init__(self, exception_handler=None): - self.loop = asyncio.new_event_loop() + self.loop = asyncio.get_event_loop() self.uv_loop = UVLoop() if hasattr(exception_handler, "__call__"): @@ -36,9 +36,7 @@ class Loop: else: self.exception_handler = None - asyncio.set_event_loop(self.loop) self.started = False - self.last_defer = False def set_timeout(self, timeout, callback, user_data): return self.uv_loop.create_timer(timeout, 0, callback, user_data) @@ -46,14 +44,14 @@ class Loop: def create_future(self): return self.loop.create_future() - def keep_alive(self): + def _keep_alive(self): if self.started: self.uv_loop.run_once() - self.loop.call_soon(self.keep_alive) + self.loop.call_soon(self._keep_alive) def run(self): self.started = True - self.loop.call_soon(self.keep_alive) + self.loop.call_soon(self._keep_alive) self.loop.run_forever() # clean up uvloop self.uv_loop.stop() @@ -74,6 +72,7 @@ class Loop: # Exposes native loop for uWS def get_native_loop(self): return self.uv_loop.get_native_loop() + def run_async(self, task, response=None): # with run_once diff --git a/src/socketify/native.py b/src/socketify/native.py new file mode 100644 index 0000000..f5921bb --- /dev/null +++ b/src/socketify/native.py @@ -0,0 +1,386 @@ +import cffi +import platform +import os + +ffi = cffi.FFI() +ffi.cdef( + """ + +struct us_socket_context_options_t { + const char *key_file_name; + const char *cert_file_name; + const char *passphrase; + const char *dh_params_file_name; + const char *ca_file_name; + const char *ssl_ciphers; + int ssl_prefer_low_memory_usage; +}; + + +struct us_socket_context_t { + struct us_loop_t *loop; + unsigned short timestamp; + struct us_socket_t *head; + struct us_socket_t *iterator; + struct us_socket_context_t *prev, *next; + struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length); + struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length); + struct us_socket_t *(*on_writable)(struct us_socket_t *); + struct us_socket_t *(*on_close)(struct us_socket_t *, int code, void *reason); + struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *); + struct us_socket_t *(*on_end)(struct us_socket_t *); + struct us_socket_t *(*on_connect_error)(struct us_socket_t *, int code); + int (*is_low_prio)(struct us_socket_t *); +}; + +struct us_poll_t { + struct { + signed int fd : 28; + unsigned int poll_type : 4; + } state; +}; + + +struct us_socket_t { + struct us_poll_t p; + struct us_socket_context_t *context; + struct us_socket_t *prev, *next; + unsigned short timeout : 14; + unsigned short low_prio_state : 2; +}; + +struct us_listen_socket_t { + struct us_socket_t s; + unsigned int socket_ext_size; +}; +void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls); +int us_socket_local_port(int ssl, struct us_listen_socket_t *ls); +struct us_loop_t *uws_get_loop(); +struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop); +typedef enum +{ + _COMPRESSOR_MASK = 0x00FF, + _DECOMPRESSOR_MASK = 0x0F00, + DISABLED = 0, + SHARED_COMPRESSOR = 1, + SHARED_DECOMPRESSOR = 1 << 8, + DEDICATED_DECOMPRESSOR_32KB = 15 << 8, + DEDICATED_DECOMPRESSOR_16KB = 14 << 8, + DEDICATED_DECOMPRESSOR_8KB = 13 << 8, + DEDICATED_DECOMPRESSOR_4KB = 12 << 8, + DEDICATED_DECOMPRESSOR_2KB = 11 << 8, + DEDICATED_DECOMPRESSOR_1KB = 10 << 8, + DEDICATED_DECOMPRESSOR_512B = 9 << 8, + DEDICATED_DECOMPRESSOR = 15 << 8, + DEDICATED_COMPRESSOR_3KB = 9 << 4 | 1, + DEDICATED_COMPRESSOR_4KB = 9 << 4 | 2, + DEDICATED_COMPRESSOR_8KB = 10 << 4 | 3, + DEDICATED_COMPRESSOR_16KB = 11 << 4 | 4, + DEDICATED_COMPRESSOR_32KB = 12 << 4 | 5, + DEDICATED_COMPRESSOR_64KB = 13 << 4 | 6, + DEDICATED_COMPRESSOR_128KB = 14 << 4 | 7, + DEDICATED_COMPRESSOR_256KB = 15 << 4 | 8, + DEDICATED_COMPRESSOR = 15 << 4 | 8 +} uws_compress_options_t; + +typedef enum +{ + CONTINUATION = 0, + TEXT = 1, + BINARY = 2, + CLOSE = 8, + PING = 9, + PONG = 10 +} uws_opcode_t; + +typedef enum +{ + BACKPRESSURE, + SUCCESS, + DROPPED +} uws_sendstatus_t; + +typedef struct +{ + int port; + const char *host; + int options; +} uws_app_listen_config_t; + +struct uws_app_s; +struct uws_req_s; +struct uws_res_s; +struct uws_websocket_s; +struct uws_header_iterator_s; +typedef struct uws_app_s uws_app_t; +typedef struct uws_req_s uws_req_t; +typedef struct uws_res_s uws_res_t; +typedef struct uws_socket_context_s uws_socket_context_t; +typedef struct uws_websocket_s uws_websocket_t; + +typedef void (*uws_websocket_handler)(uws_websocket_t *ws, void* user_data); +typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void* user_data); +typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data); +typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data); +typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data); +typedef struct +{ + uws_compress_options_t compression; + unsigned int maxPayloadLength; + unsigned short idleTimeout; + unsigned int maxBackpressure; + bool closeOnBackpressureLimit; + bool resetIdleTimeoutOnSend; + bool sendPingsAutomatically; + unsigned short maxLifetime; + uws_websocket_upgrade_handler upgrade; + uws_websocket_handler open; + uws_websocket_message_handler message; + uws_websocket_handler drain; + uws_websocket_ping_pong_handler ping; + uws_websocket_ping_pong_handler pong; + uws_websocket_close_handler close; +} uws_socket_behavior_t; + +typedef struct { + bool ok; + bool has_responded; +} uws_try_end_result_t; + +typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data); +typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data); +typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data); +typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data); +typedef void (*uws_get_headers_server_handler)(const char *header_name, size_t header_name_size, const char *header_value, size_t header_value_size, void *user_data); + + +uws_app_t *uws_create_app(int ssl, struct us_socket_context_options_t options); +void uws_app_destroy(int ssl, uws_app_t *app); +void uws_app_get(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_post(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_options(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_patch(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_put(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_head(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_connect(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_trace(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); +void uws_app_any(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); + +void uws_app_run(int ssl, uws_app_t *); + +void uws_app_listen(int ssl, uws_app_t *app, int port, uws_listen_handler handler, void *user_data); +void uws_app_listen_with_config(int ssl, uws_app_t *app, uws_app_listen_config_t config, uws_listen_handler handler, void *user_data); +bool uws_constructor_failed(int ssl, uws_app_t *app); +unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic, size_t topic_length); +bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); +void *uws_get_native_handle(int ssl, uws_app_t *app); +void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); +void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); +void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options); +void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); +void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); + + +void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection); +void uws_res_pause(int ssl, uws_res_t *res); +void uws_res_resume(int ssl, uws_res_t *res); +void uws_res_write_continue(int ssl, uws_res_t *res); +void uws_res_write_status(int ssl, uws_res_t *res, const char *status, size_t length); +void uws_res_write_header(int ssl, uws_res_t *res, const char *key, size_t key_length, const char *value, size_t value_length); +void uws_res_override_write_offset(int ssl, uws_res_t *res, uintmax_t offset); + +void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, size_t key_length, uint64_t value); +void uws_res_end_without_body(int ssl, uws_res_t *res, bool close_connection); +bool uws_res_write(int ssl, uws_res_t *res, const char *data, size_t length); +uintmax_t uws_res_get_write_offset(int ssl, uws_res_t *res); +void *uws_res_get_native_handle(int ssl, uws_res_t *res); +bool uws_res_has_responded(int ssl, uws_res_t *res); +void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data); +void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data); +void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data); +void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws); +uws_try_end_result_t uws_res_try_end(int ssl, uws_res_t *res, const char *data, size_t length, uintmax_t total_size, bool close_connection); +void uws_res_cork(int ssl, uws_res_t *res,void(*callback)(uws_res_t *res, void* user_data) ,void* user_data); +size_t uws_res_get_remote_address(int ssl, uws_res_t *res, const char **dest); +size_t uws_res_get_remote_address_as_text(int ssl, uws_res_t *res, const char **dest); +size_t uws_res_get_proxied_remote_address(int ssl, uws_res_t *res, const char **dest); +size_t uws_res_get_proxied_remote_address_as_text(int ssl, uws_res_t *res, const char **dest); + +bool uws_req_is_ancient(uws_req_t *res); +bool uws_req_get_yield(uws_req_t *res); +void uws_req_set_field(uws_req_t *res, bool yield); +size_t uws_req_get_url(uws_req_t *res, const char **dest); +size_t uws_req_get_method(uws_req_t *res, const char **dest); +size_t uws_req_get_case_sensitive_method(uws_req_t *res, const char **dest); + +size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header, size_t lower_case_header_length, const char **dest); +size_t uws_req_get_query(uws_req_t *res, const char *key, size_t key_length, const char **dest); +size_t uws_req_get_parameter(uws_req_t *res, unsigned short index, const char **dest); +size_t uws_req_get_full_url(uws_req_t *res, const char **dest); +void uws_req_for_each_header(uws_req_t *res, uws_get_headers_server_handler handler, void *user_data); + +void uws_ws(int ssl, uws_app_t *app, const char *pattern, uws_socket_behavior_t behavior, void* user_data); +void *uws_ws_get_user_data(int ssl, uws_websocket_t *ws); +void uws_ws_close(int ssl, uws_websocket_t *ws); +uws_sendstatus_t uws_ws_send(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode); +uws_sendstatus_t uws_ws_send_with_options(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress, bool fin); +uws_sendstatus_t uws_ws_send_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +uws_sendstatus_t uws_ws_send_first_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +uws_sendstatus_t uws_ws_send_first_fragment_with_opcode(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress); +uws_sendstatus_t uws_ws_send_last_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +void uws_ws_end(int ssl, uws_websocket_t *ws, int code, const char *message, size_t length); +void uws_ws_cork(int ssl, uws_websocket_t *ws, void (*handler)(void *user_data), void *user_data); + +bool uws_ws_subscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length); +bool uws_ws_unsubscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length); +bool uws_ws_is_subscribed(int ssl, uws_websocket_t *ws, const char *topic, size_t length); +void uws_ws_iterate_topics(int ssl, uws_websocket_t *ws, void (*callback)(const char *topic, size_t length, void *user_data), void *user_data); +bool uws_ws_publish(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length); +bool uws_ws_publish_with_options(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); +int uws_ws_get_buffered_amount(int ssl, uws_websocket_t *ws); +size_t uws_ws_get_remote_address(int ssl, uws_websocket_t *ws, const char **dest); +size_t uws_ws_get_remote_address_as_text(int ssl, uws_websocket_t *ws, const char **dest); + + + +typedef void (*socketify_prepare_handler)(void* user_data); +typedef void (*socketify_timer_handler)(void* user_data); +typedef void (*socketify_async_handler)(void* user_data); + +typedef enum { + SOCKETIFY_RUN_DEFAULT = 0, + SOCKETIFY_RUN_ONCE, + SOCKETIFY_RUN_NOWAIT +} socketify_run_mode; + +typedef struct { + void* uv_prepare_ptr; + socketify_prepare_handler on_prepare_handler; + void* on_prepare_data; + void* uv_loop; +} socketify_loop; + +typedef struct{ + void* uv_timer_ptr; + socketify_timer_handler handler; + void* user_data; +} socketify_timer; + +typedef struct{ + void* uv_async_ptr; + socketify_async_handler handler; + void* user_data; +} socketify_async; + +socketify_loop * socketify_create_loop(); +bool socketify_constructor_failed(socketify_loop* loop); +bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data); +bool socketify_prepare_unbind(socketify_loop* loop); +void socketify_destroy_loop(socketify_loop* loop); +void* socketify_get_native_loop(socketify_loop* loop); + +int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode); +void socketify_loop_stop(socketify_loop* loop); + +socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void* user_data); +void socketify_timer_destroy(socketify_timer* timer); +bool socketify_async_call(socketify_loop* loop, socketify_async_handler handler, void* user_data); +void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat); + + +socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data); +void socketify_check_destroy(socketify_timer* timer); + +typedef struct { + + const char* name; + const char* value; + + size_t name_size; + size_t value_size; + + void* next; +} socketify_header; + + +typedef struct { + + const char* full_url; + const char* url; + const char* query_string; + const char* method; + const char* remote_address; + + size_t full_url_size; + size_t url_size; + size_t query_string_size; + size_t method_size; + size_t remote_address_size; + + socketify_header* header_list; +} socketify_asgi_data; + +typedef struct { + + const char* full_url; + const char* url; + const char* query_string; + const char* method; + const char* remote_address; + + size_t full_url_size; + size_t url_size; + size_t query_string_size; + size_t method_size; + size_t remote_address_size; + + const char* protocol; + const char* key; + const char* extensions; + size_t protocol_size; + size_t key_size; + size_t extensions_size; + + socketify_header* header_list; +} socketify_asgi_ws_data; + +typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data, bool* aborted); +typedef struct { + int ssl; + uws_app_t* app; + socketify_asgi_method_handler handler; + void * user_data; +} socksocketify_asgi_app_info; + + +socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res); +void socketify_destroy_headers(socketify_header* headers); +bool socketify_res_write_int_status_with_headers(int ssl, uws_res_t* res, int code, socketify_header* headers); +void socketify_res_write_headers(int ssl, uws_res_t* res, socketify_header* headers); +socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_res_t *res); + +socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data); +void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app); + +void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length); +void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection); + +""" +) + +library_extension = "dll" if platform.system().lower() == "windows" else "so" +library_path = os.path.join( + os.path.dirname(__file__), + "libsocketify_%s_%s.%s" + % ( + platform.system().lower(), + "arm64" if "arm" in platform.processor().lower() else "amd64", + library_extension, + ), +) + + +lib = ffi.dlopen(library_path) + + diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index af9b033..5733e73 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -13,6 +13,7 @@ import uuid from urllib.parse import parse_qs, quote_plus, unquote_plus import logging +from .native import ffi, lib from .loop import Loop from .status_codes import status_codes from .helpers import static_route @@ -22,261 +23,6 @@ mimetypes.init() is_python = platform.python_implementation() == "CPython" -ffi = cffi.FFI() -ffi.cdef( - """ - -struct us_socket_context_options_t { - const char *key_file_name; - const char *cert_file_name; - const char *passphrase; - const char *dh_params_file_name; - const char *ca_file_name; - const char *ssl_ciphers; - int ssl_prefer_low_memory_usage; -}; - - -struct us_socket_context_t { - struct us_loop_t *loop; - unsigned short timestamp; - struct us_socket_t *head; - struct us_socket_t *iterator; - struct us_socket_context_t *prev, *next; - struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length); - struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length); - struct us_socket_t *(*on_writable)(struct us_socket_t *); - struct us_socket_t *(*on_close)(struct us_socket_t *, int code, void *reason); - struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *); - struct us_socket_t *(*on_end)(struct us_socket_t *); - struct us_socket_t *(*on_connect_error)(struct us_socket_t *, int code); - int (*is_low_prio)(struct us_socket_t *); -}; - -struct us_poll_t { - struct { - signed int fd : 28; - unsigned int poll_type : 4; - } state; -}; - - -struct us_socket_t { - struct us_poll_t p; - struct us_socket_context_t *context; - struct us_socket_t *prev, *next; - unsigned short timeout : 14; - unsigned short low_prio_state : 2; -}; - -struct us_listen_socket_t { - struct us_socket_t s; - unsigned int socket_ext_size; -}; -void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls); -int us_socket_local_port(int ssl, struct us_listen_socket_t *ls); -struct us_loop_t *uws_get_loop(); -struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop); -typedef enum -{ - _COMPRESSOR_MASK = 0x00FF, - _DECOMPRESSOR_MASK = 0x0F00, - DISABLED = 0, - SHARED_COMPRESSOR = 1, - SHARED_DECOMPRESSOR = 1 << 8, - DEDICATED_DECOMPRESSOR_32KB = 15 << 8, - DEDICATED_DECOMPRESSOR_16KB = 14 << 8, - DEDICATED_DECOMPRESSOR_8KB = 13 << 8, - DEDICATED_DECOMPRESSOR_4KB = 12 << 8, - DEDICATED_DECOMPRESSOR_2KB = 11 << 8, - DEDICATED_DECOMPRESSOR_1KB = 10 << 8, - DEDICATED_DECOMPRESSOR_512B = 9 << 8, - DEDICATED_DECOMPRESSOR = 15 << 8, - DEDICATED_COMPRESSOR_3KB = 9 << 4 | 1, - DEDICATED_COMPRESSOR_4KB = 9 << 4 | 2, - DEDICATED_COMPRESSOR_8KB = 10 << 4 | 3, - DEDICATED_COMPRESSOR_16KB = 11 << 4 | 4, - DEDICATED_COMPRESSOR_32KB = 12 << 4 | 5, - DEDICATED_COMPRESSOR_64KB = 13 << 4 | 6, - DEDICATED_COMPRESSOR_128KB = 14 << 4 | 7, - DEDICATED_COMPRESSOR_256KB = 15 << 4 | 8, - DEDICATED_COMPRESSOR = 15 << 4 | 8 -} uws_compress_options_t; - -typedef enum -{ - CONTINUATION = 0, - TEXT = 1, - BINARY = 2, - CLOSE = 8, - PING = 9, - PONG = 10 -} uws_opcode_t; - -typedef enum -{ - BACKPRESSURE, - SUCCESS, - DROPPED -} uws_sendstatus_t; - -typedef struct -{ - int port; - const char *host; - int options; -} uws_app_listen_config_t; - -struct uws_app_s; -struct uws_req_s; -struct uws_res_s; -struct uws_websocket_s; -struct uws_header_iterator_s; -typedef struct uws_app_s uws_app_t; -typedef struct uws_req_s uws_req_t; -typedef struct uws_res_s uws_res_t; -typedef struct uws_socket_context_s uws_socket_context_t; -typedef struct uws_websocket_s uws_websocket_t; - -typedef void (*uws_websocket_handler)(uws_websocket_t *ws, void* user_data); -typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void* user_data); -typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data); -typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data); -typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data); -typedef struct -{ - uws_compress_options_t compression; - unsigned int maxPayloadLength; - unsigned short idleTimeout; - unsigned int maxBackpressure; - bool closeOnBackpressureLimit; - bool resetIdleTimeoutOnSend; - bool sendPingsAutomatically; - unsigned short maxLifetime; - uws_websocket_upgrade_handler upgrade; - uws_websocket_handler open; - uws_websocket_message_handler message; - uws_websocket_handler drain; - uws_websocket_ping_pong_handler ping; - uws_websocket_ping_pong_handler pong; - uws_websocket_close_handler close; -} uws_socket_behavior_t; - -typedef struct { - bool ok; - bool has_responded; -} uws_try_end_result_t; - -typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data); -typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data); -typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data); -typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data); -typedef void (*uws_get_headers_server_handler)(const char *header_name, size_t header_name_size, const char *header_value, size_t header_value_size, void *user_data); - - -uws_app_t *uws_create_app(int ssl, struct us_socket_context_options_t options); -void uws_app_destroy(int ssl, uws_app_t *app); -void uws_app_get(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_post(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_options(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_patch(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_put(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_head(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_connect(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_trace(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); -void uws_app_any(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data); - -void uws_app_run(int ssl, uws_app_t *); - -void uws_app_listen(int ssl, uws_app_t *app, int port, uws_listen_handler handler, void *user_data); -void uws_app_listen_with_config(int ssl, uws_app_t *app, uws_app_listen_config_t config, uws_listen_handler handler, void *user_data); -bool uws_constructor_failed(int ssl, uws_app_t *app); -unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic, size_t topic_length); -bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); -void *uws_get_native_handle(int ssl, uws_app_t *app); -void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); -void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); -void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options); -void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); -void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); - - -void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection); -void uws_res_pause(int ssl, uws_res_t *res); -void uws_res_resume(int ssl, uws_res_t *res); -void uws_res_write_continue(int ssl, uws_res_t *res); -void uws_res_write_status(int ssl, uws_res_t *res, const char *status, size_t length); -void uws_res_write_header(int ssl, uws_res_t *res, const char *key, size_t key_length, const char *value, size_t value_length); -void uws_res_override_write_offset(int ssl, uws_res_t *res, uintmax_t offset); - -void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, size_t key_length, uint64_t value); -void uws_res_end_without_body(int ssl, uws_res_t *res, bool close_connection); -bool uws_res_write(int ssl, uws_res_t *res, const char *data, size_t length); -uintmax_t uws_res_get_write_offset(int ssl, uws_res_t *res); -void *uws_res_get_native_handle(int ssl, uws_res_t *res); -bool uws_res_has_responded(int ssl, uws_res_t *res); -void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data); -void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data); -void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data); -void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws); -uws_try_end_result_t uws_res_try_end(int ssl, uws_res_t *res, const char *data, size_t length, uintmax_t total_size, bool close_connection); -void uws_res_cork(int ssl, uws_res_t *res,void(*callback)(uws_res_t *res, void* user_data) ,void* user_data); -size_t uws_res_get_remote_address(int ssl, uws_res_t *res, const char **dest); -size_t uws_res_get_remote_address_as_text(int ssl, uws_res_t *res, const char **dest); -size_t uws_res_get_proxied_remote_address(int ssl, uws_res_t *res, const char **dest); -size_t uws_res_get_proxied_remote_address_as_text(int ssl, uws_res_t *res, const char **dest); - -bool uws_req_is_ancient(uws_req_t *res); -bool uws_req_get_yield(uws_req_t *res); -void uws_req_set_field(uws_req_t *res, bool yield); -size_t uws_req_get_url(uws_req_t *res, const char **dest); -size_t uws_req_get_method(uws_req_t *res, const char **dest); -size_t uws_req_get_case_sensitive_method(uws_req_t *res, const char **dest); - -size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header, size_t lower_case_header_length, const char **dest); -size_t uws_req_get_query(uws_req_t *res, const char *key, size_t key_length, const char **dest); -size_t uws_req_get_parameter(uws_req_t *res, unsigned short index, const char **dest); -size_t uws_req_get_full_url(uws_req_t *res, const char **dest); -void uws_req_for_each_header(uws_req_t *res, uws_get_headers_server_handler handler, void *user_data); - -void uws_ws(int ssl, uws_app_t *app, const char *pattern, uws_socket_behavior_t behavior, void* user_data); -void *uws_ws_get_user_data(int ssl, uws_websocket_t *ws); -void uws_ws_close(int ssl, uws_websocket_t *ws); -uws_sendstatus_t uws_ws_send(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode); -uws_sendstatus_t uws_ws_send_with_options(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress, bool fin); -uws_sendstatus_t uws_ws_send_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); -uws_sendstatus_t uws_ws_send_first_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); -uws_sendstatus_t uws_ws_send_first_fragment_with_opcode(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress); -uws_sendstatus_t uws_ws_send_last_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); -void uws_ws_end(int ssl, uws_websocket_t *ws, int code, const char *message, size_t length); -void uws_ws_cork(int ssl, uws_websocket_t *ws, void (*handler)(void *user_data), void *user_data); - -bool uws_ws_subscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length); -bool uws_ws_unsubscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length); -bool uws_ws_is_subscribed(int ssl, uws_websocket_t *ws, const char *topic, size_t length); -void uws_ws_iterate_topics(int ssl, uws_websocket_t *ws, void (*callback)(const char *topic, size_t length, void *user_data), void *user_data); -bool uws_ws_publish(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length); -bool uws_ws_publish_with_options(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); -int uws_ws_get_buffered_amount(int ssl, uws_websocket_t *ws); -size_t uws_ws_get_remote_address(int ssl, uws_websocket_t *ws, const char **dest); -size_t uws_ws_get_remote_address_as_text(int ssl, uws_websocket_t *ws, const char **dest); -""" -) - -library_extension = "dll" if platform.system().lower() == "windows" else "so" -library_path = os.path.join( - os.path.dirname(__file__), - "libsocketify_%s_%s.%s" - % ( - platform.system().lower(), - "arm64" if "arm" in platform.processor().lower() else "amd64", - library_extension, - ), -) - - -lib = ffi.dlopen(library_path) @ffi.callback("void(const char*, size_t, void*)") @@ -1838,12 +1584,12 @@ class AppResponse: def write_status(self, status_or_status_text): if not self.aborted: if isinstance(status_or_status_text, int): - try: - data = status_codes[status_or_status_text] - except: # invalid status - raise RuntimeError( - '"%d" Is not an valid Status Code' % status_or_status_text - ) + if bool(lib.socketify_res_write_int_status(self.SSL, self.res, status_or_status_text)): + return self + raise RuntimeError( + '"%d" Is not an valid Status Code' % status_or_status_text + ) + elif isinstance(status_or_status_text, str): data = status_or_status_text.encode("utf-8") elif isinstance(status_or_status_text, bytes): diff --git a/src/socketify/ssgi.py b/src/socketify/ssgi.py new file mode 100644 index 0000000..128a55c --- /dev/null +++ b/src/socketify/ssgi.py @@ -0,0 +1,292 @@ +from socketify import App, CompressOptions, OpCode +from typing import Union, Callable, Awaitable, Optional +import inspect +from queue import SimpleQueue + + +class SSGIHttpResponse: + extensions: Optional[dict] = None # extensions for http + + def __init__(self, res, req, extensions = None): + self.res = res + self.req = req + self._need_cork = False + self._received_queue = None + self._miss_receive_queue = None + self.extensions = extensions + # if payload is None, request ends without body + # if has_more is True, data is written but connection will not end + def send(self, payload: Union[str, bytes, bytearray, memoryview, None], has_more: Optional[bool] = False): + if has_more: + self.res.write(payload) + else: + self.res.end(payload) + + # send chunk of data, can be used to perform with less backpressure than using send + # total_size is the sum of all lenghts in bytes of all chunks to be sended + # connection will end when total_size is met + # returns tuple(bool, bool) first bool represents if the chunk is succefully sended, the second if the connection has ended + def send_chunk(self, chunk: Union[bytes, bytearray, memoryview], total_size: int) -> Awaitable: + return self.res.send_chunk(chunk, total_size) + + # send status code + def send_status(self, status_code: Optional[Union[int, str]] = '200 OK'): + self.res.write_status(status_code) + + # send headers to the http response + def send_headers(self, headers): + for name, value in headers: + self.res.write_header(name, value) + + # ensure async call for the handler, passing any arguments to it + def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + self.req.get_headers() # preserve headers + return self.res.run_async(handler(*arguments)) + + # get an all data + # returns an BytesIO() or None if no payload is available + def get_data(self) -> Awaitable: + if self.res.get_header("content-length", False) or self.res.get_header("transfer-encoding", False): + return self.res.get_data() + + #return empty result + future = self.res.loop.create_future() + future.set_result(None) + return future + + # get an chunk of data (chunk size is decided by the Server implementation) + # returns the bytes or None if no more chunks are sent + def get_chunk(self) -> Awaitable: + if not self._received_queue: + self._miss_receive_queue = SimpleQueue() + self._received_queue = SimpleQueue() + def on_data(res, chunk, is_end): + if not self._received_queue.empty(): + future = self._received_queue.get(False) + future.set_result(chunk) + if not self._received_queue.empty() and is_end and chunk: + future = self._received_queue.get(False) + future.set_result(None) + return + else: + self._miss_receive_queue.put(chunk, False) + + if is_end and chunk: + self._miss_receive_queue.put(None, False) + + future = self.res.loop.create_future() + self._received_queue.put(future, False) + self.res.on_data(on_data) + return future + else: + future = self.res.loop.create_future() + if not self._miss_receive_queue.empty(): + future.set_result(self._miss_receive_queue.get(False)) + return future + self._received_queue.put(future, False) + return future + + # on aborted event, calle when the connection abort + def on_aborted(self, handler: Union[Awaitable, Callable], *arguments): + def on_aborted(res): + res.aborted = True + if inspect.iscoroutinefunction(handler): + res.run_async(handler(*arguments)) + else: + handler(*arguments) + + self.res.on_aborted(on_aborted) + +class SSGIWebSocket: + status: int = 0 # 0 pending upgrade, 1 rejected, 2 closed, 3 accepted + extensions: Optional[dict] = None # extensions for websocket + def __init__(self, res, req, socket_context, ws, extensions = None): + self.res = res + self.req = req + self.status = 0 + self.extensions = extensions + self._socket_context = socket_context + self._key = self.req.get_header("sec-websocket-key") + self._protocol = self.req.get_header("sec-websocket-protocol") + self._extensions = self.req.get_header("sec-websocket-extensions") + self._close_handler = None + self._receive_handler = None + self._need_cork = False + self._accept_future = None + + # accept the connection upgrade + # can pass the protocol to accept if None is informed will use sec-websocket-protocol header if available + def accept(self, protocol: str = None) -> Awaitable: + if self.status == 0: + self._accept_future = self.res.loop.create_future() + upgrade_protocol = protocol if protocol else self._protocol + + self.res.upgrade(self._key, upgrade_protocol if upgrade_protocol else "", self._extensions, self._socket_context, self) + return self._accept_future + future = self.res.loop.create_future() + future.set_result(False) + return future + + # reject the connection upgrade, you can send status_code, payload and headers if you want, all optional + def reject(self, status_code: Optional[int] = 403, payload = None, headers = None) -> Awaitable: + + future = self.res.loop.create_future() + if self.status < 1: + self.status = 1 + if headers: + for name, value in headers: + self.res.write_header(name, value) + if not payload: + self.res.write_status(status_code).end_without_body() + else: + self.res.write_status(status_code).cork_end(payload) + future.set_result(True) + else: + future.set_result(False) + return future + + # if returns an future, this can be awaited or not + def send(self, payload: Union[bytes, bytearray, memoryview]): + if self.status == 3: + if self._need_cork: + self.ws.cork_send(payload) + else: + self.ws.send(payload) + + # close connection + def close(self, code: Optional[int] = 1000): + if self.status == 3: + self.ws.close() + return True + return False + # ensure async call for the handler, passing any arguments to it + def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + self.req.get_headers() + self._need_cork = True + return self.res.run_async(handler(*arguments)) + + # on receive event, called when the socket disconnect + # passes ws: SSGIWebSocket, msg: Union[str, bytes, bytearray, memoryview, None], *arguments + def on_receive(self, handler: Union[Awaitable, Callable], *arguments): + def on_receive_handler(ws, message, opcode): + if inspect.iscoroutinefunction(handler): + ws.res.run_async(handler(ws, message, *arguments)) + else: + handler(ws, message, *arguments) + self._receive_handler = on_receive_handler + + # on close event, called when the socket disconnect + # passes ws: SSGIWebSocket, code: int and reason: Optional[str] = None, *arguments + def on_close(self, handler: Union[Awaitable, Callable], *arguments): + def on_close_handler(ws, code, message): + if inspect.iscoroutinefunction(handler): + ws.res.run_async(handler(ws, code, message, *arguments)) + else: + handler(ws, code, message, *arguments) + + self._close_handler = on_close_handler + +class SSGI: + def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): + self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) + self.SERVER_PORT = None + self.SERVER_HOST = '' + self.SERVER_SCHEME = 'https' if self.server.options else 'http' + self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' + self.SERVER_ADDRESS = '' + + self.app = app + support = app.get_supported({ "ssgi": "1.0" }) + http, middleware = support.get('http', (None, None)) + websocket, ws_middleware = support.get('websocket', (None, None)) + + def ssgi(res, req): + + response = SSGIHttpResponse(res, req) + PATH_INFO = req.get_url() + # FULL_PATH_INFO = req.get_full_url() + METHOD = req.get_method() + QUERY_STRING = "" #FULL_PATH_INFO[len(PATH_INFO):] + + # REMOTE_ADDRESS = res.get_remote_address() + def get_header(name = None): + if name: + return req.get_header(name) + else: + return req.get_headers() + # self.SERVER_SCHEME, self.SERVER_ADDRESS, + # self.SERVER_SCHEME, self.SERVER_ADDRESS, + if inspect.iscoroutinefunction(middleware): + req.get_headers() # preserve + res.run_async(middleware('http', METHOD, PATH_INFO, QUERY_STRING, get_header, response)) + else: + middleware('http', METHOD, PATH_INFO, QUERY_STRING, get_header, response) + # if not response._responded: + # res.grab_aborted_handler() + + if http == "ssgi" and middleware: + self.server.any("/*", ssgi) + + def ws_upgrade(res, req, socket_context): + response = SSGIWebSocket(res, req, socket_context, None) + PATH_INFO = req.get_url() + FULL_PATH_INFO = req.get_full_url() + METHOD = req.get_method() + + REMOTE_ADDRESS = req.get_remote_address() + def get_header(name = None): + if name: + return req.get_header(name) + else: + return req.get_headers() + if inspect.iscoroutinefunction(ws_middleware): + req.get_headers() # preserve + res.run_async(ws_middleware('websocket', self.SERVER_SCHEME, self.SERVER_ADDRESS, REMOTE_ADDRESS, METHOD, PATH_INFO, FULL_PATH_INFO[len(PATH_INFO):], get_header, response)) + return + else: + ws_middleware('websocket', self.SERVER_WS_SCHEME, self.SERVER_HOST, REMOTE_ADDRESS, METHOD, PATH_INFO, FULL_PATH_INFO[len(PATH_INFO):], get_header, response) + # not accepted (async?) + if response.status == 0 and not response._accept_future: + res.grab_aborted_handler() + + if websocket == "ssgi" and ws_middleware: + def ws_open(ws): + res = ws.get_user_data() + res.ws = ws + res.status = 3 + res._accept_future.set_result(True) + + def ws_message(ws, message, op): + res = ws.get_user_data() + if res._receive_handler: + res._receive_handler(res, message, op) + + def ws_close(ws, code, message): + res = ws.get_user_data() + if res._close_handler: + res._close_handler(res, code, message) + + + self.server.ws("/*", { + "compression": CompressOptions.DISABLED, + "max_payload_length": 16 * 1024 * 1024, + "idle_timeout": 0, + "upgrade": ws_upgrade, + "open": ws_open, + "message": ws_message, + "close": ws_close + }) + + def listen(self, port_or_options, handler): + self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port + self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host + if self.SERVER_PORT: + self.SERVER_ADDRESS = f"{self.SERVER_HOST}:{self.SERVER_PORT}" + else: + self.SERVER_ADDRESS = self.SERVER_HOST + + self.server.listen(port_or_options, handler) + return self + def run(self): + self.server.run() + return self \ No newline at end of file diff --git a/src/socketify/uv.py b/src/socketify/uv.py index 054f520..18bc6cf 100644 --- a/src/socketify/uv.py +++ b/src/socketify/uv.py @@ -1,73 +1,4 @@ -import cffi -import os -import platform - -ffi = cffi.FFI() -ffi.cdef( - """ - - -typedef void (*socketify_prepare_handler)(void* user_data); -typedef void (*socketify_timer_handler)(void* user_data); -typedef void (*socketify_async_handler)(void* user_data); - -typedef enum { - SOCKETIFY_RUN_DEFAULT = 0, - SOCKETIFY_RUN_ONCE, - SOCKETIFY_RUN_NOWAIT -} socketify_run_mode; - -typedef struct { - void* uv_prepare_ptr; - socketify_prepare_handler on_prepare_handler; - void* on_prepare_data; - void* uv_loop; -} socketify_loop; - -typedef struct{ - void* uv_timer_ptr; - socketify_timer_handler handler; - void* user_data; -} socketify_timer; - -typedef struct{ - void* uv_async_ptr; - socketify_async_handler handler; - void* user_data; -} socketify_async; - -socketify_loop * socketify_create_loop(); -bool socketify_constructor_failed(socketify_loop* loop); -bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data); -bool socketify_prepare_unbind(socketify_loop* loop); -void socketify_destroy_loop(socketify_loop* loop); -void* socketify_get_native_loop(socketify_loop* loop); - -int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode); -void socketify_loop_stop(socketify_loop* loop); - -socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void* user_data); -void socketify_timer_destroy(socketify_timer* timer); -bool socketify_async_call(socketify_loop* loop, socketify_async_handler handler, void* user_data); -void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat); - - -socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data); -void socketify_check_destroy(socketify_timer* timer); -""" -) -library_extension = "dll" if platform.system().lower() == "windows" else "so" -library_path = os.path.join( - os.path.dirname(__file__), - "libsocketify_%s_%s.%s" - % ( - platform.system().lower(), - "arm64" if "arm" in platform.processor().lower() else "amd64", - library_extension, - ), -) - -lib = ffi.dlopen(library_path) +from .native import ffi, lib @ffi.callback("void(void *)") diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index 6e1c215..545271b 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -3,66 +3,146 @@ import os from socketify import App from io import BytesIO +from .native import lib, ffi + # Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way # re formatting headers is really slow and dummy, dict ops are really slow +@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") +def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): + data_response = ffi.from_handle(user_data) + if chunk != ffi.NULL: + data_response.buffer.write(ffi.unpack(chunk, chunk_length)) + if bool(is_end): + lib.uws_res_cork(data_response.app.server.SSL, res, wsgi_corked_response_start_handler, data_response._ptr) + + +class WSGIDataResponse: + def __init__(self, app, environ, start_response, aborted, buffer, on_data): + self.buffer = buffer + self.aborted = aborted + self._ptr = ffi.new_handle(self) + self.on_data = on_data + self.environ = environ + self.app = app + self.start_response = start_response + +def write(ssl, res, message): + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + data = b'' + lib.uws_res_write(ssl, res, data, len(data)) + +def write_status(ssl, res, status_text): + if isinstance(status_text, str): + data = status_text.encode("utf-8") + elif isinstance(status_text, bytes): + data = status_text + else: + return False + lib.uws_res_write_status(ssl, res, data, len(data)) + return True + +def write_header(ssl, res, key, value): + if isinstance(key, str): + key_data = key.encode("utf-8") + elif isinstance(key, bytes): + key_data = key + + if isinstance(value, int): + lib.uws_res_write_header_int( + ssl, + res, + key_data, + len(key_data), + ffi.cast("uint64_t", value), + ) + elif isinstance(value, str): + value_data = value.encode("utf-8") + elif isinstance(value, bytes): + value_data = value + lib.uws_res_write_header( + ssl, res, key_data, len(key_data), value_data, len(value_data) + ) + +@ffi.callback("void(uws_res_t*, void*)") +def wsgi_corked_response_start_handler(res, user_data): + data_response = ffi.from_handle(user_data) + data_response.on_data(data_response, res) + +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") +def wsgi(ssl, response, info, user_data, aborted): + app = ffi.from_handle(user_data) + + environ = dict(app.BASIC_ENVIRON) + + environ['REQUEST_METHOD'] = ffi.unpack(info.method, info.method_size).decode('utf8') + environ['PATH_INFO'] = ffi.unpack(info.url, info.url_size).decode('utf8') + environ['QUERY_STRING'] = ffi.unpack(info.query_string, info.query_string_size).decode('utf8') + environ['REMOTE_ADDR'] = ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8') + + next_header = info.header_list + while next_header != ffi.NULL: + header = next_header[0] + name = ffi.unpack(header.name, header.name_size).decode('utf8') + value = ffi.unpack(header.value, header.value_size).decode('utf8') + environ[f"HTTP_{name.replace('-', '_').upper()}"]=value + next_header = ffi.cast("socketify_header*", next_header.next) + def start_response(status, headers): + write_status(ssl, response, status) + for (name, value) in headers: + write_header(ssl, response, name, value) + # #check for body + if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False): + WSGI_INPUT = BytesIO() + environ['wsgi.input'] = WSGI_INPUT + def on_data(data_response, response): + if bool(data_response.aborted[0]): + return + + ssl = data_response.app.server.SSL + app_iter = data_response.app.app(data_response.environ, data_response.start_response) + try: + for data in app_iter: + write(ssl, response, data) + finally: + if hasattr(app_iter, 'close'): + app_iter.close() + lib.uws_res_end_without_body(ssl, response, 0) + + data_response = WSGIDataResponse(app, environ, start_response, aborted, WSGI_INPUT, on_data) + + lib.uws_res_on_data( + ssl, response, wsgi_on_data_handler, data_response._ptr + ) + else: + environ['wsgi.input'] = None + app_iter = app.app(environ, start_response) + try: + for data in app_iter: + write(ssl, response, data) + finally: + if hasattr(app_iter, 'close'): + app_iter.close() + lib.uws_res_end_without_body(ssl, response, 0) + class WSGI: def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) self.SERVER_PORT = None self.app = app self.BASIC_ENVIRON = dict(os.environ) - def wsgi(res, req): - # create environ - PATH_INFO = req.get_url() - FULL_PATH_INFO = req.get_full_url() - environ = dict(self.BASIC_ENVIRON) - environ['REQUEST_METHOD'] = req.get_method() - environ['PATH_INFO'] = PATH_INFO - environ['QUERY_STRING'] = FULL_PATH_INFO[len(PATH_INFO):] - environ['REMOTE_ADDR'] = res.get_remote_address() - - - def start_response(status, headers): - res.write_status(status) - for (name, value) in headers: - res.write_header(name, value) - - - def set_http(name, value): - environ[f"HTTP_{name.replace('-', '_').upper()}"]=value - req.for_each_header(set_http) - - #check for body - if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False): - WSGI_INPUT = BytesIO() - environ['wsgi.input'] = WSGI_INPUT - def on_data(res, chunk, is_end): - if chunk: - WSGI_INPUT.write(chunk) - if is_end: - app_iter = self.app(environ, start_response) - try: - for data in app_iter: - res.write(data) - finally: - if hasattr(app_iter, 'close'): - app_iter.close() - res.end_without_body() - res.on_data(on_data) - else: - environ['wsgi.input'] = None - app_iter = self.app(environ, start_response) - try: - for data in app_iter: - res.write(data) - finally: - if hasattr(app_iter, 'close'): - app_iter.close() - res.end_without_body() - - self.server.any("/*", wsgi) + self._ptr = ffi.new_handle(self) + self.asgi_http_info = lib.socketify_add_asgi_http_handler( + self.server.SSL, + self.server.app, + wsgi, + self._ptr + ) def listen(self, port_or_options, handler): self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port