kopia lustrzana https://github.com/cirospaciari/socketify.py
				
				
				
			added api fetch and caching samples
							rodzic
							
								
									7908575ebf
								
							
						
					
					
						commit
						d9b2a6ae18
					
				|  | @ -0,0 +1,24 @@ | |||
| import datetime | ||||
| 
 | ||||
| class MemoryCacheItem: | ||||
|     def __init__(self, expires, value): | ||||
|         self.expires = datetime.datetime.utcnow().timestamp() + expires | ||||
|         self.value = value | ||||
|     def is_expired(self): | ||||
|         return datetime.datetime.utcnow().timestamp() > self.expires | ||||
| 
 | ||||
| class MemoryCache: | ||||
|     def __init__(self): | ||||
|         self.cache = {} | ||||
| 
 | ||||
|     def setex(self, key, expires, value): | ||||
|         self.cache[key] = MemoryCacheItem(expires, value) | ||||
|     | ||||
|     def get(self, key): | ||||
|         try: | ||||
|             cache = self.cache[key] | ||||
|             if cache.is_expired(): | ||||
|                 return None | ||||
|             return cache.value | ||||
|         except KeyError: | ||||
|             return None | ||||
|  | @ -0,0 +1,71 @@ | |||
| import asyncio | ||||
| import json | ||||
| from .memory_cache import MemoryCache | ||||
| 
 | ||||
| # 2 LEVEL CACHE (Redis to share amoung worker, Memory to be much faster) | ||||
| class TwoLevelCache: | ||||
|     def __init__(self, redis_conection, memory_expiration_time=3, redis_expiration_time=10): | ||||
|         self.memory_cache = MemoryCache() | ||||
|         self.redis_conection = redis_conection | ||||
|         self.memory_expiration_time = memory_expiration_time | ||||
|         self.redis_expiration_time = redis_expiration_time | ||||
| 
 | ||||
|     #set cache to redis and memory | ||||
|     def set(self, key, data): | ||||
|         try: | ||||
|             #never cache invalid data | ||||
|             if data == None: | ||||
|                 return False | ||||
|             self.redis_conection.setex(key, self.redis_expiration_time, data) | ||||
|             self.memory_cache.setex(key, self.memory_expiration_time, data) | ||||
|             return True | ||||
|         except Exception as err: | ||||
|             print(err) | ||||
|             return False | ||||
|                  | ||||
|     def get(self, key): | ||||
|         try: | ||||
|             value = self.memory_cache.get(key) | ||||
|             if value != None: | ||||
|                 return value | ||||
|             #no memory cache so, got to redis | ||||
|             value = self.redis_conection.get(key) | ||||
|             if value != None: | ||||
|                 #refresh memory cache to speed up | ||||
|                 self.memory_cache.setex(key, self.memory_expiration_time, data) | ||||
|             return value | ||||
|         except Exception as err: | ||||
|             return None | ||||
| 
 | ||||
|     #if more than 1 worker/request try to do this request, only one will call the Model and the others will get from cache | ||||
|     async def run_once(self, key, timeout, executor, *args): | ||||
|         result = None | ||||
|         try: | ||||
|             lock = self.redis_conection.lock(f"lock-{key}", blocking_timeout=timeout) | ||||
|             #wait lock (some request is yeat not finish) | ||||
|             while lock.locked(): | ||||
|                 await asyncio.sleep(0.5) | ||||
|             try: | ||||
|                 lock.acquire(blocking=False) | ||||
|                 #always check cache first | ||||
|                 cached = self.get(key) | ||||
|                 if cached != None: | ||||
|                     return json.loads(cached) | ||||
|                 result = await executor(*args) | ||||
|                 if result != None: | ||||
|                     self.set(key, json.dumps(result)) | ||||
|             except Exception as err: | ||||
|                 # the lock wasn't acquired | ||||
|                 pass | ||||
|             finally: | ||||
|                 lock.release() | ||||
|         except Exception as err: | ||||
|             #cannot even create or release the lock | ||||
|             pass | ||||
|         finally: | ||||
|             #if result is None, try cache one last time | ||||
|             if result == None: | ||||
|                 cache = self.get(key) | ||||
|                 if cache != None: | ||||
|                     return json.loads(cache) | ||||
|             return result | ||||
|  | @ -0,0 +1,86 @@ | |||
| from socketify import App | ||||
| import redis | ||||
| import aiohttp | ||||
| import asyncio | ||||
| import json | ||||
| from helpers.twolevel_cache import TwoLevelCache | ||||
| 
 | ||||
| #create redis poll + connections | ||||
| redis_pool = redis.ConnectionPool(host='localhost', port=6379, db=0) | ||||
| redis_conection = redis.Redis(connection_pool=redis_pool) | ||||
| # 2 LEVEL CACHE (Redis to share amoung workers, Memory to be much faster) | ||||
| # cache in memory is 5s, cache in redis is 10s duration  | ||||
| cache = TwoLevelCache(redis_conection, 10, 10) | ||||
| 
 | ||||
| ### | ||||
| # Model | ||||
| ### | ||||
| 
 | ||||
| async def get_pokemon(session, number): | ||||
|     async with session.get(f'https://pokeapi.co/api/v2/pokemon/{number}') as response: | ||||
|         pokemon = await response.text() | ||||
|         pokemon_name = json.loads(pokemon)['name'] | ||||
|         return pokemon_name | ||||
| 
 | ||||
| async def get_pokemon_by_number(number): | ||||
|         async with aiohttp.ClientSession() as session: | ||||
|             return await get_pokemon(session, number) | ||||
| 
 | ||||
| async def get_original_pokemons(): | ||||
|     async with aiohttp.ClientSession() as session: | ||||
|         tasks = [] | ||||
|         for number in range(1, 151): | ||||
|             tasks.append(asyncio.ensure_future(get_pokemon(session, number))) | ||||
|         result = await asyncio.gather(*tasks) | ||||
|         return result | ||||
| 
 | ||||
| 
 | ||||
| ### | ||||
| # Routes | ||||
| ### | ||||
| def list_original_pokemons(res, req): | ||||
|      | ||||
|     #check cache for faster response | ||||
|     value = cache.get("original_pokemons") | ||||
|     if value != None:  | ||||
|         return res.end(value) | ||||
|      | ||||
|     #get asynchronous from Model | ||||
|     async def get_originals(): | ||||
|         value = await cache.run_once("original_pokemons", 5, get_original_pokemons) | ||||
|         res.end(value) | ||||
| 
 | ||||
|     res.run_async(get_originals()) | ||||
| 
 | ||||
| 
 | ||||
| def list_pokemon(res, req): | ||||
| 
 | ||||
|     #get needed parameters | ||||
|     try: | ||||
|         number = int(req.get_parameter(0)) | ||||
|     except: | ||||
|         #invalid number | ||||
|         return req.set_yield(1)  | ||||
| 
 | ||||
|     #check cache for faster response | ||||
|     key = f"pokemon-{number}" | ||||
|     value = cache.get(key) | ||||
|     if value != None:  | ||||
|         return res.end(value) | ||||
| 
 | ||||
|     #get asynchronous from Model | ||||
|     async def find_pokemon(number, res): | ||||
|         #sync with redis lock to run only once | ||||
|         #if more than 1 worker/request try to do this request, only one will call the Model and the others will get from cache | ||||
|         value = await cache.run_once(key, 5, get_pokemon_by_number, number) | ||||
|         res.end(value) | ||||
| 
 | ||||
|     res.run_async(find_pokemon(number, res)) | ||||
| 
 | ||||
| 
 | ||||
| app = App() | ||||
| app.get("/", list_original_pokemons) | ||||
| app.get("/:number", list_pokemon) | ||||
| app.any("/*", lambda res, _: res.write_status(404).end("Not Found")) | ||||
| app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port)) | ||||
| app.run() | ||||
|  | @ -0,0 +1,2 @@ | |||
| aiohttp | ||||
| redis | ||||
		Ładowanie…
	
		Reference in New Issue
	
	 Ciro
						Ciro