low energy usage loop

pull/185/head
Anthony Leung 2024-07-05 10:38:31 +00:00
rodzic 2a398a3543
commit 1f95795cdc
2 zmienionych plików z 60 dodań i 13 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
import asyncio
import logging
import threading
from .tasks import create_task, TaskFactory
from .uv import UVLoop
@ -45,6 +46,7 @@ class Loop:
self.exception_handler = None
self.started = False
self.uv_thread_started = None
if is_pypy: # PyPy async Optimizations
if task_factory_max_items > 0: # Only available in PyPy for now
self._task_factory = TaskFactory(task_factory_max_items)
@ -76,6 +78,16 @@ class Loop:
def create_future(self):
return self.loop.create_future()
def start_uv_loop(self):
if not self.uv_thread_started:
t1 = threading.Thread(target=self.uv_loop.run, daemon=True)
t1.start()
self.uv_thread_started = True
def stop_uv_loop(self):
self.uv_loop.stop()
self.uv_thread_started = False
def _keep_alive(self):
if self.started:
relax = False
@ -85,17 +97,19 @@ class Loop:
self._idle_count += 1
else:
relax = True
self.is_idle = True
if relax:
self.uv_loop.run_nowait()
#self.uv_loop.run()
#self.uv_loop.run_nowait()
#self.loop.call_later(10, self._keep_alive)
self.loop.call_later(0.001, self._keep_alive)
else:
self.uv_loop.run_nowait()
#self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)
def create_task(self, *args, **kwargs):
# this is not using optimized create_task yet
return self.loop.create_task(*args, **kwargs)
@ -109,10 +123,21 @@ class Loop:
future = self.ensure_future(task)
else:
future = None
self.loop.call_soon(self._keep_alive)
"""
this function used by on_start, on_stop handlers
seems shouldnt need the uvloop running yet, otherwise
could start here, the start_uv_loop function is idempotent
self.start_uv_loop()
shouldnt need to wake the loop as run_until_complete ought to
take care of this.
"""
# self.loop.call_soon(self._keep_alive)
#self.wake_asyncio_loop()
self.loop.run_until_complete(future)
#self.wake_asyncio_loop()
# clean up uvloop
self.uv_loop.stop()
self.stop_uv_loop()
return future
def run(self, task=None):
@ -121,12 +146,24 @@ class Loop:
future = self.ensure_future(task)
else:
future = None
self.loop.call_soon(self._keep_alive)
self.loop.run_forever()
self.start_uv_loop()
if not self.loop.is_running():
self.loop.run_forever()
# clean up uvloop
self.uv_loop.stop()
self.stop_uv_loop()
return future
def wake_asyncio_loop(self):
""" trigger event loop self._write_to_self to wake up,
useful after future.set_result so that the async runtime knows to
check for and process new events """
# These two lines will both call self._write_to_self and wake the event loop
#asyncio.run_coroutine_threadsafe(asyncio.sleep(0), self._dataFuture.get_loop())
self.loop.call_soon_threadsafe(lambda: 2)
def run_once(self):
# run one step of asyncio
# if loop._run_once is not available use loop.run_forever + loop.call_soon(loop.stop)
@ -209,3 +246,4 @@ class Loop:
# asyncio.set_event_loop(uws.Loop())
# asyncio.get_event_loop().run_forever()

Wyświetl plik

@ -1605,6 +1605,7 @@ class AppResponse:
) # if aborted set to done True and ok False
except:
pass
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
def on_writeble(self, offset):
# Here the timeout is off, we can spend as much time before calling try_end we want to
@ -1613,6 +1614,7 @@ class AppResponse:
)
if ok:
self._chunkFuture.set_result((ok, done))
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return ok
self.on_writable(on_writeble)
@ -1622,6 +1624,7 @@ class AppResponse:
self._chunkFuture.set_result(
(False, True)
) # if aborted set to done True and ok False
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return self._chunkFuture
self._lastChunkOffset = self.get_write_offset()
@ -1629,12 +1632,13 @@ class AppResponse:
(ok, done) = self.try_end(buffer, total_size)
if ok:
self._chunkFuture.set_result((ok, done))
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return self._chunkFuture
# failed to send chunk
return self._chunkFuture
def get_data(self):
async def get_data(self):
self._dataFuture = self.app.loop.create_future()
self._data = BytesIO()
@ -1645,6 +1649,7 @@ class AppResponse:
self._dataFuture.set_result(self._data)
except:
pass
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
def get_chunks(self, chunk, is_end):
if chunk is not None:
@ -1652,10 +1657,15 @@ class AppResponse:
if is_end:
self._dataFuture.set_result(self._data)
self._data = None
"""
This wakes up the asyncio event loop so that get_data can act on the
completed _dataFuture
"""
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
self.on_aborted(is_aborted)
self.on_data(get_chunks)
return self._dataFuture
return await self._dataFuture
def grab_aborted_handler(self):
# only needed if is async
@ -3464,4 +3474,3 @@ class App:
self.loop = None
except:
pass