socketify.py/src/socketify/loop.py

116 wiersze
3.4 KiB
Python
Czysty Zwykły widok Historia

import asyncio
import threading
import time
from queue import Queue
from .native import UVLoop
2022-05-31 20:53:20 +00:00
def future_handler(future, loop, exception_handler, response):
try:
future.result()
return None
except Exception as error:
if hasattr(exception_handler, '__call__'):
exception_handler(loop, error, response)
else:
try:
#just log in console the error to call attention
print("Uncaught Exception: %s" % str(error))
if response != None:
response.write_status(500).end("Internal Error")
finally:
return
class Loop:
2022-05-31 20:53:20 +00:00
def __init__(self, exception_handler=None):
self.loop = asyncio.new_event_loop()
self.uv_loop = UVLoop()
self.queue = Queue()
2022-05-31 20:53:20 +00:00
if hasattr(exception_handler, '__call__'):
self.exception_handler = exception_handler
self.loop.set_exception_handler(lambda loop, context: exception_handler(loop, context, None))
else:
self.exception_handler = None
asyncio.set_event_loop(self.loop)
self.started = False
self.last_defer = False
2022-10-25 12:53:18 +00:00
def set_timeout(self, timeout, callback, user_data):
return self.uv_loop.create_timer(timeout, 0, callback, user_data)
def enqueue(self, callback, user_data):
self.queue.put((callback, user_data))
def create_future(self):
return self.loop.create_future()
def start(self):
self.started = True
#run asyncio once per tick
def tick(loop):
#only call one item of the queue per tick
if not loop.queue.empty():
(callback, user_data) = loop.queue.get(False)
callback(user_data)
loop.queue.task_done()
#run once asyncio
loop.run_once_asyncio()
#use check for calling asyncio once per tick
self.timer = self.uv_loop.create_check(tick, self)
def run(self):
self.uv_loop.run()
def run_once(self):
self.uv_loop.run_once()
def run_once_asyncio(self):
#run only one step
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
def stop(self):
if(self.started):
self.timer.stop()
self.started = False
#unbind run_once
#if is still running stops
if self.loop.is_running():
self.loop.stop()
self.last_defer = None
# Find all running tasks in main thread:
pending = asyncio.all_tasks(self.loop)
# Run loop until tasks done
self.loop.run_until_complete(asyncio.gather(*pending))
#Exposes native loop for uWS
def get_native_loop(self):
return self.uv_loop.get_native_loop()
2022-05-31 20:53:20 +00:00
def run_async(self, task, response=None):
#with run_once
future = asyncio.ensure_future(task, loop=self.loop)
#with threads
2022-05-31 20:53:20 +00:00
future.add_done_callback(lambda f: future_handler(f, self.loop, self.exception_handler, response))
#force asyncio run once to enable req in async functions before first await
self.run_once_asyncio()
2022-10-24 17:15:46 +00:00
2022-10-25 14:05:11 +00:00
#if response != None: #set auto cork
# response.needs_cork = True
2022-05-31 20:53:20 +00:00
return future
2022-06-01 23:00:05 +00:00
# if sys.version_info >= (3, 11)
# with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
# runner.run(main())
# else:
# uvloop.install()
# asyncio.run(main())