diff --git a/src/socketify/loop.py b/src/socketify/loop.py new file mode 100644 index 0000000..5911dcf --- /dev/null +++ b/src/socketify/loop.py @@ -0,0 +1,28 @@ + +import asyncio +import threading +import time + +class Loop: + def __init__(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop_thread = None + + def start(self): + self.loop_thread = threading.Thread(target=lambda loop: loop.run_forever(), args=(self.loop,), daemon=True) + self.loop_thread.start() + + def stop(self): + #stop loop + self.loop.call_soon_threadsafe(self.loop.stop) + #wait loop thread to stops + self.loop_thread.join() + # Find all running tasks in main thread: + pending = asyncio.all_tasks(self.loop) + # Run loop until tasks done + self.loop.run_until_complete(asyncio.gather(*pending)) + + def run_async(self, task): + asyncio.run_coroutine_threadsafe(task, self.loop) + return True diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 397e12d..1c4374e 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -1,5 +1,10 @@ import cffi import os +from .loop import Loop +from .status_codes import status_codes +import json +import inspect + ffi = cffi.FFI() ffi.cdef(""" @@ -207,18 +212,27 @@ lib = ffi.dlopen(library_path) @ffi.callback("void(uws_res_t *, uws_req_t *, void *)") def uws_generic_method_handler(res, req, user_data): if not user_data == ffi.NULL: - handler = ffi.from_handle(user_data) - response = AppResponse(res, False) + (handler, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, False) request = AppRequest(req) - handler(response, request) + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + app.run_async(handler(response, request)) + else: + handler(response, request) @ffi.callback("void(uws_res_t *, uws_req_t *, void *)") def uws_generic_ssl_method_handler(res, req, user_data): if not user_data == ffi.NULL: - handler = ffi.from_handle(user_data) - response = AppResponse(res, True) + (handler, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, True) request = AppRequest(req) - handler(response, request) + task = handler(response, request) + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + app.run_async(handler(response, request)) + else: + handler(response, request) @ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)") def uws_generic_listen_handler(listen_socket, config, user_data): @@ -283,16 +297,31 @@ class AppRequest: return bool(lib.uws_req_is_ancient(self.req)) class AppResponse: - def __init__(self, response, is_ssl): + def __init__(self, response, loop, is_ssl): self.res = response self.SSL = ffi.cast("int", 1 if is_ssl else 0) self.aborted = False - self._ptr = ffi.new_handle(self) - lib.uws_res_on_aborted(self.SSL, response, uws_generic_abord_handler, self._ptr) + self._ptr = ffi.NULL + self.loop = loop + def run_async(self, task): + self.grab_aborted_handler() + return self.loop.run_async(task) + + def grab_aborted_handler(self): + #only needed if is async + if self._ptr == ffi.NULL: + self._ptr = ffi.new_handle(self) + lib.uws_res_on_aborted(self.SSL, self.res, uws_generic_abord_handler, self._ptr) + def end(self, message, end_connection=False): if not self.aborted: - data = message.encode("utf-8") + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + data = json.dumps(message).encode("utf-8") lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0) return self @@ -311,9 +340,20 @@ class AppResponse: lib.uws_res_write_continue(self.SSL, self.res) return self - def write_status(self, status_text): + def write_status(self, status_or_status_text): if not self.aborted: - data = status_text.encode("utf-8") + if isinstance(status_or_status_text, int): + try: + data = status_codes[status_or_status_text].encode("utf-8") + except: #invalid status + raise RuntimeError("\"%d\" Is not an valid Status Code" % status_or_status_text) + elif isinstance(status_text, str): + data = status_text.encode("utf-8") + elif isinstance(status_text, bytes): + data = status_text + else: + data = json.dumps(status_text).encode("utf-8") + lib.uws_res_write_status(self.SSL, self.res, data, len(data)) return self @@ -322,9 +362,13 @@ class AppResponse: key_data = key.encode("utf-8") if isinstance(value, int): lib.uws_res_write_header_int(self.SSL, self.res, key_data, len(key_data), ffi.cast("uint64_t", value)) - else: + elif isinstance(value, str): value_data = value.encode("utf-8") - lib.uws_res_write_header(self.SSL, self.res, key_data, len(key_data), value_data, len(value_data)) + elif isinstance(value, bytes): + value_data = value + else: + value_data = json.dumps(value).encode("utf-8") + lib.uws_res_write_header(self.SSL, self.res, key_data, len(key_data), value_data, len(value_data)) return self def end_without_body(self): @@ -334,13 +378,23 @@ class AppResponse: def write(self, message): if not self.aborted: - data = message.encode("utf-8") + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + data = json.dumps(message).encode("utf-8") lib.uws_res_write(self.SSL, self.res, data, len(data)) return self def get_write_offset(self, message): if not self.aborted: - data = message.encode("utf-8") + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + else: + data = json.dumps(message).encode("utf-8") return int(lib.uws_res_get_write_offset(self.SSL, self.res, data, len(data))) return 0 @@ -356,9 +410,13 @@ class AppResponse: def on_aborted(self, handler): if hasattr(handler, '__call__'): + self.grab_aborted_handler() self._aborted_handler = handler return self - + + def __del__(self): + self.res = ffi.NULL + self._ptr = ffi.NULL # void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data); # void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws); # void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data); @@ -389,59 +447,60 @@ class App: if bool(lib.uws_constructor_failed(self.SSL, self.app)): raise RuntimeError("Failed to create connection") self.handlers = [] + self.loop = Loop() def get(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_get(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def post(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_post(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def options(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_options(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def delete(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_delete(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def patch(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_patch(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def put(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_put(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def head(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_head(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def connect(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_connect(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def trace(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_trace(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self def any(self, path, handler): - user_data = ffi.new_handle(handler) + user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler lib.uws_app_any(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) return self - def listen(self, port_or_options, handler): + def listen(self, port_or_options, handler=None): self._listen_handler = handler if isinstance(port_or_options, int): lib.uws_app_listen(self.SSL, self.app, ffi.cast("int", port_or_options), uws_generic_listen_handler, self._ptr) @@ -456,8 +515,16 @@ class App: return self + def get_loop(): + return self.loop.loop + + def run_async(self, task): + return self.loop.run_async(task) + def run(self): + self.loop.start() lib.uws_app_run(self.SSL, self.app) + self.loop.stop() return self def close(self): @@ -465,6 +532,7 @@ class App: if not self.socket == ffi.NULL: lib.us_listen_socket_close(self.SSL, self.socket) return self + def __del__(self): lib.uws_app_destroy(self.SSL, self.app) @@ -494,4 +562,5 @@ class AppOptions: self.dh_params_file_name = dh_params_file_name self.ca_file_name = ca_file_name self.ssl_ciphers = ssl_ciphers - self.ssl_prefer_low_memory_usage = ssl_prefer_low_memory_usage \ No newline at end of file + self.ssl_prefer_low_memory_usage = ssl_prefer_low_memory_usage + diff --git a/src/socketify/status_codes.py b/src/socketify/status_codes.py new file mode 100644 index 0000000..4c82561 --- /dev/null +++ b/src/socketify/status_codes.py @@ -0,0 +1,65 @@ +status_codes = { + 100 : "100 Continue", + 101 : "101 Switching Protocols", + 102 : "102 Processing", + 103 : "103 Early Hints", + 200 : "200 OK", + 201 : "201 Created", + 202 : "202 Accepted", + 203 : "203 Non-Authoritative Information", + 204 : "204 No Content", + 205 : "205 Reset Content", + 206 : "206 Partial Content", + 207 : "207 Multi-Status", + 208 : "208 Already Reported", + 226 : "226 IM Used (HTTP Delta encoding)", + 300 : "300 Multiple Choices", + 301 : "301 Moved Permanently", + 302 : "302 Found", + 303 : "303 See Other", + 304 : "304 Not Modified", + 305 : "305 Use Proxy Deprecated", + 306 : "306 unused", + 307 : "307 Temporary Redirect", + 308 : "308 Permanent Redirect", + 400 : "400 Bad Request", + 401 : "401 Unauthorized", + 402 : "402 Payment Required Experimental", + 403 : "403 Forbidden", + 404 : "404 Not Found", + 405 : "405 Method Not Allowed", + 406 : "406 Not Acceptable", + 407 : "407 Proxy Authentication Required", + 408 : "408 Request Timeout", + 409 : "409 Conflict", + 410 : "410 Gone", + 411 : "411 Length Required", + 412 : "412 Precondition Failed", + 413 : "413 Payload Too Large", + 414 : "414 URI Too Long", + 415 : "415 Unsupported Media Type", + 416 : "416 Range Not Satisfiable", + 417 : "417 Expectation Failed", + 418 : "418 I'm a teapot", + 421 : "421 Misdirected Request", + 422 : "422 Unprocessable Entity", + 423 : "423 Locked", + 424 : "424 Failed Dependency", + 425 : "425 Too Early Experimental", + 426 : "426 Upgrade Required", + 428 : "428 Precondition Required", + 429 : "429 Too Many Requests", + 431 : "431 Request Header Fields Too Large", + 451 : "451 Unavailable For Legal Reasons", + 500 : "500 Internal Server Error", + 501 : "501 Not Implemented", + 502 : "502 Bad Gateway", + 503 : "503 Service Unavailable", + 504 : "504 Gateway Timeout", + 505 : "505 HTTP Version Not Supported", + 506 : "506 Variant Also Negotiates", + 507 : "507 Insufficient Storage", + 508 : "508 Loop Detected", + 510 : "510 Not Extended", + 511 : "511 Network Authentication Required" +} \ No newline at end of file diff --git a/tests/async.py b/tests/async.py new file mode 100644 index 0000000..1772074 --- /dev/null +++ b/tests/async.py @@ -0,0 +1,30 @@ +from socketify import App, AppOptions, AppListenOptions +import asyncio + +app = App() + +async def delayed_hello(delay, res): + await asyncio.sleep(delay) #do something async + res.end("Hello with delay!") + +def home(res, req): + #request objecy only lives during the live time of this call + #get parameters, query, headers anything you need here + delay = req.get_query("delay") + delay = 0 if delay == None else float(delay) + #tell response to run this in the event loop + #abort handler is grabed here, so responses only will be send if res.aborted == False + res.run_async(delayed_hello(delay, res)) + +async def json_message(res, req): + #req maybe will not be available in direct attached async functions + #but if you dont care about req info you can do it + await asyncio.sleep(2) #do something async + res.write_header("Content-Type", "application/json") + res.end({ "message": "I'm delayed!"}) + +app.get("/", home) +app.get("/json_message", json_message) + +app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port))) +app.run() \ No newline at end of file diff --git a/tests/original.py b/tests/original.py deleted file mode 100644 index dd57016..0000000 --- a/tests/original.py +++ /dev/null @@ -1,20 +0,0 @@ -import uws -import asyncio - -# Integrate with asyncio -# asyncio.set_event_loop(uws.Loop()) - -app = uws.App() - -def getHandler(res, req): - res.end("Hello Python!") - -app.get("/*", getHandler) - -def listenHandler(): - print("Listening to port 3000") - -app.listen(3000, listenHandler) -app.run() -# Run asyncio event loop -# asyncio.get_event_loop().run_forever() \ No newline at end of file diff --git a/tests/shutdown.py b/tests/shutdown.py new file mode 100644 index 0000000..bdd529d --- /dev/null +++ b/tests/shutdown.py @@ -0,0 +1,15 @@ +from socketify import App, AppOptions, AppListenOptions + +app = App() + +def shutdown(res, req): + res.end("Good bye!") + app.close() + +app.get("/", lambda res, req: res.end("Hello!")) +app.get("/shutdown", shutdown) + + +app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port))) +app.run() +print("App Closed!") \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py index fcea40a..c352949 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,11 +1,16 @@ -import threading -from datetime import datetime -import time + import os +import sys import asyncio -#import uvloop +import threading +import time from json import dumps as json +from datetime import datetime + from socketify import App, AppOptions, AppListenOptions + +#import uvloop + # from ujson import dumps as json # from zzzjson import stringify as json #is too slow with CFFI # from orjson import dumps @@ -50,19 +55,6 @@ def applicationjson(res, req): res.end(json({"message":"Hello, World!"})) -def async_route(handler): - return lambda res,req: asyncio.run(handler(res, req)) - -async def plaintext_async(res, req): - # await asyncio.sleep(1) - res.write_header("Date", current_http_date) - res.write_header("Server", "socketify") - res.write_header("Content-Type", "text/plain") - res.end("Hello, World!") - -async def test(): - await asyncio.sleep(1) - print("Hello!") def run_app(): timing = threading.Thread(target=time_thread, args=()) @@ -71,7 +63,6 @@ def run_app(): app.get("/", plaintext) # app.get("/json", applicationjson) # app.get("/plaintext", plaintext) - # app.get("/", async_route(plaintext_async)) app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port))) # app.listen(AppListenOptions(port=3000, host="0.0.0.0"), lambda config: print("Listening on port http://%s:%d now\n" % (config.host, config.port))) @@ -83,11 +74,11 @@ def run_app(): # app.run() # asyncio.get_event_loop().run_forever() -def create_fork(): - n = os.fork() - # n greater than 0 means parent process - if not n > 0: - run_app() +# def create_fork(): +# n = os.fork() +# # n greater than 0 means parent process +# if not n > 0: +# run_app() # for index in range(3): # create_fork() @@ -115,4 +106,7 @@ run_app() #pypy3 -m pip install cysimdjson (uses simdjson) is parse only #pypy3 -m pip install rapidjson (not working with pypy) #https://github.com/MagicStack/uvloop/issues/380 -#https://foss.heptapod.net/pypy/pypy/-/issues/3740 \ No newline at end of file +#https://foss.heptapod.net/pypy/pypy/-/issues/3740 + +# make shared -C ./src/socketify/uWebSockets/capi +#cp ./src/socketify/uWebSockets/capi/libuwebsockets.so ./src/socketify/libuwebsockets.so \ No newline at end of file