diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 2eecc45..e0dd1bd 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -2,6 +2,7 @@ import asyncio import threading import time +from queue import Queue from .native import UVLoop @@ -26,6 +27,8 @@ class Loop: def __init__(self, exception_handler=None): self.loop = asyncio.new_event_loop() self.uv_loop = UVLoop() + self.queue = Queue() + if hasattr(exception_handler, '__call__'): self.exception_handler = exception_handler self.loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None)) @@ -39,13 +42,25 @@ class Loop: def set_timeout(self, timeout, callback, user_data): return self.uv_loop.create_timer(timeout, 0, callback, user_data) + def enqueue(self, callback, user_data): + self.queue.put((callback, user_data)) + def create_future(self): return self.loop.create_future() def start(self): self.started = True - #start relaxed until first task - self.timer = self.uv_loop.create_timer(0, 1, lambda loop: loop.run_once_asyncio(), self) + #run asyncio once per tick + def tick(loop): + #only call one item of the queue per tick + if not loop.queue.empty(): + (callback, user_data) = loop.queue.get(False) + callback(user_data) + loop.queue.task_done() + #run once asyncio + loop.run_once_asyncio() + #use check for calling asyncio once per tick + self.timer = self.uv_loop.create_check(tick, self) def run(self): self.uv_loop.run() diff --git a/src/socketify/native/src/libsocketify.c b/src/socketify/native/src/libsocketify.c index 578612f..1238612 100644 --- a/src/socketify/native/src/libsocketify.c +++ b/src/socketify/native/src/libsocketify.c @@ -15,6 +15,11 @@ void socketify_generic_timer_callback(uv_timer_t *timer){ loop_data->handler(loop_data->user_data); } +void socketify_generic_check_callback(uv_check_t *timer){ + socketify_timer* loop_data = (socketify_timer*)uv_handle_get_data((uv_handle_t*)timer); + loop_data->handler(loop_data->user_data); +} + void* socketify_get_native_loop(socketify_loop* loop){ return loop->uv_loop; @@ -104,7 +109,7 @@ socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, timer->handler = handler; uv_handle_set_data((uv_handle_t*)uv_timer, timer); - uv_timer_start(uv_timer, socketify_generic_timer_callback, timeout, repeat); + uv_timer_start(uv_timer, socketify_generic_timer_callback, timeout, repeat); return timer; } @@ -114,10 +119,37 @@ void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat){ } - //stops and destroy timer info void socketify_timer_destroy(socketify_timer* timer){ uv_timer_stop(timer->uv_timer_ptr); free(timer->uv_timer_ptr); free(timer); +} + + + +socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data){ + + uv_check_t* uv_timer = malloc(sizeof(uv_check_t)); + if(uv_check_init(loop->uv_loop, uv_timer)){ + free(uv_timer); + return NULL; + } + + socketify_timer* timer = malloc(sizeof(socketify_timer)); + timer->uv_timer_ptr = uv_timer; + timer->user_data = user_data; + timer->handler = handler; + + uv_handle_set_data((uv_handle_t*)uv_timer, timer); + uv_check_start(uv_timer, socketify_generic_check_callback); + + return timer; +} + +//stops and destroy timer info +void socketify_check_destroy(socketify_timer* timer){ + uv_check_stop(timer->uv_timer_ptr); + free(timer->uv_timer_ptr); + free(timer); } \ No newline at end of file diff --git a/src/socketify/native/src/libsocketify.h b/src/socketify/native/src/libsocketify.h index 45679f1..38ccd15 100644 --- a/src/socketify/native/src/libsocketify.h +++ b/src/socketify/native/src/libsocketify.h @@ -42,4 +42,6 @@ socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, void socketify_timer_destroy(socketify_timer* timer); void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat); +socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data); +void socketify_check_destroy(socketify_timer* timer); #endif \ No newline at end of file diff --git a/src/socketify/native/uv.py b/src/socketify/native/uv.py index 45b2ac6..7b8f29c 100644 --- a/src/socketify/native/uv.py +++ b/src/socketify/native/uv.py @@ -50,6 +50,9 @@ void socketify_timer_destroy(socketify_timer* timer); bool socketify_async_call(socketify_loop* loop, socketify_async_handler handler, void* user_data); void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat); + +socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data); +void socketify_check_destroy(socketify_timer* timer); """) library_path = os.path.join(os.path.dirname(__file__), "libsocketify.so") @@ -60,12 +63,26 @@ def socketify_generic_handler(data): if not data == ffi.NULL: (handler, user_data) = ffi.from_handle(data) handler(user_data) + + +class UVCheck: + def __init__(self, loop, handler, user_data): + self._handler_data = ffi.new_handle((handler, user_data)) + self._ptr = lib.socketify_create_check(loop, socketify_generic_handler, self._handler_data) + def stop(self): + lib.socketify_check_destroy(self._ptr) + self._handler_data = None + self._ptr = ffi.NULL + + def __del__(self): + if self._ptr != ffi.NULL: + lib.socketify_check_destroy(self._ptr) + self._handler_data = None class UVTimer: def __init__(self, loop, timeout, repeat, handler, user_data): self._handler_data = ffi.new_handle((handler, user_data)) self._ptr = lib.socketify_create_timer(loop, ffi.cast("uint64_t", timeout), ffi.cast("uint64_t", repeat), socketify_generic_handler, self._handler_data) - def stop(self): lib.socketify_timer_destroy(self._ptr) self._handler_data = None @@ -93,6 +110,9 @@ class UVLoop: def create_timer(self, timeout, repeat, handler, user_data): return UVTimer(self._loop, timeout, repeat, handler, user_data) + def create_check(self, handler, user_data): + return UVCheck(self._loop, handler, user_data) + def prepare_unbind(self): lib.socketify_prepare_unbind(self._loop) diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index a1a2f7f..96287e6 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -418,11 +418,12 @@ class AppResponse: def cork(self, callback): if not self.aborted: self._cork_handler = callback - if is_python: - lib.uws_res_cork(self.SSL, self.res, uws_generic_cork_handler, self._ptr) - else: #just add to uv loop in next tick to garantee corking works properly in pypy3 + if is_python: #call is enqueued to garantee corking works properly in python3 + self.loop.enqueue(lambda instance: lib.uws_res_cork(instance.SSL, instance.res, uws_generic_cork_handler, instance._ptr), self) + else: #just add to uvloop in next tick to garantee corking works properly in pypy3 self.loop.set_timeout(0, lambda instance: lib.uws_res_cork(instance.SSL, instance.res, uws_generic_cork_handler, instance._ptr), self) + def set_cookie(self, name, value, options={}): if self._write_jar == None: self._write_jar = cookies.SimpleCookie() diff --git a/src/tests.py b/src/tests.py index f462b97..36b7570 100644 --- a/src/tests.py +++ b/src/tests.py @@ -28,7 +28,7 @@ import asyncio async def home(res, req): # res.write_header("Content-Type", "plain/text") - # await asyncio.sleep(0) + await asyncio.sleep(0) def corked(res): res.write("Test ")