fixed asyncio integration (kinda) and added gracefull shutdown on SIGINT

pull/39/head
Ciro 2022-06-02 16:00:42 -03:00
rodzic ce72919ebc
commit 9f4bb0de72
12 zmienionych plików z 355 dodań i 23 usunięć

3
.gitignore vendored
Wyświetl plik

@ -3,4 +3,5 @@ __pycache__
/build
/dist
/src/socketify/*.so
*.so
*.so
*.o

Wyświetl plik

@ -19,12 +19,21 @@ import subprocess
_ROOT = pathlib.Path(__file__).parent
UWS_CAPI_DIR = str(_ROOT / "build" / "uWebSockets" / "capi")
UWS_CAPI_DIR = str(_ROOT / "build" / "uWebSockets" / "capi")
UWS_LIB_PATH = str(_ROOT / "build" / "uWebSockets" / "capi" / "libuwebsockets.so")
UWS_DIR = str(_ROOT / "src" / "socketify" /"uWebSockets")
UWS_BUILD_DIR = str(_ROOT / "build" /"uWebSockets")
UWS_LIB_OUTPUT = str(_ROOT / "src" / "socketify" / "libuwebsockets.so")
NATIVE_CAPI_DIR = str(_ROOT / "build" / "native")
NATIVE_LIB_PATH = str(_ROOT / "build" / "native" / "libsocketify.so")
NATIVE_DIR = str(_ROOT / "src" / "socketify" /"native")
NATIVE_BUILD_DIR = str(_ROOT / "build" /"native")
NATIVE_LIB_OUTPUT = str(_ROOT / "src" / "socketify" / "native"/ "libsocketify.so")
class Prepare(sdist):
def run(self):
super().run()
@ -39,6 +48,15 @@ class Makefile(build_ext):
subprocess.run(["make", "shared"], cwd=UWS_CAPI_DIR, env=env, check=True)
shutil.move(UWS_LIB_PATH, UWS_LIB_OUTPUT)
if os.path.exists(NATIVE_CAPI_DIR):
shutil.rmtree(NATIVE_CAPI_DIR)
shutil.copytree(NATIVE_DIR, NATIVE_CAPI_DIR)
subprocess.run(["make"], cwd=NATIVE_CAPI_DIR, env=env, check=True)
shutil.move(NATIVE_LIB_PATH, NATIVE_LIB_OUTPUT)
super().run()
@ -66,7 +84,7 @@ setuptools.setup(
],
packages=["socketify"],
package_dir={"": "src"},
package_data={"": ['./*.so', './uWebSockets/*','./uWebSockets/*/*','./uWebSockets/*/*/*']},
package_data={"": ['./*.so', './uWebSockets/*','./uWebSockets/*/*','./uWebSockets/*/*/*', './native/*','./native/*/*','./native/*/*/*']},
python_requires=">=3.7",
install_requires=["cffi>=1.0.0"],
has_ext_modules=lambda: True,

Wyświetl plik

@ -3,10 +3,8 @@ import asyncio
import threading
import time
def loop_thread(loop, exception_handler):
if hasattr(exception_handler, '__call__'):
loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None))
loop.run_forever()
from .native import UVLoop
def future_handler(future, loop, exception_handler, response):
try:
@ -27,6 +25,7 @@ def future_handler(future, loop, exception_handler, response):
class Loop:
def __init__(self, exception_handler=None):
self.loop = asyncio.new_event_loop()
self.uv_loop = UVLoop()
if hasattr(exception_handler, '__call__'):
self.exception_handler = exception_handler
self.loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None))
@ -34,24 +33,46 @@ class Loop:
self.exception_handler = None
asyncio.set_event_loop(self.loop)
self.loop_thread = None
self.started = False
# self.loop_thread = None
def start(self):
self.loop_thread = threading.Thread(target=loop_thread, args=(self.loop,self.exception_handler), daemon=True)
self.loop_thread.start()
self.started = True
self.timer = self.uv_loop.create_timer(0, 100, lambda loop: loop.run_once_asyncio(), self)
def run(self):
self.uv_loop.run()
def run_once(self):
self.uv_loop.run_once()
def run_once_asyncio(self):
#run only one step
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
def stop(self):
#stop loop
self.loop.call_soon_threadsafe(self.loop.stop)
#wait loop thread to stops
self.loop_thread.join()
if(self.started):
self.timer.stop()
self.started = False
#unbind run_once
#if is still running stops
if self.loop.is_running():
self.loop.stop()
# Find all running tasks in main thread:
pending = asyncio.all_tasks(self.loop)
# Run loop until tasks done
self.loop.run_until_complete(asyncio.gather(*pending))
#Exposes native loop for uWS
def get_native_loop(self):
return self.uv_loop.get_native_loop()
def run_async(self, task, response=None):
future = asyncio.run_coroutine_threadsafe(task, self.loop)
#with run_once
future = asyncio.ensure_future(task, loop=self.loop)
#with threads
future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response))
return future

Wyświetl plik

@ -0,0 +1,5 @@
LIBRARY_NAME := libsocketify
default:
$(CC) -c -O3 -luv -flto -fPIC -I ./src ./src/$(LIBRARY_NAME).c
$(CC) -shared -o $(LIBRARY_NAME).so $(LIBRARY_NAME).o -luv

Wyświetl plik

@ -0,0 +1 @@
from .uv import UVLoop

Wyświetl plik

@ -0,0 +1,133 @@
#include "uv.h"
#include "libsocketify.h"
#include <stdlib.h>
#include <stdio.h>
void socketify_generic_prepare_callback(uv_prepare_t *prepare){
socketify_loop* loop = (socketify_loop*)uv_handle_get_data((uv_handle_t*)prepare);
loop->on_prepare_handler(loop->on_prepare_data);
}
void socketify_generic_timer_callback(uv_timer_t *timer){
socketify_timer* loop_data = (socketify_timer*)uv_handle_get_data((uv_handle_t*)timer);
loop_data->handler(loop_data->user_data);
}
void* socketify_get_native_loop(socketify_loop* loop){
return loop->uv_loop;
}
socketify_loop * socketify_create_loop(){
socketify_loop* loop = malloc(sizeof(uv_prepare_t));
loop->uv_loop = NULL;
loop->on_prepare_handler = NULL;
loop->uv_prepare_ptr = NULL;
uv_loop_t* uv_loop = malloc(sizeof(uv_loop_t));
if(uv_loop_init(uv_loop)){
free(uv_loop);
return loop;
}
loop->uv_loop = uv_loop;
return loop;
}
bool socketify_constructor_failed(socketify_loop* loop){
return loop->uv_loop == NULL;
}
bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data){
if (loop->uv_prepare_ptr != NULL) return false;
if(handler == NULL) return false;
uv_prepare_t* prepare = malloc(sizeof(uv_prepare_t));
if(uv_prepare_init(loop->uv_loop, prepare)){
free(prepare);
return false;
}
loop->on_prepare_handler = handler;
loop->on_prepare_data = user_data;
loop->uv_prepare_ptr = prepare;
uv_handle_set_data((uv_handle_t*)prepare, loop);
uv_prepare_start(prepare, socketify_generic_prepare_callback);
return true;
// uv_unref((uv_handle_t *) loop->uv_pre);
// loop->uv_pre->data = loop;
}
bool socketify_prepare_unbind(socketify_loop* loop){
if(loop->uv_prepare_ptr == NULL) return false;
uv_prepare_stop(loop->uv_prepare_ptr);
free(loop->uv_prepare_ptr);
loop->uv_prepare_ptr = NULL;
return true;
}
int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode){
return uv_run(loop->uv_loop, (uv_run_mode)mode);
}
void socketify_loop_stop(socketify_loop* loop){
if(uv_loop_alive(loop->uv_loop)){
uv_stop(loop->uv_loop);
}
}
void socketify_destroy_loop(socketify_loop* loop){
socketify_loop_stop(loop);
uv_loop_close(loop->uv_loop);
free(loop->uv_loop);
if(loop->uv_prepare_ptr){
free(loop->uv_prepare_ptr);
}
free(loop);
}
socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data){
uv_timer_t* uv_timer = malloc(sizeof(uv_timer_t));
// uv_timer_init(loop->uv_loop, uv_timer);
if(uv_timer_init(loop->uv_loop, uv_timer)){
free(uv_timer);
return NULL;
}
socketify_timer* timer = malloc(sizeof(socketify_timer));
timer->uv_timer_ptr = uv_timer;
timer->user_data = user_data;
timer->handler = handler;
uv_handle_set_data((uv_handle_t*)uv_timer, timer);
uv_timer_start(uv_timer, socketify_generic_timer_callback, timeout, repeat);
return timer;
}
//stops and destroy timer info
void socketify_timer_destroy(socketify_timer* timer){
uv_timer_stop(timer->uv_timer_ptr);
free(timer->uv_timer_ptr);
free(timer);
}
// int socketify_set_timeout(socketify_loop* loop, int64_t timeout, socketify_timer_handler handler, void* user_data){
// uv_timer_t* timer = malloc(sizeof(uv_timer_t));
// if(!uv_timer_init(loop->uv_loop, timer)){
// free(timer);
// return -1;
// }
// uv_handle_set_data((uv_handle_t*)timer, handler);
// uv_timer_start(timer, socketify_generic_timer_callback, timeout, 0);
// return 0;
// }
// int uv_timer_init(uv_loop_t *loop, uv_timer_t *handle)
// int uv_timer_start(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)

Wyświetl plik

@ -0,0 +1,41 @@
#ifndef SOCKETIFY_CAPI_HEADER
#define SOCKETIFY_CAPI_HEADER
#include "uv.h"
#include <stdbool.h>
typedef void (*socketify_prepare_handler)(void* user_data);
typedef void (*socketify_timer_handler)(void* user_data);
typedef enum {
SOCKETIFY_RUN_DEFAULT = 0,
SOCKETIFY_RUN_ONCE,
SOCKETIFY_RUN_NOWAIT
} socketify_run_mode;
typedef struct {
void* uv_prepare_ptr;
socketify_prepare_handler on_prepare_handler;
void* on_prepare_data;
void* uv_loop;
} socketify_loop;
typedef struct{
void* uv_timer_ptr;
socketify_timer_handler handler;
void* user_data;
} socketify_timer;
socketify_loop * socketify_create_loop();
bool socketify_constructor_failed(socketify_loop* loop);
bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data);
bool socketify_prepare_unbind(socketify_loop* loop);
void socketify_destroy_loop(socketify_loop* loop);
void* socketify_get_native_loop(socketify_loop* loop);
int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode);
void socketify_loop_stop(socketify_loop* loop);
socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data);
void socketify_timer_destroy(socketify_timer* timer);
#endif

Wyświetl plik

@ -0,0 +1,102 @@
import cffi
import os
ffi = cffi.FFI()
ffi.cdef("""
typedef void (*socketify_prepare_handler)(void* user_data);
typedef void (*socketify_timer_handler)(void* user_data);
typedef enum {
SOCKETIFY_RUN_DEFAULT = 0,
SOCKETIFY_RUN_ONCE,
SOCKETIFY_RUN_NOWAIT
} socketify_run_mode;
typedef struct {
void* uv_prepare_ptr;
socketify_prepare_handler on_prepare_handler;
void* on_prepare_data;
void* uv_loop;
} socketify_loop;
typedef struct{
void* uv_timer_ptr;
socketify_timer_handler handler;
void* user_data;
} socketify_timer;
socketify_loop * socketify_create_loop();
bool socketify_constructor_failed(socketify_loop* loop);
bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data);
bool socketify_prepare_unbind(socketify_loop* loop);
void socketify_destroy_loop(socketify_loop* loop);
void* socketify_get_native_loop(socketify_loop* loop);
int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode);
void socketify_loop_stop(socketify_loop* loop);
socketify_timer* socketify_create_timer(socketify_loop* loop, int64_t timeout, int64_t repeat, socketify_timer_handler handler, void* user_data);
void socketify_timer_destroy(socketify_timer* timer);
""")
library_path = os.path.join(os.path.dirname(__file__), "libsocketify.so")
lib = ffi.dlopen(library_path)
@ffi.callback("void(void *)")
def socketify_generic_handler(data):
if not data == ffi.NULL:
(handler, user_data) = ffi.from_handle(data)
handler(user_data)
class UVTimer:
def __init__(self, loop, timeout, repeat, handler, user_data):
self._handler_data = ffi.new_handle((handler, user_data))
self._ptr = lib.socketify_create_timer(loop, ffi.cast("int64_t", timeout), ffi.cast("int64_t", repeat), socketify_generic_handler, self._handler_data)
def stop(self):
lib.socketify_timer_destroy(self._ptr)
self._handler_data = None
self._ptr = ffi.NULL
def __del__(self):
if self._ptr != ffi.NULL:
lib.socketify_timer_destroy(self._ptr)
self.self._handler_data = None
class UVLoop:
def __init__(self, exception_handler=None):
self._loop = lib.socketify_create_loop()
if bool(lib.socketify_constructor_failed(self._loop)):
raise RuntimeError("Failed to create socketify uv loop")
def on_prepare(self, handler, user_data):
self._handler_data = ffi.new_handle((handler, user_data))
lib.socketify_on_prepare(self._loop, socketify_generic_handler, self._handler_data)
def create_timer(self, timeout, repeat, handler, user_data):
return UVTimer(self._loop, timeout, repeat, handler, user_data)
def prepare_unbind(self):
lib.socketify_prepare_unbind(self._loop)
def get_native_loop(self):
return lib.socketify_get_native_loop(self._loop)
def __del__(self):
lib.socketify_destroy_loop(self._loop)
self.self._handler_data = None
def run(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT)
def run_once(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE)
def stop(self):
lib.socketify_loop_stop(self._loop)

Wyświetl plik

@ -4,7 +4,7 @@ from .loop import Loop
from .status_codes import status_codes
import json
import inspect
import threading
import signal
ffi = cffi.FFI()
ffi.cdef("""
@ -59,7 +59,7 @@ struct us_listen_socket_t {
void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls);
int us_socket_local_port(int ssl, struct us_listen_socket_t *ls);
struct us_loop_t *uws_get_loop();
struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop);
typedef enum
{
_COMPRESSOR_MASK = 0x00FF,
@ -466,12 +466,19 @@ class App:
else:
self.is_ssl = False
self.SSL = ffi.cast("int", 0)
self.loop = Loop(lambda loop, context, response: self.trigger_error(context, response, None))
#set async loop to be the last created (is thread_local), App must be one per thread otherwise will use only the lasted loop
#needs to be called before uws_create_app or otherwise will create another loop and will not receive the right one
lib.uws_get_loop_with_native(self.loop.get_native_loop())
self.app = lib.uws_create_app(self.SSL, socket_options)
self._ptr = ffi.new_handle(self)
if bool(lib.uws_constructor_failed(self.SSL, self.app)):
raise RuntimeError("Failed to create connection")
self.handlers = []
self.loop = Loop(lambda loop, context, response: self.trigger_error(context, response, None))
self.error_handler = None
def get(self, path, handler):
@ -547,15 +554,17 @@ class App:
return self.loop.run_async(task, response)
def run(self):
signal.signal(signal.SIGINT, lambda sig, frame: self.close())
self.loop.start()
lib.uws_app_run(self.SSL, self.app)
self.loop.stop()
self.loop.run()
# lib.uws_app_run(self.SSL, self.app)
return self
def close(self):
if hasattr(self, "socket"):
if not self.socket == ffi.NULL:
lib.us_listen_socket_close(self.SSL, self.socket)
self.loop.stop()
return self
def set_error_handler(self, handler):

@ -1 +1 @@
Subproject commit c168734e80daa0c91123ed44172f193b1ba8e365
Subproject commit 91129232631c108a975278cfd7892d4351ccf71d

Wyświetl plik

@ -30,4 +30,5 @@ app.get("/json", json)
app.any("/*", not_found)
app.listen(3000, lambda config: print("Listening on port http://localhost:%s now\n" % str(config.port)))
app.run()

Wyświetl plik

@ -4,7 +4,7 @@ import multiprocessing
def run_app():
app = App()
app.get("/", lambda res, req: res.end("Hello World socketify from Python!"))
app.get("/", lambda res, req: res.end("Hello, World!"))
app.listen(3000, lambda config: print("PID %d Listening on port http://localhost:%d now\n" % (os.getpid(), config.port)))
app.run()