pull/747/head
kompotkot 2022-12-22 14:09:50 +00:00
rodzic d3e6070ec9
commit e40744c04b
4 zmienionych plików z 32 dodań i 45 usunięć

Wyświetl plik

@ -105,27 +105,7 @@ func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter
return callCounter return callCounter
} }
// IncreaseCallCounter increased to 1 each time node called // FilterTagsNodes returns nodes with provided tags
func (node *Node) IncreaseCallCounter() {
node.mux.Lock()
if node.CallCounter >= NB_MAX_COUNTER_NUMBER {
log.Printf("Number of calls for node %s reached %d limit, reset the counter.", node.Endpoint, NB_MAX_COUNTER_NUMBER)
node.CallCounter = uint64(0)
} else {
node.CallCounter++
}
node.mux.Unlock()
}
func containsGeneric[T comparable](b []T, e T) bool {
for _, v := range b {
if v == e {
return true
}
}
return false
}
func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) { func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) {
nodesMap := npool.NodesMap nodesMap := npool.NodesMap
nodesSet := npool.NodesSet nodesSet := npool.NodesSet
@ -174,7 +154,7 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
if node.IsAlive() { if node.IsAlive() {
currentBlock := node.CurrentBlock currentBlock := node.CurrentBlock
if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT { if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT {
// Bypass outdated nodes // Bypass too outdated nodes
continue continue
} }
if node.CallCounter < nextNode.CallCounter { if node.CallCounter < nextNode.CallCounter {
@ -183,12 +163,10 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
} }
} }
if nextNode == nil { if nextNode != nil {
return nil
}
// Increase CallCounter value with 1 // Increase CallCounter value with 1
atomic.AddUint64(&nextNode.CallCounter, uint64(1)) atomic.AddUint64(&nextNode.CallCounter, uint64(1))
}
return nextNode return nextNode
} }
@ -221,41 +199,41 @@ func StatusLog() {
// HealthCheck fetch the node latest block // HealthCheck fetch the node latest block
func HealthCheck() { func HealthCheck() {
for blockchain, nodes := range blockchainPools { for blockchain, nodes := range blockchainPools {
for _, n := range nodes.NodesSet { for _, node := range nodes.NodesSet {
alive := false alive := false
httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
resp, err := httpClient.Post( resp, err := httpClient.Post(
n.Endpoint.String(), node.Endpoint.String(),
"application/json", "application/json",
bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)),
) )
if err != nil { if err != nil {
n.UpdateNodeState(0, alive) node.UpdateNodeState(0, alive)
log.Printf("Unable to reach node: %s", n.Endpoint.Host) log.Printf("Unable to reach node: %s", node.Endpoint.Host)
continue continue
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
n.UpdateNodeState(0, alive) node.UpdateNodeState(0, alive)
log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) log.Printf("Unable to parse response from %s node, err %v", node.Endpoint.Host, err)
continue continue
} }
var statusResponse NodeStatusResponse var statusResponse NodeStatusResponse
err = json.Unmarshal(body, &statusResponse) err = json.Unmarshal(body, &statusResponse)
if err != nil { if err != nil {
n.UpdateNodeState(0, alive) node.UpdateNodeState(0, alive)
log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) log.Printf("Unable to read json response from %s node, err: %v", node.Endpoint.Host, err)
continue continue
} }
blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1)
blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64)
if err != nil { if err != nil {
n.UpdateNodeState(0, alive) node.UpdateNodeState(0, alive)
log.Printf("Unable to parse block number from hex to string, err: %v", err) log.Printf("Unable to parse block number from hex to string, err: %v", err)
continue continue
} }
@ -264,15 +242,24 @@ func HealthCheck() {
if blockNumber != 0 { if blockNumber != 0 {
alive = true alive = true
} }
callCounter := n.UpdateNodeState(blockNumber, alive) callCounter := node.UpdateNodeState(blockNumber, alive)
if blockNumber > nodes.TopNode.Block { if blockNumber > nodes.TopNode.Block {
nodes.TopNode.Block = blockNumber nodes.TopNode.Block = blockNumber
nodes.TopNode.Node = n nodes.TopNode.Node = node
}
if node.CallCounter >= NB_MAX_COUNTER_NUMBER {
log.Printf(
"Number of CallCounter for node %s reached %d limit, reset the counter.",
node.Endpoint, NB_MAX_COUNTER_NUMBER,
)
atomic.StoreUint64(&node.CallCounter, uint64(0))
} }
log.Printf( log.Printf(
"In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter, "Blockchain %s node %s is alive: %t with current block: %d called: %d times",
blockchain, node.Endpoint.Host, alive, blockNumber, callCounter,
) )
} }
} }

Wyświetl plik

@ -130,8 +130,6 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
} }
} }
node.IncreaseCallCounter()
// Overwrite Path so response will be returned to correct place // Overwrite Path so response will be returned to correct place
r.URL.Path = "/" r.URL.Path = "/"
node.GethReverseProxy.ServeHTTP(w, r) node.GethReverseProxy.ServeHTTP(w, r)

Wyświetl plik

@ -128,10 +128,11 @@ func Server() {
fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err) fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
resourcesLog := "Access with resources established."
if len(resources.Resources) != 1 { if len(resources.Resources) != 1 {
log.Println("There are no access IDs for users in resources") log.Printf("%s There are no access IDs for users in resources", resourcesLog)
} else { } else {
log.Println("Found user access IDs in resources") log.Printf("%s Found user access IDs in resources", resourcesLog)
} }
// Set internal crawlers access to bypass requests from internal services // Set internal crawlers access to bypass requests from internal services
@ -145,7 +146,6 @@ func Server() {
BlockchainAccess: true, BlockchainAccess: true,
ExtendedMethods: true, ExtendedMethods: true,
} }
log.Printf("Internal crawlers access set with user ID: %s", internalCrawlersUserID)
err = InitDatabaseClient() err = InitDatabaseClient()
if err != nil { if err != nil {
@ -185,6 +185,8 @@ func Server() {
r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER)) r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER))
// Change r.Host from nodebalancer's to end host so TLS check will be passed // Change r.Host from nodebalancer's to end host so TLS check will be passed
r.Host = r.URL.Host r.Host = r.URL.Host
// Explicit set of r.URL requires, because by default it adds trailing slash and brake some urls
r.URL = endpoint
} }
proxyErrorHandler(proxyToEndpoint, endpoint) proxyErrorHandler(proxyToEndpoint, endpoint)

Wyświetl plik

@ -1,6 +1,6 @@
module github.com/bugout-dev/moonstream/nodes/node_balancer module github.com/bugout-dev/moonstream/nodes/node_balancer
go 1.18 go 1.17
require ( require (
github.com/bugout-dev/bugout-go v0.3.4 github.com/bugout-dev/bugout-go v0.3.4