From a0e50b3722f747ceb253d212f213947266b925e2 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jan 2022 18:53:15 +0000 Subject: [PATCH 01/13] Implemented clients with IPs with cleaning after inactive time --- nodes/node_balancer/cmd/balancer.go | 7 +- nodes/node_balancer/cmd/clients.go | 97 +++++++++++++++++++++++++ nodes/node_balancer/cmd/clients_test.go | 88 ++++++++++++++++++++++ nodes/node_balancer/cmd/data.go | 17 +++++ nodes/node_balancer/cmd/routes.go | 22 ++++-- nodes/node_balancer/cmd/server.go | 12 ++- nodes/node_balancer/configs/settings.go | 13 ++-- 7 files changed, 239 insertions(+), 17 deletions(-) create mode 100644 nodes/node_balancer/cmd/clients.go create mode 100644 nodes/node_balancer/cmd/clients_test.go diff --git a/nodes/node_balancer/cmd/balancer.go b/nodes/node_balancer/cmd/balancer.go index 976849c8..e23b5543 100644 --- a/nodes/node_balancer/cmd/balancer.go +++ b/nodes/node_balancer/cmd/balancer.go @@ -70,9 +70,9 @@ func (node *Node) GetCurrentBlock() (currentBlock uint64) { return currentBlock } -// GetNextPeer returns next active peer to take a connection +// GetNextNode returns next active peer to take a connection // Loop through entire nodes to find out an alive one -func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node { +func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { highestBlock := uint64(0) // Get NodePool with correct blockchain @@ -113,6 +113,7 @@ func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node { continue } + fmt.Printf("Used node: %s with hi block: %d\n", np.Nodes[idx].GethURL, np.Nodes[idx].CurrentBlock) return np.Nodes[idx] } } @@ -152,7 +153,7 @@ func (bpool *BlockchainPool) HealthCheck() { n.SetCurrentBlock(0) // Get response from node /ping endpoint - httpClient := http.Client{Timeout: configs.LB_HEALTH_CHECK_CALL_TIMEOUT} + httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT} resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL)) if err != nil { log.Printf("Unable to reach node: %s\n", n.StatusURL) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go new file mode 100644 index 00000000..40974129 --- /dev/null +++ b/nodes/node_balancer/cmd/clients.go @@ -0,0 +1,97 @@ +package cmd + +import ( + "fmt" + "log" + "reflect" + "time" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" +) + +var clientPool ClientPool + +func (cpool *ClientPool) AddClient(ip string) { + var client *Client + for _, c := range cpool.Clients { + if c.IP == ip { + client = c + } + } + + if client == nil { + fmt.Println("Adding new client") + client = &Client{ + IP: ip, + } + cpool.Clients = append(cpool.Clients, client) + } +} + +func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { + ts := time.Now().Unix() + + for _, c := range cpool.Clients { + if c.IP == ip { + newNode := true + + for _, cn := range c.ClientNodes { + if reflect.DeepEqual(cn.Node, node) { + cn.LastCallTs = ts + newNode = false + } + } + + if newNode { + c.ClientNodes = append(c.ClientNodes, ClientNode{ + Blockchain: blockchain, + Node: node, + LastCallTs: ts, + }) + } + } + } +} + +func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { + ts := time.Now().Unix() + + for _, c := range cpool.Clients { + if c.IP == ip { + for j, cn := range c.ClientNodes { + if cn.Blockchain == blockchain { + if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + cn.LastCallTs = ts + fmt.Println("Hot client node found, re-use it") + return cn.Node + } else { + fmt.Println("Client node outdated, remove it") + c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) + } + } + } + } + } + + return nil +} + +func (cpool *ClientPool) CleanInactiveClientNodes() { + ts := time.Now().Unix() + + for i, c := range cpool.Clients { + for j, cn := range c.ClientNodes { + if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + fmt.Println("Removing client node") + c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) + } + } + if len(c.ClientNodes) == 0 { + fmt.Println("Removing client itself") + cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...) + } + } +} +func (cpool *ClientPool) StatusLog() { + log.Printf("Active clients: %d", len(cpool.Clients)) +} diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go new file mode 100644 index 00000000..ae91a8c2 --- /dev/null +++ b/nodes/node_balancer/cmd/clients_test.go @@ -0,0 +1,88 @@ +package cmd + +import ( + "reflect" + "testing" + "time" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" +) + +func TestAddClient(t *testing.T) { + var cases = []struct { + clients []Client + ip string + expected int + }{ + {[]Client{}, "localhost", 1}, + {[]Client{{IP: "localhost"}}, "192.168.1.2", 2}, + } + for _, c := range cases { + clientPool.Clients = []*Client{} + for _, client := range c.clients { + clientPool.Clients = append(clientPool.Clients, &client) + } + + clientPool.AddClient(c.ip) + if len(clientPool.Clients) != c.expected { + t.Log("Wrong number of clients") + t.Fatal() + } + } +} + +func TestCleanInactiveClientNodes(t *testing.T) { + ts := time.Now().Unix() + + var cases = []struct { + clients []Client + expected int + }{ + {[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}}}, 0}, + {[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts}}}}, 1}, + {[]Client{ + {IP: "localhost", ClientNodes: []ClientNode{ + {LastCallTs: ts, Blockchain: "polygon"}, + {LastCallTs: ts, Blockchain: "ethereum"}, + {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 1, Blockchain: "solana"}, + }}}, 2}, + } + for _, c := range cases { + clientPool.Clients = []*Client{} + for _, client := range c.clients { + clientPool.Clients = append(clientPool.Clients, &client) + } + + clientPool.CleanInactiveClientNodes() + for _, client := range clientPool.Clients { + if len(client.ClientNodes) != c.expected { + t.Log("Wrong number of client nodes") + t.Fatal() + } + } + + } +} + +func TestGetClientNode(t *testing.T) { + var cases = []struct { + clients []Client + blockchain string + ip string + expected *Node + }{ + {[]Client{{IP: "localhost"}}, "ethereum", "192.168.1.2", nil}, + } + for _, c := range cases { + clientPool.Clients = []*Client{} + for _, client := range c.clients { + clientPool.Clients = append(clientPool.Clients, &client) + } + + clientNode := clientPool.GetClientNode(c.blockchain, c.ip) + if !reflect.DeepEqual(clientNode, c.expected) { + t.Log("Wrong value") + t.Fatal() + } + } +} diff --git a/nodes/node_balancer/cmd/data.go b/nodes/node_balancer/cmd/data.go index 323dc59d..7070c5fa 100644 --- a/nodes/node_balancer/cmd/data.go +++ b/nodes/node_balancer/cmd/data.go @@ -17,6 +17,23 @@ type NodeStatusResponse struct { CurrentBlock uint64 `json:"current_block"` } +// Node - which one node client worked with +// LastCallTs - timestamp from last call +type ClientNode struct { + Blockchain string + Node *Node + LastCallTs int64 +} + +type Client struct { + IP string + ClientNodes []ClientNode +} + +type ClientPool struct { + Clients []*Client +} + // Node structure with // StatusURL for status server at node endpoint // GethURL for geth/bor/etc node http.server endpoint diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index 94108c6c..6c33284d 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -6,6 +6,7 @@ package cmd import ( "encoding/json" "fmt" + "net" "net/http" "strings" ) @@ -31,23 +32,34 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } // Chose one node - peer := blockchainPool.GetNextPeer(blockchain) - if peer == nil { - http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + http.Error(w, "Unable to parse client IP", http.StatusInternalServerError) return } + var node *Node + node = clientPool.GetClientNode(blockchain, ip) + if node == nil { + node = blockchainPool.GetNextNode(blockchain) + if node == nil { + http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + return + } + clientPool.AddClientNode(ip, blockchain, node) + } + // Save origin path, to use in proxyErrorHandler if node will not response r.Header.Add("X-Origin-Path", r.URL.Path) switch { case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/ping", blockchain)): r.URL.Path = "/ping" - peer.StatusReverseProxy.ServeHTTP(w, r) + node.StatusReverseProxy.ServeHTTP(w, r) return case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): r.URL.Path = "/" - peer.GethReverseProxy.ServeHTTP(w, r) + node.GethReverseProxy.ServeHTTP(w, r) return default: http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index adc97885..063520d1 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -22,13 +22,15 @@ var reporter *humbug.HumbugReporter // initHealthCheck runs a routine for check status of the nodes every 5 seconds func initHealthCheck(debug bool) { - t := time.NewTicker(configs.LB_HEALTH_CHECK_INTERVAL) + t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: blockchainPool.HealthCheck() + clientPool.CleanInactiveClientNodes() if debug { blockchainPool.StatusLog() + clientPool.StatusLog() } } } @@ -60,13 +62,13 @@ func GetRetryFromContext(r *http.Request) int { func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { retries := GetRetryFromContext(r) - if retries < configs.LB_CONNECTION_RETRIES { + if retries < configs.NB_CONNECTION_RETRIES { log.Printf( "An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n", - url, retries+1, configs.LB_CONNECTION_RETRIES, e.Error(), + url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(), ) select { - case <-time.After(configs.LB_CONNECTION_RETRIES_INTERVAL): + case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL): ctx := context.WithValue(r.Context(), Retry, retries+1) proxy.ServeHTTP(w, r.WithContext(ctx)) } @@ -107,6 +109,8 @@ func InitServer() { return } + clientPool.AddClient("localhost") + // Configure Humbug reporter to handle errors var err error sessionID := uuid.New().String() diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index e36ad79f..62452890 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -50,7 +50,7 @@ func checkEnvVarSet() { if MOONSTREAM_NODE_POLYGON_B_IPC_ADDR == "" { MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal" } - + if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" { log.Fatal("Some of environment variables not set") } @@ -90,10 +90,13 @@ func (nc *NodeConfigList) InitNodeConfigList() { } } -var LB_CONNECTION_RETRIES = 2 -var LB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10 -var LB_HEALTH_CHECK_INTERVAL = time.Second * 5 -var LB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 +var NB_CONNECTION_RETRIES = 2 +var NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10 +var NB_HEALTH_CHECK_INTERVAL = time.Second * 5 +var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 + +// Client config +var NB_CLIENT_NODE_KEEP_ALIVE = int64(10) // Seconds // Humbug config var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN") From f031de68a400b90e1cc57a6d82e35f44bb0b60ff Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jan 2022 20:45:43 +0000 Subject: [PATCH 02/13] AddClient with AddClientNode united --- nodes/node_balancer/cmd/clients.go | 41 ++++++++++--------------- nodes/node_balancer/cmd/clients_test.go | 23 -------------- nodes/node_balancer/cmd/server.go | 4 +-- nodes/node_balancer/configs/settings.go | 2 +- 4 files changed, 19 insertions(+), 51 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 40974129..703e5fee 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -11,7 +11,9 @@ import ( var clientPool ClientPool -func (cpool *ClientPool) AddClient(ip string) { +func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { + ts := time.Now().Unix() + var client *Client for _, c := range cpool.Clients { if c.IP == ip { @@ -26,31 +28,22 @@ func (cpool *ClientPool) AddClient(ip string) { } cpool.Clients = append(cpool.Clients, client) } -} -func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { - ts := time.Now().Unix() - - for _, c := range cpool.Clients { - if c.IP == ip { - newNode := true - - for _, cn := range c.ClientNodes { - if reflect.DeepEqual(cn.Node, node) { - cn.LastCallTs = ts - newNode = false - } - } - - if newNode { - c.ClientNodes = append(c.ClientNodes, ClientNode{ - Blockchain: blockchain, - Node: node, - LastCallTs: ts, - }) - } + newNode := true + for _, cn := range client.ClientNodes { + if reflect.DeepEqual(cn.Node, node) { + cn.LastCallTs = ts + newNode = false } } + + if newNode { + client.ClientNodes = append(client.ClientNodes, ClientNode{ + Blockchain: blockchain, + Node: node, + LastCallTs: ts, + }) + } } func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { @@ -62,7 +55,7 @@ func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { if cn.Blockchain == blockchain { if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { cn.LastCallTs = ts - fmt.Println("Hot client node found, re-use it") + fmt.Printf("Hot client node found: %s, re-use it", cn.Node.GethURL) return cn.Node } else { fmt.Println("Client node outdated, remove it") diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go index ae91a8c2..9d98064e 100644 --- a/nodes/node_balancer/cmd/clients_test.go +++ b/nodes/node_balancer/cmd/clients_test.go @@ -8,29 +8,6 @@ import ( configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) -func TestAddClient(t *testing.T) { - var cases = []struct { - clients []Client - ip string - expected int - }{ - {[]Client{}, "localhost", 1}, - {[]Client{{IP: "localhost"}}, "192.168.1.2", 2}, - } - for _, c := range cases { - clientPool.Clients = []*Client{} - for _, client := range c.clients { - clientPool.Clients = append(clientPool.Clients, &client) - } - - clientPool.AddClient(c.ip) - if len(clientPool.Clients) != c.expected { - t.Log("Wrong number of clients") - t.Fatal() - } - } -} - func TestCleanInactiveClientNodes(t *testing.T) { ts := time.Now().Unix() diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 063520d1..13e00717 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -29,7 +29,7 @@ func initHealthCheck(debug bool) { blockchainPool.HealthCheck() clientPool.CleanInactiveClientNodes() if debug { - blockchainPool.StatusLog() + // blockchainPool.StatusLog() clientPool.StatusLog() } } @@ -109,8 +109,6 @@ func InitServer() { return } - clientPool.AddClient("localhost") - // Configure Humbug reporter to handle errors var err error sessionID := uuid.New().String() diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index 62452890..a2c917f6 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -96,7 +96,7 @@ var NB_HEALTH_CHECK_INTERVAL = time.Second * 5 var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 // Client config -var NB_CLIENT_NODE_KEEP_ALIVE = int64(10) // Seconds +var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // Seconds // Humbug config var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN") From de0ef498258e724ecd7c7c2ba2a426513eb57c23 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jan 2022 21:04:21 +0000 Subject: [PATCH 03/13] Extended client nodebalancer with additional comments --- nodes/node_balancer/cmd/clients.go | 31 +++++++++++++++---------- nodes/node_balancer/cmd/server.go | 3 +-- nodes/node_balancer/configs/settings.go | 2 +- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 703e5fee..4942070a 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -1,7 +1,6 @@ package cmd import ( - "fmt" "log" "reflect" "time" @@ -11,9 +10,12 @@ import ( var clientPool ClientPool +// Add client node and client itself if doesn't exist +// TODO(kompotkot): Add mutes as for balancer func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { ts := time.Now().Unix() + // Find in list clint with same IP var client *Client for _, c := range cpool.Clients { if c.IP == ip { @@ -21,8 +23,8 @@ func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { } } + // Add new client if doesn't exist if client == nil { - fmt.Println("Adding new client") client = &Client{ IP: ip, } @@ -46,6 +48,8 @@ func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { } } +// Get client hot node if exists +// TODO(kompotkot): Add mutes as for balancer func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { ts := time.Now().Unix() @@ -54,13 +58,14 @@ func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { for j, cn := range c.ClientNodes { if cn.Blockchain == blockchain { if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { - cn.LastCallTs = ts - fmt.Printf("Hot client node found: %s, re-use it", cn.Node.GethURL) - return cn.Node - } else { - fmt.Println("Client node outdated, remove it") - c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) + // Hot node for client found, use it + if cn.Node.IsAlive() { + cn.LastCallTs = ts + return cn.Node + } } + // Remove outdated hot node from client hot nodes list + c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) } } } @@ -69,22 +74,24 @@ func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { return nil } +// Clean client list of hot nodes from outdated +// TODO(kompotkot): Add mutes as for balancer func (cpool *ClientPool) CleanInactiveClientNodes() { ts := time.Now().Unix() for i, c := range cpool.Clients { for j, cn := range c.ClientNodes { if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { - fmt.Println("Removing client node") + // Remove client's node c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) } } + + // If there are no hot nodes under client, remove client if len(c.ClientNodes) == 0 { - fmt.Println("Removing client itself") cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...) } } -} -func (cpool *ClientPool) StatusLog() { + log.Printf("Active clients: %d", len(cpool.Clients)) } diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 13e00717..257bd39c 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -29,8 +29,7 @@ func initHealthCheck(debug bool) { blockchainPool.HealthCheck() clientPool.CleanInactiveClientNodes() if debug { - // blockchainPool.StatusLog() - clientPool.StatusLog() + blockchainPool.StatusLog() } } } diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index a2c917f6..664995b6 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -96,7 +96,7 @@ var NB_HEALTH_CHECK_INTERVAL = time.Second * 5 var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 // Client config -var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // Seconds +var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds // Humbug config var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN") From d0ebd4782f1cc4a30d3fbc32b5ed8ec1f548867c Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jan 2022 21:21:50 +0000 Subject: [PATCH 04/13] Updated logs for middleware and status of nodes at node balancer --- nodes/node_balancer/cmd/balancer.go | 3 +-- nodes/node_balancer/cmd/clients.go | 2 +- nodes/node_balancer/cmd/middleware.go | 8 +++++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/nodes/node_balancer/cmd/balancer.go b/nodes/node_balancer/cmd/balancer.go index e23b5543..e7fcb210 100644 --- a/nodes/node_balancer/cmd/balancer.go +++ b/nodes/node_balancer/cmd/balancer.go @@ -113,7 +113,6 @@ func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { continue } - fmt.Printf("Used node: %s with hi block: %d\n", np.Nodes[idx].GethURL, np.Nodes[idx].CurrentBlock) return np.Nodes[idx] } } @@ -180,7 +179,7 @@ func (bpool *BlockchainPool) HealthCheck() { n.SetCurrentBlock(statusResponse.CurrentBlock) } - log.Printf("Node %s is alive: %t with current block: %d\n", n.StatusURL, true, statusResponse.CurrentBlock) + log.Printf("Node %s is alive: %t with current block: %d blockchain called: %d times\n", n.StatusURL, true, statusResponse.CurrentBlock, b.Current) } } } diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 4942070a..43b7650b 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -93,5 +93,5 @@ func (cpool *ClientPool) CleanInactiveClientNodes() { } } - log.Printf("Active clients: %d", len(cpool.Clients)) + log.Printf("Active clients: %d\n", len(cpool.Clients)) } diff --git a/nodes/node_balancer/cmd/middleware.go b/nodes/node_balancer/cmd/middleware.go index 60976441..1bc69fda 100644 --- a/nodes/node_balancer/cmd/middleware.go +++ b/nodes/node_balancer/cmd/middleware.go @@ -5,6 +5,7 @@ package cmd import ( "log" + "net" "net/http" humbug "github.com/bugout-dev/humbug/go/pkg" @@ -31,6 +32,11 @@ func panicMiddleware(next http.Handler) http.Handler { func logMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { next.ServeHTTP(w, r) - log.Printf("%s %s %s\n", r.Method, r.URL.Path, r.RemoteAddr) + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + log.Printf("Unable to parse client IP: %s\n", r.RemoteAddr) + } else { + log.Printf("%s %s %s\n", ip, r.Method, r.URL.Path) + } }) } From dd91735887f4ff71a7461ff3297b2211354c5bca Mon Sep 17 00:00:00 2001 From: kompotkot Date: Fri, 14 Jan 2022 14:14:48 +0000 Subject: [PATCH 05/13] ClientPool as map instead of slice --- nodes/node_balancer/cmd/clients.go | 99 ++++++++++--------------- nodes/node_balancer/cmd/clients_test.go | 85 ++++++++++----------- nodes/node_balancer/cmd/data.go | 9 +-- nodes/node_balancer/cmd/routes.go | 2 +- nodes/node_balancer/cmd/server.go | 3 + 5 files changed, 89 insertions(+), 109 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 43b7650b..02d3de6e 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -1,6 +1,7 @@ package cmd import ( + // "fmt" "log" "reflect" "time" @@ -10,88 +11,66 @@ import ( var clientPool ClientPool +func (client *Client) GetClientNode() (clientNode *Node) { + client.mux.RLock() + clientNode = client.Node + client.mux.RUnlock() + return clientNode +} + // Add client node and client itself if doesn't exist -// TODO(kompotkot): Add mutes as for balancer -func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) { +// TODO(kompotkot): Add mutex as for balancer +func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) *Client { ts := time.Now().Unix() - // Find in list clint with same IP - var client *Client - for _, c := range cpool.Clients { - if c.IP == ip { - client = c + currentClient := cpool.Client[id] + + // Find clint with same ID and update timestamp or + // add new one if doesn't exist + if currentClient != nil { + if reflect.DeepEqual(currentClient.Node, node) { + currentClient.LastCallTs = ts + return currentClient } } - // Add new client if doesn't exist - if client == nil { - client = &Client{ - IP: ip, - } - cpool.Clients = append(cpool.Clients, client) - } - - newNode := true - for _, cn := range client.ClientNodes { - if reflect.DeepEqual(cn.Node, node) { - cn.LastCallTs = ts - newNode = false - } - } - - if newNode { - client.ClientNodes = append(client.ClientNodes, ClientNode{ - Blockchain: blockchain, - Node: node, - LastCallTs: ts, - }) - } + currentClient.Blockchain = blockchain + currentClient.Node = node + currentClient.LastCallTs = ts + return currentClient } // Get client hot node if exists -// TODO(kompotkot): Add mutes as for balancer -func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node { +// TODO(kompotkot): Add mutex as for balancer +func (cpool *ClientPool) GetClientNode(id string) *Node { ts := time.Now().Unix() - for _, c := range cpool.Clients { - if c.IP == ip { - for j, cn := range c.ClientNodes { - if cn.Blockchain == blockchain { - if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { - // Hot node for client found, use it - if cn.Node.IsAlive() { - cn.LastCallTs = ts - return cn.Node - } - } - // Remove outdated hot node from client hot nodes list - c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) - } - } + currentClient := cpool.Client[id] + + if currentClient != nil { + if ts-currentClient.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + currentClient.LastCallTs = ts + return currentClient.Node } + delete(cpool.Client, id) } return nil } // Clean client list of hot nodes from outdated -// TODO(kompotkot): Add mutes as for balancer +// TODO(kompotkot): Add mutex as for balancer func (cpool *ClientPool) CleanInactiveClientNodes() { ts := time.Now().Unix() - for i, c := range cpool.Clients { - for j, cn := range c.ClientNodes { - if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { - // Remove client's node - c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...) - } - } - - // If there are no hot nodes under client, remove client - if len(c.ClientNodes) == 0 { - cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...) + cnt := 0 + for id, client := range cpool.Client { + if ts-client.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + delete(cpool.Client, id) + } else { + cnt += 1 } } - log.Printf("Active clients: %d\n", len(cpool.Clients)) + log.Printf("Active clients: %d\n", cnt) } diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go index 9d98064e..31448b1e 100644 --- a/nodes/node_balancer/cmd/clients_test.go +++ b/nodes/node_balancer/cmd/clients_test.go @@ -8,58 +8,59 @@ import ( configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) -func TestCleanInactiveClientNodes(t *testing.T) { +func TestGetClientNode(t *testing.T) { ts := time.Now().Unix() var cases = []struct { - clients []Client - expected int + clients map[string]*Client + id string + expected *Node }{ - {[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}}}, 0}, - {[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts}}}}, 1}, - {[]Client{ - {IP: "localhost", ClientNodes: []ClientNode{ - {LastCallTs: ts, Blockchain: "polygon"}, - {LastCallTs: ts, Blockchain: "ethereum"}, - {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 1, Blockchain: "solana"}, - }}}, 2}, + {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, + {map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } for _, c := range cases { - clientPool.Clients = []*Client{} - for _, client := range c.clients { - clientPool.Clients = append(clientPool.Clients, &client) + clientPool.Client = make(map[string]*Client) + for id, client := range c.clients { + clientPool.Client[id] = client } - clientPool.CleanInactiveClientNodes() - for _, client := range clientPool.Clients { - if len(client.ClientNodes) != c.expected { - t.Log("Wrong number of client nodes") - t.Fatal() - } - } - - } -} - -func TestGetClientNode(t *testing.T) { - var cases = []struct { - clients []Client - blockchain string - ip string - expected *Node - }{ - {[]Client{{IP: "localhost"}}, "ethereum", "192.168.1.2", nil}, - } - for _, c := range cases { - clientPool.Clients = []*Client{} - for _, client := range c.clients { - clientPool.Clients = append(clientPool.Clients, &client) - } - - clientNode := clientPool.GetClientNode(c.blockchain, c.ip) + clientNode := clientPool.GetClientNode(c.id) if !reflect.DeepEqual(clientNode, c.expected) { - t.Log("Wrong value") + t.Log("Wrong node returned") t.Fatal() } } } + +func TestCleanInactiveClientNodes(t *testing.T) { + ts := time.Now().Unix() + + var cases = []struct { + clients map[string]*Client + expected string + }{ + {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, + {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts}}, "1"}, + {map[string]*Client{ + "1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, + "2": {Blockchain: "polygon", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, + "3": {Blockchain: "stellar", LastCallTs: ts}, + }, "3"}, + } + for _, c := range cases { + clientPool.Client = make(map[string]*Client) + for id, client := range c.clients { + clientPool.Client[id] = client + } + + clientPool.CleanInactiveClientNodes() + for key := range clientPool.Client { + if key != c.expected { + t.Log("Wrong client was removed") + t.Fatal() + } + } + } +} diff --git a/nodes/node_balancer/cmd/data.go b/nodes/node_balancer/cmd/data.go index 7070c5fa..9a578e72 100644 --- a/nodes/node_balancer/cmd/data.go +++ b/nodes/node_balancer/cmd/data.go @@ -19,19 +19,16 @@ type NodeStatusResponse struct { // Node - which one node client worked with // LastCallTs - timestamp from last call -type ClientNode struct { +type Client struct { Blockchain string Node *Node LastCallTs int64 -} -type Client struct { - IP string - ClientNodes []ClientNode + mux sync.RWMutex } type ClientPool struct { - Clients []*Client + Client map[string]*Client } // Node structure with diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index 6c33284d..e8c2d7e0 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -39,7 +39,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } var node *Node - node = clientPool.GetClientNode(blockchain, ip) + node = clientPool.GetClientNode(ip) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 257bd39c..13cd15a7 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -108,6 +108,9 @@ func InitServer() { return } + // Generate map of clients + clientPool.Client = make(map[string]*Client) + // Configure Humbug reporter to handle errors var err error sessionID := uuid.New().String() From 491a3744372de16745cf09e9642aa44f1c928122 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Fri, 14 Jan 2022 18:10:23 +0000 Subject: [PATCH 06/13] Fixed add client node --- nodes/node_balancer/cmd/clients.go | 35 +++++++++++-------------- nodes/node_balancer/cmd/clients_test.go | 26 ++++++++++++++++-- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 02d3de6e..2e0b300c 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -1,7 +1,6 @@ package cmd import ( - // "fmt" "log" "reflect" "time" @@ -11,33 +10,31 @@ import ( var clientPool ClientPool -func (client *Client) GetClientNode() (clientNode *Node) { - client.mux.RLock() - clientNode = client.Node - client.mux.RUnlock() - return clientNode -} +// func (client *Client) GetClientNode() (clientNode *Node) { +// client.mux.RLock() +// clientNode = client.Node +// client.mux.RUnlock() +// return clientNode +// } // Add client node and client itself if doesn't exist // TODO(kompotkot): Add mutex as for balancer -func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) *Client { +func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) { ts := time.Now().Unix() - currentClient := cpool.Client[id] - // Find clint with same ID and update timestamp or // add new one if doesn't exist - if currentClient != nil { - if reflect.DeepEqual(currentClient.Node, node) { - currentClient.LastCallTs = ts - return currentClient + if cpool.Client[id] != nil { + if reflect.DeepEqual(cpool.Client[id].Node, node) { + cpool.Client[id].LastCallTs = ts + return } } - - currentClient.Blockchain = blockchain - currentClient.Node = node - currentClient.LastCallTs = ts - return currentClient + cpool.Client[id] = &Client{ + Blockchain: blockchain, + Node: node, + LastCallTs: ts, + } } // Get client hot node if exists diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go index 31448b1e..9ad1f7e7 100644 --- a/nodes/node_balancer/cmd/clients_test.go +++ b/nodes/node_balancer/cmd/clients_test.go @@ -8,6 +8,27 @@ import ( configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) +func TestAddClientNode(t *testing.T) { + var cases = []struct { + clients map[string]*Client + expected string + }{ + {map[string]*Client{"1": {Blockchain: "ethereum", Node: &Node{Alive: true}}}, "1"}, + } + for _, c := range cases { + clientPool.Client = make(map[string]*Client) + for id, client := range c.clients { + clientPool.AddClientNode(id, client.Blockchain, client.Node) + } + for id := range clientPool.Client { + if id != c.expected { + t.Log("Wrong client was added") + t.Fatal() + } + } + } +} + func TestGetClientNode(t *testing.T) { ts := time.Now().Unix() @@ -16,6 +37,7 @@ func TestGetClientNode(t *testing.T) { id string expected *Node }{ + {map[string]*Client{}, "1", nil}, {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, @@ -56,8 +78,8 @@ func TestCleanInactiveClientNodes(t *testing.T) { } clientPool.CleanInactiveClientNodes() - for key := range clientPool.Client { - if key != c.expected { + for id := range clientPool.Client { + if id != c.expected { t.Log("Wrong client was removed") t.Fatal() } From aab3b8f4ac4bb00a85fb595ba15ccce84777a70e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:09:44 +0000 Subject: [PATCH 07/13] Mutex and blockchain respect for client node balancer --- nodes/node_balancer/cmd/clients.go | 82 ++++++++++++++++--------- nodes/node_balancer/cmd/clients_test.go | 38 ++++++------ nodes/node_balancer/cmd/data.go | 1 - nodes/node_balancer/cmd/routes.go | 9 ++- nodes/node_balancer/cmd/server.go | 5 +- 5 files changed, 82 insertions(+), 53 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index 2e0b300c..c591de24 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -1,6 +1,7 @@ package cmd import ( + "errors" "log" "reflect" "time" @@ -8,46 +9,71 @@ import ( configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) -var clientPool ClientPool +var ethereumClientPool ClientPool +var polygonClientPool ClientPool -// func (client *Client) GetClientNode() (clientNode *Node) { -// client.mux.RLock() -// clientNode = client.Node -// client.mux.RUnlock() -// return clientNode -// } +// Generate client pools for different blockchains +func CreateClientPools() { + ethereumClientPool.Client = make(map[string]*Client) + polygonClientPool.Client = make(map[string]*Client) +} -// Add client node and client itself if doesn't exist -// TODO(kompotkot): Add mutex as for balancer -func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) { +// Return client pool correspongin to blockchain +func GetClientPool(blockchain string) (*ClientPool, error) { + var cpool *ClientPool + if blockchain == "ethereum" { + cpool = ðereumClientPool + } else if blockchain == "polygon" { + cpool = &polygonClientPool + } else { + return nil, errors.New("Unexisting blockchain provided") + } + return cpool, nil +} + +// Updates client last appeal to node +func (client *Client) UpdateClientLastCall() { ts := time.Now().Unix() - // Find clint with same ID and update timestamp or - // add new one if doesn't exist + client.mux.Lock() + client.LastCallTs = ts + client.mux.Unlock() +} + +// Get number of seconds from current last call to client node +func (client *Client) GetClientLastCallDiff() (lastCallTs int64) { + ts := time.Now().Unix() + + client.mux.RLock() + lastCallTs = ts - client.LastCallTs + client.mux.RUnlock() + + return lastCallTs +} + +// Find clint with same ID and update timestamp or +// add new one if doesn't exist +func (cpool *ClientPool) AddClientNode(id string, node *Node) { + if cpool.Client[id] != nil { if reflect.DeepEqual(cpool.Client[id].Node, node) { - cpool.Client[id].LastCallTs = ts + cpool.Client[id].UpdateClientLastCall() return } } cpool.Client[id] = &Client{ - Blockchain: blockchain, Node: node, - LastCallTs: ts, + LastCallTs: time.Now().Unix(), } } // Get client hot node if exists -// TODO(kompotkot): Add mutex as for balancer func (cpool *ClientPool) GetClientNode(id string) *Node { - ts := time.Now().Unix() - - currentClient := cpool.Client[id] - - if currentClient != nil { - if ts-currentClient.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { - currentClient.LastCallTs = ts - return currentClient.Node + if cpool.Client[id] != nil { + lastCallTs := cpool.Client[id].GetClientLastCallDiff() + if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + cpool.Client[id].UpdateClientLastCall() + return cpool.Client[id].Node } delete(cpool.Client, id) } @@ -55,14 +81,12 @@ func (cpool *ClientPool) GetClientNode(id string) *Node { return nil } -// Clean client list of hot nodes from outdated -// TODO(kompotkot): Add mutex as for balancer +// Clean client list of hot outdated nodes func (cpool *ClientPool) CleanInactiveClientNodes() { - ts := time.Now().Unix() - cnt := 0 for id, client := range cpool.Client { - if ts-client.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + lastCallTs := client.GetClientLastCallDiff() + if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { delete(cpool.Client, id) } else { cnt += 1 diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go index 9ad1f7e7..bcdb0947 100644 --- a/nodes/node_balancer/cmd/clients_test.go +++ b/nodes/node_balancer/cmd/clients_test.go @@ -13,14 +13,14 @@ func TestAddClientNode(t *testing.T) { clients map[string]*Client expected string }{ - {map[string]*Client{"1": {Blockchain: "ethereum", Node: &Node{Alive: true}}}, "1"}, + {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.AddClientNode(id, client.Blockchain, client.Node) + ethereumClientPool.AddClientNode(id, client.Node) } - for id := range clientPool.Client { + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was added") t.Fatal() @@ -38,17 +38,17 @@ func TestGetClientNode(t *testing.T) { expected *Node }{ {map[string]*Client{}, "1", nil}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, - {map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, + {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.Client[id] = client + ethereumClientPool.Client[id] = client } - clientNode := clientPool.GetClientNode(c.id) + clientNode := ethereumClientPool.GetClientNode(c.id) if !reflect.DeepEqual(clientNode, c.expected) { t.Log("Wrong node returned") t.Fatal() @@ -63,22 +63,22 @@ func TestCleanInactiveClientNodes(t *testing.T) { clients map[string]*Client expected string }{ - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, - {map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts}}, "1"}, + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, + {map[string]*Client{"1": {LastCallTs: ts}}, "1"}, {map[string]*Client{ - "1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, - "2": {Blockchain: "polygon", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, - "3": {Blockchain: "stellar", LastCallTs: ts}, + "1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, + "2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, + "3": {LastCallTs: ts}, }, "3"}, } for _, c := range cases { - clientPool.Client = make(map[string]*Client) + CreateClientPools() for id, client := range c.clients { - clientPool.Client[id] = client + ethereumClientPool.Client[id] = client } - clientPool.CleanInactiveClientNodes() - for id := range clientPool.Client { + ethereumClientPool.CleanInactiveClientNodes() + for id := range ethereumClientPool.Client { if id != c.expected { t.Log("Wrong client was removed") t.Fatal() diff --git a/nodes/node_balancer/cmd/data.go b/nodes/node_balancer/cmd/data.go index 9a578e72..0a1d3e0a 100644 --- a/nodes/node_balancer/cmd/data.go +++ b/nodes/node_balancer/cmd/data.go @@ -20,7 +20,6 @@ type NodeStatusResponse struct { // Node - which one node client worked with // LastCallTs - timestamp from last call type Client struct { - Blockchain string Node *Node LastCallTs int64 diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index e8c2d7e0..efb2e9fe 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -39,14 +39,19 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } var node *Node - node = clientPool.GetClientNode(ip) + cpool, err := GetClientPool(blockchain) + if err != nil { + http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest) + return + } + cpool.GetClientNode(ip) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) return } - clientPool.AddClientNode(ip, blockchain, node) + cpool.AddClientNode(ip, node) } // Save origin path, to use in proxyErrorHandler if node will not response diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index 13cd15a7..c25166e1 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -27,7 +27,8 @@ func initHealthCheck(debug bool) { select { case <-t.C: blockchainPool.HealthCheck() - clientPool.CleanInactiveClientNodes() + ethereumClientPool.CleanInactiveClientNodes() + polygonClientPool.CleanInactiveClientNodes() if debug { blockchainPool.StatusLog() } @@ -109,7 +110,7 @@ func InitServer() { } // Generate map of clients - clientPool.Client = make(map[string]*Client) + CreateClientPools() // Configure Humbug reporter to handle errors var err error From c2f95e9217812bfa24a94648efb3cd25022108ad Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:16:31 +0000 Subject: [PATCH 08/13] Correct active clients logging --- nodes/node_balancer/cmd/clients.go | 5 ++--- nodes/node_balancer/cmd/server.go | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go index c591de24..95c9f42d 100644 --- a/nodes/node_balancer/cmd/clients.go +++ b/nodes/node_balancer/cmd/clients.go @@ -2,7 +2,6 @@ package cmd import ( "errors" - "log" "reflect" "time" @@ -82,7 +81,7 @@ func (cpool *ClientPool) GetClientNode(id string) *Node { } // Clean client list of hot outdated nodes -func (cpool *ClientPool) CleanInactiveClientNodes() { +func (cpool *ClientPool) CleanInactiveClientNodes() int { cnt := 0 for id, client := range cpool.Client { lastCallTs := client.GetClientLastCallDiff() @@ -93,5 +92,5 @@ func (cpool *ClientPool) CleanInactiveClientNodes() { } } - log.Printf("Active clients: %d\n", cnt) + return cnt } diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index c25166e1..be57b3ec 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -27,8 +27,9 @@ func initHealthCheck(debug bool) { select { case <-t.C: blockchainPool.HealthCheck() - ethereumClientPool.CleanInactiveClientNodes() - polygonClientPool.CleanInactiveClientNodes() + ethereumClients := ethereumClientPool.CleanInactiveClientNodes() + polygonClients := polygonClientPool.CleanInactiveClientNodes() + log.Printf("Active etehereum clients: %d, polygon clients: %d\n", ethereumClients, polygonClients) if debug { blockchainPool.StatusLog() } From 723c9db6be0f614a2fd6b3024669e893b3f88dc7 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:37:08 +0000 Subject: [PATCH 09/13] Work with client id header instead of ip --- nodes/node_balancer/cmd/routes.go | 19 ++++++++++++------- nodes/node_balancer/configs/settings.go | 5 +++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index efb2e9fe..695a7205 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -6,9 +6,12 @@ package cmd import ( "encoding/json" "fmt" + "log" "net" "net/http" "strings" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) // pingRoute response with status of load balancer server itself @@ -31,27 +34,29 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { return } - // Chose one node - ip, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - http.Error(w, "Unable to parse client IP", http.StatusInternalServerError) - return + clientId := w.Header().Get(configs.MOONSTREAM_CLIENT_ID_HEADER) + if clientId == "" { + log.Printf("Empty client id provided") + // TODO(kompotkot): After all internal crawlers and services start + // providing client id header, then replace to http.Error + clientId = "none" } + // Chose one node var node *Node cpool, err := GetClientPool(blockchain) if err != nil { http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest) return } - cpool.GetClientNode(ip) + cpool.GetClientNode(clientId) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) return } - cpool.AddClientNode(ip, node) + cpool.AddClientNode(clientId, node) } // Save origin path, to use in proxyErrorHandler if node will not response diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index 664995b6..5eb88113 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -35,6 +35,7 @@ var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IP var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR") var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT") var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT") +var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER") func checkEnvVarSet() { if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" { @@ -51,6 +52,10 @@ func checkEnvVarSet() { MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal" } + if MOONSTREAM_CLIENT_ID_HEADER == "" { + MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id" + } + if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" { log.Fatal("Some of environment variables not set") } From 3d11433056d0a4e401e53c51a9aff9ba3b246343 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:38:18 +0000 Subject: [PATCH 10/13] Fixed imports --- nodes/node_balancer/cmd/routes.go | 1 - 1 file changed, 1 deletion(-) diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index 695a7205..d6123c8e 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "log" - "net" "net/http" "strings" From 0e5fb1307e72ae456b69c84a8452c1f975842c44 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jan 2022 11:41:35 +0000 Subject: [PATCH 11/13] Reduced logging --- nodes/node_balancer/cmd/balancer.go | 2 +- nodes/node_balancer/cmd/routes.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/nodes/node_balancer/cmd/balancer.go b/nodes/node_balancer/cmd/balancer.go index e7fcb210..c0939b01 100644 --- a/nodes/node_balancer/cmd/balancer.go +++ b/nodes/node_balancer/cmd/balancer.go @@ -104,7 +104,7 @@ func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { // If we have an alive one, use it and store if its not the original one if np.Nodes[idx].IsAlive() { if i != next { - log.Printf("Mark the current one %d", uint64(idx)) + // Mark the current one atomic.StoreUint64(&np.Current, uint64(idx)) } // Pass nodes with low blocks diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index d6123c8e..d3004291 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -6,7 +6,6 @@ package cmd import ( "encoding/json" "fmt" - "log" "net/http" "strings" @@ -35,7 +34,6 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { clientId := w.Header().Get(configs.MOONSTREAM_CLIENT_ID_HEADER) if clientId == "" { - log.Printf("Empty client id provided") // TODO(kompotkot): After all internal crawlers and services start // providing client id header, then replace to http.Error clientId = "none" @@ -48,7 +46,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest) return } - cpool.GetClientNode(clientId) + node = cpool.GetClientNode(clientId) if node == nil { node = blockchainPool.GetNextNode(blockchain) if node == nil { From 1179ea7138a3c817a8d5194ccbf9326cf650f8f8 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jan 2022 11:36:24 +0000 Subject: [PATCH 12/13] Fixed bug with infinity attempts --- nodes/node_balancer/cmd/routes.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index d3004291..672d89e4 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -6,6 +6,7 @@ package cmd import ( "encoding/json" "fmt" + "log" "net/http" "strings" @@ -21,6 +22,13 @@ func pingRoute(w http.ResponseWriter, r *http.Request) { // lbHandler load balances the incoming requests to nodes func lbHandler(w http.ResponseWriter, r *http.Request) { + attempts := GetAttemptsFromContext(r) + if attempts > configs.NB_CONNECTION_RETRIES { + log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path) + http.Error(w, "Service not available", http.StatusServiceUnavailable) + return + } + var blockchain string switch { case strings.HasPrefix(r.URL.Path, "/nb/ethereum"): From 30c5f6da9bc1abe303bf6c68035bdf07a1874f65 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jan 2022 11:38:58 +0000 Subject: [PATCH 13/13] Ethereum sync service in multi threads --- crawlers/deploy/ethereum-synchronize.service | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/deploy/ethereum-synchronize.service b/crawlers/deploy/ethereum-synchronize.service index 825e3f35..60143f14 100644 --- a/crawlers/deploy/ethereum-synchronize.service +++ b/crawlers/deploy/ethereum-synchronize.service @@ -7,7 +7,7 @@ User=ubuntu Group=www-data WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 1 +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 2 SyslogIdentifier=ethereum-synchronize [Install]