Optimised mux lock during healthcheck

pull/637/head
kompotkot 2022-07-12 21:04:25 +00:00
rodzic ce439f0f51
commit f753e7d137
2 zmienionych plików z 25 dodań i 25 usunięć

Wyświetl plik

@ -29,6 +29,7 @@ type Node struct {
Alive bool Alive bool
CurrentBlock uint64 CurrentBlock uint64
CallCounter uint64
mux sync.RWMutex mux sync.RWMutex
@ -92,19 +93,23 @@ func (node *Node) IsAlive() (alive bool) {
return alive return alive
} }
// SetCurrentBlock with mutex for exact node // UpdateNodeState updates block number and live status,
func (node *Node) SetCurrentBlock(currentBlock uint64) { // also it returns number of time node appeal
func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter uint64) {
node.mux.Lock() node.mux.Lock()
node.CurrentBlock = currentBlock node.CurrentBlock = currentBlock
node.Alive = alive
callCounter = node.CallCounter
node.mux.Unlock() node.mux.Unlock()
return callCounter
} }
// GetCurrentBlock returns block number // IncreaseCallCounter increased to 1 each time node called
func (node *Node) GetCurrentBlock() (currentBlock uint64) { func (node *Node) IncreaseCallCounter() {
node.mux.RLock() node.mux.Lock()
currentBlock = node.CurrentBlock node.CallCounter++
node.mux.RUnlock() node.mux.Unlock()
return currentBlock
} }
// GetNextNode returns next active peer to take a connection // GetNextNode returns next active peer to take a connection
@ -125,7 +130,7 @@ func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node {
} }
} }
// Increase Current value with 1 to be able to track node appeals // Increase Current value with 1
currentInc := atomic.AddUint64(&np.Current, uint64(1)) currentInc := atomic.AddUint64(&np.Current, uint64(1))
// next is an Atomic incrementer, value always in range from 0 to slice length, // next is an Atomic incrementer, value always in range from 0 to slice length,
@ -185,6 +190,8 @@ func (bpool *BlockchainPool) StatusLog() {
func (bpool *BlockchainPool) HealthCheck() { func (bpool *BlockchainPool) HealthCheck() {
for _, b := range bpool.Blockchains { for _, b := range bpool.Blockchains {
for _, n := range b.Nodes { for _, n := range b.Nodes {
alive := false
httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
resp, err := httpClient.Post( resp, err := httpClient.Post(
n.Endpoint.String(), n.Endpoint.String(),
@ -192,8 +199,7 @@ func (bpool *BlockchainPool) HealthCheck() {
bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)),
) )
if err != nil { if err != nil {
n.SetAlive(false) n.UpdateNodeState(0, alive)
n.SetCurrentBlock(0)
log.Printf("Unable to reach node: %s", n.Endpoint.Host) log.Printf("Unable to reach node: %s", n.Endpoint.Host)
continue continue
} }
@ -201,8 +207,7 @@ func (bpool *BlockchainPool) HealthCheck() {
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
n.SetAlive(false) n.UpdateNodeState(0, alive)
n.SetCurrentBlock(0)
log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err)
continue continue
} }
@ -210,8 +215,7 @@ func (bpool *BlockchainPool) HealthCheck() {
var statusResponse NodeStatusResponse var statusResponse NodeStatusResponse
err = json.Unmarshal(body, &statusResponse) err = json.Unmarshal(body, &statusResponse)
if err != nil { if err != nil {
n.SetAlive(false) n.UpdateNodeState(0, alive)
n.SetCurrentBlock(0)
log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err)
continue continue
} }
@ -219,25 +223,19 @@ func (bpool *BlockchainPool) HealthCheck() {
blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1)
blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64)
if err != nil { if err != nil {
n.SetAlive(false) n.UpdateNodeState(0, alive)
n.SetCurrentBlock(0)
log.Printf("Unable to parse block number from hex to string, err: %v", err) log.Printf("Unable to parse block number from hex to string, err: %v", err)
continue continue
} }
// Mark node in list of nodes as alive or not and update current block // Mark node in list of pool as alive and update current block
var alive bool
if blockNumber != 0 { if blockNumber != 0 {
alive = true alive = true
} else {
alive = false
} }
n.SetAlive(alive) callCounter := n.UpdateNodeState(blockNumber, alive)
n.SetCurrentBlock(blockNumber)
log.Printf( log.Printf(
"Node %s is alive: %t with current block: %d blockchain called: %d times", "Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter,
n.Endpoint.Host, alive, blockNumber, b.Current,
) )
} }
} }

Wyświetl plik

@ -118,6 +118,8 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
} }
} }
node.IncreaseCallCounter()
// Overwrite Path so response will be returned to correct place // Overwrite Path so response will be returned to correct place
r.URL.Path = "/" r.URL.Path = "/"
node.GethReverseProxy.ServeHTTP(w, r) node.GethReverseProxy.ServeHTTP(w, r)