diff --git a/bench/asgi_wsgi/falcon-asgi.py b/bench/asgi_wsgi/falcon-asgi.py index 5f82afd..dc4d133 100644 --- a/bench/asgi_wsgi/falcon-asgi.py +++ b/bench/asgi_wsgi/falcon-asgi.py @@ -8,6 +8,7 @@ class Home: resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override resp.text = "Hello, World!" async def on_post(self, req, resp): + # curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://localhost:8000/ raw_data = await req.stream.read() print("data", raw_data) resp.status = falcon.HTTP_200 # This is the default status diff --git a/bench/asgi_wsgi/falcon-ws.py b/bench/asgi_wsgi/falcon-ws.py index 5a204b0..839be60 100644 --- a/bench/asgi_wsgi/falcon-ws.py +++ b/bench/asgi_wsgi/falcon-ws.py @@ -43,10 +43,12 @@ class SomeResource: app = falcon.asgi.App() app.ws_options.max_receive_queue = 20_000_000# this virtual disables queue but adds overhead -app.ws_options.enable_buffered_receiver = True # this disable queue but for now only available on cirospaciari/falcon +app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon app.add_route("/", SomeResource()) # python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker # pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker if __name__ == "__main__": ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run() + +# 126550 \ No newline at end of file diff --git a/bench/asgi_wsgi/raw-ws.py b/bench/asgi_wsgi/raw-ws.py index 4ba527c..ad103d4 100644 --- a/bench/asgi_wsgi/raw-ws.py +++ b/bench/asgi_wsgi/raw-ws.py @@ -34,7 +34,7 @@ async def app(scope, receive, send): if type == 'websocket.disconnect': print("disconnected!", scope) break - + # echo! await send({ 'type': 'websocket.send', diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 3116669..8c458c0 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -1,11 +1,203 @@ from socketify import App, CompressOptions, OpCode from queue import SimpleQueue from .native import lib, ffi +import asyncio -# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way -# re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False } +@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") +def ws_message(ws, message, length, opcode, user_data): + socket_data = ffi.from_handle(user_data) + message = None if message == ffi.NULL else ffi.unpack(message, length) + if opcode == OpCode.TEXT: + message = message.decode("utf8") + + socket_data.message(ws, message, OpCode(opcode)) + +@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") +def ws_close(ws, code, message, length, user_data): + socket_data = ffi.from_handle(user_data) + message = None if message == ffi.NULL else ffi.unpack(message, length) + socket_data.disconnect(code, message) + +@ffi.callback("void(uws_websocket_t*, void*)") +def ws_open(ws, user_data): + socket_data = ffi.from_handle(user_data) + socket_data.open(ws) + +@ffi.callback("void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*, bool*)") +def ws_upgrade(ssl, response, info, socket_context, user_data, aborted): + app = ffi.from_handle(user_data) + headers = [] + next_header = info.header_list + while next_header != ffi.NULL: + header = next_header[0] + headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) + next_header = ffi.cast("socketify_header*", next_header.next) + + url = ffi.unpack(info.url, info.url_size) + + if info.key == ffi.NULL: + key = None + else: + key = ffi.unpack(info.key, info.key_size).decode('utf8') + + if info.protocol == ffi.NULL: + protocol = None + else: + protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8') + if info.extensions == ffi.NULL: + extensions = None + else: + extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8') + ws = ASGIWebSocket(app.server.loop) + scope = { + 'type': 'websocket', + 'asgi': { + 'version': '3.0', + 'spec_version': '2.3' + }, + 'http_version': '1.1', + 'server': (app.SERVER_HOST, app.SERVER_PORT), + 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), + 'scheme': app.SERVER_WS_SCHEME, + 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), + 'root_path': '', + 'path': url.decode('utf8'), + 'raw_path': url, + 'query_string': ffi.unpack(info.query_string, info.query_string_size), + 'headers': headers, + 'subprotocols': [protocol] if protocol else [], + 'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True } + } + async def send(options): + if bool(aborted[0]): return False + type = options['type'] + if type == 'websocket.send': + data = options.get("bytes", None) + if ws.ws: + if data: + lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.BINARY)) + else: + data = options.get('text', '').encode('utf8') + lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.TEXT)) + return True + return False + if type == 'websocket.accept': # upgrade! + res_headers = options.get('headers', None) + if res_headers: + cork_data = ffi.new_handle((ssl, res_headers)) + lib.uws_res_cork(ssl, response, uws_asgi_corked_ws_accept_handler, cork_data) + + future = ws.accept() + upgrade_protocol = options.get('subprotocol', protocol) + + if isinstance(key, str): + sec_web_socket_key_data = key.encode("utf-8") + elif isinstance(key, bytes): + sec_web_socket_key_data = key + else: + sec_web_socket_key_data = b"" + + if isinstance(upgrade_protocol, str): + sec_web_socket_protocol_data = upgrade_protocol.encode("utf-8") + elif isinstance(upgrade_protocol, bytes): + sec_web_socket_protocol_data = upgrade_protocol + else: + sec_web_socket_protocol_data = b"" + + if isinstance(extensions, str): + sec_web_socket_extensions_data = extensions.encode("utf-8") + elif isinstance(extensions, bytes): + sec_web_socket_extensions_data = extensions + else: + sec_web_socket_extensions_data = b"" + + lib.uws_res_upgrade( + ssl, + response, + ws._ptr, + sec_web_socket_key_data, + len(sec_web_socket_key_data), + sec_web_socket_protocol_data, + len(sec_web_socket_protocol_data), + sec_web_socket_extensions_data, + len(sec_web_socket_extensions_data), + socket_context, + ) + return await future + if type == 'websocket.close': # code and reason? + if ws.ws: + lib.uws_ws_close(ssl, ws.ws) + else: + cork_data = ffi.new_handle(ssl) + lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) + return True + if type == 'websocket.publish': # publish extension + data = options.get("bytes", None) + if data: + app.server.publish(options.get('topic'), data) + else: + app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT) + return True + if type == 'websocket.subscribe': # subscribe extension + if ws.ws: + topic = options.get('topic') + if isinstance(topic, str): + data = topic.encode("utf-8") + elif isinstance(topic, bytes): + data = topic + else: + return False + + return bool(lib.uws_ws_subscribe(ssl, ws.ws, data, len(data))) + else: + cork_data = ffi.new_handle(ssl) + lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) + return True + if type == 'websocket.unsubscribe': # unsubscribe extension + if ws.ws: + topic = options.get('topic') + if isinstance(topic, str): + data = topic.encode("utf-8") + elif isinstance(topic, bytes): + data = topic + else: + return False + + return bool(lib.uws_ws_unsubscribe(ssl, ws.ws, data, len(data))) + else: + cork_data = ffi.new_handle(ssl) + lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data) + return True + return False + app.server.run_async(app.app(scope, ws.receive, send)) + +@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") +def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): + data_response = ffi.from_handle(user_data) + data_response.is_end = bool(is_end) + more_body = not data_response.is_end + result = { + 'type': 'http.request', + 'body': b'' if chunk == ffi.NULL else ffi.unpack(chunk, chunk_length), + 'more_body': more_body + } + data_response.queue.put(result, False) + data_response.next_data_future.set_result(result) + if more_body: + data_response.next_data_future = data_response.loop.create_future() + + +class ASGIDataQueue: + def __init__(self, loop): + self.queue = SimpleQueue() + self._ptr = ffi.new_handle(self) + self.loop = loop + self.is_end = False + self.next_data_future = loop.create_future() + + class ASGIWebSocket: def __init__(self, loop): self.loop = loop @@ -19,6 +211,7 @@ class ASGIWebSocket: }, False) self._code = None self._message = None + self._ptr = ffi.new_handle(self) def accept(self): self.accept_future = self.loop.create_future() @@ -93,8 +286,10 @@ class ASGIWebSocket: def write_header(ssl, res, key, value): if isinstance(key, str): + if key == "content-length": return #auto key_data = key.encode("utf-8") elif isinstance(key, bytes): + if key == b'content-length': return #auto key_data = key if isinstance(value, int): @@ -121,13 +316,34 @@ def uws_asgi_corked_response_start_handler(res, user_data): write_header(ssl, res, name, value) write_header(ssl, res, b'Server', b'socketify.py') +@ffi.callback("void(uws_res_t*, void*)") +def uws_asgi_corked_accept_handler(res, user_data): + (ssl, status, headers) = ffi.from_handle(user_data) + lib.socketify_res_write_int_status(ssl, res, int(status)) + for name, value in headers: + write_header(ssl, res, name, value) + write_header(ssl, res, b'Server', b'socketify.py') -@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)") +@ffi.callback("void(uws_res_t*, void*)") +def uws_asgi_corked_ws_accept_handler(res, user_data): + (ssl, headers) = ffi.from_handle(user_data) + for name, value in headers: + write_header(ssl, res, name, value) + write_header(ssl, res, b'Server', b'socketify.py') + +@ffi.callback("void(uws_res_t*, void*)") +def uws_asgi_corked_403_handler(res, user_data): + ssl = ffi.from_handle(user_data) + lib.socketify_res_write_int_status(ssl, res, int(403)) + lib.uws_res_end_without_body(ssl, res, 0) + + +@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*, bool*)") def asgi(ssl, response, info, user_data, aborted): app = ffi.from_handle(user_data) headers = [] - next_header = info.header_list + next_header = info.header_list while next_header != ffi.NULL: header = next_header[0] headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) @@ -150,19 +366,26 @@ def asgi(ssl, response, info, user_data, aborted): 'query_string': ffi.unpack(info.query_string, info.query_string_size), 'headers': headers } + if bool(info.has_content): + data_queue = ASGIDataQueue(app.server.loop) + lib.uws_res_on_data( + ssl, response, asgi_on_data_handler, data_queue._ptr + ) + else: + data_queue = None async def receive(): if bool(aborted[0]): return { 'type': 'http.disconnect'} - # if scope.get("content-length", False) or scope.get("transfer-encoding", False): - # data = await res.get_data() - # if data: - # # all at once but could get in chunks - # return { - # 'type': 'http.request', - # 'body': data.getvalue(), - # 'more_body': False - # } - # no body, just empty + if data_queue: + if data_queue.queue.empty(): + if not data_queue.is_end: + #wait for next item + await data_queue.next_data_future + return await receive() #consume again because multiple receives maybe called + else: + return data_queue.queue.get(False) #consume queue + + # no more body, just empty return EMPTY_RESPONSE async def send(options): if bool(aborted[0]): @@ -214,116 +437,54 @@ class ASGI: self._ptr ) - def ws_upgrade(res, req, socket_context): - info = lib.socketify_asgi_ws_request(res.SSL, req.req, res.res) - - headers = [] - next_header = info.header_list - while next_header != ffi.NULL: - header = next_header[0] - headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size))) - next_header = ffi.cast("socketify_header*", next_header.next) + native_options = ffi.new("uws_socket_behavior_t *") + native_behavior = native_options[0] + + native_behavior.maxPayloadLength = ffi.cast( + "unsigned int", + 16 * 1024 * 1024, + ) + native_behavior.idleTimeout = ffi.cast( + "unsigned short", + 0, + ) + native_behavior.maxBackpressure = ffi.cast( + "unsigned int", + 1024 * 1024 * 1024, + ) + native_behavior.compression = ffi.cast( + "uws_compress_options_t", 0 + ) + native_behavior.maxLifetime = ffi.cast( + "unsigned short", 0 + ) + native_behavior.closeOnBackpressureLimit = ffi.cast( + "int", 0 + ) + native_behavior.resetIdleTimeoutOnSend = ffi.cast( + "int", 0 + ) + native_behavior.sendPingsAutomatically = ffi.cast( + "int", 0 + ) - url = ffi.unpack(info.url, info.url_size) + native_behavior.upgrade = ffi.NULL # will be set first on C++ - if info.key == ffi.NULL: - key = None - else: - key = ffi.unpack(info.key, info.key_size).decode('utf8') - - if info.protocol == ffi.NULL: - protocol = None - else: - protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8') - if info.extensions == ffi.NULL: - extensions = None - else: - extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8') + native_behavior.open = ws_open + native_behavior.message = ws_message + native_behavior.ping = ffi.NULL + native_behavior.pong = ffi.NULL + native_behavior.close = ws_close + + self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( + self.server.SSL, + self.server.app, + native_behavior, + ws_upgrade, + self._ptr + ) - ws = ASGIWebSocket(self.server.loop) - - scope = { - 'type': 'websocket', - 'asgi': { - 'version': '3.0', - 'spec_version': '2.3' - }, - 'http_version': '1.1', - 'server': (self.SERVER_HOST, self.SERVER_PORT), - 'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None), - 'scheme': self.SERVER_WS_SCHEME, - 'method': ffi.unpack(info.method, info.method_size).decode('utf8'), - 'root_path': '', - 'path': url.decode('utf8'), - 'raw_path': url, - 'query_string': ffi.unpack(info.query_string, info.query_string_size), - 'headers': headers, - 'subprotocols': [protocol] if protocol else [], - 'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True } - } - lib.socketify_destroy_headers(info.header_list) - async def send(options): - if res.aborted: return False - type = options['type'] - if type == 'websocket.send': - bytes = options.get("bytes", None) - - if ws.ws: - if bytes: - ws.ws.cork_send(bytes, OpCode.BINARY) - else: - ws.ws.cork_send(options.get('text', ''), OpCode.TEXT) - return True - return False - - if type == 'websocket.accept': # upgrade! - res_headers = options.get('headers', None) - def corked(res): - for header in res_headers: - res.write_header(header[0], header[1]) - if res_headers: - res.cork(corked) - - future = ws.accept() - upgrade_protocol = options.get('subprotocol', protocol) - res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws) - return await future - - if type == 'websocket.close': # code and reason? - if ws.ws: ws.ws.close() - else: res.cork(lambda res: res.write_status(403).end_without_body()) - return True - if type == 'websocket.publish': # publish extension - bytes = options.get("bytes", None) - if bytes: - self.server.publish(options.get('topic'), bytes) - else: - self.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT) - return True - if type == 'websocket.subscribe': # subscribe extension - if ws.ws: ws.ws.subscribe(options.get('topic')) - else: res.cork(lambda res: res.write_status(403).end_without_body()) - return True - if type == 'websocket.unsubscribe': # unsubscribe extension - if ws.ws: ws.ws.unsubscribe(options.get('topic')) - else: res.cork(lambda res: res.write_status(403).end_without_body()) - return True - return False - - res.run_async(app(scope, ws.receive, send)) - - - self.server.ws("/*", { - "compression": CompressOptions.DISABLED, - "max_payload_length": 16 * 1024 * 1024, - "idle_timeout": 0, - "upgrade": ws_upgrade, - "open": lambda ws: ws.get_user_data().open(ws), - "message": lambda ws, msg, opcode: ws.get_user_data().message(ws, msg, opcode), - "close": lambda ws, code, message: ws.get_user_data().disconnect(code, message) - }) - - def listen(self, port_or_options, handler): + def listen(self, port_or_options, handler=None): self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host self.server.listen(port_or_options, handler) @@ -334,4 +495,6 @@ class ASGI: def __del__(self): if self.asgi_http_info: - lib.socketify_destroy_asgi_app_info(self.asgi_http_info) \ No newline at end of file + lib.socketify_destroy_asgi_app_info(self.asgi_http_info) + if self.asgi_ws_info: + lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info) \ No newline at end of file diff --git a/src/socketify/native.py b/src/socketify/native.py index b22c998..8c4bcf0 100644 --- a/src/socketify/native.py +++ b/src/socketify/native.py @@ -319,6 +319,7 @@ typedef struct { size_t remote_address_size; socketify_header* header_list; + bool has_content; } socketify_asgi_data; typedef struct { @@ -352,7 +353,14 @@ typedef struct { socketify_asgi_method_handler handler; void * user_data; } socksocketify_asgi_app_info; - +typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t* socket, void *user_data, bool* aborted); +typedef struct { + int ssl; + uws_app_t* app; + socketify_asgi_ws_method_handler handler; + uws_socket_behavior_t behavior; + void * user_data; +} socksocketify_asgi_ws_app_info; socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res); void socketify_destroy_headers(socketify_header* headers); @@ -362,10 +370,12 @@ socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_re bool socketify_res_write_int_status(int ssl, uws_res_t* res, int code); socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data); void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app); +socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data); +void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app); void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length); void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection); - +void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode); """ ) diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index 9ec3cc4..12a3fb1 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -5,9 +5,6 @@ from socketify import App from io import BytesIO from .native import lib, ffi -# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way -# re formatting headers is really slow and dummy, dict ops are really slow - @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response = ffi.from_handle(user_data) @@ -48,8 +45,10 @@ def write_status(ssl, res, status_text): def write_header(ssl, res, key, value): if isinstance(key, str): + if key == "content-length": return #auto key_data = key.encode("utf-8") elif isinstance(key, bytes): + if key == b'content-length': return #auto key_data = key if isinstance(value, int): @@ -89,6 +88,7 @@ def wsgi(ssl, response, info, user_data, aborted): header = next_header[0] name = ffi.unpack(header.name, header.name_size).decode('utf8') value = ffi.unpack(header.value, header.value_size).decode('utf8') + # this conversion should be optimized in future environ[f"HTTP_{name.replace('-', '_').upper()}"]=value next_header = ffi.cast("socketify_header*", next_header.next) def start_response(status, headers): @@ -96,8 +96,8 @@ def wsgi(ssl, response, info, user_data, aborted): for (name, value) in headers: write_header(ssl, response, name, value) write_header(ssl, response, b'Server', b'socketify.py') - # #check for body - if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False): + # check for body + if bool(info.has_content): WSGI_INPUT = BytesIO() environ['wsgi.input'] = WSGI_INPUT def on_data(data_response, response): @@ -145,7 +145,7 @@ class WSGI: self._ptr ) - def listen(self, port_or_options, handler): + def listen(self, port_or_options, handler=None): self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port self.BASIC_ENVIRON.update({ 'GATEWAY_INTERFACE': 'CGI/1.1',