working low energy loop

pull/183/head
Anthony Leung 2024-06-28 08:23:13 +00:00
rodzic 484d973475
commit e3da9cfa5f
2 zmienionych plików z 105 dodań i 11 usunięć

Wyświetl plik

@ -1,5 +1,7 @@
import asyncio
import logging
from time import sleep
import threading
from .tasks import create_task, TaskFactory
from .uv import UVLoop
@ -76,8 +78,46 @@ class Loop:
def create_future(self):
return self.loop.create_future()
def run_uv(self, uv_loop):
uv_loop.run()
#import time
#while True:
# time.sleep(1)
# uv_loop.run_nowait()
def start_uvloop(self):
'''
import time
if not self.started:
time.sleep(1)
self._keep_alive()
'''
if not hasattr(self, 'thread_started'):
logging.info('starting _keep_alive thread')
print('starting _keep_alive thread')
t1 = threading.Thread(target=self.run_uv, daemon=True, args=[self.uv_loop])
t1.start()
self.thread_started = True
def _keep_alive(self):
'''if not self.started:
time.sleep(1)
self._keep_alive()
'''
'''
if not hasattr(self, 'thread_started'):
logging.info('starting _keep_alive thread')
t1 = threading.Thread(target=self.run_uv, daemon=True, args=[self.uv_loop])
t1.start()
'''
self.thread_started = True
if self.started:
#sleep(5) # TODO does this still run?
#asyncio.sleep(5)
#logging.info('Commencing self.started loop checking ')
#print('in _k_a')
"""
relax = False
if not self.is_idle:
self._idle_count = 0
@ -85,17 +125,26 @@ class Loop:
self._idle_count += 1
else:
relax = True
#print(self._idle_count, relax)
self.is_idle = True
"""
self.loop.call_later(0.01, self._keep_alive)
return
if relax:
self.uv_loop.run_nowait()
#self.uv_loop.run()
#self.uv_loop.run_nowait()
#self.loop.call_later(10, self._keep_alive)
self.loop.call_later(0.001, self._keep_alive)
else:
self.uv_loop.run_nowait()
#self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)
async def slp(self):
await asyncio.sleep(999)
def create_task(self, *args, **kwargs):
# this is not using optimized create_task yet
return self.loop.create_task(*args, **kwargs)
@ -109,10 +158,14 @@ class Loop:
future = self.ensure_future(task)
else:
future = None
print('RUC', flush=True)
# not sure if this method is used. if so,
# might want to use self.start_uvloop() here
# as well?
self.loop.call_soon(self._keep_alive)
self.loop.run_until_complete(future)
# clean up uvloop
self.uv_loop.stop()
#self.uv_loop.stop()
return future
def run(self, task=None):
@ -121,12 +174,38 @@ class Loop:
future = self.ensure_future(task)
else:
future = None
self.loop.call_soon(self._keep_alive)
print('RUN1', flush=True)
#self.uv_loop.run()
print('after uvr')
self.start_uvloop()
print('started uvloop', flush=True)
#asyncio.run(self._keep_alive)
#self.loop.call_soon(self._keep_alive)
#self.loop.call_soon(self.slp)
#self.ensure_future(Task(self.slp()))
print('b4 rf')
#await self.slp()
print('after slp')
#l = asyncio.get_event_loop()
#l.run_until_complete(self.slp)
#asyncio.create_task(self.slp)
if not self.loop.is_running():
print('run forever')
self.loop.run_forever()
print('after rf')
#asyncio.create_task(self.slp)
# clean up uvloop
self.uv_loop.stop()
"""self.uv_loop.stop()"""
return future
def wake_asyncio_loop(self):
""" trigger event loop self._write_to_self to wake up,
useful after future.set_result so that the async runtime knows to
check for and process new events """
# These two lines will both call self._write_to_self and wake the event loop
#asyncio.run_coroutine_threadsafe(asyncio.sleep(0), self._dataFuture.get_loop())
self.loop.call_soon_threadsafe(lambda: 2)
def run_once(self):
# run one step of asyncio
# if loop._run_once is not available use loop.run_forever + loop.call_soon(loop.stop)

Wyświetl plik

@ -1605,6 +1605,7 @@ class AppResponse:
) # if aborted set to done True and ok False
except:
pass
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
def on_writeble(self, offset):
# Here the timeout is off, we can spend as much time before calling try_end we want to
@ -1613,6 +1614,7 @@ class AppResponse:
)
if ok:
self._chunkFuture.set_result((ok, done))
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return ok
self.on_writable(on_writeble)
@ -1622,6 +1624,7 @@ class AppResponse:
self._chunkFuture.set_result(
(False, True)
) # if aborted set to done True and ok False
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return self._chunkFuture
self._lastChunkOffset = self.get_write_offset()
@ -1629,12 +1632,15 @@ class AppResponse:
(ok, done) = self.try_end(buffer, total_size)
if ok:
self._chunkFuture.set_result((ok, done))
""" Call self._write_to_self on the loop via one of these two lines
so that the application code can pick up on the new completed data """
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
return self._chunkFuture
# failed to send chunk
return self._chunkFuture
def get_data(self):
async def get_data(self):
self._dataFuture = self.app.loop.create_future()
self._data = BytesIO()
@ -1645,17 +1651,26 @@ class AppResponse:
self._dataFuture.set_result(self._data)
except:
pass
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
def get_chunks(self, chunk, is_end):
if chunk is not None:
self._data.write(chunk)
if is_end:
self._dataFuture.set_result(self._data)
self._data = None
"""
This wakes up the asyncio event loop so that get_data can act on the
completed _dataFuture
"""
self.app.loop.wake_asyncio_loop() # wake up async loop after set_result
self.on_aborted(is_aborted)
self.on_data(get_chunks)
return self._dataFuture
return await self._dataFuture
def grab_aborted_handler(self):
# only needed if is async