Balncer with tags based on CallCounter

pull/747/head
kompotkot 2022-12-22 13:02:00 +00:00
rodzic de609da7cd
commit 382d776f8d
6 zmienionych plików z 163 dodań i 109 usunięć

Wyświetl plik

@ -1,5 +1,5 @@
/* /*
Load balancer, based on https://github.com/kasvith/simplelb/ Load balancer logic.
*/ */
package main package main
@ -19,10 +19,9 @@ import (
// Main variable of pool of blockchains which contains pool of nodes // Main variable of pool of blockchains which contains pool of nodes
// for each blockchain we work during session. // for each blockchain we work during session.
var blockchainPool BlockchainPool var blockchainPools map[string]*NodePool
// Node structure with // Node structure with
// StatusURL for status server at node endpoint
// Endpoint for geth/bor/etc node http.server endpoint // Endpoint for geth/bor/etc node http.server endpoint
type Node struct { type Node struct {
Endpoint *url.URL Endpoint *url.URL
@ -36,16 +35,16 @@ type Node struct {
GethReverseProxy *httputil.ReverseProxy GethReverseProxy *httputil.ReverseProxy
} }
type NodePool struct { type TopNodeBlock struct {
Blockchain string Block uint64
Nodes []*Node Node *Node
// Counter to observe all nodes
Current uint64
} }
type BlockchainPool struct { type NodePool struct {
Blockchains []*NodePool NodesMap map[string][]*Node
NodesSet []*Node
TopNode TopNodeBlock
} }
// Node status response struct for HealthCheck // Node status response struct for HealthCheck
@ -58,24 +57,25 @@ type NodeStatusResponse struct {
} }
// AddNode to the nodes pool // AddNode to the nodes pool
func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) { func AddNode(blockchain string, tags []string, node *Node) {
var nodePool *NodePool if blockchainPools == nil {
for _, b := range bpool.Blockchains { blockchainPools = make(map[string]*NodePool)
if b.Blockchain == blockchain { }
nodePool = b if blockchainPools[blockchain] == nil {
} blockchainPools[blockchain] = &NodePool{}
}
if blockchainPools[blockchain].NodesMap == nil {
blockchainPools[blockchain].NodesMap = make(map[string][]*Node)
}
blockchainPools[blockchain].NodesSet = append(blockchainPools[blockchain].NodesSet, node)
for _, tag := range tags {
blockchainPools[blockchain].NodesMap[tag] = append(
blockchainPools[blockchain].NodesMap[tag],
node,
)
} }
// Check if blockchain not yet in pool
if nodePool == nil {
nodePool = &NodePool{
Blockchain: blockchain,
}
nodePool.Nodes = append(nodePool.Nodes, node)
bpool.Blockchains = append(bpool.Blockchains, nodePool)
} else {
nodePool.Nodes = append(nodePool.Nodes, node)
}
} }
// SetAlive with mutex for exact node // SetAlive with mutex for exact node
@ -117,59 +117,86 @@ func (node *Node) IncreaseCallCounter() {
node.mux.Unlock() node.mux.Unlock()
} }
// GetNextNode returns next active peer to take a connection func containsGeneric[T comparable](b []T, e T) bool {
// Loop through entire nodes to find out an alive one for _, v := range b {
func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { if v == e {
highestBlock := uint64(0) return true
}
}
return false
}
// Get NodePool with correct blockchain func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) {
var np *NodePool nodesMap := npool.NodesMap
for _, b := range bpool.Blockchains { nodesSet := npool.NodesSet
if b.Blockchain == blockchain {
np = b tagSet := make(map[string]map[*Node]bool)
for _, n := range b.Nodes {
if n.CurrentBlock > highestBlock { for tag, nodes := range nodesMap {
highestBlock = n.CurrentBlock if tagSet[tag] == nil {
} tagSet[tag] = make(map[*Node]bool)
}
for _, node := range nodes {
tagSet[tag][node] = true
}
}
topNode := TopNodeBlock{}
var filteredNodes []*Node
for _, node := range nodesSet {
accept := true
for _, tag := range tags {
if tagSet[tag][node] != true {
accept = false
break
}
}
if accept {
filteredNodes = append(filteredNodes, node)
currentBlock := node.CurrentBlock
if currentBlock >= npool.TopNode.Block {
topNode.Block = currentBlock
topNode.Node = node
} }
} }
} }
// Increase Current value with 1 return filteredNodes, topNode
currentInc := atomic.AddUint64(&np.Current, uint64(1)) }
// next is an Atomic incrementer, value always in range from 0 to slice length, // GetNextNode returns next active peer to take a connection
// it returns an index of slice // Loop through entire nodes to find out an alive one and chose one with small CallCounter
next := int(currentInc % uint64(len(np.Nodes))) func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
nextNode := topNode.Node
// Start from next one and move full cycle for _, node := range nodes {
l := len(np.Nodes) + next if node.IsAlive() {
currentBlock := node.CurrentBlock
for i := next; i < l; i++ { if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT {
// Take an index by modding with length // Bypass outdated nodes
idx := i % len(np.Nodes)
// If we have an alive one, use it and store if its not the original one
if np.Nodes[idx].IsAlive() {
if i != next {
// Mark the current one
atomic.StoreUint64(&np.Current, uint64(idx))
}
// Pass nodes with low blocks
// TODO(kompotkot): Re-write to not rotate through not highest blocks
if np.Nodes[idx].CurrentBlock < highestBlock {
continue continue
} }
if node.CallCounter < nextNode.CallCounter {
return np.Nodes[idx] nextNode = node
}
} }
} }
return nil
if nextNode == nil {
return nil
}
// Increase CallCounter value with 1
atomic.AddUint64(&nextNode.CallCounter, uint64(1))
return nextNode
} }
// SetNodeStatus modify status of the node // SetNodeStatus modify status of the node
func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { func SetNodeStatus(url *url.URL, alive bool) {
for _, b := range bpool.Blockchains { for _, nodes := range blockchainPools {
for _, n := range b.Nodes { for _, n := range nodes.NodesSet {
if n.Endpoint.String() == url.String() { if n.Endpoint.String() == url.String() {
n.SetAlive(alive) n.SetAlive(alive)
break break
@ -180,21 +207,21 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
// StatusLog logs node status // StatusLog logs node status
// TODO(kompotkot): Print list of alive and dead nodes // TODO(kompotkot): Print list of alive and dead nodes
func (bpool *BlockchainPool) StatusLog() { func StatusLog() {
for _, b := range bpool.Blockchains { for blockchain, nodes := range blockchainPools {
for _, n := range b.Nodes { for _, n := range nodes.NodesSet {
log.Printf( log.Printf(
"Blockchain %s node %s is alive %t. Blockchain called %d times", "Blockchain %s node %s is alive %t",
b.Blockchain, n.Endpoint.Host, n.Alive, b.Current, blockchain, n.Endpoint.Host, n.Alive,
) )
} }
} }
} }
// HealthCheck fetch the node latest block // HealthCheck fetch the node latest block
func (bpool *BlockchainPool) HealthCheck() { func HealthCheck() {
for _, b := range bpool.Blockchains { for blockchain, nodes := range blockchainPools {
for _, n := range b.Nodes { for _, n := range nodes.NodesSet {
alive := false alive := false
httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
@ -239,8 +266,13 @@ func (bpool *BlockchainPool) HealthCheck() {
} }
callCounter := n.UpdateNodeState(blockNumber, alive) callCounter := n.UpdateNodeState(blockNumber, alive)
if blockNumber > nodes.TopNode.Block {
nodes.TopNode.Block = blockNumber
nodes.TopNode.Node = n
}
log.Printf( log.Printf(
"Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter, "In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter,
) )
} }
} }

Wyświetl plik

@ -34,7 +34,8 @@ var (
NB_MAX_COUNTER_NUMBER = uint64(10000000) NB_MAX_COUNTER_NUMBER = uint64(10000000)
// Client configuration // Client configuration
NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds NB_CLIENT_NODE_KEEP_ALIVE = int64(1) // How long to store node in hot list for client in seconds
NB_HIGHEST_BLOCK_SHIFT = uint64(50) // Allowed shift to prefer node with most highest block
NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER") NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER")
NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER") NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER")

Wyświetl plik

@ -98,13 +98,13 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
return removedAccessIds, totalAccessIds return removedAccessIds, totalAccessIds
} }
func initCacheCleaning(debug bool) { func initCacheCleaning() {
t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL) t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL)
for { for {
select { select {
case <-t.C: case <-t.C:
removedAccessIds, totalAccessIds := accessIdCache.Cleanup() removedAccessIds, totalAccessIds := accessIdCache.Cleanup()
if debug { if stateCLI.enableDebugFlag {
log.Printf("Removed %d elements from access id cache", removedAccessIds) log.Printf("Removed %d elements from access id cache", removedAccessIds)
} }
log.Printf("Elements in access id cache: %d", totalAccessIds) log.Printf("Elements in access id cache: %d", totalAccessIds)

Wyświetl plik

@ -53,25 +53,12 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// Chose one node
var node *Node
cpool := GetClientPool(blockchain)
node = cpool.GetClientNode(currentClientAccess.AccessID)
if node == nil {
node = blockchainPool.GetNextNode(blockchain)
if node == nil {
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
return
}
cpool.AddClientNode(currentClientAccess.AccessID, node)
}
// Save origin path, to use in proxyErrorHandler if node will not response // Save origin path, to use in proxyErrorHandler if node will not response
r.Header.Add("X-Origin-Path", r.URL.Path) r.Header.Add("X-Origin-Path", r.URL.Path)
switch { switch {
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
lbJSONRPCHandler(w, r, blockchain, node, currentClientAccess) lbJSONRPCHandler(w, r, blockchain, currentClientAccess)
return return
default: default:
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
@ -79,7 +66,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) { func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, currentClientAccess ClientResourceData) {
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
http.Error(w, "Unable to read body", http.StatusBadRequest) http.Error(w, "Unable to read body", http.StatusBadRequest)
@ -94,6 +81,39 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
return return
} }
// Get tags from request params, sort and generate from it identifier
var tags []string
queries := r.URL.Query()
for k, v := range queries {
if k == "tag" {
for _, tag := range v {
tags = append(tags, tag)
}
}
}
// Chose one node
var node *Node
cpool := GetClientPool(blockchain)
node = cpool.GetClientNode(currentClientAccess.AccessID)
if node == nil {
npool := blockchainPools[blockchain]
var nodes []*Node
var topNode TopNodeBlock
if len(tags) != 0 {
nodes, topNode = npool.FilterTagsNodes(tags)
} else {
topNode = npool.TopNode
nodes = npool.NodesSet
}
node = GetNextNode(nodes, topNode)
if node == nil {
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
return
}
cpool.AddClientNode(currentClientAccess.AccessID, node)
}
switch { switch {
case currentClientAccess.dataSource == "blockchain": case currentClientAccess.dataSource == "blockchain":
if currentClientAccess.BlockchainAccess == false { if currentClientAccess.BlockchainAccess == false {

Wyświetl plik

@ -5,7 +5,6 @@ package main
import ( import (
"context" "context"
// "encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
@ -29,12 +28,12 @@ var (
) )
// initHealthCheck runs a routine for check status of the nodes every 5 seconds // initHealthCheck runs a routine for check status of the nodes every 5 seconds
func initHealthCheck(debug bool) { func initHealthCheck() {
t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL) t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL)
for { for {
select { select {
case <-t.C: case <-t.C:
blockchainPool.HealthCheck() HealthCheck()
logStr := "Client pool healthcheck." logStr := "Client pool healthcheck."
for b := range configBlockchains { for b := range configBlockchains {
cp := clientPool[b] cp := clientPool[b]
@ -42,8 +41,8 @@ func initHealthCheck(debug bool) {
logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients) logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients)
} }
log.Println(logStr) log.Println(logStr)
if debug { if stateCLI.enableDebugFlag {
blockchainPool.StatusLog() StatusLog()
} }
} }
} }
@ -89,7 +88,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
} }
// After 3 retries, mark this backend as down // After 3 retries, mark this backend as down
blockchainPool.SetNodeStatus(url, false) SetNodeStatus(url, false)
// Set modified path back // Set modified path back
// TODO(kompotkot): Try r.RequestURI instead of header // TODO(kompotkot): Try r.RequestURI instead of header
@ -189,11 +188,13 @@ func Server() {
} }
proxyErrorHandler(proxyToEndpoint, endpoint) proxyErrorHandler(proxyToEndpoint, endpoint)
blockchainPool.AddNode(&Node{ newNode := &Node{
Endpoint: endpoint, Endpoint: endpoint,
Alive: true, Alive: true,
GethReverseProxy: proxyToEndpoint, GethReverseProxy: proxyToEndpoint,
}, nodeConfig.Blockchain) }
AddNode(nodeConfig.Blockchain, nodeConfig.Tags, newNode)
log.Printf( log.Printf(
"Added new %s proxy blockchain under index %d from config file with geth url: %s://%s", "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s",
nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host) nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host)
@ -202,6 +203,12 @@ func Server() {
// Generate map of clients // Generate map of clients
CreateClientPools() CreateClientPools()
// Start node health checking and current block fetching
HealthCheck()
if stateCLI.enableHealthCheckFlag {
go initHealthCheck()
}
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler))) serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler)))
log.Println("Authentication middleware enabled") log.Println("Authentication middleware enabled")
@ -218,14 +225,8 @@ func Server() {
WriteTimeout: 40 * time.Second, WriteTimeout: 40 * time.Second,
} }
// Start node health checking and current block fetching
blockchainPool.HealthCheck()
if stateCLI.enableHealthCheckFlag {
go initHealthCheck(stateCLI.enableDebugFlag)
}
// Start access id cache cleaning // Start access id cache cleaning
go initCacheCleaning(stateCLI.enableDebugFlag) go initCacheCleaning()
log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
err = server.ListenAndServe() err = server.ListenAndServe()

Wyświetl plik

@ -1,6 +1,6 @@
module github.com/bugout-dev/moonstream/nodes/node_balancer module github.com/bugout-dev/moonstream/nodes/node_balancer
go 1.17 go 1.18
require ( require (
github.com/bugout-dev/bugout-go v0.3.4 github.com/bugout-dev/bugout-go v0.3.4