diff --git a/nodes/node_balancer/cmd/nodebalancer/balancer.go b/nodes/node_balancer/cmd/nodebalancer/balancer.go index 267af421..cf1de7dc 100644 --- a/nodes/node_balancer/cmd/nodebalancer/balancer.go +++ b/nodes/node_balancer/cmd/nodebalancer/balancer.go @@ -105,27 +105,7 @@ func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter return callCounter } -// IncreaseCallCounter increased to 1 each time node called -func (node *Node) IncreaseCallCounter() { - node.mux.Lock() - if node.CallCounter >= NB_MAX_COUNTER_NUMBER { - log.Printf("Number of calls for node %s reached %d limit, reset the counter.", node.Endpoint, NB_MAX_COUNTER_NUMBER) - node.CallCounter = uint64(0) - } else { - node.CallCounter++ - } - node.mux.Unlock() -} - -func containsGeneric[T comparable](b []T, e T) bool { - for _, v := range b { - if v == e { - return true - } - } - return false -} - +// FilterTagsNodes returns nodes with provided tags func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) { nodesMap := npool.NodesMap nodesSet := npool.NodesSet @@ -174,7 +154,7 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node { if node.IsAlive() { currentBlock := node.CurrentBlock if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT { - // Bypass outdated nodes + // Bypass too outdated nodes continue } if node.CallCounter < nextNode.CallCounter { @@ -183,13 +163,11 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node { } } - if nextNode == nil { - return nil + if nextNode != nil { + // Increase CallCounter value with 1 + atomic.AddUint64(&nextNode.CallCounter, uint64(1)) } - // Increase CallCounter value with 1 - atomic.AddUint64(&nextNode.CallCounter, uint64(1)) - return nextNode } @@ -221,41 +199,41 @@ func StatusLog() { // HealthCheck fetch the node latest block func HealthCheck() { for blockchain, nodes := range blockchainPools { - for _, n := range nodes.NodesSet { + for _, node := range nodes.NodesSet { alive := false httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} resp, err := httpClient.Post( - n.Endpoint.String(), + node.Endpoint.String(), "application/json", bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), ) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to reach node: %s", n.Endpoint.Host) + node.UpdateNodeState(0, alive) + log.Printf("Unable to reach node: %s", node.Endpoint.Host) continue } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) + node.UpdateNodeState(0, alive) + log.Printf("Unable to parse response from %s node, err %v", node.Endpoint.Host, err) continue } var statusResponse NodeStatusResponse err = json.Unmarshal(body, &statusResponse) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) + node.UpdateNodeState(0, alive) + log.Printf("Unable to read json response from %s node, err: %v", node.Endpoint.Host, err) continue } blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) if err != nil { - n.UpdateNodeState(0, alive) + node.UpdateNodeState(0, alive) log.Printf("Unable to parse block number from hex to string, err: %v", err) continue } @@ -264,15 +242,24 @@ func HealthCheck() { if blockNumber != 0 { alive = true } - callCounter := n.UpdateNodeState(blockNumber, alive) + callCounter := node.UpdateNodeState(blockNumber, alive) if blockNumber > nodes.TopNode.Block { nodes.TopNode.Block = blockNumber - nodes.TopNode.Node = n + nodes.TopNode.Node = node + } + + if node.CallCounter >= NB_MAX_COUNTER_NUMBER { + log.Printf( + "Number of CallCounter for node %s reached %d limit, reset the counter.", + node.Endpoint, NB_MAX_COUNTER_NUMBER, + ) + atomic.StoreUint64(&node.CallCounter, uint64(0)) } log.Printf( - "In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter, + "Blockchain %s node %s is alive: %t with current block: %d called: %d times", + blockchain, node.Endpoint.Host, alive, blockNumber, callCounter, ) } } diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index 586d9f00..57a5e6a8 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -130,8 +130,6 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, } } - node.IncreaseCallCounter() - // Overwrite Path so response will be returned to correct place r.URL.Path = "/" node.GethReverseProxy.ServeHTTP(w, r) diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index d3d693e0..d0533d6e 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -128,10 +128,11 @@ func Server() { fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err) os.Exit(1) } + resourcesLog := "Access with resources established." if len(resources.Resources) != 1 { - log.Println("There are no access IDs for users in resources") + log.Printf("%s There are no access IDs for users in resources", resourcesLog) } else { - log.Println("Found user access IDs in resources") + log.Printf("%s Found user access IDs in resources", resourcesLog) } // Set internal crawlers access to bypass requests from internal services @@ -145,7 +146,6 @@ func Server() { BlockchainAccess: true, ExtendedMethods: true, } - log.Printf("Internal crawlers access set with user ID: %s", internalCrawlersUserID) err = InitDatabaseClient() if err != nil { @@ -185,6 +185,8 @@ func Server() { r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER)) // Change r.Host from nodebalancer's to end host so TLS check will be passed r.Host = r.URL.Host + // Explicit set of r.URL requires, because by default it adds trailing slash and brake some urls + r.URL = endpoint } proxyErrorHandler(proxyToEndpoint, endpoint) diff --git a/nodes/node_balancer/go.mod b/nodes/node_balancer/go.mod index 73b2432c..a2e51e1f 100644 --- a/nodes/node_balancer/go.mod +++ b/nodes/node_balancer/go.mod @@ -1,6 +1,6 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer -go 1.18 +go 1.17 require ( github.com/bugout-dev/bugout-go v0.3.4