Mutex and blockchain respect for client node balancer

pull/519/head
kompotkot 2022-01-17 11:09:44 +00:00
rodzic 491a374437
commit aab3b8f4ac
5 zmienionych plików z 82 dodań i 53 usunięć

Wyświetl plik

@ -1,6 +1,7 @@
package cmd package cmd
import ( import (
"errors"
"log" "log"
"reflect" "reflect"
"time" "time"
@ -8,46 +9,71 @@ import (
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
var clientPool ClientPool var ethereumClientPool ClientPool
var polygonClientPool ClientPool
// func (client *Client) GetClientNode() (clientNode *Node) { // Generate client pools for different blockchains
// client.mux.RLock() func CreateClientPools() {
// clientNode = client.Node ethereumClientPool.Client = make(map[string]*Client)
// client.mux.RUnlock() polygonClientPool.Client = make(map[string]*Client)
// return clientNode }
// }
// Add client node and client itself if doesn't exist // Return client pool correspongin to blockchain
// TODO(kompotkot): Add mutex as for balancer func GetClientPool(blockchain string) (*ClientPool, error) {
func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) { var cpool *ClientPool
if blockchain == "ethereum" {
cpool = &ethereumClientPool
} else if blockchain == "polygon" {
cpool = &polygonClientPool
} else {
return nil, errors.New("Unexisting blockchain provided")
}
return cpool, nil
}
// Updates client last appeal to node
func (client *Client) UpdateClientLastCall() {
ts := time.Now().Unix() ts := time.Now().Unix()
// Find clint with same ID and update timestamp or client.mux.Lock()
// add new one if doesn't exist client.LastCallTs = ts
client.mux.Unlock()
}
// Get number of seconds from current last call to client node
func (client *Client) GetClientLastCallDiff() (lastCallTs int64) {
ts := time.Now().Unix()
client.mux.RLock()
lastCallTs = ts - client.LastCallTs
client.mux.RUnlock()
return lastCallTs
}
// Find clint with same ID and update timestamp or
// add new one if doesn't exist
func (cpool *ClientPool) AddClientNode(id string, node *Node) {
if cpool.Client[id] != nil { if cpool.Client[id] != nil {
if reflect.DeepEqual(cpool.Client[id].Node, node) { if reflect.DeepEqual(cpool.Client[id].Node, node) {
cpool.Client[id].LastCallTs = ts cpool.Client[id].UpdateClientLastCall()
return return
} }
} }
cpool.Client[id] = &Client{ cpool.Client[id] = &Client{
Blockchain: blockchain,
Node: node, Node: node,
LastCallTs: ts, LastCallTs: time.Now().Unix(),
} }
} }
// Get client hot node if exists // Get client hot node if exists
// TODO(kompotkot): Add mutex as for balancer
func (cpool *ClientPool) GetClientNode(id string) *Node { func (cpool *ClientPool) GetClientNode(id string) *Node {
ts := time.Now().Unix() if cpool.Client[id] != nil {
lastCallTs := cpool.Client[id].GetClientLastCallDiff()
currentClient := cpool.Client[id] if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
cpool.Client[id].UpdateClientLastCall()
if currentClient != nil { return cpool.Client[id].Node
if ts-currentClient.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
currentClient.LastCallTs = ts
return currentClient.Node
} }
delete(cpool.Client, id) delete(cpool.Client, id)
} }
@ -55,14 +81,12 @@ func (cpool *ClientPool) GetClientNode(id string) *Node {
return nil return nil
} }
// Clean client list of hot nodes from outdated // Clean client list of hot outdated nodes
// TODO(kompotkot): Add mutex as for balancer
func (cpool *ClientPool) CleanInactiveClientNodes() { func (cpool *ClientPool) CleanInactiveClientNodes() {
ts := time.Now().Unix()
cnt := 0 cnt := 0
for id, client := range cpool.Client { for id, client := range cpool.Client {
if ts-client.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { lastCallTs := client.GetClientLastCallDiff()
if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
delete(cpool.Client, id) delete(cpool.Client, id)
} else { } else {
cnt += 1 cnt += 1

Wyświetl plik

@ -13,14 +13,14 @@ func TestAddClientNode(t *testing.T) {
clients map[string]*Client clients map[string]*Client
expected string expected string
}{ }{
{map[string]*Client{"1": {Blockchain: "ethereum", Node: &Node{Alive: true}}}, "1"}, {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"},
} }
for _, c := range cases { for _, c := range cases {
clientPool.Client = make(map[string]*Client) CreateClientPools()
for id, client := range c.clients { for id, client := range c.clients {
clientPool.AddClientNode(id, client.Blockchain, client.Node) ethereumClientPool.AddClientNode(id, client.Node)
} }
for id := range clientPool.Client { for id := range ethereumClientPool.Client {
if id != c.expected { if id != c.expected {
t.Log("Wrong client was added") t.Log("Wrong client was added")
t.Fatal() t.Fatal()
@ -38,17 +38,17 @@ func TestGetClientNode(t *testing.T) {
expected *Node expected *Node
}{ }{
{map[string]*Client{}, "1", nil}, {map[string]*Client{}, "1", nil},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}},
{map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil},
} }
for _, c := range cases { for _, c := range cases {
clientPool.Client = make(map[string]*Client) CreateClientPools()
for id, client := range c.clients { for id, client := range c.clients {
clientPool.Client[id] = client ethereumClientPool.Client[id] = client
} }
clientNode := clientPool.GetClientNode(c.id) clientNode := ethereumClientPool.GetClientNode(c.id)
if !reflect.DeepEqual(clientNode, c.expected) { if !reflect.DeepEqual(clientNode, c.expected) {
t.Log("Wrong node returned") t.Log("Wrong node returned")
t.Fatal() t.Fatal()
@ -63,22 +63,22 @@ func TestCleanInactiveClientNodes(t *testing.T) {
clients map[string]*Client clients map[string]*Client
expected string expected string
}{ }{
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts}}, "1"}, {map[string]*Client{"1": {LastCallTs: ts}}, "1"},
{map[string]*Client{ {map[string]*Client{
"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, "1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE},
"2": {Blockchain: "polygon", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, "2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10},
"3": {Blockchain: "stellar", LastCallTs: ts}, "3": {LastCallTs: ts},
}, "3"}, }, "3"},
} }
for _, c := range cases { for _, c := range cases {
clientPool.Client = make(map[string]*Client) CreateClientPools()
for id, client := range c.clients { for id, client := range c.clients {
clientPool.Client[id] = client ethereumClientPool.Client[id] = client
} }
clientPool.CleanInactiveClientNodes() ethereumClientPool.CleanInactiveClientNodes()
for id := range clientPool.Client { for id := range ethereumClientPool.Client {
if id != c.expected { if id != c.expected {
t.Log("Wrong client was removed") t.Log("Wrong client was removed")
t.Fatal() t.Fatal()

Wyświetl plik

@ -20,7 +20,6 @@ type NodeStatusResponse struct {
// Node - which one node client worked with // Node - which one node client worked with
// LastCallTs - timestamp from last call // LastCallTs - timestamp from last call
type Client struct { type Client struct {
Blockchain string
Node *Node Node *Node
LastCallTs int64 LastCallTs int64

Wyświetl plik

@ -39,14 +39,19 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
} }
var node *Node var node *Node
node = clientPool.GetClientNode(ip) cpool, err := GetClientPool(blockchain)
if err != nil {
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
return
}
cpool.GetClientNode(ip)
if node == nil { if node == nil {
node = blockchainPool.GetNextNode(blockchain) node = blockchainPool.GetNextNode(blockchain)
if node == nil { if node == nil {
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
return return
} }
clientPool.AddClientNode(ip, blockchain, node) cpool.AddClientNode(ip, node)
} }
// Save origin path, to use in proxyErrorHandler if node will not response // Save origin path, to use in proxyErrorHandler if node will not response

Wyświetl plik

@ -27,7 +27,8 @@ func initHealthCheck(debug bool) {
select { select {
case <-t.C: case <-t.C:
blockchainPool.HealthCheck() blockchainPool.HealthCheck()
clientPool.CleanInactiveClientNodes() ethereumClientPool.CleanInactiveClientNodes()
polygonClientPool.CleanInactiveClientNodes()
if debug { if debug {
blockchainPool.StatusLog() blockchainPool.StatusLog()
} }
@ -109,7 +110,7 @@ func InitServer() {
} }
// Generate map of clients // Generate map of clients
clientPool.Client = make(map[string]*Client) CreateClientPools()
// Configure Humbug reporter to handle errors // Configure Humbug reporter to handle errors
var err error var err error