From dc0dfdfab0dcd3e65fe2d69b9acd6c98ad29538c Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 6 Jun 2022 12:58:33 +0000 Subject: [PATCH] Go routine cleaning access id cache --- nodes/node_balancer/cmd/middleware.go | 42 +++++++++++++++++++++---- nodes/node_balancer/cmd/server.go | 3 ++ nodes/node_balancer/configs/settings.go | 3 ++ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/nodes/node_balancer/cmd/middleware.go b/nodes/node_balancer/cmd/middleware.go index 4a3a08e5..d2120d94 100644 --- a/nodes/node_balancer/cmd/middleware.go +++ b/nodes/node_balancer/cmd/middleware.go @@ -39,19 +39,18 @@ func CreateAccessCache() { // Get access id from cache if exists func (ac *AccessCache) FindAccessIdInCache(accessId string) string { - // tsNow = time.Now().Unix() - var searchAccessId string + var detectedId string ac.mux.RLock() - for id, _ := range ac.accessIds { + for id := range ac.accessIds { if id == accessId { - searchAccessId = id + detectedId = id break } } ac.mux.RUnlock() - return searchAccessId + return detectedId } // Update last call access timestamp and datasource for access id @@ -84,6 +83,38 @@ func (ac *AccessCache) AddAccessIdToCache(clientResourceData ClientResourceData, ac.mux.Unlock() } +// Check each access id in cache if it exceeds lifetime +func (ac *AccessCache) Cleanup() (int64, int64) { + var removedAccessIds, totalAccessIds int64 + tsNow := time.Now().Unix() + ac.mux.Lock() + for aId, aData := range ac.accessIds { + fmt.Println(tsNow, aData.LastAccessTs, configs.NB_CACHE_ACCESS_ID_LIFETIME) + if tsNow-aData.LastAccessTs > configs.NB_CACHE_ACCESS_ID_LIFETIME { + delete(ac.accessIds, aId) + removedAccessIds++ + } else { + totalAccessIds++ + } + } + ac.mux.Unlock() + return removedAccessIds, totalAccessIds +} + +func initCacheCleaning(debug bool) { + t := time.NewTicker(configs.NB_CACHE_CLEANING_INTERVAL) + for { + select { + case <-t.C: + removedAccessIds, totalAccessIds := accessIdCache.Cleanup() + if debug { + log.Printf("Removed %d elements from access id cache", removedAccessIds) + } + log.Printf("Elements in access id cache: %d", totalAccessIds) + } + } +} + // Extract access_id from header and query. Query takes precedence over header. func extractAccessID(r *http.Request) string { var accessID string @@ -210,7 +241,6 @@ func accessMiddleware(next http.Handler) http.Handler { if stateCLI.enableDebugFlag { log.Printf("Access id found in cache") } - // Access id found in cache currentClientAccess = accessIdCache.accessIds[accessID] currentClientAccess.dataSource = dataSource accessIdCache.UpdateAccessIdAtCache(accessID, dataSource) diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 4b63674d..d265ce6b 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -222,6 +222,9 @@ func Server() { go initHealthCheck(stateCLI.enableDebugFlag) } + // Start access id cache cleaning + go initCacheCleaning(stateCLI.enableDebugFlag) + log.Printf("Starting node load balancer HTTP server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) err = server.ListenAndServe() if err != nil { diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index c66afd55..6bb13b40 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -23,6 +23,9 @@ var ( NB_HEALTH_CHECK_INTERVAL = time.Second * 5 NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 + NB_CACHE_CLEANING_INTERVAL = time.Second * 10 + NB_CACHE_ACCESS_ID_LIFETIME = int64(120) + // Client configuration NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds