diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index c4f1646d..b5fba9f1 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -241,7 +241,7 @@ func cli() { stateCLI.addAccessCmd.Parse(os.Args[2:]) stateCLI.checkRequirements() - proposedUserAccess := ClientResourceData{ + proposedClientResourceData := ClientResourceData{ UserID: stateCLI.userIDFlag, AccessID: stateCLI.accessIDFlag, Name: stateCLI.accessNameFlag, @@ -257,7 +257,7 @@ func cli() { _, err := bugoutClient.Brood.FindUser( NB_CONTROLLER_TOKEN, map[string]string{ - "user_id": proposedUserAccess.UserID, + "user_id": proposedClientResourceData.UserID, "application_id": NB_APPLICATION_ID, }, ) @@ -265,7 +265,7 @@ func cli() { fmt.Printf("User does not exists, err: %v\n", err) os.Exit(1) } - resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedUserAccess) + resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedClientResourceData) if err != nil { fmt.Printf("Unable to create user access, err: %v\n", err) os.Exit(1) @@ -275,7 +275,7 @@ func cli() { fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) os.Exit(1) } - var newUserAccess ClientResourceData + var newUserAccess ClientAccess err = json.Unmarshal(resourceData, &newUserAccess) if err != nil { fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) @@ -321,44 +321,45 @@ func cli() { } resource := resources.Resources[0] - resource_data, err := json.Marshal(resource.ResourceData) + resourceData, err := json.Marshal(resource.ResourceData) if err != nil { fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) os.Exit(1) } - var currentUserAccess ClientResourceData - err = json.Unmarshal(resource_data, ¤tUserAccess) + + var currentClientAccess ClientAccess + currentClientAccess.ResourceID = resource.Id + err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData) if err != nil { fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) os.Exit(1) } - currentUserAccess.ResourceID = resource.Id // TODO(kompotkot): Since we are using bool flags I moved with ugly solution. // Let's find better one when have free time or will re-write flag Set. update := make(map[string]interface{}) - if stateCLI.accessNameFlag != currentUserAccess.Name && stateCLI.accessNameFlag != DEFAULT_ACCESS_NAME { + if stateCLI.accessNameFlag != currentClientAccess.ClientResourceData.Name && stateCLI.accessNameFlag != DEFAULT_ACCESS_NAME { update["name"] = stateCLI.accessNameFlag } - if stateCLI.accessDescriptionFlag != currentUserAccess.Description && stateCLI.accessDescriptionFlag != DEFAULT_ACCESS_DESCRIPTION { + if stateCLI.accessDescriptionFlag != currentClientAccess.ClientResourceData.Description && stateCLI.accessDescriptionFlag != DEFAULT_ACCESS_DESCRIPTION { update["description"] = stateCLI.accessDescriptionFlag } - if stateCLI.blockchainAccessFlag != currentUserAccess.BlockchainAccess && stateCLI.blockchainAccessFlag != DEFAULT_BLOCKCHAIN_ACCESS { + if stateCLI.blockchainAccessFlag != currentClientAccess.ClientResourceData.BlockchainAccess && stateCLI.blockchainAccessFlag != DEFAULT_BLOCKCHAIN_ACCESS { update["blockchain_access"] = stateCLI.blockchainAccessFlag } - if stateCLI.extendedMethodsFlag != currentUserAccess.ExtendedMethods && stateCLI.extendedMethodsFlag != DEFAULT_EXTENDED_METHODS { + if stateCLI.extendedMethodsFlag != currentClientAccess.ClientResourceData.ExtendedMethods && stateCLI.extendedMethodsFlag != DEFAULT_EXTENDED_METHODS { update["extended_methods"] = stateCLI.extendedMethodsFlag } - if stateCLI.PeriodDurationFlag != currentUserAccess.PeriodDuration && stateCLI.PeriodDurationFlag != DEFAULT_PERIOD_DURATION { + if stateCLI.PeriodDurationFlag != currentClientAccess.ClientResourceData.PeriodDuration && stateCLI.PeriodDurationFlag != DEFAULT_PERIOD_DURATION { update["period_duration"] = stateCLI.PeriodDurationFlag } - if stateCLI.MaxCallsPerPeriodFlag != currentUserAccess.MaxCallsPerPeriod && stateCLI.MaxCallsPerPeriodFlag != DEFAULT_MAX_CALLS_PER_PERIOD { + if stateCLI.MaxCallsPerPeriodFlag != currentClientAccess.ClientResourceData.MaxCallsPerPeriod && stateCLI.MaxCallsPerPeriodFlag != DEFAULT_MAX_CALLS_PER_PERIOD { update["max_calls_per_period"] = stateCLI.MaxCallsPerPeriodFlag } - if stateCLI.PeriodStartTsFlag != currentUserAccess.PeriodStartTs && stateCLI.PeriodStartTsFlag != 0 { + if stateCLI.PeriodStartTsFlag != currentClientAccess.ClientResourceData.PeriodStartTs && stateCLI.PeriodStartTsFlag != 0 { update["period_start_ts"] = stateCLI.PeriodStartTsFlag } - if stateCLI.CallsPerPeriodFlag != currentUserAccess.CallsPerPeriod && stateCLI.CallsPerPeriodFlag != 0 { + if stateCLI.CallsPerPeriodFlag != currentClientAccess.ClientResourceData.CallsPerPeriod && stateCLI.CallsPerPeriodFlag != 0 { update["calls_per_period"] = stateCLI.CallsPerPeriodFlag } @@ -372,13 +373,13 @@ func cli() { fmt.Printf("Unable to update Bugout resource, err: %v\n", err) os.Exit(1) } - + updatedResourceData, err := json.Marshal(updatedResource.ResourceData) if err != nil { fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) os.Exit(1) } - var updatedUserAccess ClientResourceData + var updatedUserAccess ClientAccess err = json.Unmarshal(updatedResourceData, &updatedUserAccess) if err != nil { fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) @@ -413,7 +414,7 @@ func cli() { os.Exit(1) } - var userAccesses []ClientResourceData + var userAccesses []ClientAccess for _, resource := range resources.Resources { deletedResource, err := bugoutClient.Brood.DeleteResource(NB_CONTROLLER_TOKEN, resource.Id) if err != nil { @@ -425,7 +426,7 @@ func cli() { fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) continue } - var deletedUserAccess ClientResourceData + var deletedUserAccess ClientAccess err = json.Unmarshal(deletedResourceData, &deletedUserAccess) if err != nil { fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) @@ -471,7 +472,7 @@ func cli() { os.Exit(1) } - var userAccesses []ClientResourceData + var clientAccesses []ClientAccess offset := stateCLI.offsetFlag if stateCLI.offsetFlag > len(resources.Resources) { @@ -483,20 +484,21 @@ func cli() { } for _, resource := range resources.Resources[offset:limit] { - resource_data, err := json.Marshal(resource.ResourceData) + resourceData, err := json.Marshal(resource.ResourceData) if err != nil { fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) continue } - var userAccess ClientResourceData - err = json.Unmarshal(resource_data, &userAccess) + var clientAccess ClientAccess + clientAccess.ResourceID = resource.Id + err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData) if err != nil { fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) continue } - userAccesses = append(userAccesses, userAccess) + clientAccesses = append(clientAccesses, clientAccess) } - userAccessesJson, err := json.Marshal(userAccesses) + userAccessesJson, err := json.Marshal(clientAccesses) if err != nil { fmt.Printf("Unable to marshal user accesses struct, err: %v\n", err) os.Exit(1) diff --git a/nodes/node_balancer/cmd/nodebalancer/clients.go b/nodes/node_balancer/cmd/nodebalancer/clients.go index 8ef24f1b..58ca26ad 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients.go @@ -1,6 +1,7 @@ package main import ( + "log" "reflect" "sync" "time" @@ -11,9 +12,19 @@ var ( ) // Structure to define user access according with Brood resources +type ClientAccess struct { + ResourceID string + + ClientResourceData ClientResourceData + + LastAccessTs int64 + LastSessionAccessTs int64 // When last session with nodebalancer where started + LastSessionCallsCounter int64 + + requestedDataSource string +} + type ClientResourceData struct { - ResourceID string `json:"resource_id"` - UserID string `json:"user_id"` AccessID string `json:"access_id"` Name string `json:"name"` @@ -25,12 +36,50 @@ type ClientResourceData struct { PeriodStartTs int64 `json:"period_start_ts"` MaxCallsPerPeriod int64 `json:"max_calls_per_period"` CallsPerPeriod int64 `json:"calls_per_period"` +} - LastAccessTs int64 `json:"last_access_ts"` - LastSessionAccessTs int64 `json:"last_session_access_ts"` // When last session with nodebalancer where started - LastSessionCallsCounter int64 `json:"last_session_calls_counter"` +// CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded +// If client passed this check, we will add this client to cache and let him operates until cache will be +// cleaned with go-routine and resource will be updated +func (ca *ClientAccess) CheckClientCallPeriodLimits(tsNow int64) bool { + isClientAllowedToGetAccess := false + if tsNow-ca.ClientResourceData.PeriodStartTs < ca.ClientResourceData.PeriodDuration { + // Client operates in period + if ca.ClientResourceData.CallsPerPeriod < ca.ClientResourceData.MaxCallsPerPeriod { + // Client's limit of calls not reached + isClientAllowedToGetAccess = true + } + } else { + // Client period should be refreshed + if stateCLI.enableDebugFlag { + log.Printf("Refresh client's period_start_ts with time.now() and reset calls_per_period") + } + ca.ClientResourceData.CallsPerPeriod = 0 + ca.ClientResourceData.PeriodStartTs = tsNow + isClientAllowedToGetAccess = true + } + return isClientAllowedToGetAccess +} - dataSource string +// UpdateClientResourceCallCounter updates Brood resource where increase calls counter to node +// with current number of calls during last session. +func (ca *ClientAccess) UpdateClientResourceCallCounter(tsNow int64) error { + update := make(map[string]interface{}) + update["period_start_ts"] = ca.ClientResourceData.PeriodStartTs + update["calls_per_period"] = ca.ClientResourceData.CallsPerPeriod + ca.LastSessionCallsCounter + + updatedResource, err := bugoutClient.Brood.UpdateResource( + NB_CONTROLLER_TOKEN, + ca.ResourceID, + update, + []string{}, + ) + if err != nil { + return err + } + log.Printf("Resource %s updated\n", updatedResource.Id) + + return nil } // Node - which one node client worked with diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 50fce247..dee31d25 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -30,9 +30,9 @@ var ( NB_HEALTH_CHECK_INTERVAL = time.Millisecond * 5000 NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 - NB_CACHE_CLEANING_INTERVAL = time.Second * 10 - NB_CACHE_ACCESS_ID_LIFETIME = int64(120) - NB_CACHE_ACCESS_ID_SESSION_LIFETIME = int64(600) + NB_CACHE_CLEANING_INTERVAL = time.Second * 10 + NB_CACHE_ACCESS_ID_LIFETIME = int64(120) // TODO(kompotkot): Set to 2 mins + NB_CACHE_ACCESS_ID_SESSION_LIFETIME = int64(600) // TODO(kompotkot): Set to 10 mins NB_MAX_COUNTER_NUMBER = uint64(10000000) diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index bcaa4331..7838b48c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -24,7 +24,7 @@ var ( ) type AccessCache struct { - accessIds map[string]ClientResourceData + accessIds map[string]ClientAccess mux sync.RWMutex } @@ -32,7 +32,7 @@ type AccessCache struct { // CreateAccessCache generates empty cache of client access func CreateAccessCache() { accessIdCache = AccessCache{ - accessIds: make(map[string]ClientResourceData), + accessIds: make(map[string]ClientAccess), } } @@ -53,34 +53,43 @@ func (ac *AccessCache) FindAccessIdInCache(accessId string) string { } // Update last call access timestamp and datasource for access id -func (ac *AccessCache) UpdateAccessIdAtCache(accessId, dataSource string) { +func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) { ac.mux.Lock() if accessData, ok := ac.accessIds[accessId]; ok { - accessData.LastAccessTs = time.Now().Unix() - accessData.dataSource = dataSource + accessData.LastAccessTs = tsNow + accessData.requestedDataSource = requestedDataSource + accessData.LastSessionCallsCounter++ ac.accessIds[accessId] = accessData } ac.mux.Unlock() } -// Add new access id with data to cache -func (ac *AccessCache) AddAccessIdToCache(clientResourceData ClientResourceData, dataSource string) { - tsNow := time.Now().Unix() - +// Add new access ID with data to cache +func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) { ac.mux.Lock() - ac.accessIds[clientResourceData.AccessID] = ClientResourceData{ - UserID: clientResourceData.UserID, - AccessID: clientResourceData.AccessID, - Name: clientResourceData.Name, - Description: clientResourceData.Description, - BlockchainAccess: clientResourceData.BlockchainAccess, - ExtendedMethods: clientResourceData.ExtendedMethods, + ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{ + ResourceID: clientAccess.ResourceID, - LastAccessTs: tsNow, - LastSessionAccessTs: tsNow, + ClientResourceData: ClientResourceData{ + UserID: clientAccess.ClientResourceData.UserID, + AccessID: clientAccess.ClientResourceData.AccessID, + Name: clientAccess.ClientResourceData.Name, + Description: clientAccess.ClientResourceData.Description, + BlockchainAccess: clientAccess.ClientResourceData.BlockchainAccess, + ExtendedMethods: clientAccess.ClientResourceData.ExtendedMethods, - dataSource: dataSource, + PeriodDuration: clientAccess.ClientResourceData.PeriodDuration, + PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs, + MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod, + CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod, + }, + + LastAccessTs: tsNow, + LastSessionAccessTs: tsNow, + LastSessionCallsCounter: 1, + + requestedDataSource: clientAccess.requestedDataSource, } ac.mux.Unlock() } @@ -89,19 +98,31 @@ func (ac *AccessCache) AddAccessIdToCache(clientResourceData ClientResourceData, func (ac *AccessCache) Cleanup() (int64, int64) { var removedAccessIds, totalAccessIds int64 tsNow := time.Now().Unix() + ac.mux.Lock() - for aId, aData := range ac.accessIds { - if tsNow-aData.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { + for aId, clientAccess := range ac.accessIds { + if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { + // Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period delete(ac.accessIds, aId) removedAccessIds++ - } else if tsNow-aData.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME { + err := clientAccess.UpdateClientResourceCallCounter(tsNow) + if err != nil { + log.Printf("Unable to update Brood resource, err: %v\n", err) + } + } else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME { + // Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME delete(ac.accessIds, aId) removedAccessIds++ + err := clientAccess.UpdateClientResourceCallCounter(tsNow) + if err != nil { + log.Printf("Unable to update Brood resource, err: %v\n", err) + } } else { totalAccessIds++ } } ac.mux.Unlock() + return removedAccessIds, totalAccessIds } @@ -112,9 +133,9 @@ func initCacheCleaning(debug bool) { case <-t.C: removedAccessIds, totalAccessIds := accessIdCache.Cleanup() if debug { - log.Printf("Removed %d elements from access id cache", removedAccessIds) + log.Printf("Removed %d clients from access cache", removedAccessIds) } - log.Printf("Elements in access id cache: %d", totalAccessIds) + log.Printf("Clients in access cache: %d", totalAccessIds) } } } @@ -139,22 +160,22 @@ func extractAccessID(r *http.Request) string { } // Extract data_source from header and query. Query takes precedence over header. -func extractDataSource(r *http.Request) string { - dataSource := "database" +func extractRequestedDataSource(r *http.Request) string { + requestedDataSource := "database" - dataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)] - for _, h := range dataSources { - dataSource = h + requestedDataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)] + for _, h := range requestedDataSources { + requestedDataSource = h } queries := r.URL.Query() for k, v := range queries { if k == "data_source" { - dataSource = v[0] + requestedDataSource = v[0] } } - return dataSource + return requestedDataSource } // Handle panic errors to prevent server shutdown @@ -252,7 +273,7 @@ func logMiddleware(next http.Handler) http.Handler { } accessID := extractAccessID(r) if accessID != "" { - dataSource := extractDataSource(r) + dataSource := extractRequestedDataSource(r) logStr += fmt.Sprintf(" %s %s", dataSource, accessID) } } @@ -263,30 +284,38 @@ func logMiddleware(next http.Handler) http.Handler { // Check access id was provided correctly and save user access configuration to request context func accessMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var currentClientAccess ClientResourceData + var currentClientAccess ClientAccess accessID := extractAccessID(r) - dataSource := extractDataSource(r) + requestedDataSource := extractRequestedDataSource(r) if accessID == "" { http.Error(w, "No access id passed with request", http.StatusForbidden) return } + tsNow := time.Now().Unix() + // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources if accessID == NB_CONTROLLER_ACCESS_ID { - if stateCLI.enableDebugFlag { - log.Printf("Access id belongs to internal crawlers") - } currentClientAccess = internalCrawlersAccess - currentClientAccess.dataSource = dataSource - } else if accessIdCache.FindAccessIdInCache(accessID) != "" { if stateCLI.enableDebugFlag { - log.Printf("Access id found in cache") + log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) } + currentClientAccess.requestedDataSource = requestedDataSource + } else if accessIdCache.FindAccessIdInCache(accessID) != "" { currentClientAccess = accessIdCache.accessIds[accessID] - currentClientAccess.dataSource = dataSource - accessIdCache.UpdateAccessIdAtCache(accessID, dataSource) + if stateCLI.enableDebugFlag { + log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) + } + // Check if limit of calls not exceeded + isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + if !isClientAllowedToGetAccess { + http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) + return + } + currentClientAccess.requestedDataSource = requestedDataSource + accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow) } else { if stateCLI.enableDebugFlag { log.Printf("New access id, looking at Brood resources") @@ -300,33 +329,35 @@ func accessMiddleware(next http.Handler) http.Handler { http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden) return } - if len(resources.Resources) == 0 { + resourcesLen := len(resources.Resources) + if resourcesLen == 0 { http.Error(w, "User with provided access identifier not found", http.StatusForbidden) return } - resource_data, err := json.Marshal(resources.Resources[0].ResourceData) + if resourcesLen > 1 { + http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError) + return + } + resourceData, err := json.Marshal(resources.Resources[0].ResourceData) if err != nil { http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError) return } - var clientResourceData ClientResourceData - err = json.Unmarshal(resource_data, &clientResourceData) + currentClientAccess.ResourceID = resources.Resources[0].Id + currentClientAccess.requestedDataSource = requestedDataSource + err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData) if err != nil { http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError) return } - currentClientAccess = ClientResourceData{ - UserID: clientResourceData.UserID, - AccessID: clientResourceData.AccessID, - Name: clientResourceData.Name, - Description: clientResourceData.Description, - BlockchainAccess: clientResourceData.BlockchainAccess, - ExtendedMethods: clientResourceData.ExtendedMethods, - dataSource: dataSource, + // Check if limit of calls not exceeded + isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + if !isClientAllowedToGetAccess { + http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) + return } - - accessIdCache.AddAccessIdToCache(clientResourceData, dataSource) + accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow) } ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess) diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index 2cc2d32e..b280615f 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -27,7 +27,7 @@ func pingRoute(w http.ResponseWriter, r *http.Request) { // lbHandler load balances the incoming requests to nodes func lbHandler(w http.ResponseWriter, r *http.Request) { currentClientAccessRaw := r.Context().Value("currentClientAccess") - currentClientAccess, ok := currentClientAccessRaw.(ClientResourceData) + currentClientAccess, ok := currentClientAccessRaw.(ClientAccess) if !ok { http.Error(w, "Internal server error", http.StatusInternalServerError) return @@ -55,14 +55,14 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { // Chose one node var node *Node cpool := GetClientPool(blockchain) - node = cpool.GetClientNode(currentClientAccess.AccessID) + node = cpool.GetClientNode(currentClientAccess.ClientResourceData.AccessID) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) return } - cpool.AddClientNode(currentClientAccess.AccessID, node) + cpool.AddClientNode(currentClientAccess.ClientResourceData.AccessID, node) } // Save origin path, to use in proxyErrorHandler if node will not response @@ -78,7 +78,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } } -func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) { +func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientAccess) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Unable to read body", http.StatusBadRequest) @@ -94,12 +94,12 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, } switch { - case currentClientAccess.dataSource == "blockchain": - if !currentClientAccess.BlockchainAccess { + case currentClientAccess.requestedDataSource == "blockchain": + if !currentClientAccess.ClientResourceData.BlockchainAccess { http.Error(w, "Access to blockchain node not allowed with provided access id", http.StatusForbidden) return } - if !currentClientAccess.ExtendedMethods { + if !currentClientAccess.ClientResourceData.ExtendedMethods { for _, jsonrpcRequest := range jsonrpcRequests { _, exists := ALLOWED_METHODS[jsonrpcRequest.Method] if !exists { @@ -115,11 +115,11 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, r.URL.Path = "/" node.GethReverseProxy.ServeHTTP(w, r) return - case currentClientAccess.dataSource == "database": + case currentClientAccess.requestedDataSource == "database": http.Error(w, "Database access under development", http.StatusInternalServerError) return default: - http.Error(w, fmt.Sprintf("Unacceptable data source %s", currentClientAccess.dataSource), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Unacceptable data source %s", currentClientAccess.requestedDataSource), http.StatusBadRequest) return } } diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index 5355060a..a5324eab 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -20,7 +20,7 @@ import ( ) var ( - internalCrawlersAccess ClientResourceData + internalCrawlersAccess ClientAccess // Crash reporter reporter *humbug.HumbugReporter @@ -137,23 +137,25 @@ func Server() { fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err) os.Exit(1) } - var clientAccess ClientResourceData - err = json.Unmarshal(resource_data, &clientAccess) + var clientResourceData ClientResourceData + err = json.Unmarshal(resource_data, &clientResourceData) if err != nil { fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err) os.Exit(1) } - internalCrawlersAccess = ClientResourceData{ - UserID: clientAccess.UserID, - AccessID: clientAccess.AccessID, - Name: clientAccess.Name, - Description: clientAccess.Description, - BlockchainAccess: clientAccess.BlockchainAccess, - ExtendedMethods: clientAccess.ExtendedMethods, + internalCrawlersAccess = ClientAccess{ + ClientResourceData: ClientResourceData{ + UserID: clientResourceData.UserID, + AccessID: clientResourceData.AccessID, + Name: clientResourceData.Name, + Description: clientResourceData.Description, + BlockchainAccess: clientResourceData.BlockchainAccess, + ExtendedMethods: clientResourceData.ExtendedMethods, + }, } log.Printf( "Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t", - resources.Resources[0].Id, clientAccess.BlockchainAccess, clientAccess.ExtendedMethods, + resources.Resources[0].Id, clientResourceData.BlockchainAccess, clientResourceData.ExtendedMethods, ) // Fill NodeConfigList with initial nodes from environment variables