Optimized cache reuse and set default values in config

pull/787/head
kompotkot 2023-05-24 16:49:41 +00:00
rodzic 7f89784f2f
commit bfdc2eb718
4 zmienionych plików z 81 dodań i 67 usunięć

Wyświetl plik

@ -48,7 +48,10 @@ var (
HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN") HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN")
// Moonstream resources types // Moonstream resources types
BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access" BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access"
DEFAULT_AUTOGENERATED_USER_PERMISSIONS = []string{"read", "update", "delete"}
DEFAULT_AUTOGENERATED_PERIOD_DURATION = int64(86400)
DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD = int64(1000)
) )
func CheckEnvVarSet() { func CheckEnvVarSet() {

Wyświetl plik

@ -25,6 +25,11 @@ var (
accessCache AccessCache accessCache AccessCache
) )
// AccessCache caches client identification for fast access to nodes
//
// If authorization passed with Bearer token, then it triggers to fetch Brood resource with access ID
// or create new one. After it under key `accessIds` and `authorizationTokens` will be added similar
// address pointers to one `ClientAccess`.
type AccessCache struct { type AccessCache struct {
accessIds map[string]*ClientAccess accessIds map[string]*ClientAccess
authorizationTokens map[string]*ClientAccess authorizationTokens map[string]*ClientAccess
@ -40,30 +45,14 @@ func CreateAccessCache() {
} }
} }
// Get access id from cache if exists // FindAccessIdInCache looking for user access in `accessIds` cache
func (ac *AccessCache) FindAccessIdInCache(accessId string) string { func (ac *AccessCache) isAccessIdInCache(accessId string) bool {
var detectedId string detected := false
ac.mux.RLock() ac.mux.RLock()
for id := range ac.accessIds { for id := range ac.accessIds {
if id == accessId { if id == accessId {
detectedId = id detected = true
break
}
}
ac.mux.RUnlock()
return detectedId
}
// Get access id from cache if exists
func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) string {
var detected string
ac.mux.RLock()
for id := range ac.authorizationTokens {
if id == authorizationToken {
detected = id
break break
} }
} }
@ -72,16 +61,38 @@ func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string)
return detected return detected
} }
// Update last call access timestamp and datasource for access id // FindAuthorizationTokenInCache looking for user access in `authorizationTokens` cache
func (ac *AccessCache) isAuthorizationTokenInCache(authorizationToken string) bool {
detected := false
ac.mux.RLock()
for id := range ac.authorizationTokens {
if id == authorizationToken {
detected = true
break
}
}
ac.mux.RUnlock()
return detected
}
// Update last call access timestamp and datasource for user access
func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) { func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) {
ac.mux.Lock() ac.mux.Lock()
var accessToModify *ClientAccess var accessToModify *ClientAccess
if access, ok := ac.accessIds[accessId]; ok {
accessToModify = access
if accessId != "" {
if access, ok := ac.accessIds[accessId]; ok {
accessToModify = access
}
} }
if access, ok := ac.authorizationTokens[authorizationToken]; ok {
accessToModify = access if authorizationToken != "" {
if access, ok := ac.authorizationTokens[authorizationToken]; ok {
accessToModify = access
}
} }
accessToModify.LastAccessTs = tsNow accessToModify.LastAccessTs = tsNow
@ -91,10 +102,10 @@ func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, request
ac.mux.Unlock() ac.mux.Unlock()
} }
// Add new access ID with data to cache // Add new user access identifier with data to cache
func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) { func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) {
ac.mux.Lock() ac.mux.Lock()
access := ClientAccess{ access := &ClientAccess{
ResourceID: clientAccess.ResourceID, ResourceID: clientAccess.ResourceID,
authorizationToken: clientAccess.authorizationToken, authorizationToken: clientAccess.authorizationToken,
@ -121,9 +132,9 @@ func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64)
requestedDataSource: clientAccess.requestedDataSource, requestedDataSource: clientAccess.requestedDataSource,
} }
ac.accessIds[clientAccess.ClientResourceData.AccessID] = &access ac.accessIds[clientAccess.ClientResourceData.AccessID] = access
if clientAccess.authorizationToken != "" { if clientAccess.authorizationToken != "" {
ac.authorizationTokens[clientAccess.authorizationToken] = &access ac.authorizationTokens[clientAccess.authorizationToken] = access
} }
ac.mux.Unlock() ac.mux.Unlock()
} }
@ -189,22 +200,6 @@ func initCacheCleaning(debug bool) {
} }
} }
func parseClientAccess(resource brood.Resource) (*ClientAccess, error) {
var clientAccess ClientAccess
resourceData, err := json.Marshal(resource.ResourceData)
if err != nil {
return nil, err
}
clientAccess.ResourceID = resource.Id
err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData)
if err != nil {
return nil, err
}
return &clientAccess, nil
}
// fetchResources fetch resources with access ID or authorization token and generate new one if there no one // fetchResources fetch resources with access ID or authorization token and generate new one if there no one
func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) { func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) {
var err error var err error
@ -247,9 +242,9 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res
BlockchainAccess: true, BlockchainAccess: true,
ExtendedMethods: false, ExtendedMethods: false,
PeriodDuration: 86400, PeriodDuration: DEFAULT_AUTOGENERATED_PERIOD_DURATION,
PeriodStartTs: tsNow, PeriodStartTs: tsNow,
MaxCallsPerPeriod: 1000, MaxCallsPerPeriod: DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD,
CallsPerPeriod: 0, CallsPerPeriod: 0,
Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS,
@ -264,7 +259,7 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res
NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{ NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{
Id: user.Id, Id: user.Id,
HolderType: "user", HolderType: "user",
Permissions: []string{"read", "update", "delete"}, Permissions: DEFAULT_AUTOGENERATED_USER_PERMISSIONS,
}, },
) )
if err != nil { if err != nil {
@ -459,18 +454,17 @@ func accessMiddleware(next http.Handler) http.Handler {
// If access id does not belong to internal crawlers, then check cache or find it in Bugout resources // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources
if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID {
currentClientAccess = internalUsageAccess
if stateCLI.enableDebugFlag { if stateCLI.enableDebugFlag {
log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID)
} }
currentClientAccess = internalUsageAccess
currentClientAccess.LastAccessTs = tsNow currentClientAccess.LastAccessTs = tsNow
currentClientAccess.requestedDataSource = requestedDataSource currentClientAccess.requestedDataSource = requestedDataSource
} else if accessID != "" && accessCache.FindAccessIdInCache(accessID) != "" { } else if accessID != "" && accessCache.isAccessIdInCache(accessID) {
currentClientAccess = *accessCache.accessIds[accessID]
if stateCLI.enableDebugFlag { if stateCLI.enableDebugFlag {
log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID)
} }
// Check if limit of calls not exceeded currentClientAccess = *accessCache.accessIds[accessID]
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
if !isClientAllowedToGetAccess { if !isClientAllowedToGetAccess {
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
@ -478,20 +472,21 @@ func accessMiddleware(next http.Handler) http.Handler {
} }
currentClientAccess.requestedDataSource = requestedDataSource currentClientAccess.requestedDataSource = requestedDataSource
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
} else if accessID == "" && accessCache.FindAuthorizationTokenInCache(authorizationToken) != "" { } else if accessID == "" && accessCache.isAuthorizationTokenInCache(authorizationToken) {
if stateCLI.enableDebugFlag {
log.Printf("Client connected with Authorization token")
}
currentClientAccess = *accessCache.authorizationTokens[authorizationToken] currentClientAccess = *accessCache.authorizationTokens[authorizationToken]
// Check if limit of calls not exceeded
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
if !isClientAllowedToGetAccess { if !isClientAllowedToGetAccess {
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
return return
} }
fmt.Println(currentClientAccess.ResourceID)
currentClientAccess.requestedDataSource = requestedDataSource currentClientAccess.requestedDataSource = requestedDataSource
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
} else { } else {
if stateCLI.enableDebugFlag { if stateCLI.enableDebugFlag {
log.Printf("No access in cache found, looking at Brood resources") log.Printf("No access identity found in cache, looking at Brood resources")
} }
resource, err := fetchResource(accessID, authorizationToken, tsNow) resource, err := fetchResource(accessID, authorizationToken, tsNow)
@ -500,22 +495,38 @@ func accessMiddleware(next http.Handler) http.Handler {
return return
} }
clientAccess, err := parseClientAccess(*resource) var clientAccessRaw ClientAccess
resourceData, err := json.Marshal(resource.ResourceData)
if err != nil {
http.Error(w, "Unable to parse resource data to access identifier", http.StatusInternalServerError)
return
}
err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData)
if err != nil { if err != nil {
http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError) http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError)
return return
} }
currentClientAccess = ClientAccess(*clientAccess)
currentClientAccess.authorizationToken = authorizationToken
currentClientAccess.requestedDataSource = requestedDataSource
// Check if limit of calls not exceeded isClientAllowedToGetAccess := clientAccessRaw.CheckClientCallPeriodLimits(tsNow)
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
if !isClientAllowedToGetAccess { if !isClientAllowedToGetAccess {
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
return return
} }
accessCache.AddAccessToCache(currentClientAccess, tsNow) currentClientAccess = ClientAccess(clientAccessRaw)
currentClientAccess.ResourceID = resource.Id
currentClientAccess.authorizationToken = authorizationToken
currentClientAccess.requestedDataSource = requestedDataSource
// If client logged in before with access ID and it exists in cache, then re-use it
// else create new instances in cache
if authorizationToken != "" && accessCache.isAccessIdInCache(currentClientAccess.ClientResourceData.AccessID) {
accessCache.authorizationTokens[authorizationToken] = accessCache.accessIds[currentClientAccess.ClientResourceData.AccessID]
} else {
if stateCLI.enableDebugFlag {
log.Printf("Adding new access identifier in cache")
}
accessCache.AddAccessToCache(currentClientAccess, tsNow)
}
} }
ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess) ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess)

Wyświetl plik

@ -3,7 +3,7 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer
go 1.17 go 1.17
require ( require (
github.com/bugout-dev/bugout-go v0.4.1 github.com/bugout-dev/bugout-go v0.4.2
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
) )

Wyświetl plik

@ -23,8 +23,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bugout-dev/bugout-go v0.4.1 h1:idZ4k+/skHj217/q8OmHBoYdzwJrqCY5Vd7S8FM6zlo= github.com/bugout-dev/bugout-go v0.4.2 h1:oADFQzZ4iZeQOz8dDaO/+25eQkrCYG8SqjA8mRSQl7k=
github.com/bugout-dev/bugout-go v0.4.1/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8= github.com/bugout-dev/bugout-go v0.4.2/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8=
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ=
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=