diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 5911dcf..3e67392 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -3,14 +3,41 @@ import asyncio import threading import time +def loop_thread(loop, exception_handler): + if hasattr(exception_handler, '__call__'): + loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None)) + loop.run_forever() + +def future_handler(future, loop, exception_handler, response): + try: + future.result() + return None + except Exception as error: + if hasattr(exception_handler, '__call__'): + exception_handler(loop, error, response) + else: + try: + #just log in console the error to call attention + print("Uncaught Exception: %s" % str(error)) + if response != None: + response.write_status(500).end("Internal Error") + finally: + return + class Loop: - def __init__(self): + def __init__(self, exception_handler=None): self.loop = asyncio.new_event_loop() + if hasattr(exception_handler, '__call__'): + self.exception_handler = exception_handler + self.loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None)) + else: + self.exception_handler = None + asyncio.set_event_loop(self.loop) self.loop_thread = None def start(self): - self.loop_thread = threading.Thread(target=lambda loop: loop.run_forever(), args=(self.loop,), daemon=True) + self.loop_thread = threading.Thread(target=loop_thread, args=(self.loop,self.exception_handler), daemon=True) self.loop_thread.start() def stop(self): @@ -23,6 +50,7 @@ class Loop: # Run loop until tasks done self.loop.run_until_complete(asyncio.gather(*pending)) - def run_async(self, task): - asyncio.run_coroutine_threadsafe(task, self.loop) - return True + def run_async(self, task, response=None): + future = asyncio.run_coroutine_threadsafe(task, self.loop) + future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response)) + return future diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index c5918b7..ca8e3f9 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -215,11 +215,15 @@ def uws_generic_method_handler(res, req, user_data): (handler, app) = ffi.from_handle(user_data) response = AppResponse(res, app.loop, False) request = AppRequest(req) - if inspect.iscoroutinefunction(handler): + try: + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + app.run_async(handler(response, request), response) + else: + handler(response, request) + except Exception as err: response.grab_aborted_handler() - app.run_async(handler(response, request)) - else: - handler(response, request) + app.trigger_error(err, response, request) @ffi.callback("void(uws_res_t *, uws_req_t *, void *)") def uws_generic_ssl_method_handler(res, req, user_data): @@ -227,21 +231,24 @@ def uws_generic_ssl_method_handler(res, req, user_data): (handler, app) = ffi.from_handle(user_data) response = AppResponse(res, app.loop, True) request = AppRequest(req) - task = handler(response, request) - if inspect.iscoroutinefunction(handler): + try: + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + app.run_async(handler(response, request), response) + else: + handler(response, request) + except Exception as err: response.grab_aborted_handler() - app.run_async(handler(response, request)) - else: - handler(response, request) + app.trigger_error(err, response, request) @ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)") def uws_generic_listen_handler(listen_socket, config, user_data): + if listen_socket == ffi.NULL: + raise RuntimeError("Failed to listen on port %d" % int(config.port)) if not user_data == ffi.NULL: app = ffi.from_handle(user_data) if hasattr(app, "_listen_handler") and hasattr(app._listen_handler, '__call__'): app.socket = listen_socket - if listen_socket == ffi.NULL: - raise RuntimeError("Failed to listen on port %d" % int(config.port)) app._listen_handler(None if config == ffi.NULL else AppListenOptions(port=int(config.port),host=None if config.host == ffi.NULL else ffi.string(config.host).decode("utf-8"), options=int(config.options))) @ffi.callback("void(uws_res_t *, void*)") @@ -308,7 +315,7 @@ class AppResponse: def run_async(self, task): self.grab_aborted_handler() - return self.loop.run_async(task) + return self.loop.run_async(task, self) def grab_aborted_handler(self): #only needed if is async @@ -443,13 +450,13 @@ class App: else: self.is_ssl = False self.SSL = ffi.cast("int", 0) - self.app = lib.uws_create_app(self.SSL, socket_options) self._ptr = ffi.new_handle(self) if bool(lib.uws_constructor_failed(self.SSL, self.app)): raise RuntimeError("Failed to create connection") self.handlers = [] - self.loop = Loop() + self.loop = Loop(lambda loop, context, response: self.trigger_error(context, response, None)) + self.error_handler = None def get(self, path, handler): user_data = ffi.new_handle((handler, self)) @@ -520,8 +527,8 @@ class App: def get_loop(): return self.loop.loop - def run_async(self, task): - return self.loop.run_async(task) + def run_async(self, task, response=None): + return self.loop.run_async(task, response) def run(self): self.loop.start() @@ -535,6 +542,34 @@ class App: lib.us_listen_socket_close(self.SSL, self.socket) return self + def set_error_handler(self, handler): + if hasattr(handler, '__call__'): + self.error_handler = handler + else: + self.error_handler = None + + def trigger_error(self, error, response, request): + if self.error_handler == None: + try: + print("Uncaught Exception: %s" % str(error)) #just log in console the error to call attention + response.write_status(500).end("Internal Error") + finally: + return + else: + try: + if inspect.iscoroutinefunction(self.error_handler ): + self.run_async(self.error_handler(error, response, request), response) + else: + self.error_handler(error, response, request) + except Exception as error: + try: + #Error handler got an error :D + print("Uncaught Exception: %s" % str(error)) #just log in console the error to call attention + response.write_status(500).end("Internal Error") + finally: + pass + + def __del__(self): lib.uws_app_destroy(self.SSL, self.app) diff --git a/tests/examples/error_handler.py b/tests/examples/error_handler.py new file mode 100644 index 0000000..db82345 --- /dev/null +++ b/tests/examples/error_handler.py @@ -0,0 +1,28 @@ +from socketify import App, AppOptions, AppListenOptions +import asyncio + +app = App() + +def xablau(res, req): + raise RuntimeError("Xablau!") + +async def async_xablau(res, req): + await asyncio.sleep(1) + raise RuntimeError("Async Xablau!") + +#this can be async no problems +def on_error(error, res, req): + #here you can log properly the error and doa pretty response to your clients + print("Somethind goes %s" % str(error)) + #response and request can be None if the error is in an async function + if res != None: + res.write_status(500) + res.end("Sorry we did something wrong") + +app.get("/", xablau) +app.get("/async", async_xablau) + +app.set_error_handler(on_error) + +app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port))) +app.run() \ No newline at end of file diff --git a/tests/examples/hello_world.py b/tests/examples/hello_world.py new file mode 100644 index 0000000..80d037c --- /dev/null +++ b/tests/examples/hello_world.py @@ -0,0 +1,6 @@ +from socketify import App + +app = App() +app.get("/", lambda res, req: res.end("Hello World socketify from Python!")) +app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port)) +app.run() \ No newline at end of file diff --git a/tests/examples/not_found.py b/tests/examples/not_found.py index ef51983..967987c 100644 --- a/tests/examples/not_found.py +++ b/tests/examples/not_found.py @@ -2,7 +2,7 @@ from socketify import App, AppOptions, AppListenOptions app = App() -def home(res, req): +async def home(res, req): res.end("Hello, World!") def user(res, req):