From aab3b8f4ac4bb00a85fb595ba15ccce84777a70e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:09:44 +0000 Subject: [PATCH] Mutex and blockchain respect for client node balancer --- nodes/node_balancer/cmd/clients.go | 82 ++++++++++++++++--------- nodes/node_balancer/cmd/clients_test.go | 38 ++++++------ nodes/node_balancer/cmd/data.go | 1 - nodes/node_balancer/cmd/routes.go | 9 ++- nodes/node_balancer/cmd/server.go | 5 +- 5 files changed, 82 insertions(+), 53 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 2e0b300c..c591de24 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -1,6 +1,7 @@ package cmd import ( + "errors" "log" "reflect" "time" @@ -8,46 +9,71 @@ import ( configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) -var clientPool ClientPool +var ethereumClientPool ClientPool +var polygonClientPool ClientPool -// func (client *Client) GetClientNode() (clientNode *Node) { -// client.mux.RLock() -// clientNode = client.Node -// client.mux.RUnlock() -// return clientNode -// } +// Generate client pools for different blockchains +func CreateClientPools() { + ethereumClientPool.Client = make(map[string]*Client) + polygonClientPool.Client = make(map[string]*Client) +} -// Add client node and client itself if doesn't exist -// TODO(kompotkot): Add mutex as for balancer -func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) { +// Return client pool correspongin to blockchain +func GetClientPool(blockchain string) (*ClientPool, error) { + var cpool *ClientPool + if blockchain == "ethereum" { + cpool = ðereumClientPool + } else if blockchain == "polygon" { + cpool = &polygonClientPool + } else { + return nil, errors.New("Unexisting blockchain provided") + } + return cpool, nil +} + +// Updates client last appeal to node +func (client *Client) UpdateClientLastCall() { ts := time.Now().Unix() - // Find clint with same ID and update timestamp or - // add new one if doesn't exist + client.mux.Lock() + client.LastCallTs = ts + client.mux.Unlock() +} + +// Get number of seconds from current last call to client node +func (client *Client) GetClientLastCallDiff() (lastCallTs int64) { + ts := time.Now().Unix() + + client.mux.RLock() + lastCallTs = ts - client.LastCallTs + client.mux.RUnlock() + + return lastCallTs +} + +// Find clint with same ID and update timestamp or +// add new one if doesn't exist +func (cpool *ClientPool) AddClientNode(id string, node *Node) { + if cpool.Client[id] != nil { if reflect.DeepEqual(cpool.Client[id].Node, node) { - cpool.Client[id].LastCallTs = ts + cpool.Client[id].UpdateClientLastCall() return } } cpool.Client[id] = &Client{ - Blockchain: blockchain, Node: node, - LastCallTs: ts, + LastCallTs: time.Now().Unix(), } } // Get client hot node if exists -// TODO(kompotkot): Add mutex as for balancer func (cpool *ClientPool) GetClientNode(id string) *Node { - ts := time.Now().Unix() - - currentClient := cpool.Client[id] - - if currentClient != nil { - if ts-currentClient.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { - currentClient.LastCallTs = ts - return currentClient.Node + if cpool.Client[id] != nil { + lastCallTs := cpool.Client[id].GetClientLastCallDiff() + if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + cpool.Client[id].UpdateClientLastCall() + return cpool.Client[id].Node } delete(cpool.Client, id) } @@ -55,14 +81,12 @@ func (cpool *ClientPool) GetClientNode(id string) *Node { return nil } -// Clean client list of hot nodes from outdated -// TODO(kompotkot): Add mutex as for balancer +// Clean client list of hot outdated nodes func (cpool *ClientPool) CleanInactiveClientNodes() { - ts := time.Now().Unix() - cnt := 0 for id, client := range cpool.Client { - if ts-client.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + lastCallTs := client.GetClientLastCallDiff() + if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { delete(cpool.Client, id) } else { cnt += 1 diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go index 9ad1f7e7..bcdb0947 100644 --- a/nodes/node_balancer/cmd/clients_test.go +++ b/nodes/node_balancer/cmd/clients_test.go @@ -13,14 +13,14 @@ func TestAddClientNode(t *testing.T) { clients map[string]*Client expected string }{ - {map[string]*Client{"1": {Blockchain: "ethereum", Node: &Node{Alive: true}}}, "1"}, + {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.AddClientNode(id, client.Blockchain, client.Node) + ethereumClientPool.AddClientNode(id, client.Node) } - for id := range clientPool.Client { + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was added") t.Fatal() @@ -38,17 +38,17 @@ func TestGetClientNode(t *testing.T) { expected *Node }{ {map[string]*Client{}, "1", nil}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, - {map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, + {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.Client[id] = client + ethereumClientPool.Client[id] = client } - clientNode := clientPool.GetClientNode(c.id) + clientNode := ethereumClientPool.GetClientNode(c.id) if !reflect.DeepEqual(clientNode, c.expected) { t.Log("Wrong node returned") t.Fatal() @@ -63,22 +63,22 @@ func TestCleanInactiveClientNodes(t *testing.T) { clients map[string]*Client expected string }{ - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts}}, "1"}, + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, + {map[string]*Client{"1": {LastCallTs: ts}}, "1"}, {map[string]*Client{ - "1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, - "2": {Blockchain: "polygon", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, - "3": {Blockchain: "stellar", LastCallTs: ts}, + "1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, + "2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, + "3": {LastCallTs: ts}, }, "3"}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.Client[id] = client + ethereumClientPool.Client[id] = client } - clientPool.CleanInactiveClientNodes() - for id := range clientPool.Client { + ethereumClientPool.CleanInactiveClientNodes() + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was removed") t.Fatal() diff --git a/nodes/node_balancer/cmd/data.go b/nodes/node_balancer/cmd/data.go index 9a578e72..0a1d3e0a 100644 --- a/nodes/node_balancer/cmd/data.go +++ b/nodes/node_balancer/cmd/data.go @@ -20,7 +20,6 @@ type NodeStatusResponse struct { // Node - which one node client worked with // LastCallTs - timestamp from last call type Client struct { - Blockchain string Node *Node LastCallTs int64 diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index e8c2d7e0..efb2e9fe 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -39,14 +39,19 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } var node *Node - node = clientPool.GetClientNode(ip) + cpool, err := GetClientPool(blockchain) + if err != nil { + http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest) + return + } + cpool.GetClientNode(ip) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) return } - clientPool.AddClientNode(ip, blockchain, node) + cpool.AddClientNode(ip, node) } // Save origin path, to use in proxyErrorHandler if node will not response diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 13cd15a7..c25166e1 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -27,7 +27,8 @@ func initHealthCheck(debug bool) { select { case <-t.C: blockchainPool.HealthCheck() - clientPool.CleanInactiveClientNodes() + ethereumClientPool.CleanInactiveClientNodes() + polygonClientPool.CleanInactiveClientNodes() if debug { blockchainPool.StatusLog() } @@ -109,7 +110,7 @@ func InitServer() { } // Generate map of clients - clientPool.Client = make(map[string]*Client) + CreateClientPools() // Configure Humbug reporter to handle errors var err error