diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index b5fba9f1..6364fd98 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -253,6 +253,8 @@ func cli() { PeriodStartTs: time.Now().Unix(), MaxCallsPerPeriod: stateCLI.MaxCallsPerPeriodFlag, CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, } _, err := bugoutClient.Brood.FindUser( NB_CONTROLLER_TOKEN, diff --git a/nodes/node_balancer/cmd/nodebalancer/clients.go b/nodes/node_balancer/cmd/nodebalancer/clients.go index ebc0a3a1..56cb1e17 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients.go @@ -15,6 +15,8 @@ var ( type ClientAccess struct { ResourceID string + authorizationToken string + ClientResourceData ClientResourceData LastAccessTs int64 @@ -36,6 +38,8 @@ type ClientResourceData struct { PeriodStartTs int64 `json:"period_start_ts"` MaxCallsPerPeriod int64 `json:"max_calls_per_period"` CallsPerPeriod int64 `json:"calls_per_period"` + + Type string `json:"type"` } // CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 27d51a3e..bd41f1fb 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -16,23 +16,27 @@ import ( "sync" "time" + "github.com/bugout-dev/bugout-go/pkg/brood" humbug "github.com/bugout-dev/humbug/go/pkg" + "github.com/google/uuid" ) var ( - accessIdCache AccessCache + accessCache AccessCache ) type AccessCache struct { - accessIds map[string]ClientAccess + accessIds map[string]*ClientAccess + authorizationTokens map[string]*ClientAccess mux sync.RWMutex } // CreateAccessCache generates empty cache of client access func CreateAccessCache() { - accessIdCache = AccessCache{ - accessIds: make(map[string]ClientAccess), + accessCache = AccessCache{ + accessIds: make(map[string]*ClientAccess), + authorizationTokens: make(map[string]*ClientAccess), } } @@ -52,24 +56,47 @@ func (ac *AccessCache) FindAccessIdInCache(accessId string) string { return detectedId } -// Update last call access timestamp and datasource for access id -func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) { - ac.mux.Lock() - if accessData, ok := ac.accessIds[accessId]; ok { - accessData.LastAccessTs = tsNow - accessData.requestedDataSource = requestedDataSource - accessData.LastSessionCallsCounter++ +// Get access id from cache if exists +func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) string { + var detected string - ac.accessIds[accessId] = accessData + ac.mux.RLock() + for id := range ac.authorizationTokens { + if id == authorizationToken { + detected = id + break + } } + ac.mux.RUnlock() + + return detected +} + +// Update last call access timestamp and datasource for access id +func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) { + ac.mux.Lock() + var accessToModify *ClientAccess + if access, ok := ac.accessIds[accessId]; ok { + accessToModify = access + + } + if access, ok := ac.authorizationTokens[authorizationToken]; ok { + accessToModify = access + } + + accessToModify.LastAccessTs = tsNow + accessToModify.requestedDataSource = requestedDataSource + accessToModify.LastSessionCallsCounter++ + ac.mux.Unlock() } // Add new access ID with data to cache -func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) { +func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) { ac.mux.Lock() - ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{ - ResourceID: clientAccess.ResourceID, + access := ClientAccess{ + ResourceID: clientAccess.ResourceID, + authorizationToken: clientAccess.authorizationToken, ClientResourceData: ClientResourceData{ UserID: clientAccess.ClientResourceData.UserID, @@ -83,6 +110,8 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs, MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod, CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod, + + Type: clientAccess.ClientResourceData.Type, }, LastAccessTs: tsNow, @@ -91,6 +120,11 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 requestedDataSource: clientAccess.requestedDataSource, } + + ac.accessIds[clientAccess.ClientResourceData.AccessID] = &access + if clientAccess.authorizationToken != "" { + ac.authorizationTokens[clientAccess.authorizationToken] = &access + } ac.mux.Unlock() } @@ -100,9 +134,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) { tsNow := time.Now().Unix() ac.mux.Lock() + for aId, clientAccess := range ac.accessIds { + totalAccessIds++ + removedUserId := "" + if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { // Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period + removedUserId = clientAccess.ClientResourceData.UserID delete(ac.accessIds, aId) removedAccessIds++ err := clientAccess.UpdateClientResourceCallCounter(tsNow) @@ -110,6 +149,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) { log.Printf("Unable to update Brood resource, err: %v\n", err) } } else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME { + removedUserId = clientAccess.ClientResourceData.UserID // Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME delete(ac.accessIds, aId) removedAccessIds++ @@ -117,12 +157,21 @@ func (ac *AccessCache) Cleanup() (int64, int64) { if err != nil { log.Printf("Unable to update Brood resource, err: %v\n", err) } - } else { - totalAccessIds++ + } + + if removedUserId != "" { + for aToken, clientAccess := range ac.authorizationTokens { + if clientAccess.ClientResourceData.UserID == removedUserId { + delete(ac.authorizationTokens, aToken) + } + } + removedUserId = "" } } ac.mux.Unlock() + totalAccessIds = totalAccessIds - removedAccessIds + return removedAccessIds, totalAccessIds } @@ -131,7 +180,7 @@ func initCacheCleaning(debug bool) { for { select { case <-t.C: - removedAccessIds, totalAccessIds := accessIdCache.Cleanup() + removedAccessIds, totalAccessIds := accessCache.Cleanup() if debug { log.Printf("Removed %d clients from access cache", removedAccessIds) } @@ -140,6 +189,102 @@ func initCacheCleaning(debug bool) { } } +func parseClientAccess(resource brood.Resource) (*ClientAccess, error) { + var clientAccess ClientAccess + + resourceData, err := json.Marshal(resource.ResourceData) + if err != nil { + return nil, err + } + clientAccess.ResourceID = resource.Id + err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData) + if err != nil { + return nil, err + } + + return &clientAccess, nil +} + +// fetchResources fetch resources with access ID or authorization token and generate new one if there no one +func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) { + var err error + var resources brood.Resources + + queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS} + if accessID != "" { + queryParameters["access_id"] = accessID + } + + token := NB_CONTROLLER_TOKEN + if authorizationToken != "" { + token = authorizationToken + } + + resources, err = bugoutClient.Brood.GetResources( + token, + NB_APPLICATION_ID, + queryParameters, + ) + if err != nil { + log.Printf("Unable to get resources, err: %v", err) + return nil, fmt.Errorf("unable to get access identifiers") + } + + if len(resources.Resources) == 0 { + if authorizationToken != "" { + // Generate new autogenerated access resource with default parameters and grant user permissions to work with it + user, err := bugoutClient.Brood.GetUser(authorizationToken) + if err != nil { + log.Printf("Unable to get user, err: %v", err) + return nil, fmt.Errorf("unable to find user with provided authorization token") + } + newResource, err := bugoutClient.Brood.CreateResource( + NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, ClientResourceData{ + UserID: user.Id, + AccessID: uuid.New().String(), + Name: user.Username, + Description: "Autogenerated access ID", + BlockchainAccess: true, + ExtendedMethods: false, + + PeriodDuration: 86400, + PeriodStartTs: tsNow, + MaxCallsPerPeriod: 1000, + CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, + }, + ) + if err != nil { + log.Printf("Unable to create resource with autogenerated access for user with ID %s, err: %v", user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + resourceHolderPermissions, err := bugoutClient.Brood.AddResourceHolderPermissions( + NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{ + Id: user.Id, + HolderType: "user", + Permissions: []string{"read", "update", "delete"}, + }, + ) + if err != nil { + log.Printf("Unable to grant permissions to user with ID %s at resource with ID %s, err: %v", newResource.Id, user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + log.Printf("Created new resource with ID %s with autogenerated access for user with ID %s", resourceHolderPermissions.ResourceId, user.Id) + resources.Resources = append(resources.Resources, newResource) + } else { + return nil, fmt.Errorf("there are no provided access identifier") + } + } else if len(resources.Resources) > 1 { + // TODO(kompotkot): Write support of multiple resources, be careful, because NB_CONTROLLER has several resources + return nil, fmt.Errorf("there are no provided access identifier") + } + + return &resources.Resources[0], nil +} + // Extract access_id from header and query. Query takes precedence over header. func extractAccessID(r *http.Request) string { var accessID string @@ -289,23 +434,39 @@ func accessMiddleware(next http.Handler) http.Handler { accessID := extractAccessID(r) requestedDataSource := extractRequestedDataSource(r) - if accessID == "" { - http.Error(w, "No access id passed with request", http.StatusForbidden) - return + // Extract Authorization token if Bearer header provided + var authorizationTokenRaw string + authorizationTokenHeaders := r.Header[strings.Title("authorization")] + for _, h := range authorizationTokenHeaders { + authorizationTokenRaw = h + } + var authorizationToken string + if authorizationTokenRaw != "" { + authorizationTokenSlice := strings.Split(authorizationTokenRaw, " ") + if len(authorizationTokenSlice) != 2 || authorizationTokenSlice[0] != "Bearer" || authorizationTokenSlice[1] == "" { + http.Error(w, "Wrong authorization token provided", http.StatusForbidden) + return + } + authorizationToken = authorizationTokenSlice[1] } tsNow := time.Now().Unix() + if accessID == "" && authorizationToken == "" { + http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden) + return + } + // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources - if accessID == NB_CONTROLLER_ACCESS_ID { + if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { currentClientAccess = internalUsageAccess if stateCLI.enableDebugFlag { log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) } currentClientAccess.LastAccessTs = tsNow currentClientAccess.requestedDataSource = requestedDataSource - } else if accessIdCache.FindAccessIdInCache(accessID) != "" { - currentClientAccess = accessIdCache.accessIds[accessID] + } else if accessID != "" && accessCache.FindAccessIdInCache(accessID) != "" { + currentClientAccess = *accessCache.accessIds[accessID] if stateCLI.enableDebugFlag { log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) } @@ -316,41 +477,37 @@ func accessMiddleware(next http.Handler) http.Handler { return } currentClientAccess.requestedDataSource = requestedDataSource - accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow) + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) + } else if accessID == "" && accessCache.FindAuthorizationTokenInCache(authorizationToken) != "" { + currentClientAccess = *accessCache.authorizationTokens[authorizationToken] + // Check if limit of calls not exceeded + isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + if !isClientAllowedToGetAccess { + http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) + return + } + fmt.Println(currentClientAccess.ResourceID) + currentClientAccess.requestedDataSource = requestedDataSource + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) } else { if stateCLI.enableDebugFlag { - log.Printf("New access id, looking at Brood resources") + log.Printf("No access in cache found, looking at Brood resources") } - resources, err := bugoutClient.Brood.GetResources( - NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, - map[string]string{"access_id": accessID}, - ) + + resource, err := fetchResource(accessID, authorizationToken, tsNow) if err != nil { - http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden) + http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden) return } - resourcesLen := len(resources.Resources) - if resourcesLen == 0 { - http.Error(w, "User with provided access identifier not found", http.StatusForbidden) - return - } - if resourcesLen > 1 { - http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError) - return - } - resourceData, err := json.Marshal(resources.Resources[0].ResourceData) + + clientAccess, err := parseClientAccess(*resource) if err != nil { - http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError) + http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError) return } - currentClientAccess.ResourceID = resources.Resources[0].Id + currentClientAccess = ClientAccess(*clientAccess) + currentClientAccess.authorizationToken = authorizationToken currentClientAccess.requestedDataSource = requestedDataSource - err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData) - if err != nil { - http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError) - return - } // Check if limit of calls not exceeded isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) @@ -358,7 +515,7 @@ func accessMiddleware(next http.Handler) http.Handler { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow) + accessCache.AddAccessToCache(currentClientAccess, tsNow) } ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess)