diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 074d577..1e785a5 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -1,5 +1,6 @@ import asyncio import logging +import threading from .tasks import create_task, TaskFactory from .uv import UVLoop @@ -45,6 +46,7 @@ class Loop: self.exception_handler = None self.started = False + self.uv_thread_started = None if is_pypy: # PyPy async Optimizations if task_factory_max_items > 0: # Only available in PyPy for now self._task_factory = TaskFactory(task_factory_max_items) @@ -76,6 +78,16 @@ class Loop: def create_future(self): return self.loop.create_future() + def start_uv_loop(self): + if not self.uv_thread_started: + t1 = threading.Thread(target=self.uv_loop.run, daemon=True) + t1.start() + self.uv_thread_started = True + + def stop_uv_loop(self): + self.uv_loop.stop() + self.uv_thread_started = False + def _keep_alive(self): if self.started: relax = False @@ -85,17 +97,19 @@ class Loop: self._idle_count += 1 else: relax = True - + self.is_idle = True - + if relax: - self.uv_loop.run_nowait() + #self.uv_loop.run() + #self.uv_loop.run_nowait() + #self.loop.call_later(10, self._keep_alive) self.loop.call_later(0.001, self._keep_alive) else: - self.uv_loop.run_nowait() + #self.uv_loop.run_nowait() # be more agressive when needed self.loop.call_soon(self._keep_alive) - + def create_task(self, *args, **kwargs): # this is not using optimized create_task yet return self.loop.create_task(*args, **kwargs) @@ -109,10 +123,21 @@ class Loop: future = self.ensure_future(task) else: future = None - self.loop.call_soon(self._keep_alive) + """ + this function used by on_start, on_stop handlers + seems shouldnt need the uvloop running yet, otherwise + could start here, the start_uv_loop function is idempotent + self.start_uv_loop() + + shouldnt need to wake the loop as run_until_complete ought to + take care of this. + """ + # self.loop.call_soon(self._keep_alive) + #self.wake_asyncio_loop() self.loop.run_until_complete(future) + #self.wake_asyncio_loop() # clean up uvloop - self.uv_loop.stop() + self.stop_uv_loop() return future def run(self, task=None): @@ -121,12 +146,24 @@ class Loop: future = self.ensure_future(task) else: future = None - self.loop.call_soon(self._keep_alive) - self.loop.run_forever() + + self.start_uv_loop() + + if not self.loop.is_running(): + self.loop.run_forever() + # clean up uvloop - self.uv_loop.stop() + self.stop_uv_loop() return future + def wake_asyncio_loop(self): + """ trigger event loop self._write_to_self to wake up, + useful after future.set_result so that the async runtime knows to + check for and process new events """ + # These two lines will both call self._write_to_self and wake the event loop + #asyncio.run_coroutine_threadsafe(asyncio.sleep(0), self._dataFuture.get_loop()) + self.loop.call_soon_threadsafe(lambda: 2) + def run_once(self): # run one step of asyncio # if loop._run_once is not available use loop.run_forever + loop.call_soon(loop.stop) @@ -209,3 +246,4 @@ class Loop: # asyncio.set_event_loop(uws.Loop()) # asyncio.get_event_loop().run_forever() + diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 6ddc68e..8038cab 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -1605,6 +1605,7 @@ class AppResponse: ) # if aborted set to done True and ok False except: pass + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result def on_writeble(self, offset): # Here the timeout is off, we can spend as much time before calling try_end we want to @@ -1613,6 +1614,7 @@ class AppResponse: ) if ok: self._chunkFuture.set_result((ok, done)) + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return ok self.on_writable(on_writeble) @@ -1622,6 +1624,7 @@ class AppResponse: self._chunkFuture.set_result( (False, True) ) # if aborted set to done True and ok False + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return self._chunkFuture self._lastChunkOffset = self.get_write_offset() @@ -1629,12 +1632,13 @@ class AppResponse: (ok, done) = self.try_end(buffer, total_size) if ok: self._chunkFuture.set_result((ok, done)) + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return self._chunkFuture # failed to send chunk return self._chunkFuture - def get_data(self): + async def get_data(self): self._dataFuture = self.app.loop.create_future() self._data = BytesIO() @@ -1645,6 +1649,7 @@ class AppResponse: self._dataFuture.set_result(self._data) except: pass + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result def get_chunks(self, chunk, is_end): if chunk is not None: @@ -1652,10 +1657,15 @@ class AppResponse: if is_end: self._dataFuture.set_result(self._data) self._data = None + """ + This wakes up the asyncio event loop so that get_data can act on the + completed _dataFuture + """ + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result self.on_aborted(is_aborted) self.on_data(get_chunks) - return self._dataFuture + return await self._dataFuture def grab_aborted_handler(self): # only needed if is async @@ -3464,4 +3474,3 @@ class App: self.loop = None except: pass -