diff --git a/.gitignore b/.gitignore index a936517..d733439 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ __pycache__ /build /dist /src/socketify/*.so -*.so \ No newline at end of file +*.so +*.o \ No newline at end of file diff --git a/setup.py b/setup.py index 99e4845..5c9ee4a 100644 --- a/setup.py +++ b/setup.py @@ -19,12 +19,21 @@ import subprocess _ROOT = pathlib.Path(__file__).parent UWS_CAPI_DIR = str(_ROOT / "build" / "uWebSockets" / "capi") -UWS_CAPI_DIR = str(_ROOT / "build" / "uWebSockets" / "capi") UWS_LIB_PATH = str(_ROOT / "build" / "uWebSockets" / "capi" / "libuwebsockets.so") UWS_DIR = str(_ROOT / "src" / "socketify" /"uWebSockets") UWS_BUILD_DIR = str(_ROOT / "build" /"uWebSockets") UWS_LIB_OUTPUT = str(_ROOT / "src" / "socketify" / "libuwebsockets.so") + +NATIVE_CAPI_DIR = str(_ROOT / "build" / "native") +NATIVE_LIB_PATH = str(_ROOT / "build" / "native" / "libsocketify.so") +NATIVE_DIR = str(_ROOT / "src" / "socketify" /"native") +NATIVE_BUILD_DIR = str(_ROOT / "build" /"native") +NATIVE_LIB_OUTPUT = str(_ROOT / "src" / "socketify" / "native"/ "libsocketify.so") + + + + class Prepare(sdist): def run(self): super().run() @@ -39,6 +48,15 @@ class Makefile(build_ext): subprocess.run(["make", "shared"], cwd=UWS_CAPI_DIR, env=env, check=True) shutil.move(UWS_LIB_PATH, UWS_LIB_OUTPUT) + + + if os.path.exists(NATIVE_CAPI_DIR): + shutil.rmtree(NATIVE_CAPI_DIR) + shutil.copytree(NATIVE_DIR, NATIVE_CAPI_DIR) + + subprocess.run(["make"], cwd=NATIVE_CAPI_DIR, env=env, check=True) + shutil.move(NATIVE_LIB_PATH, NATIVE_LIB_OUTPUT) + super().run() @@ -66,7 +84,7 @@ setuptools.setup( ], packages=["socketify"], package_dir={"": "src"}, - package_data={"": ['./*.so', './uWebSockets/*','./uWebSockets/*/*','./uWebSockets/*/*/*']}, + package_data={"": ['./*.so', './uWebSockets/*','./uWebSockets/*/*','./uWebSockets/*/*/*', './native/*','./native/*/*','./native/*/*/*']}, python_requires=">=3.7", install_requires=["cffi>=1.0.0"], has_ext_modules=lambda: True, diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 0e1ff6f..883a25a 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -3,10 +3,8 @@ import asyncio import threading import time -def loop_thread(loop, exception_handler): - if hasattr(exception_handler, '__call__'): - loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None)) - loop.run_forever() +from .native import UVLoop + def future_handler(future, loop, exception_handler, response): try: @@ -27,6 +25,7 @@ def future_handler(future, loop, exception_handler, response): class Loop: def __init__(self, exception_handler=None): self.loop = asyncio.new_event_loop() + self.uv_loop = UVLoop() if hasattr(exception_handler, '__call__'): self.exception_handler = exception_handler self.loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None)) @@ -34,24 +33,46 @@ class Loop: self.exception_handler = None asyncio.set_event_loop(self.loop) - self.loop_thread = None + self.started = False + # self.loop_thread = None def start(self): - self.loop_thread = threading.Thread(target=loop_thread, args=(self.loop,self.exception_handler), daemon=True) - self.loop_thread.start() - + self.started = True + self.timer = self.uv_loop.create_timer(0, 100, lambda loop: loop.run_once_asyncio(), self) + + def run(self): + self.uv_loop.run() + + def run_once(self): + self.uv_loop.run_once() + + def run_once_asyncio(self): + #run only one step + self.loop.call_soon(self.loop.stop) + self.loop.run_forever() def stop(self): - #stop loop - self.loop.call_soon_threadsafe(self.loop.stop) - #wait loop thread to stops - self.loop_thread.join() + if(self.started): + self.timer.stop() + self.started = False + #unbind run_once + #if is still running stops + if self.loop.is_running(): + self.loop.stop() + # Find all running tasks in main thread: pending = asyncio.all_tasks(self.loop) # Run loop until tasks done self.loop.run_until_complete(asyncio.gather(*pending)) + + #Exposes native loop for uWS + def get_native_loop(self): + return self.uv_loop.get_native_loop() def run_async(self, task, response=None): - future = asyncio.run_coroutine_threadsafe(task, self.loop) + #with run_once + future = asyncio.ensure_future(task, loop=self.loop) + + #with threads future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response)) return future diff --git a/src/socketify/native/Makefile b/src/socketify/native/Makefile new file mode 100644 index 0000000..0fdd6b1 --- /dev/null +++ b/src/socketify/native/Makefile @@ -0,0 +1,5 @@ +LIBRARY_NAME := libsocketify + +default: + $(CC) -c -O3 -luv -flto -fPIC -I ./src ./src/$(LIBRARY_NAME).c + $(CC) -shared -o $(LIBRARY_NAME).so $(LIBRARY_NAME).o -luv \ No newline at end of file diff --git a/src/socketify/native/__init__.py b/src/socketify/native/__init__.py new file mode 100644 index 0000000..4001ef1 --- /dev/null +++ b/src/socketify/native/__init__.py @@ -0,0 +1 @@ +from .uv import UVLoop \ No newline at end of file diff --git a/src/socketify/native/src/libsocketify.c b/src/socketify/native/src/libsocketify.c new file mode 100644 index 0000000..24c0426 --- /dev/null +++ b/src/socketify/native/src/libsocketify.c @@ -0,0 +1,133 @@ +#include "uv.h" +#include "libsocketify.h" + +#include +#include + + +void socketify_generic_prepare_callback(uv_prepare_t *prepare){ + socketify_loop* loop = (socketify_loop*)uv_handle_get_data((uv_handle_t*)prepare); + loop->on_prepare_handler(loop->on_prepare_data); +} + +void socketify_generic_timer_callback(uv_timer_t *timer){ + socketify_timer* loop_data = (socketify_timer*)uv_handle_get_data((uv_handle_t*)timer); + loop_data->handler(loop_data->user_data); +} + + +void* socketify_get_native_loop(socketify_loop* loop){ + return loop->uv_loop; +} + +socketify_loop * socketify_create_loop(){ + socketify_loop* loop = malloc(sizeof(uv_prepare_t)); + loop->uv_loop = NULL; + loop->on_prepare_handler = NULL; + loop->uv_prepare_ptr = NULL; + + uv_loop_t* uv_loop = malloc(sizeof(uv_loop_t)); + if(uv_loop_init(uv_loop)){ + free(uv_loop); + return loop; + } + loop->uv_loop = uv_loop; + return loop; +} + +bool socketify_constructor_failed(socketify_loop* loop){ + return loop->uv_loop == NULL; +} + +bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data){ + if (loop->uv_prepare_ptr != NULL) return false; + if(handler == NULL) return false; + uv_prepare_t* prepare = malloc(sizeof(uv_prepare_t)); + if(uv_prepare_init(loop->uv_loop, prepare)){ + free(prepare); + return false; + } + + loop->on_prepare_handler = handler; + loop->on_prepare_data = user_data; + loop->uv_prepare_ptr = prepare; + uv_handle_set_data((uv_handle_t*)prepare, loop); + uv_prepare_start(prepare, socketify_generic_prepare_callback); + + return true; + // uv_unref((uv_handle_t *) loop->uv_pre); + // loop->uv_pre->data = loop; +} + +bool socketify_prepare_unbind(socketify_loop* loop){ + if(loop->uv_prepare_ptr == NULL) return false; + uv_prepare_stop(loop->uv_prepare_ptr); + + free(loop->uv_prepare_ptr); + loop->uv_prepare_ptr = NULL; + return true; +} + +int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode){ + return uv_run(loop->uv_loop, (uv_run_mode)mode); +} + +void socketify_loop_stop(socketify_loop* loop){ + if(uv_loop_alive(loop->uv_loop)){ + uv_stop(loop->uv_loop); + } +} + +void socketify_destroy_loop(socketify_loop* loop){ + socketify_loop_stop(loop); + + uv_loop_close(loop->uv_loop); + free(loop->uv_loop); + if(loop->uv_prepare_ptr){ + free(loop->uv_prepare_ptr); + } + free(loop); +} + +socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data){ + + uv_timer_t* uv_timer = malloc(sizeof(uv_timer_t)); + // uv_timer_init(loop->uv_loop, uv_timer); + if(uv_timer_init(loop->uv_loop, uv_timer)){ + free(uv_timer); + return NULL; + } + + socketify_timer* timer = malloc(sizeof(socketify_timer)); + timer->uv_timer_ptr = uv_timer; + timer->user_data = user_data; + timer->handler = handler; + + uv_handle_set_data((uv_handle_t*)uv_timer, timer); + uv_timer_start(uv_timer, socketify_generic_timer_callback, timeout, repeat); + + return timer; +} + +//stops and destroy timer info +void socketify_timer_destroy(socketify_timer* timer){ + uv_timer_stop(timer->uv_timer_ptr); + free(timer->uv_timer_ptr); + free(timer); +} + +// int socketify_set_timeout(socketify_loop* loop, int64_t timeout, socketify_timer_handler handler, void* user_data){ + +// uv_timer_t* timer = malloc(sizeof(uv_timer_t)); +// if(!uv_timer_init(loop->uv_loop, timer)){ +// free(timer); +// return -1; +// } +// uv_handle_set_data((uv_handle_t*)timer, handler); +// uv_timer_start(timer, socketify_generic_timer_callback, timeout, 0); +// return 0; +// } + +// int uv_timer_init(uv_loop_t *loop, uv_timer_t *handle) + +// int uv_timer_start(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat) diff --git a/src/socketify/native/src/libsocketify.h b/src/socketify/native/src/libsocketify.h new file mode 100644 index 0000000..fb0e06d --- /dev/null +++ b/src/socketify/native/src/libsocketify.h @@ -0,0 +1,41 @@ +#ifndef SOCKETIFY_CAPI_HEADER +#define SOCKETIFY_CAPI_HEADER +#include "uv.h" +#include + + +typedef void (*socketify_prepare_handler)(void* user_data); +typedef void (*socketify_timer_handler)(void* user_data); + +typedef enum { + SOCKETIFY_RUN_DEFAULT = 0, + SOCKETIFY_RUN_ONCE, + SOCKETIFY_RUN_NOWAIT +} socketify_run_mode; + +typedef struct { + void* uv_prepare_ptr; + socketify_prepare_handler on_prepare_handler; + void* on_prepare_data; + void* uv_loop; +} socketify_loop; + +typedef struct{ + void* uv_timer_ptr; + socketify_timer_handler handler; + void* user_data; +} socketify_timer; + +socketify_loop * socketify_create_loop(); +bool socketify_constructor_failed(socketify_loop* loop); +bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data); +bool socketify_prepare_unbind(socketify_loop* loop); +void socketify_destroy_loop(socketify_loop* loop); +void* socketify_get_native_loop(socketify_loop* loop); + +int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode); +void socketify_loop_stop(socketify_loop* loop); + +socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data); +void socketify_timer_destroy(socketify_timer* timer); +#endif \ No newline at end of file diff --git a/src/socketify/native/uv.py b/src/socketify/native/uv.py new file mode 100644 index 0000000..1085c83 --- /dev/null +++ b/src/socketify/native/uv.py @@ -0,0 +1,102 @@ + +import cffi +import os + +ffi = cffi.FFI() +ffi.cdef(""" + + +typedef void (*socketify_prepare_handler)(void* user_data); +typedef void (*socketify_timer_handler)(void* user_data); + +typedef enum { + SOCKETIFY_RUN_DEFAULT = 0, + SOCKETIFY_RUN_ONCE, + SOCKETIFY_RUN_NOWAIT +} socketify_run_mode; + +typedef struct { + void* uv_prepare_ptr; + socketify_prepare_handler on_prepare_handler; + void* on_prepare_data; + void* uv_loop; +} socketify_loop; + +typedef struct{ + void* uv_timer_ptr; + socketify_timer_handler handler; + void* user_data; +} socketify_timer; + +socketify_loop * socketify_create_loop(); +bool socketify_constructor_failed(socketify_loop* loop); +bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data); +bool socketify_prepare_unbind(socketify_loop* loop); +void socketify_destroy_loop(socketify_loop* loop); +void* socketify_get_native_loop(socketify_loop* loop); + +int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode); +void socketify_loop_stop(socketify_loop* loop); + +socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data); +void socketify_timer_destroy(socketify_timer* timer); + + +""") +library_path = os.path.join(os.path.dirname(__file__), "libsocketify.so") + +lib = ffi.dlopen(library_path) + +@ffi.callback("void(void *)") +def socketify_generic_handler(data): + if not data == ffi.NULL: + (handler, user_data) = ffi.from_handle(data) + handler(user_data) + +class UVTimer: + def __init__(self, loop, timeout, repeat, handler, user_data): + self._handler_data = ffi.new_handle((handler, user_data)) + self._ptr = lib.socketify_create_timer(loop, ffi.cast("int64_t", timeout), ffi.cast("int64_t", repeat), socketify_generic_handler, self._handler_data) + + def stop(self): + lib.socketify_timer_destroy(self._ptr) + self._handler_data = None + self._ptr = ffi.NULL + + def __del__(self): + if self._ptr != ffi.NULL: + lib.socketify_timer_destroy(self._ptr) + self.self._handler_data = None + + +class UVLoop: + def __init__(self, exception_handler=None): + self._loop = lib.socketify_create_loop() + if bool(lib.socketify_constructor_failed(self._loop)): + raise RuntimeError("Failed to create socketify uv loop") + + def on_prepare(self, handler, user_data): + self._handler_data = ffi.new_handle((handler, user_data)) + lib.socketify_on_prepare(self._loop, socketify_generic_handler, self._handler_data) + + def create_timer(self, timeout, repeat, handler, user_data): + return UVTimer(self._loop, timeout, repeat, handler, user_data) + + def prepare_unbind(self): + lib.socketify_prepare_unbind(self._loop) + + def get_native_loop(self): + return lib.socketify_get_native_loop(self._loop) + + def __del__(self): + lib.socketify_destroy_loop(self._loop) + self.self._handler_data = None + + def run(self): + return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT) + + def run_once(self): + return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE) + + def stop(self): + lib.socketify_loop_stop(self._loop) \ No newline at end of file diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 9606c13..e5904a7 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -4,7 +4,7 @@ from .loop import Loop from .status_codes import status_codes import json import inspect -import threading +import signal ffi = cffi.FFI() ffi.cdef(""" @@ -59,7 +59,7 @@ struct us_listen_socket_t { void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls); int us_socket_local_port(int ssl, struct us_listen_socket_t *ls); struct us_loop_t *uws_get_loop(); - +struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop); typedef enum { _COMPRESSOR_MASK = 0x00FF, @@ -466,12 +466,19 @@ class App: else: self.is_ssl = False self.SSL = ffi.cast("int", 0) + + self.loop = Loop(lambda loop, context, response: self.trigger_error(context, response, None)) + + #set async loop to be the last created (is thread_local), App must be one per thread otherwise will use only the lasted loop + #needs to be called before uws_create_app or otherwise will create another loop and will not receive the right one + lib.uws_get_loop_with_native(self.loop.get_native_loop()) self.app = lib.uws_create_app(self.SSL, socket_options) self._ptr = ffi.new_handle(self) if bool(lib.uws_constructor_failed(self.SSL, self.app)): raise RuntimeError("Failed to create connection") + + self.handlers = [] - self.loop = Loop(lambda loop, context, response: self.trigger_error(context, response, None)) self.error_handler = None def get(self, path, handler): @@ -547,15 +554,17 @@ class App: return self.loop.run_async(task, response) def run(self): + signal.signal(signal.SIGINT, lambda sig, frame: self.close()) self.loop.start() - lib.uws_app_run(self.SSL, self.app) - self.loop.stop() + self.loop.run() + # lib.uws_app_run(self.SSL, self.app) return self def close(self): if hasattr(self, "socket"): if not self.socket == ffi.NULL: lib.us_listen_socket_close(self.SSL, self.socket) + self.loop.stop() return self def set_error_handler(self, handler): diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index c168734..9112923 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit c168734e80daa0c91123ed44172f193b1ba8e365 +Subproject commit 91129232631c108a975278cfd7892d4351ccf71d diff --git a/tests/examples/async.py b/tests/examples/async.py index 5cba560..60fc5c7 100644 --- a/tests/examples/async.py +++ b/tests/examples/async.py @@ -30,4 +30,5 @@ app.get("/json", json) app.any("/*", not_found) app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port))) + app.run() \ No newline at end of file diff --git a/tests/examples/forks.py b/tests/examples/forks.py index 752850e..3007a3e 100644 --- a/tests/examples/forks.py +++ b/tests/examples/forks.py @@ -4,7 +4,7 @@ import multiprocessing def run_app(): app = App() - app.get("/", lambda res, req: res.end("Hello World socketify from Python!")) + app.get("/", lambda res, req: res.end("Hello, World!")) app.listen(3000, lambda config: print("PID %d Listening on port http://localhost:%d now\n" % (os.getpid(), config.port))) app.run()