kopia lustrzana https://github.com/bugout-dev/moonstream
Support of two caches for access IDs and Auth tokens
rodzic
2d42ac4cd5
commit
7f89784f2f
|
@ -253,6 +253,8 @@ func cli() {
|
|||
PeriodStartTs: time.Now().Unix(),
|
||||
MaxCallsPerPeriod: stateCLI.MaxCallsPerPeriodFlag,
|
||||
CallsPerPeriod: 0,
|
||||
|
||||
Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS,
|
||||
}
|
||||
_, err := bugoutClient.Brood.FindUser(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
|
|
|
@ -15,6 +15,8 @@ var (
|
|||
type ClientAccess struct {
|
||||
ResourceID string
|
||||
|
||||
authorizationToken string
|
||||
|
||||
ClientResourceData ClientResourceData
|
||||
|
||||
LastAccessTs int64
|
||||
|
@ -36,6 +38,8 @@ type ClientResourceData struct {
|
|||
PeriodStartTs int64 `json:"period_start_ts"`
|
||||
MaxCallsPerPeriod int64 `json:"max_calls_per_period"`
|
||||
CallsPerPeriod int64 `json:"calls_per_period"`
|
||||
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded
|
||||
|
|
|
@ -16,23 +16,27 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bugout-dev/bugout-go/pkg/brood"
|
||||
humbug "github.com/bugout-dev/humbug/go/pkg"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var (
|
||||
accessIdCache AccessCache
|
||||
accessCache AccessCache
|
||||
)
|
||||
|
||||
type AccessCache struct {
|
||||
accessIds map[string]ClientAccess
|
||||
accessIds map[string]*ClientAccess
|
||||
authorizationTokens map[string]*ClientAccess
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
// CreateAccessCache generates empty cache of client access
|
||||
func CreateAccessCache() {
|
||||
accessIdCache = AccessCache{
|
||||
accessIds: make(map[string]ClientAccess),
|
||||
accessCache = AccessCache{
|
||||
accessIds: make(map[string]*ClientAccess),
|
||||
authorizationTokens: make(map[string]*ClientAccess),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,24 +56,47 @@ func (ac *AccessCache) FindAccessIdInCache(accessId string) string {
|
|||
return detectedId
|
||||
}
|
||||
|
||||
// Update last call access timestamp and datasource for access id
|
||||
func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
if accessData, ok := ac.accessIds[accessId]; ok {
|
||||
accessData.LastAccessTs = tsNow
|
||||
accessData.requestedDataSource = requestedDataSource
|
||||
accessData.LastSessionCallsCounter++
|
||||
// Get access id from cache if exists
|
||||
func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) string {
|
||||
var detected string
|
||||
|
||||
ac.accessIds[accessId] = accessData
|
||||
ac.mux.RLock()
|
||||
for id := range ac.authorizationTokens {
|
||||
if id == authorizationToken {
|
||||
detected = id
|
||||
break
|
||||
}
|
||||
}
|
||||
ac.mux.RUnlock()
|
||||
|
||||
return detected
|
||||
}
|
||||
|
||||
// Update last call access timestamp and datasource for access id
|
||||
func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
var accessToModify *ClientAccess
|
||||
if access, ok := ac.accessIds[accessId]; ok {
|
||||
accessToModify = access
|
||||
|
||||
}
|
||||
if access, ok := ac.authorizationTokens[authorizationToken]; ok {
|
||||
accessToModify = access
|
||||
}
|
||||
|
||||
accessToModify.LastAccessTs = tsNow
|
||||
accessToModify.requestedDataSource = requestedDataSource
|
||||
accessToModify.LastSessionCallsCounter++
|
||||
|
||||
ac.mux.Unlock()
|
||||
}
|
||||
|
||||
// Add new access ID with data to cache
|
||||
func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) {
|
||||
func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{
|
||||
ResourceID: clientAccess.ResourceID,
|
||||
access := ClientAccess{
|
||||
ResourceID: clientAccess.ResourceID,
|
||||
authorizationToken: clientAccess.authorizationToken,
|
||||
|
||||
ClientResourceData: ClientResourceData{
|
||||
UserID: clientAccess.ClientResourceData.UserID,
|
||||
|
@ -83,6 +110,8 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64
|
|||
PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs,
|
||||
MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod,
|
||||
CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod,
|
||||
|
||||
Type: clientAccess.ClientResourceData.Type,
|
||||
},
|
||||
|
||||
LastAccessTs: tsNow,
|
||||
|
@ -91,6 +120,11 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64
|
|||
|
||||
requestedDataSource: clientAccess.requestedDataSource,
|
||||
}
|
||||
|
||||
ac.accessIds[clientAccess.ClientResourceData.AccessID] = &access
|
||||
if clientAccess.authorizationToken != "" {
|
||||
ac.authorizationTokens[clientAccess.authorizationToken] = &access
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
}
|
||||
|
||||
|
@ -100,9 +134,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
tsNow := time.Now().Unix()
|
||||
|
||||
ac.mux.Lock()
|
||||
|
||||
for aId, clientAccess := range ac.accessIds {
|
||||
totalAccessIds++
|
||||
removedUserId := ""
|
||||
|
||||
if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME {
|
||||
// Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period
|
||||
removedUserId = clientAccess.ClientResourceData.UserID
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
err := clientAccess.UpdateClientResourceCallCounter(tsNow)
|
||||
|
@ -110,6 +149,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME {
|
||||
removedUserId = clientAccess.ClientResourceData.UserID
|
||||
// Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
|
@ -117,12 +157,21 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
if err != nil {
|
||||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
totalAccessIds++
|
||||
}
|
||||
|
||||
if removedUserId != "" {
|
||||
for aToken, clientAccess := range ac.authorizationTokens {
|
||||
if clientAccess.ClientResourceData.UserID == removedUserId {
|
||||
delete(ac.authorizationTokens, aToken)
|
||||
}
|
||||
}
|
||||
removedUserId = ""
|
||||
}
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
|
||||
totalAccessIds = totalAccessIds - removedAccessIds
|
||||
|
||||
return removedAccessIds, totalAccessIds
|
||||
}
|
||||
|
||||
|
@ -131,7 +180,7 @@ func initCacheCleaning(debug bool) {
|
|||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
removedAccessIds, totalAccessIds := accessIdCache.Cleanup()
|
||||
removedAccessIds, totalAccessIds := accessCache.Cleanup()
|
||||
if debug {
|
||||
log.Printf("Removed %d clients from access cache", removedAccessIds)
|
||||
}
|
||||
|
@ -140,6 +189,102 @@ func initCacheCleaning(debug bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func parseClientAccess(resource brood.Resource) (*ClientAccess, error) {
|
||||
var clientAccess ClientAccess
|
||||
|
||||
resourceData, err := json.Marshal(resource.ResourceData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clientAccess.ResourceID = resource.Id
|
||||
err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &clientAccess, nil
|
||||
}
|
||||
|
||||
// fetchResources fetch resources with access ID or authorization token and generate new one if there no one
|
||||
func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) {
|
||||
var err error
|
||||
var resources brood.Resources
|
||||
|
||||
queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS}
|
||||
if accessID != "" {
|
||||
queryParameters["access_id"] = accessID
|
||||
}
|
||||
|
||||
token := NB_CONTROLLER_TOKEN
|
||||
if authorizationToken != "" {
|
||||
token = authorizationToken
|
||||
}
|
||||
|
||||
resources, err = bugoutClient.Brood.GetResources(
|
||||
token,
|
||||
NB_APPLICATION_ID,
|
||||
queryParameters,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to get resources, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to get access identifiers")
|
||||
}
|
||||
|
||||
if len(resources.Resources) == 0 {
|
||||
if authorizationToken != "" {
|
||||
// Generate new autogenerated access resource with default parameters and grant user permissions to work with it
|
||||
user, err := bugoutClient.Brood.GetUser(authorizationToken)
|
||||
if err != nil {
|
||||
log.Printf("Unable to get user, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to find user with provided authorization token")
|
||||
}
|
||||
newResource, err := bugoutClient.Brood.CreateResource(
|
||||
NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, ClientResourceData{
|
||||
UserID: user.Id,
|
||||
AccessID: uuid.New().String(),
|
||||
Name: user.Username,
|
||||
Description: "Autogenerated access ID",
|
||||
BlockchainAccess: true,
|
||||
ExtendedMethods: false,
|
||||
|
||||
PeriodDuration: 86400,
|
||||
PeriodStartTs: tsNow,
|
||||
MaxCallsPerPeriod: 1000,
|
||||
CallsPerPeriod: 0,
|
||||
|
||||
Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to create resource with autogenerated access for user with ID %s, err: %v", user.Id, err)
|
||||
return nil, fmt.Errorf("unable to create resource with autogenerated access for user")
|
||||
}
|
||||
|
||||
resourceHolderPermissions, err := bugoutClient.Brood.AddResourceHolderPermissions(
|
||||
NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{
|
||||
Id: user.Id,
|
||||
HolderType: "user",
|
||||
Permissions: []string{"read", "update", "delete"},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to grant permissions to user with ID %s at resource with ID %s, err: %v", newResource.Id, user.Id, err)
|
||||
return nil, fmt.Errorf("unable to create resource with autogenerated access for user")
|
||||
}
|
||||
|
||||
log.Printf("Created new resource with ID %s with autogenerated access for user with ID %s", resourceHolderPermissions.ResourceId, user.Id)
|
||||
resources.Resources = append(resources.Resources, newResource)
|
||||
} else {
|
||||
return nil, fmt.Errorf("there are no provided access identifier")
|
||||
}
|
||||
} else if len(resources.Resources) > 1 {
|
||||
// TODO(kompotkot): Write support of multiple resources, be careful, because NB_CONTROLLER has several resources
|
||||
return nil, fmt.Errorf("there are no provided access identifier")
|
||||
}
|
||||
|
||||
return &resources.Resources[0], nil
|
||||
}
|
||||
|
||||
// Extract access_id from header and query. Query takes precedence over header.
|
||||
func extractAccessID(r *http.Request) string {
|
||||
var accessID string
|
||||
|
@ -289,23 +434,39 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
accessID := extractAccessID(r)
|
||||
requestedDataSource := extractRequestedDataSource(r)
|
||||
|
||||
if accessID == "" {
|
||||
http.Error(w, "No access id passed with request", http.StatusForbidden)
|
||||
return
|
||||
// Extract Authorization token if Bearer header provided
|
||||
var authorizationTokenRaw string
|
||||
authorizationTokenHeaders := r.Header[strings.Title("authorization")]
|
||||
for _, h := range authorizationTokenHeaders {
|
||||
authorizationTokenRaw = h
|
||||
}
|
||||
var authorizationToken string
|
||||
if authorizationTokenRaw != "" {
|
||||
authorizationTokenSlice := strings.Split(authorizationTokenRaw, " ")
|
||||
if len(authorizationTokenSlice) != 2 || authorizationTokenSlice[0] != "Bearer" || authorizationTokenSlice[1] == "" {
|
||||
http.Error(w, "Wrong authorization token provided", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
authorizationToken = authorizationTokenSlice[1]
|
||||
}
|
||||
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
if accessID == "" && authorizationToken == "" {
|
||||
http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
// If access id does not belong to internal crawlers, then check cache or find it in Bugout resources
|
||||
if accessID == NB_CONTROLLER_ACCESS_ID {
|
||||
if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID {
|
||||
currentClientAccess = internalUsageAccess
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
currentClientAccess.LastAccessTs = tsNow
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
} else if accessIdCache.FindAccessIdInCache(accessID) != "" {
|
||||
currentClientAccess = accessIdCache.accessIds[accessID]
|
||||
} else if accessID != "" && accessCache.FindAccessIdInCache(accessID) != "" {
|
||||
currentClientAccess = *accessCache.accessIds[accessID]
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
|
@ -316,41 +477,37 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
return
|
||||
}
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow)
|
||||
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
|
||||
} else if accessID == "" && accessCache.FindAuthorizationTokenInCache(authorizationToken) != "" {
|
||||
currentClientAccess = *accessCache.authorizationTokens[authorizationToken]
|
||||
// Check if limit of calls not exceeded
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
fmt.Println(currentClientAccess.ResourceID)
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
|
||||
} else {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("New access id, looking at Brood resources")
|
||||
log.Printf("No access in cache found, looking at Brood resources")
|
||||
}
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
map[string]string{"access_id": accessID},
|
||||
)
|
||||
|
||||
resource, err := fetchResource(accessID, authorizationToken, tsNow)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden)
|
||||
http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
resourcesLen := len(resources.Resources)
|
||||
if resourcesLen == 0 {
|
||||
http.Error(w, "User with provided access identifier not found", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if resourcesLen > 1 {
|
||||
http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resourceData, err := json.Marshal(resources.Resources[0].ResourceData)
|
||||
|
||||
clientAccess, err := parseClientAccess(*resource)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError)
|
||||
http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
currentClientAccess.ResourceID = resources.Resources[0].Id
|
||||
currentClientAccess = ClientAccess(*clientAccess)
|
||||
currentClientAccess.authorizationToken = authorizationToken
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if limit of calls not exceeded
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
|
@ -358,7 +515,7 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow)
|
||||
accessCache.AddAccessToCache(currentClientAccess, tsNow)
|
||||
}
|
||||
|
||||
ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess)
|
||||
|
|
Ładowanie…
Reference in New Issue