kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #787 from bugout-dev/nb-auth-token-access
Nodebalancer access by Bearer access tokenpull/792/head
commit
541779c0b5
|
@ -61,4 +61,5 @@ go.work
|
|||
dev.env
|
||||
prod.env
|
||||
test.env
|
||||
.venv
|
||||
|
||||
|
|
|
@ -113,3 +113,14 @@ For Web3 providers `access_id` and `data_source` could be specified in headers
|
|||
```bash
|
||||
/usr/local/go/bin/go test -run ^TestCleanInactiveClientNodes$ github.com/bugout-dev/moonstream/nodes/node_balancer/cmd/nodebalancer -v -count=1
|
||||
```
|
||||
|
||||
## Migrations
|
||||
|
||||
To run migration:
|
||||
|
||||
```bash
|
||||
python migrations/migrations.py run --key 20230522 \
|
||||
--token-current-owner "$NB_CONTROLLER_TOKEN" \
|
||||
--token-new-owner "$MOONSTREAM_ADMIN_OR_OTHER_CONTROLLER" \
|
||||
--new-application-id "$MOONSTREAM_APPLICATION_ID"
|
||||
```
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func accessCacheSetupSuit(t *testing.T) func(t *testing.T) {
|
||||
t.Log("Setup suit")
|
||||
|
||||
CreateAccessCache()
|
||||
|
||||
return func(t *testing.T) {
|
||||
t.Log("Teardown suit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddAccessToCache(t *testing.T) {
|
||||
teardownSuit := accessCacheSetupSuit(t)
|
||||
defer teardownSuit(t)
|
||||
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
var cases = []struct {
|
||||
prop ClientAccess
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "7378e2b2-b6ac-4738-bf34-fe39aa0d19e9"}},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "000000000000000000000000000000000000"}},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
prop: ClientAccess{ClientResourceData: ClientResourceData{Name: "name-1"}},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
accessId := c.prop.ClientResourceData.AccessID
|
||||
accessCache.AddAccessToCache(c.prop, tsNow)
|
||||
if accessCache.isAccessIdInCache(accessId) != c.expected {
|
||||
t.Logf("Access %s not found in access cache", accessId)
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -253,19 +253,21 @@ func cli() {
|
|||
PeriodStartTs: time.Now().Unix(),
|
||||
MaxCallsPerPeriod: stateCLI.MaxCallsPerPeriodFlag,
|
||||
CallsPerPeriod: 0,
|
||||
|
||||
Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS,
|
||||
}
|
||||
_, err := bugoutClient.Brood.FindUser(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
map[string]string{
|
||||
"user_id": proposedClientResourceData.UserID,
|
||||
"application_id": NB_APPLICATION_ID,
|
||||
"application_id": MOONSTREAM_APPLICATION_ID,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("User does not exists, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedClientResourceData)
|
||||
resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, proposedClientResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to create user access, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
|
@ -302,7 +304,7 @@ func cli() {
|
|||
}
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
MOONSTREAM_APPLICATION_ID,
|
||||
queryParameters,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -406,7 +408,7 @@ func cli() {
|
|||
}
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
MOONSTREAM_APPLICATION_ID,
|
||||
queryParameters,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -464,7 +466,7 @@ func cli() {
|
|||
}
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
MOONSTREAM_APPLICATION_ID,
|
||||
queryParameters,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -15,6 +15,8 @@ var (
|
|||
type ClientAccess struct {
|
||||
ResourceID string
|
||||
|
||||
authorizationToken string
|
||||
|
||||
ClientResourceData ClientResourceData
|
||||
|
||||
LastAccessTs int64
|
||||
|
@ -36,6 +38,8 @@ type ClientResourceData struct {
|
|||
PeriodStartTs int64 `json:"period_start_ts"`
|
||||
MaxCallsPerPeriod int64 `json:"max_calls_per_period"`
|
||||
CallsPerPeriod int64 `json:"calls_per_period"`
|
||||
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
// TODO(kompotkot): Re-write tests for client
|
||||
package main
|
||||
|
||||
import (
|
||||
|
@ -7,7 +6,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
func setupSuit(t *testing.T) func(t *testing.T) {
|
||||
func clientsSetupSuit(t *testing.T) func(t *testing.T) {
|
||||
t.Log("Setup suit")
|
||||
|
||||
supportedBlockchains = map[string]bool{"ethereum": true}
|
||||
|
@ -18,7 +17,7 @@ func setupSuit(t *testing.T) func(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAddClientNode(t *testing.T) {
|
||||
teardownSuit := setupSuit(t)
|
||||
teardownSuit := clientsSetupSuit(t)
|
||||
defer teardownSuit(t)
|
||||
|
||||
var cases = []struct {
|
||||
|
@ -44,7 +43,7 @@ func TestAddClientNode(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetClientNode(t *testing.T) {
|
||||
teardownSuit := setupSuit(t)
|
||||
teardownSuit := clientsSetupSuit(t)
|
||||
defer teardownSuit(t)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
|
@ -75,7 +74,7 @@ func TestGetClientNode(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCleanInactiveClientNodes(t *testing.T) {
|
||||
teardownSuit := setupSuit(t)
|
||||
teardownSuit := clientsSetupSuit(t)
|
||||
defer teardownSuit(t)
|
||||
|
||||
ts := time.Now().Unix()
|
||||
|
|
|
@ -23,7 +23,7 @@ var (
|
|||
|
||||
// Bugout and application configuration
|
||||
BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5
|
||||
NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID")
|
||||
MOONSTREAM_APPLICATION_ID = os.Getenv("MOONSTREAM_APPLICATION_ID")
|
||||
NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN")
|
||||
NB_CONTROLLER_ACCESS_ID = os.Getenv("NB_CONTROLLER_ACCESS_ID")
|
||||
|
||||
|
@ -46,6 +46,12 @@ var (
|
|||
|
||||
// Humbug configuration
|
||||
HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN")
|
||||
|
||||
// Moonstream resources types
|
||||
BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access"
|
||||
DEFAULT_AUTOGENERATED_USER_PERMISSIONS = []string{"read", "update", "delete"}
|
||||
DEFAULT_AUTOGENERATED_PERIOD_DURATION = int64(86400)
|
||||
DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD = int64(1000)
|
||||
)
|
||||
|
||||
func CheckEnvVarSet() {
|
||||
|
|
|
@ -16,60 +16,104 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bugout-dev/bugout-go/pkg/brood"
|
||||
humbug "github.com/bugout-dev/humbug/go/pkg"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var (
|
||||
accessIdCache AccessCache
|
||||
accessCache AccessCache
|
||||
)
|
||||
|
||||
// AccessCache caches client identification for fast access to nodes
|
||||
//
|
||||
// If authorization passed with Bearer token, then it triggers to fetch Brood resource with access ID
|
||||
// or create new one. After it under key `accessIds` and `authorizationTokens` will be added similar
|
||||
// address pointers to one `ClientAccess`.
|
||||
type AccessCache struct {
|
||||
accessIds map[string]ClientAccess
|
||||
accessIds map[string]*ClientAccess
|
||||
authorizationTokens map[string]*ClientAccess
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
// CreateAccessCache generates empty cache of client access
|
||||
func CreateAccessCache() {
|
||||
accessIdCache = AccessCache{
|
||||
accessIds: make(map[string]ClientAccess),
|
||||
accessCache = AccessCache{
|
||||
accessIds: make(map[string]*ClientAccess),
|
||||
authorizationTokens: make(map[string]*ClientAccess),
|
||||
}
|
||||
}
|
||||
|
||||
// Get access id from cache if exists
|
||||
func (ac *AccessCache) FindAccessIdInCache(accessId string) string {
|
||||
var detectedId string
|
||||
// FindAccessIdInCache looking for user access in `accessIds` cache
|
||||
func (ac *AccessCache) isAccessIdInCache(accessId string) bool {
|
||||
detected := false
|
||||
|
||||
ac.mux.RLock()
|
||||
for id := range ac.accessIds {
|
||||
if id == accessId {
|
||||
detectedId = id
|
||||
detected = true
|
||||
break
|
||||
}
|
||||
}
|
||||
ac.mux.RUnlock()
|
||||
|
||||
return detectedId
|
||||
return detected
|
||||
}
|
||||
|
||||
// Update last call access timestamp and datasource for access id
|
||||
func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
if accessData, ok := ac.accessIds[accessId]; ok {
|
||||
accessData.LastAccessTs = tsNow
|
||||
accessData.requestedDataSource = requestedDataSource
|
||||
accessData.LastSessionCallsCounter++
|
||||
// FindAuthorizationTokenInCache looking for user access in `authorizationTokens` cache
|
||||
func (ac *AccessCache) isAuthorizationTokenInCache(authorizationToken string) bool {
|
||||
detected := false
|
||||
|
||||
ac.accessIds[accessId] = accessData
|
||||
ac.mux.RLock()
|
||||
for id := range ac.authorizationTokens {
|
||||
if id == authorizationToken {
|
||||
detected = true
|
||||
break
|
||||
}
|
||||
}
|
||||
ac.mux.RUnlock()
|
||||
|
||||
return detected
|
||||
}
|
||||
|
||||
// Update last call access timestamp and datasource for user access
|
||||
func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
var accessToModify *ClientAccess
|
||||
|
||||
if accessId != "" {
|
||||
if access, ok := ac.accessIds[accessId]; ok {
|
||||
accessToModify = access
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if authorizationToken != "" {
|
||||
if access, ok := ac.authorizationTokens[authorizationToken]; ok {
|
||||
accessToModify = access
|
||||
}
|
||||
}
|
||||
|
||||
accessToModify.LastAccessTs = tsNow
|
||||
accessToModify.requestedDataSource = requestedDataSource
|
||||
accessToModify.LastSessionCallsCounter++
|
||||
|
||||
ac.mux.Unlock()
|
||||
}
|
||||
|
||||
// Add new access ID with data to cache
|
||||
func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) {
|
||||
// Add new user access identifier with data to cache
|
||||
func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) error {
|
||||
_, err := uuid.Parse(clientAccess.ClientResourceData.AccessID)
|
||||
if err != nil {
|
||||
log.Printf("Access ID %s is not valid UUID, err: %v", clientAccess.ClientResourceData.AccessID, err)
|
||||
return fmt.Errorf("access ID is not valid UUID")
|
||||
}
|
||||
|
||||
ac.mux.Lock()
|
||||
ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{
|
||||
ResourceID: clientAccess.ResourceID,
|
||||
access := &ClientAccess{
|
||||
ResourceID: clientAccess.ResourceID,
|
||||
authorizationToken: clientAccess.authorizationToken,
|
||||
|
||||
ClientResourceData: ClientResourceData{
|
||||
UserID: clientAccess.ClientResourceData.UserID,
|
||||
|
@ -83,6 +127,8 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64
|
|||
PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs,
|
||||
MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod,
|
||||
CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod,
|
||||
|
||||
Type: clientAccess.ClientResourceData.Type,
|
||||
},
|
||||
|
||||
LastAccessTs: tsNow,
|
||||
|
@ -91,7 +137,14 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64
|
|||
|
||||
requestedDataSource: clientAccess.requestedDataSource,
|
||||
}
|
||||
|
||||
ac.accessIds[clientAccess.ClientResourceData.AccessID] = access
|
||||
if clientAccess.authorizationToken != "" {
|
||||
ac.authorizationTokens[clientAccess.authorizationToken] = access
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check each access id in cache if it exceeds lifetime
|
||||
|
@ -100,9 +153,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
tsNow := time.Now().Unix()
|
||||
|
||||
ac.mux.Lock()
|
||||
|
||||
for aId, clientAccess := range ac.accessIds {
|
||||
totalAccessIds++
|
||||
removedUserId := ""
|
||||
|
||||
if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME {
|
||||
// Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period
|
||||
removedUserId = clientAccess.ClientResourceData.UserID
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
err := clientAccess.UpdateClientResourceCallCounter(tsNow)
|
||||
|
@ -110,6 +168,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME {
|
||||
removedUserId = clientAccess.ClientResourceData.UserID
|
||||
// Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
|
@ -117,12 +176,21 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
|
|||
if err != nil {
|
||||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
totalAccessIds++
|
||||
}
|
||||
|
||||
if removedUserId != "" {
|
||||
for aToken, clientAccess := range ac.authorizationTokens {
|
||||
if clientAccess.ClientResourceData.UserID == removedUserId {
|
||||
delete(ac.authorizationTokens, aToken)
|
||||
}
|
||||
}
|
||||
removedUserId = ""
|
||||
}
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
|
||||
totalAccessIds = totalAccessIds - removedAccessIds
|
||||
|
||||
return removedAccessIds, totalAccessIds
|
||||
}
|
||||
|
||||
|
@ -131,7 +199,7 @@ func initCacheCleaning(debug bool) {
|
|||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
removedAccessIds, totalAccessIds := accessIdCache.Cleanup()
|
||||
removedAccessIds, totalAccessIds := accessCache.Cleanup()
|
||||
if debug {
|
||||
log.Printf("Removed %d clients from access cache", removedAccessIds)
|
||||
}
|
||||
|
@ -140,6 +208,99 @@ func initCacheCleaning(debug bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// fetchClientAccessFromResources get resources with access ID or authorization token and generate new one if there no one
|
||||
func fetchClientAccessFromResources(accessID, authorizationToken string, tsNow int64) (*ClientAccess, error) {
|
||||
var err error
|
||||
|
||||
queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS}
|
||||
if accessID != "" {
|
||||
queryParameters["access_id"] = accessID
|
||||
}
|
||||
|
||||
token := NB_CONTROLLER_TOKEN
|
||||
if authorizationToken != "" {
|
||||
token = authorizationToken
|
||||
}
|
||||
|
||||
var resources brood.Resources
|
||||
resources, err = bugoutClient.Brood.GetResources(
|
||||
token,
|
||||
MOONSTREAM_APPLICATION_ID,
|
||||
queryParameters,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to get resources, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to get access identifiers")
|
||||
}
|
||||
|
||||
if len(resources.Resources) == 0 {
|
||||
if authorizationToken != "" {
|
||||
// Generate new autogenerated access resource with default parameters and grant user permissions to work with it
|
||||
user, err := bugoutClient.Brood.GetUser(authorizationToken)
|
||||
if err != nil {
|
||||
log.Printf("Unable to get user, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to find user with provided authorization token")
|
||||
}
|
||||
newResource, err := bugoutClient.Brood.CreateResource(
|
||||
NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, ClientResourceData{
|
||||
UserID: user.Id,
|
||||
AccessID: uuid.New().String(),
|
||||
Name: user.Username,
|
||||
Description: "Autogenerated access ID",
|
||||
BlockchainAccess: true,
|
||||
ExtendedMethods: false,
|
||||
|
||||
PeriodDuration: DEFAULT_AUTOGENERATED_PERIOD_DURATION,
|
||||
PeriodStartTs: tsNow,
|
||||
MaxCallsPerPeriod: DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD,
|
||||
CallsPerPeriod: 0,
|
||||
|
||||
Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to create resource with autogenerated access for user with ID %s, err: %v", user.Id, err)
|
||||
return nil, fmt.Errorf("unable to create resource with autogenerated access for user")
|
||||
}
|
||||
|
||||
resourceHolderPermissions, err := bugoutClient.Brood.AddResourceHolderPermissions(
|
||||
NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{
|
||||
Id: user.Id,
|
||||
HolderType: "user",
|
||||
Permissions: DEFAULT_AUTOGENERATED_USER_PERMISSIONS,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to grant permissions to user with ID %s at resource with ID %s, err: %v", newResource.Id, user.Id, err)
|
||||
return nil, fmt.Errorf("unable to create resource with autogenerated access for user")
|
||||
}
|
||||
|
||||
log.Printf("Created new resource with ID %s with autogenerated access for user with ID %s", resourceHolderPermissions.ResourceId, user.Id)
|
||||
resources.Resources = append(resources.Resources, newResource)
|
||||
} else {
|
||||
return nil, fmt.Errorf("there are no provided access identifier")
|
||||
}
|
||||
} else if len(resources.Resources) > 1 {
|
||||
// TODO(kompotkot): Write support of multiple resources, be careful, because NB_CONTROLLER has several resources
|
||||
return nil, fmt.Errorf("there are no provided access identifier")
|
||||
}
|
||||
|
||||
var clientAccessRaw ClientAccess
|
||||
resourceData, err := json.Marshal(&resources.Resources[0].ResourceData)
|
||||
if err != nil {
|
||||
log.Printf("Unable to parse resource data to access identifier, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to parse resource data to access identifier")
|
||||
}
|
||||
err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData)
|
||||
if err != nil {
|
||||
log.Printf("Unable to decode resource data to access identifier, err: %v", err)
|
||||
return nil, fmt.Errorf("unable to decode resource data to access identifier")
|
||||
}
|
||||
clientAccessRaw.ResourceID = resources.Resources[0].Id
|
||||
|
||||
return &clientAccessRaw, nil
|
||||
}
|
||||
|
||||
// Extract access_id from header and query. Query takes precedence over header.
|
||||
func extractAccessID(r *http.Request) string {
|
||||
var accessID string
|
||||
|
@ -289,76 +450,95 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
accessID := extractAccessID(r)
|
||||
requestedDataSource := extractRequestedDataSource(r)
|
||||
|
||||
if accessID == "" {
|
||||
http.Error(w, "No access id passed with request", http.StatusForbidden)
|
||||
// Extract Authorization token if Bearer header provided
|
||||
var authorizationTokenRaw string
|
||||
authorizationTokenHeaders := r.Header[strings.Title("authorization")]
|
||||
for _, h := range authorizationTokenHeaders {
|
||||
authorizationTokenRaw = h
|
||||
}
|
||||
var authorizationToken string
|
||||
if authorizationTokenRaw != "" {
|
||||
authorizationTokenSlice := strings.Split(authorizationTokenRaw, " ")
|
||||
if len(authorizationTokenSlice) != 2 || authorizationTokenSlice[0] != "Bearer" || authorizationTokenSlice[1] == "" {
|
||||
http.Error(w, "Wrong authorization token provided", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
authorizationToken = authorizationTokenSlice[1]
|
||||
}
|
||||
|
||||
if accessID == "" && authorizationToken == "" {
|
||||
http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
// If access id does not belong to internal crawlers, then check cache or find it in Bugout resources
|
||||
if accessID == NB_CONTROLLER_ACCESS_ID {
|
||||
currentClientAccess = internalUsageAccess
|
||||
if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
currentClientAccess = internalUsageAccess
|
||||
currentClientAccess.LastAccessTs = tsNow
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
} else if accessIdCache.FindAccessIdInCache(accessID) != "" {
|
||||
currentClientAccess = accessIdCache.accessIds[accessID]
|
||||
} else if accessID != "" && accessCache.isAccessIdInCache(accessID) {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
// Check if limit of calls not exceeded
|
||||
currentClientAccess = *accessCache.accessIds[accessID]
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow)
|
||||
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
|
||||
} else if accessID == "" && accessCache.isAuthorizationTokenInCache(authorizationToken) {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Client connected with Authorization token")
|
||||
}
|
||||
currentClientAccess = *accessCache.authorizationTokens[authorizationToken]
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow)
|
||||
} else {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("New access id, looking at Brood resources")
|
||||
log.Printf("No access identity found in cache, looking at Brood resources")
|
||||
}
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
map[string]string{"access_id": accessID},
|
||||
)
|
||||
|
||||
clientAccessRaw, err := fetchClientAccessFromResources(accessID, authorizationToken, tsNow)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
resourcesLen := len(resources.Resources)
|
||||
if resourcesLen == 0 {
|
||||
http.Error(w, "User with provided access identifier not found", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if resourcesLen > 1 {
|
||||
http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resourceData, err := json.Marshal(resources.Resources[0].ResourceData)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
currentClientAccess.ResourceID = resources.Resources[0].Id
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError)
|
||||
http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if limit of calls not exceeded
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
isClientAllowedToGetAccess := clientAccessRaw.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow)
|
||||
currentClientAccess = ClientAccess(*clientAccessRaw)
|
||||
currentClientAccess.authorizationToken = authorizationToken
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
|
||||
// If client logged in before with access ID and it exists in cache, then re-use it
|
||||
// else create new instances in cache
|
||||
if authorizationToken != "" && accessCache.isAccessIdInCache(currentClientAccess.ClientResourceData.AccessID) {
|
||||
accessCache.authorizationTokens[authorizationToken] = accessCache.accessIds[currentClientAccess.ClientResourceData.AccessID]
|
||||
} else {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Adding new access identifier in cache")
|
||||
}
|
||||
err := accessCache.AddAccessToCache(currentClientAccess, tsNow)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to add access ID to cache", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess)
|
||||
|
|
|
@ -121,7 +121,7 @@ func Server() {
|
|||
// Fetch access id for internal usage (crawlers, infrastructure, etc)
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
NB_APPLICATION_ID,
|
||||
MOONSTREAM_APPLICATION_ID,
|
||||
map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID},
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
package main
|
||||
|
||||
var NB_VERSION = "0.2.2"
|
||||
var NB_VERSION = "0.2.3"
|
||||
|
|
|
@ -3,7 +3,7 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/bugout-dev/bugout-go v0.4.1
|
||||
github.com/bugout-dev/bugout-go v0.4.2
|
||||
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205
|
||||
github.com/google/uuid v1.3.0
|
||||
)
|
||||
|
|
|
@ -23,8 +23,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
|
||||
github.com/bugout-dev/bugout-go v0.4.1 h1:idZ4k+/skHj217/q8OmHBoYdzwJrqCY5Vd7S8FM6zlo=
|
||||
github.com/bugout-dev/bugout-go v0.4.1/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8=
|
||||
github.com/bugout-dev/bugout-go v0.4.2 h1:oADFQzZ4iZeQOz8dDaO/+25eQkrCYG8SqjA8mRSQl7k=
|
||||
github.com/bugout-dev/bugout-go v0.4.2/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8=
|
||||
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ=
|
||||
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
from bugout.app import Bugout
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def migration_20230522(
|
||||
token_current_owner: str, token_new_owner: str, new_application_id: str
|
||||
) -> None:
|
||||
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
|
||||
|
||||
bc = Bugout(brood_api_url=BUGOUT_BROOD_URL)
|
||||
|
||||
try:
|
||||
resources = bc.list_resources(token=token_current_owner, params={})
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
|
||||
logger.info(f"Found {len(resources.resources)} resources")
|
||||
|
||||
while input("Do you want to continue [y/n]? ") != "y":
|
||||
return
|
||||
|
||||
cnt = 0
|
||||
for resource in resources.resources:
|
||||
resource_data = resource.resource_data
|
||||
resource_data["type"] = "nodebalancer-access"
|
||||
|
||||
try:
|
||||
new_resource = bc.create_resource(
|
||||
token=token_new_owner,
|
||||
application_id=new_application_id,
|
||||
resource_data=resource_data,
|
||||
)
|
||||
cnt += 1
|
||||
logger.info(
|
||||
f"Created resource with ID {new_resource.id} and copied modified resource data"
|
||||
)
|
||||
except Exception as err:
|
||||
logger.error(f"Unable to copy resource with ID {resource.id}, err: {err}")
|
||||
|
||||
user_id = new_resource.resource_data.get("user_id", "")
|
||||
|
||||
try:
|
||||
new_permissions = bc.add_resource_holder_permissions(
|
||||
token=token_new_owner,
|
||||
resource_id=new_resource.id,
|
||||
holder_permissions={
|
||||
"holder_id": user_id,
|
||||
"holder_type": "user",
|
||||
"permissions": ["read", "update", "delete"],
|
||||
},
|
||||
)
|
||||
logger.info(
|
||||
f"Granted permissions for resource with ID {new_permissions.resource_id} to user with ID {user_id}"
|
||||
)
|
||||
except Exception as err:
|
||||
logger.error(
|
||||
f"Unable grant permissions for resource with ID {resource.id} to user with ID {user_id}, err: {err}"
|
||||
)
|
||||
|
||||
logger.info(f"Copied {cnt} resources")
|
||||
|
||||
|
||||
MIGRATIONS_LIST = {
|
||||
"20230522": {
|
||||
"description": "Modify existing Brood resources to Moonstream resources structure "
|
||||
"with `type` key equal to `nodebalancer-access`. And transfer ownership to moonstream admin. "
|
||||
"Then create permissions for user access.",
|
||||
"exec_func": migration_20230522,
|
||||
"required_args": [
|
||||
"token-current-owner",
|
||||
"token-new-owner",
|
||||
"new-application-id",
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def list_handler(args: argparse.Namespace) -> None:
|
||||
return print(MIGRATIONS_LIST)
|
||||
|
||||
|
||||
def run_handler(args: argparse.Namespace) -> None:
|
||||
migration = MIGRATIONS_LIST.get(args.key, None)
|
||||
if migration is None:
|
||||
logger.error(f"Migration with key '{args.key}' not found")
|
||||
return
|
||||
|
||||
migration["exec_func"](
|
||||
token_current_owner=args.token_current_owner,
|
||||
token_new_owner=args.token_new_owner,
|
||||
new_application_id=args.new_application_id,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Moonstream mode balancer migrations CLI"
|
||||
)
|
||||
parser.set_defaults(func=lambda _: parser.print_help())
|
||||
subcommands = parser.add_subparsers(description="Migration commands")
|
||||
|
||||
parser_list = subcommands.add_parser("list", description="List migrations")
|
||||
parser_list.set_defaults(func=list_handler)
|
||||
|
||||
parser_run = subcommands.add_parser("run", description="Run migration")
|
||||
parser_run.add_argument(
|
||||
"-k", "--key", required=True, type=str, help="Key of migration to run"
|
||||
)
|
||||
parser_run.add_argument(
|
||||
"--token-current-owner",
|
||||
type=str,
|
||||
default=argparse.SUPPRESS,
|
||||
help="Bugout access token of current resource owner",
|
||||
)
|
||||
parser_run.add_argument(
|
||||
"--token-new-owner",
|
||||
type=str,
|
||||
default=argparse.SUPPRESS,
|
||||
help="Bugout access token of new resource owner",
|
||||
)
|
||||
parser_run.add_argument(
|
||||
"--new-application-id",
|
||||
type=str,
|
||||
default=argparse.SUPPRESS,
|
||||
help="Bugout application ID to transfer resources",
|
||||
)
|
||||
parser_run.set_defaults(func=run_handler)
|
||||
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,18 @@
|
|||
black==23.3.0
|
||||
bugout==0.2.7
|
||||
certifi==2023.5.7
|
||||
charset-normalizer==3.1.0
|
||||
click==8.1.3
|
||||
idna==3.4
|
||||
isort==5.12.0
|
||||
mypy==1.3.0
|
||||
mypy-extensions==1.0.0
|
||||
packaging==23.1
|
||||
pathspec==0.11.1
|
||||
pkg_resources==0.0.0
|
||||
platformdirs==3.5.1
|
||||
pydantic==1.10.7
|
||||
requests==2.30.0
|
||||
tomli==2.0.1
|
||||
typing_extensions==4.5.0
|
||||
urllib3==2.0.2
|
|
@ -1,6 +1,6 @@
|
|||
# Required environment variables for load balancer
|
||||
export BUGOUT_BROOD_URL="https://auth.bugout.dev"
|
||||
export NB_APPLICATION_ID="<application_id_to_controll_access>"
|
||||
export MOONSTREAM_APPLICATION_ID="<application_id_to_controll_access>"
|
||||
export NB_CONTROLLER_TOKEN="<token_of_controller_user>"
|
||||
export NB_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_usage>"
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue