kopia lustrzana https://github.com/cirospaciari/socketify.py
removed src/helpers
rodzic
54c1a14da4
commit
c80805801f
|
@ -1,24 +0,0 @@
|
||||||
import datetime
|
|
||||||
|
|
||||||
class MemoryCacheItem:
|
|
||||||
def __init__(self, expires, value):
|
|
||||||
self.expires = datetime.datetime.utcnow().timestamp() + expires
|
|
||||||
self.value = value
|
|
||||||
def is_expired(self):
|
|
||||||
return datetime.datetime.utcnow().timestamp() > self.expires
|
|
||||||
|
|
||||||
class MemoryCache:
|
|
||||||
def __init__(self):
|
|
||||||
self.cache = {}
|
|
||||||
|
|
||||||
def setex(self, key, expires, value):
|
|
||||||
self.cache[key] = MemoryCacheItem(expires, value)
|
|
||||||
|
|
||||||
def get(self, key):
|
|
||||||
try:
|
|
||||||
cache = self.cache[key]
|
|
||||||
if cache.is_expired():
|
|
||||||
return None
|
|
||||||
return cache.value
|
|
||||||
except KeyError:
|
|
||||||
return None
|
|
|
@ -1,70 +0,0 @@
|
||||||
import asyncio
|
|
||||||
from .memory_cache import MemoryCache
|
|
||||||
|
|
||||||
# 2 LEVEL CACHE (Redis to share amoung worker, Memory to be much faster)
|
|
||||||
class TwoLevelCache:
|
|
||||||
def __init__(self, redis_conection, memory_expiration_time=3, redis_expiration_time=10):
|
|
||||||
self.memory_cache = MemoryCache()
|
|
||||||
self.redis_conection = redis_conection
|
|
||||||
self.memory_expiration_time = memory_expiration_time
|
|
||||||
self.redis_expiration_time = redis_expiration_time
|
|
||||||
|
|
||||||
#set cache to redis and memory
|
|
||||||
def set(self, key, data):
|
|
||||||
try:
|
|
||||||
#never cache invalid data
|
|
||||||
if data == None:
|
|
||||||
return False
|
|
||||||
self.redis_conection.setex(key, self.redis_expiration_time, data)
|
|
||||||
self.memory_cache.setex(key, self.memory_expiration_time, data)
|
|
||||||
return True
|
|
||||||
except Exception as err:
|
|
||||||
print(err)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get(self, key):
|
|
||||||
try:
|
|
||||||
value = self.memory_cache.get(key)
|
|
||||||
if value != None:
|
|
||||||
return value
|
|
||||||
#no memory cache so, got to redis
|
|
||||||
value = self.redis_conection.get(key)
|
|
||||||
if value != None:
|
|
||||||
#refresh memory cache to speed up
|
|
||||||
self.memory_cache.setex(key, self.memory_expiration_time, data)
|
|
||||||
return value
|
|
||||||
except Exception as err:
|
|
||||||
return None
|
|
||||||
|
|
||||||
#if more than 1 worker/request try to do this request, only one will call the Model and the others will get from cache
|
|
||||||
async def run_once(self, key, timeout, executor, *args):
|
|
||||||
result = None
|
|
||||||
try:
|
|
||||||
lock = self.redis_conection.lock(f"lock-{key}", blocking_timeout=timeout)
|
|
||||||
#wait lock (some request is yeat not finish)
|
|
||||||
while lock.locked():
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
try:
|
|
||||||
lock.acquire(blocking=False)
|
|
||||||
#always check cache first
|
|
||||||
cached = self.get(key)
|
|
||||||
if cached != None:
|
|
||||||
return cached
|
|
||||||
result = await executor(*args)
|
|
||||||
if result != None:
|
|
||||||
self.set(key, result)
|
|
||||||
except Exception as err:
|
|
||||||
# the lock wasn't acquired
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
lock.release()
|
|
||||||
except Exception as err:
|
|
||||||
#cannot even create or release the lock
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
#if result is None, try cache one last time
|
|
||||||
if result == None:
|
|
||||||
cache = self.get(key)
|
|
||||||
if cache != None:
|
|
||||||
return cache
|
|
||||||
return result
|
|
Ładowanie…
Reference in New Issue