diff --git a/.gitignore b/.gitignore index 49b5fa5..49f8fdd 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__ *.o node_modules/ yarn.lock -.vscode \ No newline at end of file +.vscode +/venv \ No newline at end of file diff --git a/bench/asgi_wsgi/falcon-wsgi.py b/bench/asgi_wsgi/falcon-wsgi.py index 7ea618d..840f396 100644 --- a/bench/asgi_wsgi/falcon-wsgi.py +++ b/bench/asgi_wsgi/falcon-wsgi.py @@ -8,11 +8,10 @@ class Home: resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override resp.text = "Hello, World!" def on_post(self, req, resp): - raw_data = req.stream.getvalue() - print("data", raw_data) + raw_data = req.stream.read() resp.status = falcon.HTTP_200 # This is the default status resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override - resp.text = raw_data + resp.text = 'Ok' @@ -23,4 +22,4 @@ home = Home() app.add_route("/", home) if __name__ == "__main__": - WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=8) \ No newline at end of file + WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=1) \ No newline at end of file diff --git a/bench/asgi_wsgi/raw-wsgi.py b/bench/asgi_wsgi/raw-wsgi.py index 6763cb3..7e50067 100644 --- a/bench/asgi_wsgi/raw-wsgi.py +++ b/bench/asgi_wsgi/raw-wsgi.py @@ -1,8 +1,42 @@ -from socketify import WSGI +from io import BytesIO +payload = None +with open("xml.zip", "rb") as file: + payload = file.read() + + +stream = BytesIO() +stream.write(payload) + +chunk_size = 64 * 1024 +content_length = len(payload) + +def app_chunked(environ, start_response): + start_response('200 OK', [('Content-Type', 'application/zip'), ('Transfer-Encoding', 'chunked')]) + + sended = 0 + while content_length > sended: + end = sended + chunk_size + yield payload[sended:end] + sended = end + + def app(environ, start_response): - start_response('200 OK', [('Content-Type', 'text/plain')]) - yield b'Hello, World!\n' + start_response('200 OK', [('Content-Type', 'application/zip'), ('Content-Length', str(content_length))]) + + sended = 0 + while content_length > sended: + end = sended + chunk_size + yield payload[sended:end] + sended = end + +def app_hello(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Length', '13')]) + + yield b'Hello, World!' if __name__ == "__main__": - WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(8) + from socketify import WSGI + WSGI(app_hello).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(1) + # import fastwsgi + # fastwsgi.run(wsgi_app=app_hello, host='127.0.0.1', port=8000) diff --git a/src/socketify/helpers.py b/src/socketify/helpers.py index 585d53c..eff0a8f 100644 --- a/src/socketify/helpers.py +++ b/src/socketify/helpers.py @@ -222,7 +222,7 @@ def async_middleware(*functions): class DecoratorRouter: def __init__(self, app, prefix: str = "", *middlewares): self.app = app - self.middlewares = list(*middlewares) + self.middlewares = list(middlewares) self.prefix = prefix def get(self, path): diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index 61eed97..1701f37 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -10,7 +10,7 @@ is_pypy = platform.python_implementation() == "PyPy" from .tasks import create_task, TaskFactory import sys import logging - +import uuid @ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)") def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): @@ -25,6 +25,68 @@ def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data): data_response._ptr, ) +@ffi.callback("void(uws_res_t*, void*)") +def wsgi_on_data_ref_abort_handler(res, user_data): + data_retry = ffi.from_handle(user_data) + data_retry.aborted = True + if data_retry.id is not None: + data_retry.app._data_refs.pop(data_retry.id, None) + + +@ffi.callback("bool(uws_res_t*, uintmax_t, void*)") +def wsgi_on_writable_handler(res, offset, user_data): + data_retry = ffi.from_handle(user_data) + if data_retry.aborted: + return False + + + chunks = data_retry.chunks + last_sended_offset = data_retry.last_offset + ssl = data_retry.app.server.SSL + content_length = data_retry.content_length + + data = chunks[0] + last_offset = int(lib.uws_res_get_write_offset(ssl, res)) + if last_sended_offset != last_offset: + offset = last_offset - last_sended_offset + data = data[offset:] + if len(data) == 0: + chunks.pop(0) + + if len(chunks) == 0: + logging.error(AssertionError("Content-Length do not match sended content")) + lib.uws_res_close( + ssl, + res + ) + if data_retry.id is not None: + data_retry.app._data_refs.pop(data_retry.id, None) + + return False + data = chunks[0] + + result = lib.uws_res_try_end( + ssl, + res, + data, + len(data), + ffi.cast("uintmax_t", content_length), + 0, + ) + if bool(result.ok): + chunks.pop(0) + + if not bool(result.has_responded) and len(chunks) == 0: + logging.error(AssertionError("Content-Length do not match sended content")) + lib.uws_res_close( + ssl, + res + ) + if bool(result.has_responded) and data_retry.id is not None: + data_retry.app._data_refs.pop(data_retry.id, None) + + return False + class WSGIBody: def __init__(self, buffer): @@ -115,14 +177,25 @@ class WSGIBody: class WSGIDataResponse: - def __init__(self, app, environ, start_response, aborted, buffer, on_data): + def __init__(self, app, environ, start_response, buffer, on_data): self.buffer = buffer - self.aborted = aborted self._ptr = ffi.new_handle(self) self.on_data = on_data self.environ = environ self.app = app self.start_response = start_response + self.id = None + self.aborted = False + +class WSGIRetryDataSend: + def __init__(self, app, chunks, content_length, last_offset): + self.chunks = chunks + self._ptr = ffi.new_handle(self) + self.app = app + self.content_length = content_length + self.last_offset = last_offset + self.id = None + self.aborted = False @ffi.callback("void(uws_res_t*, void*)") @@ -163,9 +236,10 @@ def wsgi(ssl, response, info, user_data, aborted): headers_set = None headers_written = False status_text = None - + is_chunked = False + content_length = -1 def write_headers(headers): - nonlocal headers_written, headers_set, status_text + nonlocal headers_written, headers_set, status_text, content_length, is_chunked if headers_written or not headers_set: return @@ -183,20 +257,33 @@ def wsgi(ssl, response, info, user_data, aborted): if ( key == "content-length" or key == "Content-Length" - or key == "Transfer-Encoding" + ): + content_length = int(value) + continue # auto generated by try_end + if ( + key == "Transfer-Encoding" or key == "transfer-encoding" ): - continue # auto + is_chunked = str(value) == "chunked" + if is_chunked: + continue + key_data = key.encode("utf-8") elif isinstance(key, bytes): # this is faster than using .lower() if ( key == b"content-length" or key == b"Content-Length" - or key == b"Transfer-Encoding" + ): + content_length = int(value) + continue # auto + if ( + key == b"Transfer-Encoding" or key == b"transfer-encoding" ): - continue # auto + is_chunked = str(value) == "chunked" + if is_chunked: + continue key_data = key if isinstance(value, str): @@ -216,6 +303,9 @@ def wsgi(ssl, response, info, user_data, aborted): lib.uws_res_write_header( ssl, response, key_data, len(key_data), value_data, len(value_data) ) + # no content-length + if content_length < 0: + is_chunked = True def start_response(status, headers, exc_info=None): nonlocal headers_set, status_text @@ -233,9 +323,12 @@ def wsgi(ssl, response, info, user_data, aborted): status_text = status def write(data): + nonlocal is_chunked if not headers_written: write_headers(headers_set) - + # will allow older frameworks only with is_chunked + is_chunked = True + if isinstance(data, bytes): lib.uws_res_write(ssl, response, data, len(data)) elif isinstance(data, str): @@ -244,13 +337,17 @@ def wsgi(ssl, response, info, user_data, aborted): return write + failed_chunks = None + content_length = ffi.cast("uintmax_t", content_length) + last_offset = -1 + data_retry = None # check for body if bool(info.has_content): WSGI_INPUT = BytesIO() environ["wsgi.input"] = WSGIBody(WSGI_INPUT) - def on_data(data_response, response): - if bool(data_response.aborted[0]): + nonlocal failed_chunks, last_offset, data_retry + if data_response.aborted: return ssl = data_response.app.server.SSL @@ -262,14 +359,41 @@ def wsgi(ssl, response, info, user_data, aborted): ) try: for data in app_iter: - if data and not headers_written: - write_headers(headers_set) - - if isinstance(data, bytes): - lib.uws_res_write(ssl, response, data, len(data)) - elif isinstance(data, str): - data = data.encode("utf-8") - lib.uws_res_write(ssl, response, data, len(data)) + if data: + if not headers_written: + write_headers(headers_set) + if is_chunked: + if isinstance(data, bytes): + lib.uws_res_write(ssl, response, data, len(data)) + elif isinstance(data, str): + data = data.encode("utf-8") + lib.uws_res_write(ssl, response, data, len(data)) + else: + if isinstance(data, str): + data = data.encode("utf-8") + if failed_chunks: + failed_chunks.append(data) + else: + last_offset = int(lib.uws_res_get_write_offset(ssl, response)) + result = lib.uws_res_try_end( + ssl, + response, + data, + len(data), + content_length, + 0, + ) + # this should be very very rare for HTTP + if not bool(result.ok): + last_offset = int(lib.uws_res_get_write_offset(ssl, response)) + failed_chunks = [] + # just mark the chunks + failed_chunks.append(data) + # add on writable handler + data_retry = WSGIRetryDataSend( + app, failed_chunks, content_length, last_offset + ) + break except Exception as error: logging.exception(error) @@ -278,27 +402,72 @@ def wsgi(ssl, response, info, user_data, aborted): app_iter.close() if not headers_written: - write_headers(headers_set) - lib.uws_res_end_without_body(ssl, response, 0) + write_headers(headers_set) + if is_chunked: + lib.uws_res_end_without_body(ssl, response, 0) + elif data_retry is not None: + _id = uuid.uuid4() + app._data_refs[_id] = data_retry + lib.uws_res_on_aborted(ssl, response, wsgi_on_data_ref_abort_handler, data_retry._ptr) + lib.uws_res_on_writable(ssl, response, wsgi_on_writable_handler, data_retry._ptr) + elif result is None or (not bool(result.has_responded) and bool(result.ok)): # not reachs Content-Length + logging.error(AssertionError("Content-Length do not match sended content")) + lib.uws_res_close( + ssl, + response + ) + data_response = WSGIDataResponse( - app, environ, start_response, aborted, WSGI_INPUT, on_data + app, environ, start_response, WSGI_INPUT, on_data ) - + _id = uuid.uuid4() + data_response.id = _id + app._data_refs[_id] = data_response + lib.uws_res_on_aborted(ssl, response, wsgi_on_data_ref_abort_handler, data_response._ptr) lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr) else: environ["wsgi.input"] = None app_iter = app.wsgi(environ, start_response) + result = None try: for data in app_iter: - if data and not headers_written: - write_headers(headers_set) + if data: + if not headers_written: + write_headers(headers_set) + if is_chunked: + if isinstance(data, bytes): + lib.uws_res_write(ssl, response, data, len(data)) + elif isinstance(data, str): + data = data.encode("utf-8") + lib.uws_res_write(ssl, response, data, len(data)) + else: + if isinstance(data, str): + data = data.encode("utf-8") + if failed_chunks: # if failed once, will fail again later + failed_chunks.append(data) + else: + last_offset = int(lib.uws_res_get_write_offset(ssl, response)) + result = lib.uws_res_try_end( + ssl, + response, + data, + len(data), + content_length, + 0, + ) + # this should be very very rare fot HTTP + if not bool(result.ok): + failed_chunks = [] + # just mark the chunks + failed_chunks.append(data) + # add on writable handler + data_retry = WSGIRetryDataSend( + app, failed_chunks, content_length, last_offset + ) + break + - if isinstance(data, bytes): - lib.uws_res_write(ssl, response, data, len(data)) - elif isinstance(data, str): - data = data.encode("utf-8") - lib.uws_res_write(ssl, response, data, len(data)) except Exception as error: logging.exception(error) finally: @@ -306,8 +475,22 @@ def wsgi(ssl, response, info, user_data, aborted): app_iter.close() if not headers_written: - write_headers(headers_set) - lib.uws_res_end_without_body(ssl, response, 0) + write_headers(headers_set) + if is_chunked: + lib.uws_res_end_without_body(ssl, response, 0) + elif data_retry is not None: + _id = uuid.uuid4() + data_retry.id = _id + app._data_refs[_id] = data_retry + lib.uws_res_on_aborted(ssl, response, wsgi_on_data_ref_abort_handler, data_retry._ptr) + lib.uws_res_on_writable(ssl, response, wsgi_on_writable_handler, data_retry._ptr) + elif result is None or (not bool(result.has_responded) and bool(result.ok)): # not reachs Content-Length + logging.error(AssertionError("Content-Length do not match sended content")) + lib.uws_res_close( + ssl, + response + ) + def is_asgi(module): @@ -332,7 +515,7 @@ class _WSGI: self.wsgi = app self.BASIC_ENVIRON = dict(os.environ) self.ws_compression = False - + self._data_refs = {} self._ptr = ffi.new_handle(self) self.asgi_http_info = lib.socketify_add_asgi_http_handler( self.server.SSL, self.server.app, wsgi, self._ptr