fix wsgi, asgi and optimize

pull/39/head
Ciro 2022-12-04 16:03:36 -03:00
rodzic cdc388d671
commit 69df3bd7d6
6 zmienionych plików z 308 dodań i 132 usunięć

Wyświetl plik

@ -8,6 +8,7 @@ class Home:
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
async def on_post(self, req, resp):
# curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://localhost:8000/
raw_data = await req.stream.read()
print("data", raw_data)
resp.status = falcon.HTTP_200 # This is the default status

Wyświetl plik

@ -43,10 +43,12 @@ class SomeResource:
app = falcon.asgi.App()
app.ws_options.max_receive_queue = 20_000_000# this virtual disables queue but adds overhead
app.ws_options.enable_buffered_receiver = True # this disable queue but for now only available on cirospaciari/falcon
app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon
app.add_route("/", SomeResource())
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
# 126550

Wyświetl plik

@ -34,7 +34,7 @@ async def app(scope, receive, send):
if type == 'websocket.disconnect':
print("disconnected!", scope)
break
# echo!
await send({
'type': 'websocket.send',

Wyświetl plik

@ -1,11 +1,203 @@
from socketify import App, CompressOptions, OpCode
from queue import SimpleQueue
from .native import lib, ffi
import asyncio
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
# re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow
EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False }
@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)")
def ws_message(ws, message, length, opcode, user_data):
socket_data = ffi.from_handle(user_data)
message = None if message == ffi.NULL else ffi.unpack(message, length)
if opcode == OpCode.TEXT:
message = message.decode("utf8")
socket_data.message(ws, message, OpCode(opcode))
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def ws_close(ws, code, message, length, user_data):
socket_data = ffi.from_handle(user_data)
message = None if message == ffi.NULL else ffi.unpack(message, length)
socket_data.disconnect(code, message)
@ffi.callback("void(uws_websocket_t*, void*)")
def ws_open(ws, user_data):
socket_data = ffi.from_handle(user_data)
socket_data.open(ws)
@ffi.callback("void(int, uws_res_t*, socketify_asgi_ws_data, uws_socket_context_t* socket, void*, bool*)")
def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
app = ffi.from_handle(user_data)
headers = []
next_header = info.header_list
while next_header != ffi.NULL:
header = next_header[0]
headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size)))
next_header = ffi.cast("socketify_header*", next_header.next)
url = ffi.unpack(info.url, info.url_size)
if info.key == ffi.NULL:
key = None
else:
key = ffi.unpack(info.key, info.key_size).decode('utf8')
if info.protocol == ffi.NULL:
protocol = None
else:
protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8')
if info.extensions == ffi.NULL:
extensions = None
else:
extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8')
ws = ASGIWebSocket(app.server.loop)
scope = {
'type': 'websocket',
'asgi': {
'version': '3.0',
'spec_version': '2.3'
},
'http_version': '1.1',
'server': (app.SERVER_HOST, app.SERVER_PORT),
'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None),
'scheme': app.SERVER_WS_SCHEME,
'method': ffi.unpack(info.method, info.method_size).decode('utf8'),
'root_path': '',
'path': url.decode('utf8'),
'raw_path': url,
'query_string': ffi.unpack(info.query_string, info.query_string_size),
'headers': headers,
'subprotocols': [protocol] if protocol else [],
'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True }
}
async def send(options):
if bool(aborted[0]): return False
type = options['type']
if type == 'websocket.send':
data = options.get("bytes", None)
if ws.ws:
if data:
lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.BINARY))
else:
data = options.get('text', '').encode('utf8')
lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.TEXT))
return True
return False
if type == 'websocket.accept': # upgrade!
res_headers = options.get('headers', None)
if res_headers:
cork_data = ffi.new_handle((ssl, res_headers))
lib.uws_res_cork(ssl, response, uws_asgi_corked_ws_accept_handler, cork_data)
future = ws.accept()
upgrade_protocol = options.get('subprotocol', protocol)
if isinstance(key, str):
sec_web_socket_key_data = key.encode("utf-8")
elif isinstance(key, bytes):
sec_web_socket_key_data = key
else:
sec_web_socket_key_data = b""
if isinstance(upgrade_protocol, str):
sec_web_socket_protocol_data = upgrade_protocol.encode("utf-8")
elif isinstance(upgrade_protocol, bytes):
sec_web_socket_protocol_data = upgrade_protocol
else:
sec_web_socket_protocol_data = b""
if isinstance(extensions, str):
sec_web_socket_extensions_data = extensions.encode("utf-8")
elif isinstance(extensions, bytes):
sec_web_socket_extensions_data = extensions
else:
sec_web_socket_extensions_data = b""
lib.uws_res_upgrade(
ssl,
response,
ws._ptr,
sec_web_socket_key_data,
len(sec_web_socket_key_data),
sec_web_socket_protocol_data,
len(sec_web_socket_protocol_data),
sec_web_socket_extensions_data,
len(sec_web_socket_extensions_data),
socket_context,
)
return await future
if type == 'websocket.close': # code and reason?
if ws.ws:
lib.uws_ws_close(ssl, ws.ws)
else:
cork_data = ffi.new_handle(ssl)
lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data)
return True
if type == 'websocket.publish': # publish extension
data = options.get("bytes", None)
if data:
app.server.publish(options.get('topic'), data)
else:
app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT)
return True
if type == 'websocket.subscribe': # subscribe extension
if ws.ws:
topic = options.get('topic')
if isinstance(topic, str):
data = topic.encode("utf-8")
elif isinstance(topic, bytes):
data = topic
else:
return False
return bool(lib.uws_ws_subscribe(ssl, ws.ws, data, len(data)))
else:
cork_data = ffi.new_handle(ssl)
lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data)
return True
if type == 'websocket.unsubscribe': # unsubscribe extension
if ws.ws:
topic = options.get('topic')
if isinstance(topic, str):
data = topic.encode("utf-8")
elif isinstance(topic, bytes):
data = topic
else:
return False
return bool(lib.uws_ws_unsubscribe(ssl, ws.ws, data, len(data)))
else:
cork_data = ffi.new_handle(ssl)
lib.uws_res_cork(ssl, response, uws_asgi_corked_403_handler, cork_data)
return True
return False
app.server.run_async(app.app(scope, ws.receive, send))
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
data_response.is_end = bool(is_end)
more_body = not data_response.is_end
result = {
'type': 'http.request',
'body': b'' if chunk == ffi.NULL else ffi.unpack(chunk, chunk_length),
'more_body': more_body
}
data_response.queue.put(result, False)
data_response.next_data_future.set_result(result)
if more_body:
data_response.next_data_future = data_response.loop.create_future()
class ASGIDataQueue:
def __init__(self, loop):
self.queue = SimpleQueue()
self._ptr = ffi.new_handle(self)
self.loop = loop
self.is_end = False
self.next_data_future = loop.create_future()
class ASGIWebSocket:
def __init__(self, loop):
self.loop = loop
@ -19,6 +211,7 @@ class ASGIWebSocket:
}, False)
self._code = None
self._message = None
self._ptr = ffi.new_handle(self)
def accept(self):
self.accept_future = self.loop.create_future()
@ -93,8 +286,10 @@ class ASGIWebSocket:
def write_header(ssl, res, key, value):
if isinstance(key, str):
if key == "content-length": return #auto
key_data = key.encode("utf-8")
elif isinstance(key, bytes):
if key == b'content-length': return #auto
key_data = key
if isinstance(value, int):
@ -121,13 +316,34 @@ def uws_asgi_corked_response_start_handler(res, user_data):
write_header(ssl, res, name, value)
write_header(ssl, res, b'Server', b'socketify.py')
@ffi.callback("void(uws_res_t*, void*)")
def uws_asgi_corked_accept_handler(res, user_data):
(ssl, status, headers) = ffi.from_handle(user_data)
lib.socketify_res_write_int_status(ssl, res, int(status))
for name, value in headers:
write_header(ssl, res, name, value)
write_header(ssl, res, b'Server', b'socketify.py')
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)")
@ffi.callback("void(uws_res_t*, void*)")
def uws_asgi_corked_ws_accept_handler(res, user_data):
(ssl, headers) = ffi.from_handle(user_data)
for name, value in headers:
write_header(ssl, res, name, value)
write_header(ssl, res, b'Server', b'socketify.py')
@ffi.callback("void(uws_res_t*, void*)")
def uws_asgi_corked_403_handler(res, user_data):
ssl = ffi.from_handle(user_data)
lib.socketify_res_write_int_status(ssl, res, int(403))
lib.uws_res_end_without_body(ssl, res, 0)
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*, bool*)")
def asgi(ssl, response, info, user_data, aborted):
app = ffi.from_handle(user_data)
headers = []
next_header = info.header_list
next_header = info.header_list
while next_header != ffi.NULL:
header = next_header[0]
headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size)))
@ -150,19 +366,26 @@ def asgi(ssl, response, info, user_data, aborted):
'query_string': ffi.unpack(info.query_string, info.query_string_size),
'headers': headers
}
if bool(info.has_content):
data_queue = ASGIDataQueue(app.server.loop)
lib.uws_res_on_data(
ssl, response, asgi_on_data_handler, data_queue._ptr
)
else:
data_queue = None
async def receive():
if bool(aborted[0]):
return { 'type': 'http.disconnect'}
# if scope.get("content-length", False) or scope.get("transfer-encoding", False):
# data = await res.get_data()
# if data:
# # all at once but could get in chunks
# return {
# 'type': 'http.request',
# 'body': data.getvalue(),
# 'more_body': False
# }
# no body, just empty
if data_queue:
if data_queue.queue.empty():
if not data_queue.is_end:
#wait for next item
await data_queue.next_data_future
return await receive() #consume again because multiple receives maybe called
else:
return data_queue.queue.get(False) #consume queue
# no more body, just empty
return EMPTY_RESPONSE
async def send(options):
if bool(aborted[0]):
@ -214,116 +437,54 @@ class ASGI:
self._ptr
)
def ws_upgrade(res, req, socket_context):
info = lib.socketify_asgi_ws_request(res.SSL, req.req, res.res)
headers = []
next_header = info.header_list
while next_header != ffi.NULL:
header = next_header[0]
headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size)))
next_header = ffi.cast("socketify_header*", next_header.next)
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
16 * 1024 * 1024,
)
native_behavior.idleTimeout = ffi.cast(
"unsigned short",
0,
)
native_behavior.maxBackpressure = ffi.cast(
"unsigned int",
1024 * 1024 * 1024,
)
native_behavior.compression = ffi.cast(
"uws_compress_options_t", 0
)
native_behavior.maxLifetime = ffi.cast(
"unsigned short", 0
)
native_behavior.closeOnBackpressureLimit = ffi.cast(
"int", 0
)
native_behavior.resetIdleTimeoutOnSend = ffi.cast(
"int", 0
)
native_behavior.sendPingsAutomatically = ffi.cast(
"int", 0
)
url = ffi.unpack(info.url, info.url_size)
native_behavior.upgrade = ffi.NULL # will be set first on C++
if info.key == ffi.NULL:
key = None
else:
key = ffi.unpack(info.key, info.key_size).decode('utf8')
if info.protocol == ffi.NULL:
protocol = None
else:
protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8')
if info.extensions == ffi.NULL:
extensions = None
else:
extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8')
native_behavior.open = ws_open
native_behavior.message = ws_message
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL,
self.server.app,
native_behavior,
ws_upgrade,
self._ptr
)
ws = ASGIWebSocket(self.server.loop)
scope = {
'type': 'websocket',
'asgi': {
'version': '3.0',
'spec_version': '2.3'
},
'http_version': '1.1',
'server': (self.SERVER_HOST, self.SERVER_PORT),
'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None),
'scheme': self.SERVER_WS_SCHEME,
'method': ffi.unpack(info.method, info.method_size).decode('utf8'),
'root_path': '',
'path': url.decode('utf8'),
'raw_path': url,
'query_string': ffi.unpack(info.query_string, info.query_string_size),
'headers': headers,
'subprotocols': [protocol] if protocol else [],
'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True }
}
lib.socketify_destroy_headers(info.header_list)
async def send(options):
if res.aborted: return False
type = options['type']
if type == 'websocket.send':
bytes = options.get("bytes", None)
if ws.ws:
if bytes:
ws.ws.cork_send(bytes, OpCode.BINARY)
else:
ws.ws.cork_send(options.get('text', ''), OpCode.TEXT)
return True
return False
if type == 'websocket.accept': # upgrade!
res_headers = options.get('headers', None)
def corked(res):
for header in res_headers:
res.write_header(header[0], header[1])
if res_headers:
res.cork(corked)
future = ws.accept()
upgrade_protocol = options.get('subprotocol', protocol)
res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws)
return await future
if type == 'websocket.close': # code and reason?
if ws.ws: ws.ws.close()
else: res.cork(lambda res: res.write_status(403).end_without_body())
return True
if type == 'websocket.publish': # publish extension
bytes = options.get("bytes", None)
if bytes:
self.server.publish(options.get('topic'), bytes)
else:
self.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT)
return True
if type == 'websocket.subscribe': # subscribe extension
if ws.ws: ws.ws.subscribe(options.get('topic'))
else: res.cork(lambda res: res.write_status(403).end_without_body())
return True
if type == 'websocket.unsubscribe': # unsubscribe extension
if ws.ws: ws.ws.unsubscribe(options.get('topic'))
else: res.cork(lambda res: res.write_status(403).end_without_body())
return True
return False
res.run_async(app(scope, ws.receive, send))
self.server.ws("/*", {
"compression": CompressOptions.DISABLED,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 0,
"upgrade": ws_upgrade,
"open": lambda ws: ws.get_user_data().open(ws),
"message": lambda ws, msg, opcode: ws.get_user_data().message(ws, msg, opcode),
"close": lambda ws, code, message: ws.get_user_data().disconnect(code, message)
})
def listen(self, port_or_options, handler):
def listen(self, port_or_options, handler=None):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
self.server.listen(port_or_options, handler)
@ -334,4 +495,6 @@ class ASGI:
def __del__(self):
if self.asgi_http_info:
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
if self.asgi_ws_info:
lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info)

Wyświetl plik

@ -319,6 +319,7 @@ typedef struct {
size_t remote_address_size;
socketify_header* header_list;
bool has_content;
} socketify_asgi_data;
typedef struct {
@ -352,7 +353,14 @@ typedef struct {
socketify_asgi_method_handler handler;
void * user_data;
} socksocketify_asgi_app_info;
typedef void (*socketify_asgi_ws_method_handler)(int ssl, uws_res_t *response, socketify_asgi_ws_data request, uws_socket_context_t* socket, void *user_data, bool* aborted);
typedef struct {
int ssl;
uws_app_t* app;
socketify_asgi_ws_method_handler handler;
uws_socket_behavior_t behavior;
void * user_data;
} socksocketify_asgi_ws_app_info;
socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res);
void socketify_destroy_headers(socketify_header* headers);
@ -362,10 +370,12 @@ socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_re
bool socketify_res_write_int_status(int ssl, uws_res_t* res, int code);
socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data);
void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app);
socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data);
void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app);
void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length);
void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection);
void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode);
"""
)

Wyświetl plik

@ -5,9 +5,6 @@ from socketify import App
from io import BytesIO
from .native import lib, ffi
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
# re formatting headers is really slow and dummy, dict ops are really slow
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
@ -48,8 +45,10 @@ def write_status(ssl, res, status_text):
def write_header(ssl, res, key, value):
if isinstance(key, str):
if key == "content-length": return #auto
key_data = key.encode("utf-8")
elif isinstance(key, bytes):
if key == b'content-length': return #auto
key_data = key
if isinstance(value, int):
@ -89,6 +88,7 @@ def wsgi(ssl, response, info, user_data, aborted):
header = next_header[0]
name = ffi.unpack(header.name, header.name_size).decode('utf8')
value = ffi.unpack(header.value, header.value_size).decode('utf8')
# this conversion should be optimized in future
environ[f"HTTP_{name.replace('-', '_').upper()}"]=value
next_header = ffi.cast("socketify_header*", next_header.next)
def start_response(status, headers):
@ -96,8 +96,8 @@ def wsgi(ssl, response, info, user_data, aborted):
for (name, value) in headers:
write_header(ssl, response, name, value)
write_header(ssl, response, b'Server', b'socketify.py')
# #check for body
if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False):
# check for body
if bool(info.has_content):
WSGI_INPUT = BytesIO()
environ['wsgi.input'] = WSGI_INPUT
def on_data(data_response, response):
@ -145,7 +145,7 @@ class WSGI:
self._ptr
)
def listen(self, port_or_options, handler):
def listen(self, port_or_options, handler=None):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
self.BASIC_ENVIRON.update({
'GATEWAY_INTERFACE': 'CGI/1.1',