add cork and update with master

pull/39/head
Ciro 2022-10-24 14:15:46 -03:00
rodzic e754e29c05
commit af7481c13f
15 zmienionych plików z 234 dodań i 85 usunięć

Wyświetl plik

@ -0,0 +1,24 @@
import datetime
class MemoryCacheItem:
def __init__(self, expires, value):
self.expires = datetime.datetime.utcnow().timestamp() + expires
self.value = value
def is_expired(self):
return datetime.datetime.utcnow().timestamp() > self.expires
class MemoryCache:
def __init__(self):
self.cache = {}
def setex(self, key, expires, value):
self.cache[key] = MemoryCacheItem(expires, value)
def get(self, key):
try:
cache = self.cache[key]
if cache.is_expired():
return None
return cache.value
except KeyError:
return None

Wyświetl plik

@ -0,0 +1,70 @@
import asyncio
from .memory_cache import MemoryCache
# 2 LEVEL CACHE (Redis to share amoung worker, Memory to be much faster)
class TwoLevelCache:
def __init__(self, redis_conection, memory_expiration_time=3, redis_expiration_time=10):
self.memory_cache = MemoryCache()
self.redis_conection = redis_conection
self.memory_expiration_time = memory_expiration_time
self.redis_expiration_time = redis_expiration_time
#set cache to redis and memory
def set(self, key, data):
try:
#never cache invalid data
if data == None:
return False
self.redis_conection.setex(key, self.redis_expiration_time, data)
self.memory_cache.setex(key, self.memory_expiration_time, data)
return True
except Exception as err:
print(err)
return False
def get(self, key):
try:
value = self.memory_cache.get(key)
if value != None:
return value
#no memory cache so, got to redis
value = self.redis_conection.get(key)
if value != None:
#refresh memory cache to speed up
self.memory_cache.setex(key, self.memory_expiration_time, data)
return value
except Exception as err:
return None
#if more than 1 worker/request try to do this request, only one will call the Model and the others will get from cache
async def run_once(self, key, timeout, executor, *args):
result = None
try:
lock = self.redis_conection.lock(f"lock-{key}", blocking_timeout=timeout)
#wait lock (some request is yeat not finish)
while lock.locked():
await asyncio.sleep(0)
try:
lock.acquire(blocking=False)
#always check cache first
cached = self.get(key)
if cached != None:
return cached
result = await executor(*args)
if result != None:
self.set(key, result)
except Exception as err:
# the lock wasn't acquired
pass
finally:
lock.release()
except Exception as err:
#cannot even create or release the lock
pass
finally:
#if result is None, try cache one last time
if result == None:
cache = self.get(key)
if cache != None:
return cache
return result

Wyświetl plik

@ -35,7 +35,6 @@ class Loop:
asyncio.set_event_loop(self.loop)
self.started = False
self.last_defer = False
# self.loop_thread = None
def create_future(self):
return self.loop.create_future()
@ -43,7 +42,7 @@ class Loop:
def start(self):
self.started = True
#start relaxed until first task
self.timer = self.uv_loop.create_timer(0, 100, lambda loop: loop.run_once_asyncio(), self)
self.timer = self.uv_loop.create_timer(0, 1, lambda loop: loop.run_once_asyncio(), self)
def run(self):
self.uv_loop.run()
@ -55,12 +54,6 @@ class Loop:
#run only one step
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
if self.started:
pending = len(asyncio.all_tasks(self.loop))
if pending < 1: #relaxed if has no tasks
self.timer.set_repeat(100)
else: #urge when needs
self.timer.set_repeat(1)
def stop(self):
if(self.started):
@ -90,6 +83,9 @@ class Loop:
#force asyncio run once to enable req in async functions before first await
self.run_once_asyncio()
# if response != None: #set auto cork
# response.needs_cork = True
return future

Wyświetl plik

@ -8,6 +8,7 @@ import signal
from http import cookies
from datetime import datetime
from urllib.parse import parse_qs, quote_plus, unquote_plus
from threading import Thread, local, Lock
ffi = cffi.FFI()
ffi.cdef("""
@ -148,6 +149,10 @@ typedef struct
uws_websocket_close_handler close;
} uws_socket_behavior_t;
typedef struct {
bool ok;
bool has_responded;
} uws_try_end_result_t;
typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data);
typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data);
@ -198,8 +203,8 @@ void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res
void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data);
void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data);
void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws);
uws_try_end_result_t uws_res_try_end(int ssl, uws_res_t *res, const char *data, size_t length, uintmax_t total_size);
void uws_res_cork(int ssl, uws_res_t *res,void(*callback)(uws_res_t *res, void* user_data) ,void* user_data);
bool uws_req_is_ancient(uws_req_t *res);
bool uws_req_get_yield(uws_req_t *res);
void uws_req_set_field(uws_req_t *res, bool yield);
@ -282,6 +287,20 @@ def uws_generic_on_writable_handler(res, offset, user_data):
return res.trigger_writable_handler(offset)
return False
global_lock = Lock()
@ffi.callback("void(uws_res_t *, void*)")
def uws_generic_cork_handler(res, user_data):
if not user_data == ffi.NULL:
response = ffi.from_handle(user_data)
try:
if inspect.iscoroutinefunction(response._cork_handler):
raise RuntimeError("Calls inside cork must be sync")
response._cork_handler(response)
except Exception as err:
print("Error on cork handler %s" % str(err))
# response.grab_aborted_handler()
# app.trigger_error(err, response, request)
global_lock.release()
class AppRequest:
def __init__(self, request):
self.req = request
@ -377,13 +396,11 @@ class AppRequest:
def is_ancient(self):
return bool(lib.uws_req_is_ancient(self.req))
class AppResponse:
def __init__(self, response, loop, is_ssl):
self.res = response
self.SSL = ffi.cast("int", 1 if is_ssl else 0)
self.aborted = False
self._ptr = ffi.NULL
self.loop = loop
self._aborted_handler = None
self._writable_handler = None
@ -391,7 +408,15 @@ class AppResponse:
self._ptr = ffi.new_handle(self)
self._grabed_abort_handler_once = False
self._write_jar = None
# self.needs_cork = False
self._cork_handler = None
def cork(self, callback):
if not self.aborted:
global_lock.acquire(True)
self._cork_handler = callback
lib.uws_res_cork(self.SSL, self.res, uws_generic_cork_handler, self._ptr)
def set_cookie(self, name, value, options={}):
if self._write_jar == None:
self._write_jar = cookies.SimpleCookie()
@ -405,8 +430,8 @@ class AppResponse:
def trigger_aborted(self):
self.aborted = True
self.res = ffi.NULL
self._ptr = ffi.NULL
self.res = ffi.NULL
if hasattr(self, "_aborted_handler") and hasattr(self._aborted_handler, '__call__'):
try:
if inspect.iscoroutinefunction(self._aborted_handler):
@ -512,22 +537,23 @@ class AppResponse:
self.end_without_body(False)
def end(self, message, end_connection=False):
if not self.aborted:
if self._write_jar != None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
elif message == None:
self.end_without_body(end_connection)
return self
else:
self.write_header(b'Content-Type', b'application/json')
data = json.dumps(message).encode("utf-8")
lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0)
return self
if not self.aborted:
try:
if self._write_jar != None:
self.write_header("Set-Cookie", self._write_jar.output(header=""))
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
elif message == None:
self.end_without_body(end_connection)
return self
else:
self.write_header(b'Content-Type', b'application/json')
data = json.dumps(message).encode("utf-8")
lib.uws_res_end(self.SSL, self.res, data, len(data), 1 if end_connection else 0)
finally:
return self
def pause(self):
if not self.aborted:

@ -1 +1 @@
Subproject commit 91129232631c108a975278cfd7892d4351ccf71d
Subproject commit 39d96d296c279ef1545489c4c79ac476373a68bd

72
src/tests.py 100644
Wyświetl plik

@ -0,0 +1,72 @@
# https://github.com/Tinche/aiofiles
# https://github.com/uNetworking/uWebSockets/issues/1426
# import os.path
# def in_directory(file, directory):
# #make both absolute
# directory = os.path.join(os.path.realpath(directory), '')
# file = os.path.realpath(file)
# #return true, if the common prefix of both is equal to directory
# #e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
# return os.path.commonprefix([file, directory]) == directory
# application/x-www-form-urlencoded
# application/x-www-form-urlencoded
# multipart/form-data
# try_end
# for_each_header
# https://github.com/uNetworking/uWebSockets.js/blob/master/examples/VideoStreamer.js
from socketify import App
import os
import multiprocessing
import asyncio
def corked(res):
res.write("Test ")
res.end("Hello, World!")
async def home(res, req):
# res.write_header("Content-Type", "plain/text")
await asyncio.sleep(0)
res.cork(corked)
# res.write("Test ")
# res.end("Hello, World!")
# res.end("Hello, World!")
def run_app():
app = App()
app.get("/", home)
app.listen(3000, lambda config: print("PID %d Listening on port http://localhost:%d now\n" % (os.getpid(), config.port)))
app.run()
def create_fork():
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()
# fork limiting the cpu count - 1
# for i in range(1, multiprocessing.cpu_count()):
# create_fork()
run_app() # run app on the main process too :)
# from datetime import datetime
# raw = "_ga=GA1.1.1871393672.1649875681; affclick=null; __udf_j=d31b9af0d332fec181c1a893320322c0cb33ce95d7bdbd21a4cc4ee66d6d8c23817686b4ba59dd0e015cb95e8196157c"
# jar = Cookies(None)
# jar.set("session_id", "123132", {
# "path": "/",
# "domain": "*.test.com",
# "httponly": True,
# "expires": datetime.now()
# })
# print(jar.output())
# jar = cookies.SimpleCookie(raw)
# print(jar["_gaasasd"])
# print(split_header_words(raw))
#git submodule sync

Wyświetl plik

@ -5,7 +5,7 @@ app = App()
async def delayed_hello(delay, res):
await asyncio.sleep(delay) #do something async
res.end("Hello with delay!")
res.cork(lambda res: res.end("Hello with delay!"))
def home(res, req):
#request object only lives during the life time of this call
@ -22,7 +22,7 @@ async def json(res, req):
user_agent = req.get_header("user-agent")
#req maybe will not be available in direct attached async functions after await
await asyncio.sleep(2) #do something async
res.end({ "message": "I'm delayed!", "user-agent": user_agent})
res.cork(lambda res: res.end({ "message": "I'm delayed!", "user-agent": user_agent}))
def not_found(res, req):
res.write_status(404).end("Not Found")

Wyświetl plik

@ -5,7 +5,7 @@ WORKDIR /usr/src/app
COPY requirements.txt ./
RUN apt-get update
RUN apt install libuv1-dev -y
RUN apt install libuv1-dev libssl-dev -y
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

Wyświetl plik

@ -5,7 +5,7 @@ WORKDIR /usr/src/app
COPY requirements.txt ./
RUN apt-get update
RUN apt install libuv1-dev -y
RUN apt install libuv1-dev libssl-dev -y
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

Wyświetl plik

@ -43,7 +43,7 @@ class TwoLevelCache:
lock = self.redis_conection.lock(f"lock-{key}", blocking_timeout=timeout)
#wait lock (some request is yeat not finish)
while lock.locked():
await asyncio.sleep(0.5)
await asyncio.sleep(0)
try:
lock.acquire(blocking=False)
#always check cache first

Wyświetl plik

@ -45,7 +45,7 @@ def list_original_pokemons(res, req):
#get asynchronous from Model
async def get_originals():
value = await cache.run_once("original_pokemons", 5, get_original_pokemons)
res.end(value)
res.cork(lambda res: res.end(value))
res.run_async(get_originals())
@ -70,7 +70,7 @@ def list_pokemon(res, req):
#sync with redis lock to run only once
#if more than 1 worker/request try to do this request, only one will call the Model and the others will get from cache
value = await cache.run_once(cache_key, 5, get_pokemon, number)
res.end(value)
res.cork(lambda res: res.end(value))
res.run_async(find_pokemon(number, res))

Wyświetl plik

@ -3,4 +3,6 @@ from socketify import App, AppOptions
app = App(AppOptions(key_file_name="./misc/key.pem", cert_file_name="./misc/cert.pem", passphrase="1234"))
app.get("/", lambda res, req: res.end("Hello World socketify from Python!"))
app.listen(3000, lambda config: print("Listening on port https://localhost:%d now\n" % config.port))
app.run()
app.run()
#openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -passout pass:1234 -keyout ./misc/key.pem -out ./misc/cert.pem

Wyświetl plik

@ -1,19 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDETCCAfkCFFn0Tj7tkPnpmhSVj1rDLZ5FHEBTMA0GCSqGSIb3DQEBCwUAMEUx
FDASBgNVBAoMC3VOZXR3b3JraW5nMREwDwYDVQQKDAh1U29ja2V0czEaMBgGA1UE
AwwRc2VsZnNpZ25lZF9jbGllbnQwHhcNMjIwMTE3MTYzOTI4WhcNMjMwMTE3MTYz
OTI4WjBFMRQwEgYDVQQKDAt1TmV0d29ya2luZzERMA8GA1UECgwIdVNvY2tldHMx
GjAYBgNVBAMMEXNlbGZzaWduZWRfY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAv0YnxbShmV9IgmpURNmubYfuk7zi4szJMNAO4HATTO+4lEnE
EWEIWBt+3kbfp2dgp9YRk2HJzaXzYyGd4E2lcCFxsTDyEEbda2mNwpuq8jEUb2AM
prfL6lbmxRcKKL/4I4khfLy+f8vgbLnTS8TAPneDGa4wcHGmpIKDuC3ceS7KgcCy
wVhfYse7h5jWpw4/LVgEXBYwzi1XSDeQjbEWCx7kcB/Xcai2OdmS6iFqG0Dgx6Ka
t0qf2Afd0kw954UbvvP4SCAypLjD3OAEJGlpuSUq7z8OYFmnHYcQIJ29Fm368DXn
RAOuN0jUoc/HW03poWeuKZ+Vj5qUj8AAOXZ0jwIDAQABMA0GCSqGSIb3DQEBCwUA
A4IBAQAi7n9u8/7IGtnI5hojyWXTxJH+jwMgCowU1AUiR5Ysr6FyEcprs51TDlRj
aQm1Lf+cHTLk8DGCT/tBT4PsoA9fgpKWBDkhh7HQG5WIMUyfLZVKWHCr8aNm1iuC
8EgCrp5CYEAPvmb7KQ3WzOQnlVgjQTLl6DcgjLBGi5w45Wk6eWc3YZml492WhjF4
fxUM5xjZ+sqzOFjpp0oSNeKRrBRMe7CBYSD3/ZejZLyxl/C9UFlxkK7PS+ja2CqN
Nnms2uiPkQIgg9UImH00W5hJoGwgvVONA+UTvFjGRj8a4GSnkLihVqy48Yiy7Bez
DuOG90JG17siujTQjx+njbVDv2zX
-----END CERTIFICATE-----

Wyświetl plik

@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAv0YnxbShmV9IgmpURNmubYfuk7zi4szJMNAO4HATTO+4lEnE
EWEIWBt+3kbfp2dgp9YRk2HJzaXzYyGd4E2lcCFxsTDyEEbda2mNwpuq8jEUb2AM
prfL6lbmxRcKKL/4I4khfLy+f8vgbLnTS8TAPneDGa4wcHGmpIKDuC3ceS7KgcCy
wVhfYse7h5jWpw4/LVgEXBYwzi1XSDeQjbEWCx7kcB/Xcai2OdmS6iFqG0Dgx6Ka
t0qf2Afd0kw954UbvvP4SCAypLjD3OAEJGlpuSUq7z8OYFmnHYcQIJ29Fm368DXn
RAOuN0jUoc/HW03poWeuKZ+Vj5qUj8AAOXZ0jwIDAQABAoIBAQC9sHONDIAevHIK
dCyyQzdLBL3D4lUYG4ODVzMJvdxGNo7U8PrzSUmfJ1WAVsVDHbCrgg7YHOine+aN
7y7E3fwt4d0AnsvQ/JZmCb4+u2ai3a2obpbdV/Vwp1IhL6Ixm4AYrcx6CizaTHR7
Hya/Q5Zr3NY1R5xeRze+enjq1QCLY/OXGEToejwJOBNBA2+Ai5sZfR8pGn0Mszb6
C3LBMtHx8rRDkeS+s3X2g/Sseg0aO4F9E1JwFd/+RZPzZtUB3LGbMwKe4b4s+kQn
zAjP2CMthleFs6Ki2s6z5OhCvjOUE+kYe6529+vxvmVm747keuJ9CcO6dFY92o6u
/xwmlPLBAoGBAPtL3D3sIYLzOfsZ+w2lEg6l6e5TO5lxfX1+fMyFLJ45hFaTTvR1
e0PQNahIrSrHTkhuYcicFgbh2iQTfzgnEfXVeWw2YO6fF1HUbhcdByB2+K9xT3Ap
SN2GagIn02Bgh1rOJ7VNGG+Ri/tXgoI0J7IIvp+et2VDLgWGXrGtm/NRAoGBAMLa
r36zXAqj7ISX2IDPJXI7cPuEfLJqx2wED9daxw2mDbNwGIxkP5QjEJN+iZjzaZqX
a6E6UCU09RGRwuHfy9AqCp9IiOXbi+dKo+IwjoHcWWGA3qv/wj/AlT5Hal0kiV+G
4OETAlSckxvFuZ64vrFyuHptGQL4Q5O7xTLjmzHfAoGBALRZGzUtlFdgq8n0OWLv
hugQVrT98xYKhx9becFmCkF70eg4TD/RWKewc/HURsMeyqXc4jyRGJXT3TRq8bCh
CZi+nif1VteqQZguttvLr2OzPoLa9UHvvyWM4+OsJV1TqZCXx5OsQs8/S5EUmstL
FvoEoJn51HDOJ+c7KhamG/ghAoGASkkG6N3GNEREUlR1dL4EP6WLsEfVJkvxFSwD
Qg3Yn0p0JLmSkktRtc8cba6rFIWP+CDMJp5NmbGz0GvqiSRB1m2AuTL1BfSKRLY+
/meWnMl9xd9UhOwviRCJlUGyuinIuYN5TjVqCQncR5U869bw1EOxMvNOusQdN0A5
sOn2668CgYEAjxc386wLww9CsZ9oMAWxh1v4GWEB+c3o6bEuKW7BppNYvaTftxGt
Eww9BgT7ZfORZjyCWtRxlLj+sUHXoZAnRoheTEVVKhkezcWq73SrR6Ij5SeYTuCG
yBK57WSlpRT24D/fR9tTTesQ1LUh/lBLG23KNbScIODStkaIlqO4HYQ=
-----END RSA PRIVATE KEY-----

Wyświetl plik

@ -17,6 +17,9 @@
# multipart/form-data
# try_end
# for_each_header
# https://github.com/uNetworking/uWebSockets.js/blob/master/examples/VideoStreamer.js
from socketify import App
from datetime import datetime
from datetime import timedelta
@ -44,4 +47,6 @@ app.run()
# print(jar.output())
# jar = cookies.SimpleCookie(raw)
# print(jar["_gaasasd"])
# print(split_header_words(raw))
# print(split_header_words(raw))
#git submodule sync