From e3da9cfa5f3285ad25fca3453aae54d119ddcb0e Mon Sep 17 00:00:00 2001 From: Anthony Leung Date: Fri, 28 Jun 2024 08:23:13 +0000 Subject: [PATCH] working low energy loop --- src/socketify/loop.py | 97 ++++++++++++++++++++++++++++++++++---- src/socketify/socketify.py | 19 +++++++- 2 files changed, 105 insertions(+), 11 deletions(-) diff --git a/src/socketify/loop.py b/src/socketify/loop.py index 074d577..65137c5 100644 --- a/src/socketify/loop.py +++ b/src/socketify/loop.py @@ -1,5 +1,7 @@ import asyncio import logging +from time import sleep +import threading from .tasks import create_task, TaskFactory from .uv import UVLoop @@ -76,8 +78,46 @@ class Loop: def create_future(self): return self.loop.create_future() + def run_uv(self, uv_loop): + uv_loop.run() + #import time + #while True: + # time.sleep(1) + # uv_loop.run_nowait() + + def start_uvloop(self): + ''' + import time + if not self.started: + time.sleep(1) + self._keep_alive() + ''' + + if not hasattr(self, 'thread_started'): + logging.info('starting _keep_alive thread') + print('starting _keep_alive thread') + t1 = threading.Thread(target=self.run_uv, daemon=True, args=[self.uv_loop]) + t1.start() + self.thread_started = True + def _keep_alive(self): + '''if not self.started: + time.sleep(1) + self._keep_alive() + ''' + ''' + if not hasattr(self, 'thread_started'): + logging.info('starting _keep_alive thread') + t1 = threading.Thread(target=self.run_uv, daemon=True, args=[self.uv_loop]) + t1.start() + ''' + self.thread_started = True if self.started: + #sleep(5) # TODO does this still run? + #asyncio.sleep(5) + #logging.info('Commencing self.started loop checking ') + #print('in _k_a') + """ relax = False if not self.is_idle: self._idle_count = 0 @@ -85,17 +125,26 @@ class Loop: self._idle_count += 1 else: relax = True - + #print(self._idle_count, relax) + self.is_idle = True - + """ + + self.loop.call_later(0.01, self._keep_alive) + return if relax: - self.uv_loop.run_nowait() + #self.uv_loop.run() + #self.uv_loop.run_nowait() + #self.loop.call_later(10, self._keep_alive) self.loop.call_later(0.001, self._keep_alive) else: - self.uv_loop.run_nowait() + #self.uv_loop.run_nowait() # be more agressive when needed self.loop.call_soon(self._keep_alive) - + + async def slp(self): + await asyncio.sleep(999) + def create_task(self, *args, **kwargs): # this is not using optimized create_task yet return self.loop.create_task(*args, **kwargs) @@ -109,10 +158,14 @@ class Loop: future = self.ensure_future(task) else: future = None + print('RUC', flush=True) + # not sure if this method is used. if so, + # might want to use self.start_uvloop() here + # as well? self.loop.call_soon(self._keep_alive) self.loop.run_until_complete(future) # clean up uvloop - self.uv_loop.stop() + #self.uv_loop.stop() return future def run(self, task=None): @@ -121,12 +174,38 @@ class Loop: future = self.ensure_future(task) else: future = None - self.loop.call_soon(self._keep_alive) - self.loop.run_forever() + print('RUN1', flush=True) + #self.uv_loop.run() + print('after uvr') + self.start_uvloop() + print('started uvloop', flush=True) + #asyncio.run(self._keep_alive) + #self.loop.call_soon(self._keep_alive) + #self.loop.call_soon(self.slp) + #self.ensure_future(Task(self.slp())) + print('b4 rf') + #await self.slp() + print('after slp') + #l = asyncio.get_event_loop() + #l.run_until_complete(self.slp) + #asyncio.create_task(self.slp) + if not self.loop.is_running(): + print('run forever') + self.loop.run_forever() + print('after rf') + #asyncio.create_task(self.slp) # clean up uvloop - self.uv_loop.stop() + """self.uv_loop.stop()""" return future + def wake_asyncio_loop(self): + """ trigger event loop self._write_to_self to wake up, + useful after future.set_result so that the async runtime knows to + check for and process new events """ + # These two lines will both call self._write_to_self and wake the event loop + #asyncio.run_coroutine_threadsafe(asyncio.sleep(0), self._dataFuture.get_loop()) + self.loop.call_soon_threadsafe(lambda: 2) + def run_once(self): # run one step of asyncio # if loop._run_once is not available use loop.run_forever + loop.call_soon(loop.stop) diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 71770b4..7afd52e 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -1605,6 +1605,7 @@ class AppResponse: ) # if aborted set to done True and ok False except: pass + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result def on_writeble(self, offset): # Here the timeout is off, we can spend as much time before calling try_end we want to @@ -1613,6 +1614,7 @@ class AppResponse: ) if ok: self._chunkFuture.set_result((ok, done)) + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return ok self.on_writable(on_writeble) @@ -1622,6 +1624,7 @@ class AppResponse: self._chunkFuture.set_result( (False, True) ) # if aborted set to done True and ok False + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return self._chunkFuture self._lastChunkOffset = self.get_write_offset() @@ -1629,12 +1632,15 @@ class AppResponse: (ok, done) = self.try_end(buffer, total_size) if ok: self._chunkFuture.set_result((ok, done)) + """ Call self._write_to_self on the loop via one of these two lines + so that the application code can pick up on the new completed data """ + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result return self._chunkFuture # failed to send chunk return self._chunkFuture - def get_data(self): + async def get_data(self): self._dataFuture = self.app.loop.create_future() self._data = BytesIO() @@ -1645,17 +1651,26 @@ class AppResponse: self._dataFuture.set_result(self._data) except: pass + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result def get_chunks(self, chunk, is_end): if chunk is not None: self._data.write(chunk) + if is_end: self._dataFuture.set_result(self._data) self._data = None + """ + This wakes up the asyncio event loop so that get_data can act on the + completed _dataFuture + """ + self.app.loop.wake_asyncio_loop() # wake up async loop after set_result + self.on_aborted(is_aborted) self.on_data(get_chunks) - return self._dataFuture + + return await self._dataFuture def grab_aborted_handler(self): # only needed if is async