diff --git a/SSGI.md b/SSGI.md index 0f21b05..a4dfed9 100644 --- a/SSGI.md +++ b/SSGI.md @@ -28,7 +28,7 @@ class SSGIHttpResponse: pass # ensure async call for the handler, passing any arguments to it - def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + def run_async(self, handler: Awaitable, *arguments): pass # get an all data @@ -67,7 +67,7 @@ class SSGIWebSocket: pass # ensure async call for the handler, passing any arguments to it - def run_async(self, handler: Awaitable, *arguments) -> Awaitable: + def run_async(self, handler: Awaitable, *arguments): pass # on receive event, called when the socket disconnect diff --git a/bench/asgi_wsgi/falcon-wsgi.py b/bench/asgi_wsgi/falcon-wsgi.py index 3d7429e..7ea618d 100644 --- a/bench/asgi_wsgi/falcon-wsgi.py +++ b/bench/asgi_wsgi/falcon-wsgi.py @@ -23,4 +23,4 @@ home = Home() app.add_route("/", home) if __name__ == "__main__": - WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=2) \ No newline at end of file + WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=8) \ No newline at end of file diff --git a/bench/asgi_wsgi/raw-asgi.py b/bench/asgi_wsgi/raw-asgi.py index 5cdf117..ae8355c 100644 --- a/bench/asgi_wsgi/raw-asgi.py +++ b/bench/asgi_wsgi/raw-asgi.py @@ -20,4 +20,4 @@ async def app(scope, receive, send): if __name__ == "__main__": - ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(8) diff --git a/bench/asgi_wsgi/raw-wsgi.py b/bench/asgi_wsgi/raw-wsgi.py index 2391eee..6763cb3 100644 --- a/bench/asgi_wsgi/raw-wsgi.py +++ b/bench/asgi_wsgi/raw-wsgi.py @@ -5,4 +5,4 @@ def app(environ, start_response): yield b'Hello, World!\n' if __name__ == "__main__": - WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(8) diff --git a/bench/socketify_plaintext.py b/bench/socketify_plaintext.py index a7db6ab..44ce35d 100644 --- a/bench/socketify_plaintext.py +++ b/bench/socketify_plaintext.py @@ -1,9 +1,10 @@ from socketify import App import os import multiprocessing +import asyncio def run_app(): - app = App(request_response_factory_max_itens=200_000) - def home(res, req): + app = App(request_response_factory_max_items=200_000) + async def home(res, req): res.end("Hello, World!") app.get("/", home) diff --git a/docs/api.md b/docs/api.md index 0e945d0..714f820 100644 --- a/docs/api.md +++ b/docs/api.md @@ -99,6 +99,25 @@ class AppRequest: def is_ancient(self): def __del__(self): ``` +## AppListenOptions +```python +class AppListenOptions: + port: int = 0 + host: str = None + options: int = 0 + domain: str = None +``` +## AppOptions +```python +class AppOptions: + key_file_name: str = None, + cert_file_name: str = None, + passphrase: str = None, + dh_params_file_name: str = None, + ca_file_name: str = None, + ssl_ciphers: str = None, + ssl_prefer_low_memory_usage: int = 0 +``` ## WebSockets ```python @@ -179,6 +198,8 @@ class SendStatus(IntEnum): SUCCESS = 1 DROPPED = 2 ``` + + ## Helpers ```python async def sendfile(res, req, filename) diff --git a/docs/corking.md b/docs/corking.md index e6a87e1..4f8d6ed 100644 --- a/docs/corking.md +++ b/docs/corking.md @@ -1,5 +1,5 @@ ## Corking -It is very important to understand the corking mechanism, as that is responsible for efficiently formatting, packing and sending data. Without corking your app will still work reliably, but can perform very bad and use excessive networking. In some cases the performance can be dreadful without proper corking. +It is very important to understand the corking mechanism, as that is responsible for efficiently formatting, packing and sending data. Corking is packing many sends into one single syscall/SSL block, without corking your app will still work reliably, but can perform very bad and use excessive networking. In some cases the performance can be dreadful without proper corking. That's why your sockets will be corked by default in most simple cases, including all of the examples provided. However there are cases where default corking cannot happen automatically. diff --git a/docs/ssl.md b/docs/ssl.md index d1a859e..408709a 100644 --- a/docs/ssl.md +++ b/docs/ssl.md @@ -1,3 +1,34 @@ -Support is already there, docs Coming soon... -### Next [API Reference](api.md) \ No newline at end of file +# SSL / HTTPS + +Is really easy to pass and key_file_name, cert_file_name and passphrase to get SSL working, you can also pass ssl_ciphers, ca_file_name to it. + +```python +from socketify import App, AppOptions + +app = App( + AppOptions( + key_file_name="./misc/key.pem", + cert_file_name="./misc/cert.pem", + passphrase="1234", + ) +) +app.get("/", lambda res, req: res.end("Hello World socketify from Python!")) +app.listen( + 3000, + lambda config: print("Listening on port https://localhost:%d now\n" % config.port), +) +app.run() +``` + +```python +class AppOptions: + key_file_name: str = None, + cert_file_name: str = None, + passphrase: str = None, + dh_params_file_name: str = None, + ca_file_name: str = None, + ssl_ciphers: str = None, + ssl_prefer_low_memory_usage: int = 0 +``` +### Next [CLI Reference](cli.md) \ No newline at end of file diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 1dc2e3b..4ab4ef2 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,9 +1,15 @@ from socketify import App, CompressOptions, OpCode from queue import SimpleQueue from .native import lib, ffi +from .tasks import create_task, create_task_with_factory import os +import platform +import sys + +is_pypy = platform.python_implementation() == "PyPy" + +EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False} -EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False } @ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") def ws_message(ws, message, length, opcode, user_data): @@ -14,25 +20,35 @@ def ws_message(ws, message, length, opcode, user_data): socket_data.message(ws, message, OpCode(opcode)) + @ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") def ws_close(ws, code, message, length, user_data): socket_data = ffi.from_handle(user_data) message = None if message == ffi.NULL else ffi.unpack(message, length) socket_data.disconnect(code, message) - + + @ffi.callback("void(uws_websocket_t*, void*)") def ws_open(ws, user_data): socket_data = ffi.from_handle(user_data) socket_data.open(ws) -@ffi.callback("void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*, bool*)") -def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): + +@ffi.callback( + "void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*, bool*)" +) +def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): app = ffi.from_handle(user_data) headers = [] - next_header = info.header_list + next_header = info.header_list while next_header != ffi.NULL: header = next_header[0] - headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) + headers.append( + ( + ffi.unpack(header.name, header.name_size), + ffi.unpack(header.value, header.value_size), + ) + ) next_header = ffi.cast("socketify_header*", next_header.next) url = ffi.unpack(info.url, info.url_size) @@ -40,58 +56,76 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): if info.key == ffi.NULL: key = None else: - key = ffi.unpack(info.key, info.key_size).decode('utf8') - + key = ffi.unpack(info.key, info.key_size).decode("utf8") + if info.protocol == ffi.NULL: protocol = None else: - protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8') + protocol = ffi.unpack(info.protocol, info.protocol_size).decode("utf8") if info.extensions == ffi.NULL: extensions = None else: - extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8') + extensions = ffi.unpack(info.extensions, info.extensions_size).decode("utf8") compress = app.ws_compression ws = ASGIWebSocket(app.server.loop) scope = { - 'type': 'websocket', - 'asgi': { - 'version': '3.0', - 'spec_version': '2.3' - }, - 'http_version': '1.1', - 'server': (app.SERVER_HOST, app.SERVER_PORT), - 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), - 'scheme': app.SERVER_WS_SCHEME, - 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), - 'root_path': '', - 'path': url.decode('utf8'), - 'raw_path': url, - 'query_string': ffi.unpack(info.query_string, info.query_string_size), - 'headers': headers, - 'subprotocols': [protocol] if protocol else [], - 'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True } + "type": "websocket", + "asgi": {"version": "3.0", "spec_version": "2.3"}, + "http_version": "1.1", + "server": (app.SERVER_HOST, app.SERVER_PORT), + "client": ( + ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"), + None, + ), + "scheme": app.SERVER_WS_SCHEME, + "method": ffi.unpack(info.method, info.method_size).decode("utf8"), + "root_path": "", + "path": url.decode("utf8"), + "raw_path": url, + "query_string": ffi.unpack(info.query_string, info.query_string_size), + "headers": headers, + "subprotocols": [protocol] if protocol else [], + "extensions": { + "websocket.publish": True, + "websocket.subscribe": True, + "websocket.unsubscribe": True, + }, } + async def send(options): - if bool(aborted[0]): return False - type = options['type'] - if type == 'websocket.send': - data = options.get("bytes", None) - if ws.ws: + if bool(aborted[0]): + return False + type = options["type"] + if type == "websocket.send": + data = options.get("bytes", None) + if ws.ws: if data: - lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.BINARY), int(compress), 0) + lib.socketify_ws_cork_send_with_options( + ssl, + ws.ws, + data, + len(data), + int(OpCode.BINARY), + int(compress), + 0, + ) else: - data = options.get('text', '').encode('utf8') - lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0) + data = options.get("text", "").encode("utf8") + lib.socketify_ws_cork_send_with_options( + ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0 + ) return True - return False - if type == 'websocket.accept': # upgrade! - res_headers = options.get('headers', None) + return False + if type == "websocket.accept": # upgrade! + res_headers = options.get("headers", None) if res_headers: cork_data = ffi.new_handle((ssl, res_headers)) - lib.uws_res_cork(ssl, response, uws_asgi_corked_ws_accept_handler, cork_data) + lib.uws_res_cork( + ssl, response, uws_asgi_corked_ws_accept_handler, cork_data + ) future = ws.accept() - upgrade_protocol = options.get('subprotocol', protocol) + upgrade_protocol = options.get("subprotocol", protocol) if isinstance(key, str): sec_web_socket_key_data = key.encode("utf-8") @@ -113,7 +147,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): sec_web_socket_extensions_data = extensions else: sec_web_socket_extensions_data = b"" - + lib.uws_res_upgrade( ssl, response, @@ -127,23 +161,25 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): socket_context, ) return await future - if type == 'websocket.close': # code and reason? - if ws.ws: + if type == "websocket.close": # code and reason? + if ws.ws: lib.uws_ws_close(ssl, ws.ws) - else: + else: cork_data = ffi.new_handle(ssl) lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) return True - if type == 'websocket.publish': # publish extension + if type == "websocket.publish": # publish extension data = options.get("bytes", None) - if data: - app.server.publish(options.get('topic'), data, OpCode.BINARY, compress) + if data: + app.server.publish(options.get("topic"), data, OpCode.BINARY, compress) else: - app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT, compress) + app.server.publish( + options.get("topic"), options.get("text", ""), OpCode.TEXT, compress + ) return True - if type == 'websocket.subscribe': # subscribe extension - if ws.ws: - topic = options.get('topic') + if type == "websocket.subscribe": # subscribe extension + if ws.ws: + topic = options.get("topic") if isinstance(topic, str): data = topic.encode("utf-8") elif isinstance(topic, bytes): @@ -152,13 +188,13 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): return False return bool(lib.uws_ws_subscribe(ssl, ws.ws, data, len(data))) - else: + else: cork_data = ffi.new_handle(ssl) lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) return True - if type == 'websocket.unsubscribe': # unsubscribe extension - if ws.ws: - topic = options.get('topic') + if type == "websocket.unsubscribe": # unsubscribe extension + if ws.ws: + topic = options.get("topic") if isinstance(topic, str): data = topic.encode("utf-8") elif isinstance(topic, bytes): @@ -167,12 +203,14 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): return False return bool(lib.uws_ws_unsubscribe(ssl, ws.ws, data, len(data))) - else: + else: cork_data = ffi.new_handle(ssl) lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) return True return False - app.server.run_async(app.app(scope, ws.receive, send)) + + app._run_task(app.app(scope, ws.receive, send)) + @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): @@ -180,15 +218,15 @@ def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response.is_end = bool(is_end) more_body = not data_response.is_end result = { - 'type': 'http.request', - 'body': b'' if chunk == ffi.NULL else ffi.unpack(chunk, chunk_length), - 'more_body': more_body + "type": "http.request", + "body": b"" if chunk == ffi.NULL else ffi.unpack(chunk, chunk_length), + "more_body": more_body, } data_response.queue.put(result, False) data_response.next_data_future.set_result(result) if more_body: data_response.next_data_future = data_response.loop.create_future() - + class ASGIDataQueue: def __init__(self, loop): @@ -197,7 +235,7 @@ class ASGIDataQueue: self.loop = loop self.is_end = False self.next_data_future = loop.create_future() - + class ASGIWebSocket: def __init__(self, loop): @@ -207,9 +245,7 @@ class ASGIWebSocket: self._disconnected = False self.receive_queue = SimpleQueue() self.miss_receive_queue = SimpleQueue() - self.miss_receive_queue.put({ - 'type': 'websocket.connect' - }, False) + self.miss_receive_queue.put({"type": "websocket.connect"}, False) self._code = None self._message = None self._ptr = ffi.new_handle(self) @@ -222,24 +258,25 @@ class ASGIWebSocket: self.ws = ws if not self.accept_future.done(): self.accept_future.set_result(True) - + def receive(self): future = self.loop.create_future() if not self.miss_receive_queue.empty(): future.set_result(self.miss_receive_queue.get(False)) return future if self._disconnected: - future.set_result({ - 'type': 'websocket.disconnect', - 'code': self._code, - 'message': self._message - }) + future.set_result( + { + "type": "websocket.disconnect", + "code": self._code, + "message": self._message, + } + ) return future self.receive_queue.put(future, False) return future - def disconnect(self, code, message): self.ws = None self._disconnected = True @@ -247,55 +284,57 @@ class ASGIWebSocket: self._message = message if not self.receive_queue.empty(): future = self.receive_queue.get(False) - future.set_result({ - 'type': 'websocket.disconnect', - 'code': code, - 'message': message - }) - - + future.set_result( + {"type": "websocket.disconnect", "code": code, "message": message} + ) def message(self, ws, value, opcode): self.ws = ws if self.receive_queue.empty(): if opcode == OpCode.TEXT: - self.miss_receive_queue.put({ - 'type': 'websocket.receive', - 'text': value - }, False) - elif opcode == OpCode.BINARY: - self.miss_receive_queue.put({ - 'type': 'websocket.receive', - 'bytes': value - }, False) + self.miss_receive_queue.put( + {"type": "websocket.receive", "text": value}, False + ) + elif opcode == OpCode.BINARY: + self.miss_receive_queue.put( + {"type": "websocket.receive", "bytes": value}, False + ) return True - future = self.receive_queue.get(False) if opcode == OpCode.TEXT: - future.set_result({ - 'type': 'websocket.receive', - 'text': value - }) - elif opcode == OpCode.BINARY: - future.set_result({ - 'type': 'websocket.receive', - 'bytes': value - }) - - + future.set_result({"type": "websocket.receive", "text": value}) + elif opcode == OpCode.BINARY: + future.set_result({"type": "websocket.receive", "bytes": value}) + def write_header(ssl, res, key, value): - if isinstance(key, str): - if key.lower() == "content-length": return #auto - if key.lower() == "transfer-encoding": return #auto - key_data = key.encode("utf-8") - elif isinstance(key, bytes): - if key.lower() == b'content-length': return #auto - if key.lower() == b'transfer-encoding': return #auto + if isinstance(key, bytes): + # this is faster than using .lower() + if ( + key == b"content-length" + or key == b"Content-Length" + or key == b"Transfer-Encoding" + or key == b"transfer-encoding" + ): + return # auto key_data = key + elif isinstance(key, str): + # this is faster than using .lower() + if ( + key == "content-length" + or key == "Content-Length" + or key == "Transfer-Encoding" + or key == "transfer-encoding" + ): + return # auto + key_data = key.encode("utf-8") - if isinstance(value, int): + if isinstance(value, bytes): + value_data = value + elif isinstance(value, str): + value_data = value.encode("utf-8") + elif isinstance(value, int): lib.uws_res_write_header_int( ssl, res, @@ -303,188 +342,232 @@ def write_header(ssl, res, key, value): len(key_data), ffi.cast("uint64_t", value), ) - elif isinstance(value, str): - value_data = value.encode("utf-8") - elif isinstance(value, bytes): - value_data = value lib.uws_res_write_header( ssl, res, key_data, len(key_data), value_data, len(value_data) ) + @ffi.callback("void(uws_res_t*, void*)") def uws_asgi_corked_response_start_handler(res, user_data): (ssl, status, headers) = ffi.from_handle(user_data) - lib.socketify_res_write_int_status(ssl, res, int(status)) + if status != 200: + lib.socketify_res_write_int_status(ssl, res, int(status)) for name, value in headers: write_header(ssl, res, name, value) - write_header(ssl, res, b'Server', b'socketify.py') + write_header(ssl, res, b"Server", b"socketify.py") + @ffi.callback("void(uws_res_t*, void*)") def uws_asgi_corked_accept_handler(res, user_data): (ssl, status, headers) = ffi.from_handle(user_data) - lib.socketify_res_write_int_status(ssl, res, int(status)) + if status != 200: + lib.socketify_res_write_int_status(ssl, res, int(status)) for name, value in headers: write_header(ssl, res, name, value) - write_header(ssl, res, b'Server', b'socketify.py') + write_header(ssl, res, b"Server", b"socketify.py") + @ffi.callback("void(uws_res_t*, void*)") def uws_asgi_corked_ws_accept_handler(res, user_data): (ssl, headers) = ffi.from_handle(user_data) for name, value in headers: write_header(ssl, res, name, value) - write_header(ssl, res, b'Server', b'socketify.py') + write_header(ssl, res, b"Server", b"socketify.py") + @ffi.callback("void(uws_res_t*, void*)") def uws_asgi_corked_403_handler(res, user_data): ssl = ffi.from_handle(user_data) lib.socketify_res_write_int_status(ssl, res, int(403)) lib.uws_res_end_without_body(ssl, res, 0) - + @ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*, bool*)") -def asgi(ssl, response, info, user_data, aborted): +def asgi(ssl, response, info, user_data, aborted): app = ffi.from_handle(user_data) headers = [] next_header = info.header_list while next_header != ffi.NULL: header = next_header[0] - headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) + headers.append( + ( + ffi.unpack(header.name, header.name_size), + ffi.unpack(header.value, header.value_size), + ) + ) next_header = ffi.cast("socketify_header*", next_header.next) url = ffi.unpack(info.url, info.url_size) scope = { - 'type': 'http', - 'asgi': { - 'version': '3.0', - 'spec_version': '2.3' - }, - 'http_version': '1.1', - 'server': (app.SERVER_HOST, app.SERVER_PORT), - 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), - 'scheme': app.SERVER_SCHEME, - 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), - 'root_path': '', - 'path': url.decode('utf8'), - 'raw_path': url, - 'query_string': ffi.unpack(info.query_string, info.query_string_size), - 'headers': headers + "type": "http", + "asgi": {"version": "3.0", "spec_version": "2.3"}, + "http_version": "1.1", + "server": (app.SERVER_HOST, app.SERVER_PORT), + "client": ( + ffi.unpack(info.remote_address, info.remote_address_size).decode("utf8"), + None, + ), + "scheme": app.SERVER_SCHEME, + "method": ffi.unpack(info.method, info.method_size).decode("utf8"), + "root_path": "", + "path": url.decode("utf8"), + "raw_path": url, + "query_string": ffi.unpack(info.query_string, info.query_string_size), + "headers": headers, } if bool(info.has_content): - data_queue = ASGIDataQueue(app.server.loop) - lib.uws_res_on_data( - ssl, response, asgi_on_data_handler, data_queue._ptr - ) + data_queue = ASGIDataQueue(app.server.loop) + lib.uws_res_on_data(ssl, response, asgi_on_data_handler, data_queue._ptr) else: data_queue = None + async def receive(): - if bool(aborted[0]): - return { 'type': 'http.disconnect'} + if bool(aborted[0]): + return {"type": "http.disconnect"} if data_queue: - if data_queue.queue.empty(): + if data_queue.queue.empty(): if not data_queue.is_end: - #wait for next item + # wait for next item await data_queue.next_data_future - return await receive() #consume again because multiple receives maybe called + return ( + await receive() + ) # consume again because multiple receives maybe called else: - return data_queue.queue.get(False) #consume queue + return data_queue.queue.get(False) # consume queue # no more body, just empty return EMPTY_RESPONSE + async def send(options): - if bool(aborted[0]): + if bool(aborted[0]): return False - type = options['type'] - if type == 'http.response.start': - #can also be more native optimized to do it in one GIL call - #try socketify_res_write_int_status_with_headers and create and socketify_res_cork_write_int_status_with_headers - status_code = options.get('status', 200) - headers = options.get('headers', []) + type = options["type"] + if type == "http.response.start": + # can also be more native optimized to do it in one GIL call + # try socketify_res_write_int_status_with_headers and create and socketify_res_cork_write_int_status_with_headers + status_code = options.get("status", 200) + headers = options.get("headers", []) cork_data = ffi.new_handle((ssl, status_code, headers)) - lib.uws_res_cork(ssl, response, uws_asgi_corked_response_start_handler, cork_data) + lib.uws_res_cork( + ssl, response, uws_asgi_corked_response_start_handler, cork_data + ) return True - if type == 'http.response.body': + if type == "http.response.body": - #native optimized end/send - message = options.get('body', b'') - - if isinstance(message, str): - data = message.encode("utf-8") - elif isinstance(message, bytes): - data = message + # native optimized end/send + message = options.get("body", b"") + + if options.get("more_body", False): + if isinstance(message, bytes): + lib.socketify_res_cork_write(ssl, response, message, len(message)) + elif isinstance(message, str): + data = message.encode("utf-8") + lib.socketify_res_cork_write(ssl, response, data, len(data)) else: - data = b'' - if options.get('more_body', False): - lib.socketify_res_cork_write(ssl, response, data, len(data)) - else: - lib.socketify_res_cork_end(ssl, response, data, len(data), 0) + if isinstance(message, bytes): + lib.socketify_res_cork_end(ssl, response, message, len(message), 0) + elif isinstance(message, str): + data = message.encode("utf-8") + lib.socketify_res_cork_end(ssl, response, data, len(data), 0) + return True return False - - app.server.loop.run_async(app.app(scope, receive, send)) + + app._run_task(app.app(scope, receive, send)) + + class _ASGI: - def __init__(self, app, options=None, websocket=True, websocket_options=None): - self.server = App(options) + def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=0): + self.server = App(options) self.SERVER_PORT = None - self.SERVER_HOST = '' - self.SERVER_SCHEME = 'https' if self.server.options else 'http' - self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' + self.SERVER_HOST = "" + self.SERVER_SCHEME = "https" if self.server.options else "http" + self.SERVER_WS_SCHEME = "wss" if self.server.options else "ws" + self.task_factory_max_items = task_factory_max_items + loop = self.server.loop.loop + # ASGI do not use app.run_async to not add any overhead from socketify.py WebFramework + # internally will still use custom task factory for pypy because of Loop + if is_pypy: + if task_factory_max_items > 0: + factory = create_task_with_factory(task_factory_max_items) + + def run_task(task): + factory(loop, task) + loop._run_once() + self._run_task = run_task + else: + def run_task(task): + create_task(loop, task) + loop._run_once() + self._run_task = run_task + + else: + if sys.version_info >= (3, 8): # name fixed to avoid dynamic name + def run_task(task): + loop.create_task(task, name='socketify.py-request-task') + loop._run_once() + self._run_task = run_task + else: + def run_task(task): + loop.create_task(task) + loop._run_once() + self._run_task = run_task + self.app = app self.ws_compression = False # optimized in native self._ptr = ffi.new_handle(self) self.asgi_http_info = lib.socketify_add_asgi_http_handler( - self.server.SSL, - self.server.app, - asgi, - self._ptr + self.server.SSL, self.server.app, asgi, self._ptr ) self.asgi_ws_info = None - if isinstance(websocket, dict): #serve websocket as socketify.py + if isinstance(websocket, dict): # serve websocket as socketify.py if websocket_options: websocket.update(websocket_options) self.server.ws("/*", websocket) - elif websocket: #serve websocket as ASGI - + elif websocket: # serve websocket as ASGI + native_options = ffi.new("uws_socket_behavior_t *") native_behavior = native_options[0] if not websocket_options: websocket_options = {} - self.ws_compression = bool(websocket_options.get('compression', False)) - + self.ws_compression = bool(websocket_options.get("compression", False)) + native_behavior.maxPayloadLength = ffi.cast( "unsigned int", - int(websocket_options.get('max_payload_length', 16777216)), + int(websocket_options.get("max_payload_length", 16777216)), ) native_behavior.idleTimeout = ffi.cast( "unsigned short", - int(websocket_options.get('idle_timeout', 20)), + int(websocket_options.get("idle_timeout", 20)), ) native_behavior.maxBackpressure = ffi.cast( "unsigned int", - int(websocket_options.get('max_backpressure', 16777216)), + int(websocket_options.get("max_backpressure", 16777216)), ) native_behavior.compression = ffi.cast( "uws_compress_options_t", int(self.ws_compression) ) native_behavior.maxLifetime = ffi.cast( - "unsigned short", int(websocket_options.get('max_lifetime', 0)) + "unsigned short", int(websocket_options.get("max_lifetime", 0)) ) native_behavior.closeOnBackpressureLimit = ffi.cast( - "int", int(websocket_options.get('close_on_backpressure_limit', 0)), + "int", + int(websocket_options.get("close_on_backpressure_limit", 0)), ) native_behavior.resetIdleTimeoutOnSend = ffi.cast( - "int", bool(websocket_options.get('reset_idle_timeout_on_send', True)) + "int", bool(websocket_options.get("reset_idle_timeout_on_send", True)) ) native_behavior.sendPingsAutomatically = ffi.cast( - "int", bool(websocket_options.get('send_pings_automatically', True)) + "int", bool(websocket_options.get("send_pings_automatically", True)) ) - native_behavior.upgrade = ffi.NULL # will be set first on C++ + native_behavior.upgrade = ffi.NULL # will be set first on C++ native_behavior.open = ws_open native_behavior.message = ws_message @@ -493,19 +576,23 @@ class _ASGI: native_behavior.close = ws_close self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( - self.server.SSL, - self.server.app, - native_behavior, - ws_upgrade, - self._ptr + self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr ) def listen(self, port_or_options, handler=None): - self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port - self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host + self.SERVER_PORT = ( + port_or_options + if isinstance(port_or_options, int) + else port_or_options.port + ) + self.SERVER_HOST = ( + "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host + ) self.server.listen(port_or_options, handler) return self + def run(self): + # scope = {"type": "lifespan", "asgi": {"version": "3.0", "spec_version": "2.3"}} self.server.run() # run app on the main process too :) return self @@ -515,38 +602,51 @@ class _ASGI: if self.asgi_ws_info: lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) + # "Public" ASGI interface to allow easy forks/workers class ASGI: - def __init__(self, app, options=None, websocket=True, websocket_options=None): + def __init__( + self, + app, + options=None, + websocket=True, + websocket_options=None, + task_factory_max_items=100_000, #default = 100k = +20mib in memory + ): self.app = app self.options = options self.websocket = websocket self.websocket_options = websocket_options self.listen_options = None - + self.task_factory_max_items = task_factory_max_items + def listen(self, port_or_options, handler=None): self.listen_options = (port_or_options, handler) return self def run(self, workers=1): - def run_app(): - server = _ASGI(self.app, self.options, self.websocket, self.websocket_options) + def run_task(): + server = _ASGI( + self.app, + self.options, + self.websocket, + self.websocket_options, + self.task_factory_max_items, + ) if self.listen_options: (port_or_options, handler) = self.listen_options server.listen(port_or_options, handler) server.run() - def create_fork(): n = os.fork() # n greater than 0 means parent process if not n > 0: - run_app() - + run_task() # fork limiting the cpu count - 1 for i in range(1, workers): create_fork() - run_app() # run app on the main process too :) - return self \ No newline at end of file + run_task() # run app on the main process too :) + return self diff --git a/src/socketify/cli.py b/src/socketify/cli.py index 57d2605..d524b18 100644 --- a/src/socketify/cli.py +++ b/src/socketify/cli.py @@ -12,6 +12,8 @@ Options: --port or -p INTEGER Bind socket to this port. [default: 8000] --workers or -w INTEGER Number of worker processes. Defaults to the WEB_CONCURRENCY environment variable if available, or 1 + + --uds TEXT Bind to a UNIX domain socket, this options disables --host or -h and --port or -p. --ws [auto|none|module:ws] If informed module:ws will auto detect to use socketify.py or ASGI websockets interface and disabled if informed none [default: auto] --ws-max-size INTEGER WebSocket max size message in bytes [default: 16777216] @@ -33,21 +35,19 @@ Options: --ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1] --req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0] --ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0] - - --uds TEXT Bind to a UNIX domain socket, this options disables --host or -h and --port or -p. - --reload Enable auto-reload. This options also disable --workers or -w option. - --reload-dir PATH Set reload directories explicitly, instead of using the current working directory. - --reload-include TEXT Set extensions to include while watching for files. - Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default; - these defaults can be overridden with `--reload-exclude`. - --reload-exclude TEXT Set extensions to include while watching for files. - --reload-delay INT Milliseconds to delay reload between file changes. [default: 1000] Example: python3 -m socketify main:app -w 8 -p 8181 """ + # --reload Enable auto-reload. This options also disable --workers or -w option. + # --reload-dir PATH Set reload directories explicitly, instead of using the current working directory. + # --reload-include TEXT Set extensions to include while watching for files. + # Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default; + # these defaults can be overridden with `--reload-exclude`. + # --reload-exclude TEXT Set extensions to include while watching for files. + # --reload-delay INT Milliseconds to delay reload between file changes. [default: 1000] def is_wsgi(module): return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 2 def is_asgi(module): @@ -108,9 +108,14 @@ def execute(args): if selected_option: options[selected_option] = option selected_option = None + elif option.startswith('--'): + if selected_option is None: + selected_option = option + else: # --factory, --reload etc + options[selected_option] = True else: - selected_option = option - if selected_option: # --factory + return print(f"Invalid option ${selected_option} see --help") + if selected_option: # --factory, --reload etc options[selected_option] = True interface = (options.get("--interface", "auto")).lower() @@ -262,28 +267,7 @@ def execute(args): os.kill(pid, signal.SIGINT) else: - # def on_change(): - # auto_reload - - def create_app(): - #Generic WSGI, ASGI, SSGI Interface - if uds: - app = Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(domain=uds), listen_log) - else: - app = Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log) - return app - - - if auto_reload: - force_reload = False - app = None - - while auto_reload: - app = create_app() - app.run() - if not force_reload: - auto_reload = False - + if uds: + Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers) else: - app = create_app() - app.run(workers=workers) + Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers) diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 9eb32f6..d459e1f 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -1,14 +1,15 @@ import asyncio import logging -import threading +from .tasks import create_task, create_task_with_factory from .uv import UVLoop import asyncio +import platform -def future_handler(future, loop, exception_handler, response): +is_pypy = platform.python_implementation() == 'PyPy' +async def task_wrapper(exception_handler, loop, response, task): try: - future.result() - return None + return await task except Exception as error: if hasattr(exception_handler, "__call__"): exception_handler(loop, error, response) @@ -20,12 +21,17 @@ def future_handler(future, loop, exception_handler, response): response.write_status(500).end("Internal Error") finally: return None - return None class Loop: - def __init__(self, exception_handler=None): - self.loop = asyncio.get_event_loop() + def __init__(self, exception_handler=None, task_factory_max_items=0): + + # get the current running loop or create a new one without warnings + self.loop = asyncio._get_running_loop() + if self.loop is None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.uv_loop = UVLoop() if hasattr(exception_handler, "__call__"): @@ -37,6 +43,23 @@ class Loop: self.exception_handler = None self.started = False + if is_pypy: # PyPy async Optimizations + if task_factory_max_items > 0: # Only available in PyPy for now + self._task_factory = create_task_with_factory(task_factory_max_items) + else: + self._task_factory = create_task + self.run_async = self._run_async_pypy + # custom task factory + def pypy_task_factory(loop, coro, context=None): + return create_task(loop, coro, context=context) + self.loop.set_task_factory(pypy_task_factory) + else: + # CPython performs worse using custom create_task, so native create_task is used + # but this also did not allow the use of create_task_with_factory :/ + # native create_task do not allow to change context, callbacks, state etc + + self.run_async = self._run_async_cpython + def set_timeout(self, timeout, callback, user_data): return self.uv_loop.create_timer(timeout, 0, callback, user_data) @@ -49,12 +72,24 @@ class Loop: self.uv_loop.run_once() self.loop.call_soon(self._keep_alive) - def run(self): + def create_task(self, *args, **kwargs): + # this is not using optimized create_task yet + return self.loop.create_task(*args, **kwargs) + + def ensure_future(self, task): + return asyncio.ensure_future(task, loop=self.loop) + + def run(self, task=None): self.started = True + if task is not None: + future = self.ensure_future(task) + else: + future = None self.loop.call_soon(self._keep_alive) self.loop.run_forever() # clean up uvloop self.uv_loop.stop() + return future def run_once(self): # run one step of asyncio @@ -75,19 +110,22 @@ class Loop: return self.uv_loop.get_native_loop() - def run_async(self, task, response=None): - # with run_once - future = asyncio.ensure_future(task, loop=self.loop) - - # with threads - future.add_done_callback( - lambda f: future_handler(f, self.loop, self.exception_handler, response) - ) + def _run_async_pypy(self, task, response=None): + # this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler + # using an coroutine wrapper generates less overhead than using add_done_callback + # this is an custom task/future with less overhead + future = self._task_factory(self.loop, task_wrapper(self.exception_handler, self.loop, response, task)) # force asyncio run once to enable req in async functions before first await self.loop._run_once() + return None # this future maybe already done and reused not safe to await - return future - + def _run_async_cpython(self, task, response=None): + # this garanties error 500 in case of uncaught exceptions, and can trigger the custom error handler + # using an coroutine wrapper generates less overhead than using add_done_callback + future = self.loop.create_task(task_wrapper(self.exception_handler, self.loop, response, task)) + # force asyncio run once to enable req in async functions before first await + self.loop._run_once() + return None # this future is safe to await but we return None for compatibility, and in the future will be the same behavior as PyPy def dispose(self): if self.uv_loop: self.uv_loop.dispose() diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index f4fd8ca..76d771c 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -52,12 +52,17 @@ def uws_websocket_factory_drain_handler(ws, user_data): try: handler = handlers.drain if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws)) - if dispose: - def when_finished(_): - app._ws_factory.dispose(instances) - future.add_done_callback(when_finished) + if dispose: + async def wrapper(app, instances, handler, ws): + try: + await handler(ws) + finally: + app._ws_factory.dispose(instances) + + app.run_async(wrapper(app, instances, handler, ws)) + else: + app.run_async(handler(ws)) else: handler(ws) if dispose: @@ -95,12 +100,16 @@ def uws_websocket_factory_open_handler(ws, user_data): try: handler = handlers.open if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws)) if dispose: - def when_finished(_): - app._ws_factory.dispose(instances) + async def wrapper(app, instances, handler, ws): + try: + await handler(ws) + finally: + app._ws_factory.dispose(instances) - future.add_done_callback(when_finished) + app.run_async(wrapper(app, instances, handler, ws)) + else: + app.run_async(handler(ws)) else: handler(ws) if dispose: @@ -147,12 +156,16 @@ def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data handler = handlers.message if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws, data, opcode)) if dispose: - def when_finished(_): - app._ws_factory.dispose(instances) + async def wrapper(app, instances, handler, ws, data): + try: + await handler(ws, data) + finally: + app._ws_factory.dispose(instances) - future.add_done_callback(when_finished) + app.run_async(wrapper(app, instances, handler, ws, data)) + else: + app.run_async(handler(ws, data)) else: handler(ws, data, opcode) if dispose: @@ -206,12 +219,16 @@ def uws_websocket_factory_pong_handler(ws, message, length, user_data): handler = handlers.pong if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws, data)) if dispose: - def when_finished(_): - app._ws_factory.dispose(instances) + async def wrapper(app, instances, handler, ws, data): + try: + await handler(ws, data) + finally: + app._ws_factory.dispose(instances) - future.add_done_callback(when_finished) + app.run_async(wrapper(app, instances, handler, ws, data)) + else: + app.run_async(handler(ws, data)) else: handler(ws, data) if dispose: @@ -260,12 +277,16 @@ def uws_websocket_factory_ping_handler(ws, message, length, user_data): handler = handlers.ping if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws, data)) if dispose: - def when_finished(_): - app._ws_factory.dispose(instances) + async def wrapper(app, instances, handler, ws, data): + try: + await handler(ws, data) + finally: + app._ws_factory.dispose(instances) - future.add_done_callback(when_finished) + app.run_async(wrapper(app, instances, handler, ws, data)) + else: + app.run_async(handler(ws, data)) else: handler(ws, data) if dispose: @@ -323,21 +344,22 @@ def uws_websocket_factory_close_handler(ws, code, message, length, user_data): return if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws, int(code), data)) + async def wrapper(app, instances, handler, ws, data, code, dispose): + try: + await handler(ws, code, data) + finally: + key = ws.get_user_data_uuid() + if key is not None: + app._socket_refs.pop(key, None) + if dispose: + app._ws_factory.dispose(instances) - def when_finished(_): - key = ws.get_user_data_uuid() - if key is not None: - SocketRefs.pop(key, None) - if dispose: - app._ws_factory.dispose(instances) - - future.add_done_callback(when_finished) + app.run_async(wrapper(app, instances, handler, ws, data, int(code), dispose)) else: handler(ws, int(code), data) key = ws.get_user_data_uuid() if key is not None: - SocketRefs.pop(key, None) + app._socket_refs.pop(key, None) if dispose: app._ws_factory.dispose(instances) @@ -365,19 +387,20 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): return if inspect.iscoroutinefunction(handler): - future = app.run_async(handler(ws, int(code), data)) - - def when_finished(_): - key = ws.get_user_data_uuid() - if key is not None: - SocketRefs.pop(key, None) - - future.add_done_callback(when_finished) + async def wrapper(app, handler, ws, data, code, dispose): + try: + await handler(ws, code, data) + finally: + key = ws.get_user_data_uuid() + if key is not None: + app._socket_refs.pop(key, None) + + app.run_async(wrapper(app, handler, ws, data, int(code))) else: handler(ws, int(code), data) key = ws.get_user_data_uuid() if key is not None: - SocketRefs.pop(key, None) + app._socket_refs.pop(key, None) except Exception as err: logging.error( @@ -394,12 +417,16 @@ def uws_generic_factory_method_handler(res, req, user_data): try: if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() - future = response.run_async(handler(response, request)) if dispose: - def when_finished(_): - app._factory.dispose(instances) + async def wrapper(app, instances, handler, response, request): + try: + await handler(response, request) + finally: + app._factory.dispose(instances) - future.add_done_callback(when_finished) + response.run_async(wrapper(app, instances, handler, response, request)) + else: + response.run_async(handler(response, request)) else: handler(response, request) if dispose: @@ -421,12 +448,17 @@ def uws_websocket_factory_upgrade_handler(res, req, context, user_data): handler = handlers.upgrade if inspect.iscoroutinefunction(handler): - future = response.run_async(handler(response, request, context)) + response.grab_aborted_handler() if dispose: - def when_finished(_): - app._factory.dispose(instances) + async def wrapper(app, instances, handler, response, request, context): + try: + await handler(response, request, context) + finally: + app._factadd_done_callbackory.dispose(instances) - future.add_done_callback(when_finished) + response.run_async(wrapper(app, instances, handler, response, request, context)) + else: + response.run_async(handler(response, request, context)) else: handler(response, request, context) if dispose: @@ -490,12 +522,17 @@ def uws_generic_factory_method_handler(res, req, user_data): try: if inspect.iscoroutinefunction(handler): response.grab_aborted_handler() - future = response.run_async(handler(response, request)) + response.grab_aborted_handler() if dispose: - def when_finished(_): - app._factory.dispose(instances) + async def wrapper(app, instances, handler, response, request): + try: + await handler(response, request) + finally: + app._factory.dispose(instances) - future.add_done_callback(when_finished) + response.run_async(wrapper(app, instances, handler, response, request)) + else: + response.run_async(handler(response, request)) else: handler(response, request) if dispose: @@ -671,10 +708,6 @@ class SendStatus(IntEnum): DROPPED = 2 -# dict to keep socket data alive until closed if needed -SocketRefs = {} - - class WebSocket: def __init__(self, websocket, ssl, loop): self.ws = websocket @@ -1150,6 +1183,9 @@ class AppRequest: return self._headers def get_header(self, lower_case_header): + if self._headers is not None: + return self._headers.get(lower_case_header, None) + if isinstance(lower_case_header, str): data = lower_case_header.encode("utf-8") elif isinstance(lower_case_header, bytes): @@ -1763,11 +1799,13 @@ class AppResponse: class App: - def __init__(self, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0): + def __init__(self, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0, task_factory_max_items=100_000): socket_options_ptr = ffi.new("struct us_socket_context_options_t *") socket_options = socket_options_ptr[0] self.options = options self._template = None + # keep socket data alive for CFFI + self._socket_refs = {} if options is not None: self.is_ssl = True self.SSL = ffi.cast("int", 1) @@ -1810,7 +1848,8 @@ class App: self.loop = Loop( - lambda loop, context, response: self.trigger_error(context, response, None) + lambda loop, context, response: self.trigger_error(context, response, None), + task_factory_max_items ) # set async loop to be the last created (is thread_local), App must be one per thread otherwise will use only the lasted loop diff --git a/src/socketify/ssgi.py b/src/socketify/ssgi.py index 693d0b8..3787d4b 100644 --- a/src/socketify/ssgi.py +++ b/src/socketify/ssgi.py @@ -187,8 +187,8 @@ class SSGIWebSocket: self._close_handler = on_close_handler class SSGI: - def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0): - self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens) + def __init__(self, app, options=None, request_response_factory_max_items=0, websocket_factory_max_itens=0): + self.server = App(options, request_response_factory_max_items, websocket_factory_max_itens) self.SERVER_PORT = None self.SERVER_HOST = '' self.SERVER_SCHEME = 'https' if self.server.options else 'http' diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index e6bc7d4..f7b9090 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -5,14 +5,20 @@ from .asgi import ws_close, ws_upgrade, ws_open, ws_message from io import BytesIO from .native import lib, ffi + @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response = ffi.from_handle(user_data) if chunk != ffi.NULL: data_response.buffer.write(ffi.unpack(chunk, chunk_length)) if bool(is_end): - lib.uws_res_cork(data_response.app.server.SSL, res, wsgi_corked_response_start_handler, data_response._ptr) - + lib.uws_res_cork( + data_response.app.server.SSL, + res, + wsgi_corked_response_start_handler, + data_response._ptr, + ) + class WSGIDataResponse: def __init__(self, app, environ, start_response, aborted, buffer, on_data): @@ -24,138 +30,157 @@ class WSGIDataResponse: self.app = app self.start_response = start_response -def write(ssl, res, message): - if isinstance(message, str): - data = message.encode("utf-8") - elif isinstance(message, bytes): - data = message - else: - data = b'' - lib.uws_res_write(ssl, res, data, len(data)) - -def write_status(ssl, res, status_text): - if isinstance(status_text, str): - data = status_text.encode("utf-8") - elif isinstance(status_text, bytes): - data = status_text - else: - return False - lib.uws_res_write_status(ssl, res, data, len(data)) - return True - -def write_header(ssl, res, key, value): - if isinstance(key, str): - if key.lower() == "content-length": return #auto - if key.lower() == "transfer-encoding": return #auto - key_data = key.encode("utf-8") - elif isinstance(key, bytes): - if key.lower() == b'content-length': return #auto - if key.lower() == b'transfer-encoding': return #auto - key_data = key - - if isinstance(value, int): - lib.uws_res_write_header_int( - ssl, - res, - key_data, - len(key_data), - ffi.cast("uint64_t", value), - ) - elif isinstance(value, str): - value_data = value.encode("utf-8") - elif isinstance(value, bytes): - value_data = value - lib.uws_res_write_header( - ssl, res, key_data, len(key_data), value_data, len(value_data) - ) - @ffi.callback("void(uws_res_t*, void*)") def wsgi_corked_response_start_handler(res, user_data): data_response = ffi.from_handle(user_data) data_response.on_data(data_response, res) -@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") -def wsgi(ssl, response, info, user_data, aborted): - app = ffi.from_handle(user_data) +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") +def wsgi(ssl, response, info, user_data, aborted): + app = ffi.from_handle(user_data) + # reusing the dict is slower than cloning because we need to clear HTTP headers environ = dict(app.BASIC_ENVIRON) - environ['REQUEST_METHOD'] = ffi.unpack(info.method, info.method_size).decode('utf8') - environ['PATH_INFO'] = ffi.unpack(info.url, info.url_size).decode('utf8') - environ['QUERY_STRING'] = ffi.unpack(info.query_string, info.query_string_size).decode('utf8') - environ['REMOTE_ADDR'] = ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8') + environ["REQUEST_METHOD"] = ffi.unpack(info.method, info.method_size).decode("utf8") + environ["PATH_INFO"] = ffi.unpack(info.url, info.url_size).decode("utf8") + environ["QUERY_STRING"] = ffi.unpack( + info.query_string, info.query_string_size + ).decode("utf8") + if info.remote_address != ffi.NULL: + environ["REMOTE_ADDR"] = ffi.unpack( + info.remote_address, info.remote_address_size + ).decode("utf8") + else: + environ["REMOTE_ADDR"] = "127.0.0.1" - next_header = info.header_list + next_header = info.header_list while next_header != ffi.NULL: header = next_header[0] - name = ffi.unpack(header.name, header.name_size).decode('utf8') - value = ffi.unpack(header.value, header.value_size).decode('utf8') + name = ffi.unpack(header.name, header.name_size).decode("utf8") + value = ffi.unpack(header.value, header.value_size).decode("utf8") # this conversion should be optimized in future - environ[f"HTTP_{name.replace('-', '_').upper()}"]=value + environ[f"HTTP_{name.replace('-', '_').upper()}"] = value next_header = ffi.cast("socketify_header*", next_header.next) + def start_response(status, headers): - write_status(ssl, response, status) - for (name, value) in headers: - write_header(ssl, response, name, value) - write_header(ssl, response, b'Server', b'socketify.py') + if isinstance(status, str): + data = status.encode("utf-8") + lib.uws_res_write_status(ssl, response, data, len(data)) + elif isinstance(status, bytes): + lib.uws_res_write_status(ssl, response, status, len(status)) + + for (key, value) in headers: + if isinstance(key, str): + # this is faster than using .lower() + if ( + key == "content-length" + or key == "Content-Length" + or key == "Transfer-Encoding" + or key == "transfer-encoding" + ): + continue # auto + key_data = key.encode("utf-8") + elif isinstance(key, bytes): + # this is faster than using .lower() + if ( + key == b"content-length" + or key == b"Content-Length" + or key == b"Transfer-Encoding" + or key == b"transfer-encoding" + ): + continue # auto + key_data = key + + + if isinstance(value, str): + value_data = value.encode("utf-8") + elif isinstance(value, bytes): + value_data = value + elif isinstance(value, int): + lib.uws_res_write_header_int( + ssl, + response, + key_data, + len(key_data), + ffi.cast("uint64_t", value), + ) + continue + + lib.uws_res_write_header( + ssl, response, key_data, len(key_data), value_data, len(value_data) + ) + lib.uws_res_write_header( + ssl, response, b'Server', 6, b'socketify.py', 12 + ) + # check for body - if bool(info.has_content): + if bool(info.has_content): WSGI_INPUT = BytesIO() - environ['wsgi.input'] = WSGI_INPUT + environ["wsgi.input"] = WSGI_INPUT + def on_data(data_response, response): if bool(data_response.aborted[0]): return ssl = data_response.app.server.SSL - app_iter = data_response.app.app(data_response.environ, data_response.start_response) + app_iter = data_response.app.app( + data_response.environ, data_response.start_response + ) try: for data in app_iter: - write(ssl, response, data) + if isinstance(data, bytes): + lib.uws_res_write(ssl, response, data, len(data)) + elif isinstance(data, str): + data = data.encode("utf-8") + lib.uws_res_write(ssl, response, data, len(data)) + + finally: - if hasattr(app_iter, 'close'): + if hasattr(app_iter, "close"): app_iter.close() lib.uws_res_end_without_body(ssl, response, 0) - - data_response = WSGIDataResponse(app, environ, start_response, aborted, WSGI_INPUT, on_data) - lib.uws_res_on_data( - ssl, response, wsgi_on_data_handler, data_response._ptr + data_response = WSGIDataResponse( + app, environ, start_response, aborted, WSGI_INPUT, on_data ) + + lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr) else: - environ['wsgi.input'] = None + environ["wsgi.input"] = None app_iter = app.app(environ, start_response) try: for data in app_iter: - write(ssl, response, data) + if isinstance(data, bytes): + lib.uws_res_write(ssl, response, data, len(data)) + elif isinstance(data, str): + data = data.encode("utf-8") + lib.uws_res_write(ssl, response, data, len(data)) finally: - if hasattr(app_iter, 'close'): + if hasattr(app_iter, "close"): app_iter.close() lib.uws_res_end_without_body(ssl, response, 0) class _WSGI: - def __init__(self, app, options=None, websocket=None, websocket_options=None): - self.server = App(options) + self.server = App(options) self.SERVER_HOST = None self.SERVER_PORT = None - self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws' + self.SERVER_WS_SCHEME = "wss" if self.server.options else "ws" self.app = app self.BASIC_ENVIRON = dict(os.environ) self.ws_compression = False self._ptr = ffi.new_handle(self) self.asgi_http_info = lib.socketify_add_asgi_http_handler( - self.server.SSL, - self.server.app, - wsgi, - self._ptr + self.server.SSL, self.server.app, wsgi, self._ptr ) self.asgi_ws_info = None - if isinstance(websocket, dict): #serve websocket as socketify.py + if isinstance(websocket, dict): # serve websocket as socketify.py if websocket_options: websocket.update(websocket_options) - + self.server.ws("/*", websocket) elif inspect.iscoroutine(websocket): # detect ASGI to use as WebSocket as mixed protocol @@ -165,37 +190,38 @@ class _WSGI: if not websocket_options: websocket_options = {} - self.ws_compression = websocket_options.get('compression', False) - + self.ws_compression = websocket_options.get("compression", False) + native_behavior.maxPayloadLength = ffi.cast( "unsigned int", - int(websocket_options.get('max_payload_length', 16777216)), + int(websocket_options.get("max_payload_length", 16777216)), ) native_behavior.idleTimeout = ffi.cast( "unsigned short", - int(websocket_options.get('idle_timeout', 20)), + int(websocket_options.get("idle_timeout", 20)), ) native_behavior.maxBackpressure = ffi.cast( "unsigned int", - int(websocket_options.get('max_backpressure', 16777216)), + int(websocket_options.get("max_backpressure", 16777216)), ) native_behavior.compression = ffi.cast( "uws_compress_options_t", int(self.ws_compression) ) native_behavior.maxLifetime = ffi.cast( - "unsigned short", int(websocket_options.get('max_lifetime', 0)) + "unsigned short", int(websocket_options.get("max_lifetime", 0)) ) native_behavior.closeOnBackpressureLimit = ffi.cast( - "int", int(websocket_options.get('close_on_backpressure_limit', 0)), + "int", + int(websocket_options.get("close_on_backpressure_limit", 0)), ) native_behavior.resetIdleTimeoutOnSend = ffi.cast( - "int", bool(websocket_options.get('reset_idle_timeout_on_send', True)) + "int", bool(websocket_options.get("reset_idle_timeout_on_send", True)) ) native_behavior.sendPingsAutomatically = ffi.cast( - "int", bool(websocket_options.get('send_pings_automatically', True)) + "int", bool(websocket_options.get("send_pings_automatically", True)) ) - native_behavior.upgrade = ffi.NULL # will be set first on C++ + native_behavior.upgrade = ffi.NULL # will be set first on C++ native_behavior.open = ws_open native_behavior.message = ws_message @@ -204,35 +230,39 @@ class _WSGI: native_behavior.close = ws_close self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( - self.server.SSL, - self.server.app, - native_behavior, - ws_upgrade, - self._ptr + self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr ) - def listen(self, port_or_options, handler=None): - self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port - self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host - self.BASIC_ENVIRON.update({ - 'GATEWAY_INTERFACE': 'CGI/1.1', - 'SERVER_PORT': str(self.SERVER_PORT), - 'SERVER_SOFTWARE': 'WSGIServer/0.2', - 'wsgi.input': None, - 'wsgi.errors': None, - 'wsgi.version': (1, 0), - 'wsgi.run_once': False, - 'wsgi.url_scheme': 'https' if self.server.options else 'http', - 'wsgi.multithread': False, - 'wsgi.multiprocess': False, - 'wsgi.file_wrapper': None, # No file wrapper support for now - 'SCRIPT_NAME': '', - 'SERVER_PROTOCOL': 'HTTP/1.1', - 'REMOTE_HOST': '', - }) + self.SERVER_PORT = ( + port_or_options + if isinstance(port_or_options, int) + else port_or_options.port + ) + self.SERVER_HOST = ( + "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host + ) + self.BASIC_ENVIRON.update( + { + "GATEWAY_INTERFACE": "CGI/1.1", + "SERVER_PORT": str(self.SERVER_PORT), + "SERVER_SOFTWARE": "WSGIServer/0.2", + "wsgi.input": None, + "wsgi.errors": None, + "wsgi.version": (1, 0), + "wsgi.run_once": False, + "wsgi.url_scheme": "https" if self.server.options else "http", + "wsgi.multithread": False, + "wsgi.multiprocess": False, + "wsgi.file_wrapper": None, # No file wrapper support for now + "SCRIPT_NAME": "", + "SERVER_PROTOCOL": "HTTP/1.1", + "REMOTE_HOST": "", + } + ) self.server.listen(port_or_options, handler) return self + def run(self): self.server.run() return self @@ -243,6 +273,7 @@ class _WSGI: if self.asgi_ws_info: lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) + # "Public" WSGI interface to allow easy forks/workers class WSGI: def __init__(self, app, options=None, websocket=None, websocket_options=None): @@ -251,31 +282,30 @@ class WSGI: self.websocket = websocket self.websocket_options = websocket_options self.listen_options = None - - + def listen(self, port_or_options, handler=None): self.listen_options = (port_or_options, handler) return self def run(self, workers=1): def run_app(): - server = _WSGI(self.app, self.options, self.websocket, self.websocket_options) + server = _WSGI( + self.app, self.options, self.websocket, self.websocket_options + ) if self.listen_options: (port_or_options, handler) = self.listen_options server.listen(port_or_options, handler) server.run() - def create_fork(): n = os.fork() # n greater than 0 means parent process if not n > 0: run_app() - # fork limiting the cpu count - 1 for i in range(1, workers): create_fork() run_app() # run app on the main process too :) - return self \ No newline at end of file + return self