added some preliminar asyncio integration, more options for write, end and status

pull/39/head
Ciro 2022-05-31 14:56:37 -03:00
rodzic e1b2943bbc
commit aab4427efc
7 zmienionych plików z 254 dodań i 73 usunięć

Wyświetl plik

@ -0,0 +1,28 @@
import asyncio
import threading
import time
class Loop:
def __init__(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop_thread = None
def start(self):
self.loop_thread = threading.Thread(target=lambda loop: loop.run_forever(), args=(self.loop,), daemon=True)
self.loop_thread.start()
def stop(self):
#stop loop
self.loop.call_soon_threadsafe(self.loop.stop)
#wait loop thread to stops
self.loop_thread.join()
# Find all running tasks in main thread:
pending = asyncio.all_tasks(self.loop)
# Run loop until tasks done
self.loop.run_until_complete(asyncio.gather(*pending))
def run_async(self, task):
asyncio.run_coroutine_threadsafe(task, self.loop)
return True

Wyświetl plik

@ -1,5 +1,10 @@
import cffi
import os
from .loop import Loop
from .status_codes import status_codes
import json
import inspect
ffi = cffi.FFI()
ffi.cdef("""
@ -207,18 +212,27 @@ lib = ffi.dlopen(library_path)
@ffi.callback("void(uws_res_t *, uws_req_t *, void *)")
def uws_generic_method_handler(res, req, user_data):
if not user_data == ffi.NULL:
handler = ffi.from_handle(user_data)
response = AppResponse(res, False)
(handler, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, False)
request = AppRequest(req)
handler(response, request)
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
app.run_async(handler(response, request))
else:
handler(response, request)
@ffi.callback("void(uws_res_t *, uws_req_t *, void *)")
def uws_generic_ssl_method_handler(res, req, user_data):
if not user_data == ffi.NULL:
handler = ffi.from_handle(user_data)
response = AppResponse(res, True)
(handler, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, True)
request = AppRequest(req)
handler(response, request)
task = handler(response, request)
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
app.run_async(handler(response, request))
else:
handler(response, request)
@ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)")
def uws_generic_listen_handler(listen_socket, config, user_data):
@ -283,16 +297,31 @@ class AppRequest:
return bool(lib.uws_req_is_ancient(self.req))
class AppResponse:
def __init__(self, response, is_ssl):
def __init__(self, response, loop, is_ssl):
self.res = response
self.SSL = ffi.cast("int", 1 if is_ssl else 0)
self.aborted = False
self._ptr = ffi.new_handle(self)
lib.uws_res_on_aborted(self.SSL, response, uws_generic_abord_handler, self._ptr)
self._ptr = ffi.NULL
self.loop = loop
def run_async(self, task):
self.grab_aborted_handler()
return self.loop.run_async(task)
def grab_aborted_handler(self):
#only needed if is async
if self._ptr == ffi.NULL:
self._ptr = ffi.new_handle(self)
lib.uws_res_on_aborted(self.SSL, self.res, uws_generic_abord_handler, self._ptr)
def end(self, message, end_connection=False):
if not self.aborted:
data = message.encode("utf-8")
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
else:
data = json.dumps(message).encode("utf-8")
lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0)
return self
@ -311,9 +340,20 @@ class AppResponse:
lib.uws_res_write_continue(self.SSL, self.res)
return self
def write_status(self, status_text):
def write_status(self, status_or_status_text):
if not self.aborted:
data = status_text.encode("utf-8")
if isinstance(status_or_status_text, int):
try:
data = status_codes[status_or_status_text].encode("utf-8")
except: #invalid status
raise RuntimeError("\"%d\" Is not an valid Status Code" % status_or_status_text)
elif isinstance(status_text, str):
data = status_text.encode("utf-8")
elif isinstance(status_text, bytes):
data = status_text
else:
data = json.dumps(status_text).encode("utf-8")
lib.uws_res_write_status(self.SSL, self.res, data, len(data))
return self
@ -322,9 +362,13 @@ class AppResponse:
key_data = key.encode("utf-8")
if isinstance(value, int):
lib.uws_res_write_header_int(self.SSL, self.res, key_data, len(key_data), ffi.cast("uint64_t", value))
else:
elif isinstance(value, str):
value_data = value.encode("utf-8")
lib.uws_res_write_header(self.SSL, self.res, key_data, len(key_data), value_data, len(value_data))
elif isinstance(value, bytes):
value_data = value
else:
value_data = json.dumps(value).encode("utf-8")
lib.uws_res_write_header(self.SSL, self.res, key_data, len(key_data), value_data, len(value_data))
return self
def end_without_body(self):
@ -334,13 +378,23 @@ class AppResponse:
def write(self, message):
if not self.aborted:
data = message.encode("utf-8")
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
else:
data = json.dumps(message).encode("utf-8")
lib.uws_res_write(self.SSL, self.res, data, len(data))
return self
def get_write_offset(self, message):
if not self.aborted:
data = message.encode("utf-8")
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
else:
data = json.dumps(message).encode("utf-8")
return int(lib.uws_res_get_write_offset(self.SSL, self.res, data, len(data)))
return 0
@ -356,9 +410,13 @@ class AppResponse:
def on_aborted(self, handler):
if hasattr(handler, '__call__'):
self.grab_aborted_handler()
self._aborted_handler = handler
return self
def __del__(self):
self.res = ffi.NULL
self._ptr = ffi.NULL
# void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data);
# void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws);
# void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data);
@ -389,59 +447,60 @@ class App:
if bool(lib.uws_constructor_failed(self.SSL, self.app)):
raise RuntimeError("Failed to create connection")
self.handlers = []
self.loop = Loop()
def get(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_get(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def post(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_post(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def options(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_options(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def delete(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_delete(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def patch(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_patch(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def put(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_put(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def head(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_head(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def connect(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_connect(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def trace(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_trace(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def any(self, path, handler):
user_data = ffi.new_handle(handler)
user_data = ffi.new_handle((handler, self))
self.handlers.append(user_data) #Keep alive handler
lib.uws_app_any(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data)
return self
def listen(self, port_or_options, handler):
def listen(self, port_or_options, handler=None):
self._listen_handler = handler
if isinstance(port_or_options, int):
lib.uws_app_listen(self.SSL, self.app, ffi.cast("int", port_or_options), uws_generic_listen_handler, self._ptr)
@ -456,8 +515,16 @@ class App:
return self
def get_loop():
return self.loop.loop
def run_async(self, task):
return self.loop.run_async(task)
def run(self):
self.loop.start()
lib.uws_app_run(self.SSL, self.app)
self.loop.stop()
return self
def close(self):
@ -465,6 +532,7 @@ class App:
if not self.socket == ffi.NULL:
lib.us_listen_socket_close(self.SSL, self.socket)
return self
def __del__(self):
lib.uws_app_destroy(self.SSL, self.app)
@ -495,3 +563,4 @@ class AppOptions:
self.ca_file_name = ca_file_name
self.ssl_ciphers = ssl_ciphers
self.ssl_prefer_low_memory_usage = ssl_prefer_low_memory_usage

Wyświetl plik

@ -0,0 +1,65 @@
status_codes = {
100 : "100 Continue",
101 : "101 Switching Protocols",
102 : "102 Processing",
103 : "103 Early Hints",
200 : "200 OK",
201 : "201 Created",
202 : "202 Accepted",
203 : "203 Non-Authoritative Information",
204 : "204 No Content",
205 : "205 Reset Content",
206 : "206 Partial Content",
207 : "207 Multi-Status",
208 : "208 Already Reported",
226 : "226 IM Used (HTTP Delta encoding)",
300 : "300 Multiple Choices",
301 : "301 Moved Permanently",
302 : "302 Found",
303 : "303 See Other",
304 : "304 Not Modified",
305 : "305 Use Proxy Deprecated",
306 : "306 unused",
307 : "307 Temporary Redirect",
308 : "308 Permanent Redirect",
400 : "400 Bad Request",
401 : "401 Unauthorized",
402 : "402 Payment Required Experimental",
403 : "403 Forbidden",
404 : "404 Not Found",
405 : "405 Method Not Allowed",
406 : "406 Not Acceptable",
407 : "407 Proxy Authentication Required",
408 : "408 Request Timeout",
409 : "409 Conflict",
410 : "410 Gone",
411 : "411 Length Required",
412 : "412 Precondition Failed",
413 : "413 Payload Too Large",
414 : "414 URI Too Long",
415 : "415 Unsupported Media Type",
416 : "416 Range Not Satisfiable",
417 : "417 Expectation Failed",
418 : "418 I'm a teapot",
421 : "421 Misdirected Request",
422 : "422 Unprocessable Entity",
423 : "423 Locked",
424 : "424 Failed Dependency",
425 : "425 Too Early Experimental",
426 : "426 Upgrade Required",
428 : "428 Precondition Required",
429 : "429 Too Many Requests",
431 : "431 Request Header Fields Too Large",
451 : "451 Unavailable For Legal Reasons",
500 : "500 Internal Server Error",
501 : "501 Not Implemented",
502 : "502 Bad Gateway",
503 : "503 Service Unavailable",
504 : "504 Gateway Timeout",
505 : "505 HTTP Version Not Supported",
506 : "506 Variant Also Negotiates",
507 : "507 Insufficient Storage",
508 : "508 Loop Detected",
510 : "510 Not Extended",
511 : "511 Network Authentication Required"
}

30
tests/async.py 100644
Wyświetl plik

@ -0,0 +1,30 @@
from socketify import App, AppOptions, AppListenOptions
import asyncio
app = App()
async def delayed_hello(delay, res):
await asyncio.sleep(delay) #do something async
res.end("Hello with delay!")
def home(res, req):
#request objecy only lives during the live time of this call
#get parameters, query, headers anything you need here
delay = req.get_query("delay")
delay = 0 if delay == None else float(delay)
#tell response to run this in the event loop
#abort handler is grabed here, so responses only will be send if res.aborted == False
res.run_async(delayed_hello(delay, res))
async def json_message(res, req):
#req maybe will not be available in direct attached async functions
#but if you dont care about req info you can do it
await asyncio.sleep(2) #do something async
res.write_header("Content-Type", "application/json")
res.end({ "message": "I'm delayed!"})
app.get("/", home)
app.get("/json_message", json_message)
app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port)))
app.run()

Wyświetl plik

@ -1,20 +0,0 @@
import uws
import asyncio
# Integrate with asyncio
# asyncio.set_event_loop(uws.Loop())
app = uws.App()
def getHandler(res, req):
res.end("Hello Python!")
app.get("/*", getHandler)
def listenHandler():
print("Listening to port 3000")
app.listen(3000, listenHandler)
app.run()
# Run asyncio event loop
# asyncio.get_event_loop().run_forever()

15
tests/shutdown.py 100644
Wyświetl plik

@ -0,0 +1,15 @@
from socketify import App, AppOptions, AppListenOptions
app = App()
def shutdown(res, req):
res.end("Good bye!")
app.close()
app.get("/", lambda res, req: res.end("Hello!"))
app.get("/shutdown", shutdown)
app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port)))
app.run()
print("App Closed!")

Wyświetl plik

@ -1,11 +1,16 @@
import threading
from datetime import datetime
import time
import os
import sys
import asyncio
#import uvloop
import threading
import time
from json import dumps as json
from datetime import datetime
from socketify import App, AppOptions, AppListenOptions
#import uvloop
# from ujson import dumps as json
# from zzzjson import stringify as json #is too slow with CFFI
# from orjson import dumps
@ -50,19 +55,6 @@ def applicationjson(res, req):
res.end(json({"message":"Hello, World!"}))
def async_route(handler):
return lambda res,req: asyncio.run(handler(res, req))
async def plaintext_async(res, req):
# await asyncio.sleep(1)
res.write_header("Date", current_http_date)
res.write_header("Server", "socketify")
res.write_header("Content-Type", "text/plain")
res.end("Hello, World!")
async def test():
await asyncio.sleep(1)
print("Hello!")
def run_app():
timing = threading.Thread(target=time_thread, args=())
@ -71,7 +63,6 @@ def run_app():
app.get("/", plaintext)
# app.get("/json", applicationjson)
# app.get("/plaintext", plaintext)
# app.get("/", async_route(plaintext_async))
app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port)))
# app.listen(AppListenOptions(port=3000, host="0.0.0.0"), lambda config: print("Listening on port http://%s:%d now\n" % (config.host, config.port)))
@ -83,11 +74,11 @@ def run_app():
# app.run()
# asyncio.get_event_loop().run_forever()
def create_fork():
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()
# def create_fork():
# n = os.fork()
# # n greater than 0 means parent process
# if not n > 0:
# run_app()
# for index in range(3):
# create_fork()
@ -116,3 +107,6 @@ run_app()
#pypy3 -m pip install rapidjson (not working with pypy)
#https://github.com/MagicStack/uvloop/issues/380
#https://foss.heptapod.net/pypy/pypy/-/issues/3740
# make shared -C ./src/socketify/uWebSockets/capi
#cp ./src/socketify/uWebSockets/capi/libuwebsockets.so ./src/socketify/libuwebsockets.so