diff --git a/nodes/node_balancer/README.md b/nodes/node_balancer/README.md index 895865b4..62fd4de8 100644 --- a/nodes/node_balancer/README.md +++ b/nodes/node_balancer/README.md @@ -1,49 +1,17 @@ # Node Balancer application -## Installation and configuration +# Installation -- Prepare environment variables, according with `sample.env`. +- Prepare environment variables - Build application ```bash go build -o nodebalancer . ``` -- Generate configuration +# Work with nodebalancer -```bash -nodebalancer generate-config -``` - -- Modify configuration. Tags should NOT repeat blockchain, as it is specified in `blockchain` key. Example of configuration: - -```bash -[ - { - "blockchain": "ethereum", - "endpoint": "http://127.0.0.1:8545", - "tags": ["local"] - }, - { - "blockchain": "ethereum", - "endpoint": "http://127.0.0.1:9585", - "tags": ["local"] - }, - { - "blockchain": "ethereum", - "endpoint": "https://cool-name.quiknode.pro/y0urn0de1den1f1cat0r/", - "tags": ["external"] - } -] -``` - -So if with request will be specified tag `local` will be returned node with corresponding tag. - -## Work with nodebalancer - -**IMPORTANT** Do not use flag `-debug` in production. - -### add-access +## add-access Add new access for user: @@ -57,7 +25,7 @@ nodebalancer add-access \ --blockchain--access true ``` -### delete-access +## delete-access Delete user access: @@ -69,7 +37,7 @@ nodebalancer delete-access \ If `access-id` not specified, all user accesses will be deleted. -### users +## users ```bash nodebalancer users | jq . @@ -99,7 +67,7 @@ This command will return a list of bugout resources of registered users to acces `extended_methods` - boolean which allow you to call not whitelisted method to blockchain node, by default for new user this is equal to `false` -### server +## server ```bash nodebalancer server -host 0.0.0.0 -port 8544 -healthcheck @@ -108,17 +76,17 @@ nodebalancer server -host 0.0.0.0 -port 8544 -healthcheck Flag `--healthcheck` will execute background process to ping-pong available nodes to keep their status and current block number. Flag `--debug` will extend output of each request to server and healthchecks summary. -## Work with node +# Work with node Common request to fetch block number ```bash -curl --request POST 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=' \ +curl --request GET 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=' \ --header 'Content-Type: application/json' \ --data-raw '{ "jsonrpc":"2.0", "method":"eth_getBlockByNumber", - "params":["latest", false], + "params":["0xb71b64", false], "id":1 }' ``` @@ -129,16 +97,3 @@ For Web3 providers `access_id` and `data_source` could be specified in headers --header 'x-node-balancer-data-source: ' --header 'x-node-balancer-access-id: ' ``` - -Same request to fetch specific nodes using tags - -```bash -curl --request POST 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=&tag=&tag=' \ - --header 'Content-Type: application/json' \ - --data-raw '{ - "jsonrpc":"2.0", - "method":"eth_getBlockByNumber", - "params":["latest", false], - "id":1 - }' -``` diff --git a/nodes/node_balancer/cmd/nodebalancer/balancer.go b/nodes/node_balancer/cmd/nodebalancer/balancer.go index cf1de7dc..a79f838c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/balancer.go +++ b/nodes/node_balancer/cmd/nodebalancer/balancer.go @@ -1,5 +1,5 @@ /* -Load balancer logic. +Load balancer, based on https://github.com/kasvith/simplelb/ */ package main @@ -19,9 +19,10 @@ import ( // Main variable of pool of blockchains which contains pool of nodes // for each blockchain we work during session. -var blockchainPools map[string]*NodePool +var blockchainPool BlockchainPool // Node structure with +// StatusURL for status server at node endpoint // Endpoint for geth/bor/etc node http.server endpoint type Node struct { Endpoint *url.URL @@ -35,16 +36,16 @@ type Node struct { GethReverseProxy *httputil.ReverseProxy } -type TopNodeBlock struct { - Block uint64 - Node *Node +type NodePool struct { + Blockchain string + Nodes []*Node + + // Counter to observe all nodes + Current uint64 } -type NodePool struct { - NodesMap map[string][]*Node - NodesSet []*Node - - TopNode TopNodeBlock +type BlockchainPool struct { + Blockchains []*NodePool } // Node status response struct for HealthCheck @@ -57,25 +58,24 @@ type NodeStatusResponse struct { } // AddNode to the nodes pool -func AddNode(blockchain string, tags []string, node *Node) { - if blockchainPools == nil { - blockchainPools = make(map[string]*NodePool) - } - if blockchainPools[blockchain] == nil { - blockchainPools[blockchain] = &NodePool{} - } - if blockchainPools[blockchain].NodesMap == nil { - blockchainPools[blockchain].NodesMap = make(map[string][]*Node) - } - blockchainPools[blockchain].NodesSet = append(blockchainPools[blockchain].NodesSet, node) - - for _, tag := range tags { - blockchainPools[blockchain].NodesMap[tag] = append( - blockchainPools[blockchain].NodesMap[tag], - node, - ) +func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) { + var nodePool *NodePool + for _, b := range bpool.Blockchains { + if b.Blockchain == blockchain { + nodePool = b + } } + // Check if blockchain not yet in pool + if nodePool == nil { + nodePool = &NodePool{ + Blockchain: blockchain, + } + nodePool.Nodes = append(nodePool.Nodes, node) + bpool.Blockchains = append(bpool.Blockchains, nodePool) + } else { + nodePool.Nodes = append(nodePool.Nodes, node) + } } // SetAlive with mutex for exact node @@ -105,76 +105,71 @@ func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter return callCounter } -// FilterTagsNodes returns nodes with provided tags -func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) { - nodesMap := npool.NodesMap - nodesSet := npool.NodesSet - - tagSet := make(map[string]map[*Node]bool) - - for tag, nodes := range nodesMap { - if tagSet[tag] == nil { - tagSet[tag] = make(map[*Node]bool) - } - for _, node := range nodes { - tagSet[tag][node] = true - } +// IncreaseCallCounter increased to 1 each time node called +func (node *Node) IncreaseCallCounter() { + node.mux.Lock() + if node.CallCounter >= NB_MAX_COUNTER_NUMBER { + log.Printf("Number of calls for node %s reached %d limit, reset the counter.", node.Endpoint, NB_MAX_COUNTER_NUMBER) + node.CallCounter = uint64(0) + } else { + node.CallCounter++ } - - topNode := TopNodeBlock{} - - var filteredNodes []*Node - for _, node := range nodesSet { - accept := true - for _, tag := range tags { - if tagSet[tag][node] != true { - accept = false - break - } - } - if accept { - filteredNodes = append(filteredNodes, node) - currentBlock := node.CurrentBlock - if currentBlock >= npool.TopNode.Block { - topNode.Block = currentBlock - topNode.Node = node - } - } - } - - return filteredNodes, topNode + node.mux.Unlock() } // GetNextNode returns next active peer to take a connection -// Loop through entire nodes to find out an alive one and chose one with small CallCounter -func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node { - nextNode := topNode.Node +// Loop through entire nodes to find out an alive one +func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { + highestBlock := uint64(0) - for _, node := range nodes { - if node.IsAlive() { - currentBlock := node.CurrentBlock - if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT { - // Bypass too outdated nodes - continue - } - if node.CallCounter < nextNode.CallCounter { - nextNode = node + // Get NodePool with correct blockchain + var np *NodePool + for _, b := range bpool.Blockchains { + if b.Blockchain == blockchain { + np = b + for _, n := range b.Nodes { + if n.CurrentBlock > highestBlock { + highestBlock = n.CurrentBlock + } } } } - if nextNode != nil { - // Increase CallCounter value with 1 - atomic.AddUint64(&nextNode.CallCounter, uint64(1)) - } + // Increase Current value with 1 + currentInc := atomic.AddUint64(&np.Current, uint64(1)) - return nextNode + // next is an Atomic incrementer, value always in range from 0 to slice length, + // it returns an index of slice + next := int(currentInc % uint64(len(np.Nodes))) + + // Start from next one and move full cycle + l := len(np.Nodes) + next + + for i := next; i < l; i++ { + // Take an index by modding with length + idx := i % len(np.Nodes) + // If we have an alive one, use it and store if its not the original one + if np.Nodes[idx].IsAlive() { + if i != next { + // Mark the current one + atomic.StoreUint64(&np.Current, uint64(idx)) + } + // Pass nodes with low blocks + // TODO(kompotkot): Re-write to not rotate through not highest blocks + if np.Nodes[idx].CurrentBlock < highestBlock { + continue + } + + return np.Nodes[idx] + } + } + return nil } // SetNodeStatus modify status of the node -func SetNodeStatus(url *url.URL, alive bool) { - for _, nodes := range blockchainPools { - for _, n := range nodes.NodesSet { +func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { + for _, b := range bpool.Blockchains { + for _, n := range b.Nodes { if n.Endpoint.String() == url.String() { n.SetAlive(alive) break @@ -185,55 +180,55 @@ func SetNodeStatus(url *url.URL, alive bool) { // StatusLog logs node status // TODO(kompotkot): Print list of alive and dead nodes -func StatusLog() { - for blockchain, nodes := range blockchainPools { - for _, n := range nodes.NodesSet { +func (bpool *BlockchainPool) StatusLog() { + for _, b := range bpool.Blockchains { + for _, n := range b.Nodes { log.Printf( - "Blockchain %s node %s is alive %t", - blockchain, n.Endpoint.Host, n.Alive, + "Blockchain %s node %s is alive %t. Blockchain called %d times", + b.Blockchain, n.Endpoint.Host, n.Alive, b.Current, ) } } } // HealthCheck fetch the node latest block -func HealthCheck() { - for blockchain, nodes := range blockchainPools { - for _, node := range nodes.NodesSet { +func (bpool *BlockchainPool) HealthCheck() { + for _, b := range bpool.Blockchains { + for _, n := range b.Nodes { alive := false httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} resp, err := httpClient.Post( - node.Endpoint.String(), + n.Endpoint.String(), "application/json", bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), ) if err != nil { - node.UpdateNodeState(0, alive) - log.Printf("Unable to reach node: %s", node.Endpoint.Host) + n.UpdateNodeState(0, alive) + log.Printf("Unable to reach node: %s", n.Endpoint.Host) continue } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - node.UpdateNodeState(0, alive) - log.Printf("Unable to parse response from %s node, err %v", node.Endpoint.Host, err) + n.UpdateNodeState(0, alive) + log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) continue } var statusResponse NodeStatusResponse err = json.Unmarshal(body, &statusResponse) if err != nil { - node.UpdateNodeState(0, alive) - log.Printf("Unable to read json response from %s node, err: %v", node.Endpoint.Host, err) + n.UpdateNodeState(0, alive) + log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) continue } blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) if err != nil { - node.UpdateNodeState(0, alive) + n.UpdateNodeState(0, alive) log.Printf("Unable to parse block number from hex to string, err: %v", err) continue } @@ -242,24 +237,10 @@ func HealthCheck() { if blockNumber != 0 { alive = true } - callCounter := node.UpdateNodeState(blockNumber, alive) - - if blockNumber > nodes.TopNode.Block { - nodes.TopNode.Block = blockNumber - nodes.TopNode.Node = node - } - - if node.CallCounter >= NB_MAX_COUNTER_NUMBER { - log.Printf( - "Number of CallCounter for node %s reached %d limit, reset the counter.", - node.Endpoint, NB_MAX_COUNTER_NUMBER, - ) - atomic.StoreUint64(&node.CallCounter, uint64(0)) - } + callCounter := n.UpdateNodeState(blockNumber, alive) log.Printf( - "Blockchain %s node %s is alive: %t with current block: %d called: %d times", - blockchain, node.Endpoint.Host, alive, blockNumber, callCounter, + "Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter, ) } } diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index 15a75aa2..23ccf9fc 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -167,7 +167,7 @@ func (s *StateCLI) populateCLI() { // Common flag pointers for _, fs := range []*flag.FlagSet{s.addAccessCmd, s.generateConfigCmd, s.deleteAccessCmd, s.serverCmd, s.usersCmd, s.versionCmd} { fs.BoolVar(&s.helpFlag, "help", false, "Show help message") - fs.StringVar(&s.configPathFlag, "config", "", "Path to configuration file (default: ~/.nodebalancer/config.json)") + fs.StringVar(&s.configPathFlag, "config", "", "Path to configuration file (default: ~/.nodebalancer/config.txt)") } // Add, delete and list user access subcommand flag pointers diff --git a/nodes/node_balancer/cmd/nodebalancer/clients_test.go b/nodes/node_balancer/cmd/nodebalancer/clients_test.go index 7b7e3e02..e8dfff18 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients_test.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients_test.go @@ -1,3 +1,4 @@ +// TODO(kompotkot): Re-write tests for client package main import ( @@ -6,36 +7,19 @@ import ( "time" ) -func setupSuit(t *testing.T) func(t *testing.T) { - t.Log("Setup suit") - - configBlockchains = map[string]bool{"ethereum": true} - - return func(t *testing.T) { - t.Log("Teardown suit") - } -} - -// TestAddClientNode tests adding new client to client pool func TestAddClientNode(t *testing.T) { - teardownSuit := setupSuit(t) - defer teardownSuit(t) - var cases = []struct { clients map[string]*Client expected string }{ {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"}, } - for _, c := range cases { CreateClientPools() - cpool := GetClientPool("ethereum") - for id, client := range c.clients { - cpool.AddClientNode(id, client.Node) + ethereumClientPool.AddClientNode(id, client.Node) } - for id := range cpool.Client { + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was added") t.Fatal() @@ -44,7 +28,6 @@ func TestAddClientNode(t *testing.T) { } } -// TestGetClientNode tests getting correct client func TestGetClientNode(t *testing.T) { ts := time.Now().Unix() @@ -56,17 +39,15 @@ func TestGetClientNode(t *testing.T) { {map[string]*Client{}, "1", nil}, {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } - for _, c := range cases { CreateClientPools() - cpool := GetClientPool("ethereum") - for id, client := range c.clients { - cpool.AddClientNode(id, client.Node) + ethereumClientPool.Client[id] = client } - clientNode := cpool.GetClientNode(c.id) + clientNode := ethereumClientPool.GetClientNode(c.id) if !reflect.DeepEqual(clientNode, c.expected) { t.Log("Wrong node returned") t.Fatal() @@ -74,7 +55,6 @@ func TestGetClientNode(t *testing.T) { } } -// TestCleanInactiveClientNodes tests cleaning inactive clients func TestCleanInactiveClientNodes(t *testing.T) { ts := time.Now().Unix() @@ -92,14 +72,12 @@ func TestCleanInactiveClientNodes(t *testing.T) { } for _, c := range cases { CreateClientPools() - cpool := GetClientPool("ethereum") - for id, client := range c.clients { - cpool.Client[id] = client + ethereumClientPool.Client[id] = client } - cpool.CleanInactiveClientNodes() - for id := range cpool.Client { + ethereumClientPool.CleanInactiveClientNodes() + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was removed") t.Fatal() diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 84fea272..358eb598 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -18,6 +18,7 @@ var ( nodeConfigs []NodeConfig // Bugout and application configuration + BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL") BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5 NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID") NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN") @@ -34,8 +35,7 @@ var ( NB_MAX_COUNTER_NUMBER = uint64(10000000) // Client configuration - NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds - NB_HIGHEST_BLOCK_SHIFT = uint64(50) // Allowed shift to prefer node with most highest block + NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER") NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER") @@ -60,9 +60,8 @@ func CheckEnvVarSet() { // Nodes configuration type NodeConfig struct { - Blockchain string `json:"blockchain"` - Endpoint string `json:"endpoint"` - Tags []string `json:"tags"` + Blockchain string `json:"blockchain"` + Endpoint string `json:"endpoint"` } func LoadConfig(configPath string) error { @@ -109,7 +108,7 @@ func GetConfigPath(providedPath string) (*ConfigPlacement, error) { return nil, fmt.Errorf("Unable to find user home directory, %v", err) } configDirPath = fmt.Sprintf("%s/.nodebalancer", homeDir) - configPath = fmt.Sprintf("%s/config.json", configDirPath) + configPath = fmt.Sprintf("%s/config.txt", configDirPath) } else { configPath = strings.TrimSuffix(providedPath, "/") configDirPath = filepath.Dir(configPath) @@ -145,7 +144,7 @@ func GenerateDefaultConfig(config *ConfigPlacement) error { if !config.ConfigExists { tempConfig := []NodeConfig{ - {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545", Tags: []string{"local"}}, + {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545"}, } tempConfigJson, err := json.Marshal(tempConfig) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index b5799b60..81811a5e 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -98,14 +98,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) { return removedAccessIds, totalAccessIds } -func initCacheCleaning() { +func initCacheCleaning(debug bool) { t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL) for { select { case <-t.C: removedAccessIds, totalAccessIds := accessIdCache.Cleanup() - if stateCLI.enableDebugFlag { - log.Printf("[DEBUG] Removed %d elements from access id cache", removedAccessIds) + if debug { + log.Printf("Removed %d elements from access id cache", removedAccessIds) } log.Printf("Elements in access id cache: %d", totalAccessIds) } @@ -241,7 +241,7 @@ func logMiddleware(next http.Handler) http.Handler { if stateCLI.enableDebugFlag { if r.URL.RawQuery != "" { - logStr += fmt.Sprintf(" [DEBUG] %s", r.URL.RawQuery) + logStr += fmt.Sprintf(" %s", r.URL.RawQuery) } accessID := extractAccessID(r) if accessID != "" { @@ -269,20 +269,20 @@ func accessMiddleware(next http.Handler) http.Handler { // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources if accessID == NB_CONTROLLER_ACCESS_ID { if stateCLI.enableDebugFlag { - log.Printf("[DEBUG] Access id belongs to internal crawlers") + log.Printf("Access id belongs to internal crawlers") } currentClientAccess = internalCrawlersAccess currentClientAccess.dataSource = dataSource } else if accessIdCache.FindAccessIdInCache(accessID) != "" { if stateCLI.enableDebugFlag { - log.Printf("[DEBUG] Access id found in cache") + log.Printf("Access id found in cache") } currentClientAccess = accessIdCache.accessIds[accessID] currentClientAccess.dataSource = dataSource accessIdCache.UpdateAccessIdAtCache(accessID, dataSource) } else { if stateCLI.enableDebugFlag { - log.Printf("[DEBUG] New access id, looking at Brood resources") + log.Printf("New access id, looking at Brood resources") } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index 19f1c230..b9929369 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -53,12 +53,25 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { return } + // Chose one node + var node *Node + cpool := GetClientPool(blockchain) + node = cpool.GetClientNode(currentClientAccess.AccessID) + if node == nil { + node = blockchainPool.GetNextNode(blockchain) + if node == nil { + http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + return + } + cpool.AddClientNode(currentClientAccess.AccessID, node) + } + // Save origin path, to use in proxyErrorHandler if node will not response r.Header.Add("X-Origin-Path", r.URL.Path) switch { case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): - lbJSONRPCHandler(w, r, blockchain, currentClientAccess) + lbJSONRPCHandler(w, r, blockchain, node, currentClientAccess) return default: http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) @@ -66,7 +79,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } } -func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, currentClientAccess ClientResourceData) { +func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Unable to read body", http.StatusBadRequest) @@ -81,43 +94,6 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, return } - // Get tags from request params, sort and generate from it identifier - var tags []string - queries := r.URL.Query() - for k, v := range queries { - if k == "tag" { - for _, tag := range v { - tags = append(tags, tag) - } - } - } - - // Chose one node - var node *Node - cpool := GetClientPool(blockchain) - node = cpool.GetClientNode(currentClientAccess.AccessID) - if node == nil { - npool := blockchainPools[blockchain] - var nodes []*Node - var topNode TopNodeBlock - if len(tags) != 0 { - nodes, topNode = npool.FilterTagsNodes(tags) - } else { - topNode = npool.TopNode - nodes = npool.NodesSet - } - node = GetNextNode(nodes, topNode) - if node == nil { - http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) - return - } - cpool.AddClientNode(currentClientAccess.AccessID, node) - } - - if stateCLI.enableDebugFlag { - log.Printf("[DEBUG] Used node with endpoint: %s, call counter equals: %d", node.Endpoint, node.CallCounter) - } - switch { case currentClientAccess.dataSource == "blockchain": if currentClientAccess.BlockchainAccess == false { @@ -134,6 +110,8 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, } } + node.IncreaseCallCounter() + // Overwrite Path so response will be returned to correct place r.URL.Path = "/" node.GethReverseProxy.ServeHTTP(w, r) diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index d0533d6e..f7b47c41 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -5,6 +5,7 @@ package main import ( "context" + "encoding/json" "fmt" "log" "net/http" @@ -28,12 +29,12 @@ var ( ) // initHealthCheck runs a routine for check status of the nodes every 5 seconds -func initHealthCheck() { +func initHealthCheck(debug bool) { t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: - HealthCheck() + blockchainPool.HealthCheck() logStr := "Client pool healthcheck." for b := range configBlockchains { cp := clientPool[b] @@ -41,8 +42,8 @@ func initHealthCheck() { logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients) } log.Println(logStr) - if stateCLI.enableDebugFlag { - StatusLog() + if debug { + blockchainPool.StatusLog() } } } @@ -88,7 +89,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { } // After 3 retries, mark this backend as down - SetNodeStatus(url, false) + blockchainPool.SetNodeStatus(url, false) // Set modified path back // TODO(kompotkot): Try r.RequestURI instead of header @@ -128,24 +129,33 @@ func Server() { fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err) os.Exit(1) } - resourcesLog := "Access with resources established." if len(resources.Resources) != 1 { - log.Printf("%s There are no access IDs for users in resources", resourcesLog) - } else { - log.Printf("%s Found user access IDs in resources", resourcesLog) + fmt.Printf("User with provided access identifier has wrong number of resources, err: %v\n", err) + os.Exit(1) + } + resource_data, err := json.Marshal(resources.Resources[0].ResourceData) + if err != nil { + fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err) + os.Exit(1) + } + var clientAccess ClientResourceData + err = json.Unmarshal(resource_data, &clientAccess) + if err != nil { + fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err) + os.Exit(1) } - - // Set internal crawlers access to bypass requests from internal services - // without fetching data from authn Brood server - internalCrawlersUserID := uuid.New().String() internalCrawlersAccess = ClientResourceData{ - UserID: internalCrawlersUserID, - AccessID: NB_CONTROLLER_ACCESS_ID, - Name: "InternalCrawlersAccess", - Description: "Access for internal crawlers.", - BlockchainAccess: true, - ExtendedMethods: true, + UserID: clientAccess.UserID, + AccessID: clientAccess.AccessID, + Name: clientAccess.Name, + Description: clientAccess.Description, + BlockchainAccess: clientAccess.BlockchainAccess, + ExtendedMethods: clientAccess.ExtendedMethods, } + log.Printf( + "Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t", + resources.Resources[0].Id, clientAccess.BlockchainAccess, clientAccess.ExtendedMethods, + ) err = InitDatabaseClient() if err != nil { @@ -185,18 +195,14 @@ func Server() { r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER)) // Change r.Host from nodebalancer's to end host so TLS check will be passed r.Host = r.URL.Host - // Explicit set of r.URL requires, because by default it adds trailing slash and brake some urls - r.URL = endpoint } proxyErrorHandler(proxyToEndpoint, endpoint) - newNode := &Node{ + blockchainPool.AddNode(&Node{ Endpoint: endpoint, Alive: true, GethReverseProxy: proxyToEndpoint, - } - AddNode(nodeConfig.Blockchain, nodeConfig.Tags, newNode) - + }, nodeConfig.Blockchain) log.Printf( "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s", nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host) @@ -205,12 +211,6 @@ func Server() { // Generate map of clients CreateClientPools() - // Start node health checking and current block fetching - HealthCheck() - if stateCLI.enableHealthCheckFlag { - go initHealthCheck() - } - serveMux := http.NewServeMux() serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler))) log.Println("Authentication middleware enabled") @@ -227,8 +227,14 @@ func Server() { WriteTimeout: 40 * time.Second, } + // Start node health checking and current block fetching + blockchainPool.HealthCheck() + if stateCLI.enableHealthCheckFlag { + go initHealthCheck(stateCLI.enableDebugFlag) + } + // Start access id cache cleaning - go initCacheCleaning() + go initCacheCleaning(stateCLI.enableDebugFlag) log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) err = server.ListenAndServe() diff --git a/nodes/node_balancer/cmd/nodebalancer/version.go b/nodes/node_balancer/cmd/nodebalancer/version.go index 0a350a65..0c645931 100644 --- a/nodes/node_balancer/cmd/nodebalancer/version.go +++ b/nodes/node_balancer/cmd/nodebalancer/version.go @@ -1,3 +1,3 @@ package main -var NB_VERSION = "0.2.2" +var NB_VERSION = "0.2.1" diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index 6de59eb5..d1aea3b6 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -1,5 +1,5 @@ # Required environment variables for load balancer -export BUGOUT_BROOD_URL="https://auth.bugout.dev" +export BUGOUT_AUTH_URL="https://auth.bugout.dev" export NB_APPLICATION_ID="" export NB_CONTROLLER_TOKEN="" export NB_CONTROLLER_ACCESS_ID=""