diff --git a/crawlers/deploy/ethereum-synchronize.service b/crawlers/deploy/ethereum-synchronize.service index 825e3f35..60143f14 100644 --- a/crawlers/deploy/ethereum-synchronize.service +++ b/crawlers/deploy/ethereum-synchronize.service @@ -7,7 +7,7 @@ User=ubuntu Group=www-data WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 1 +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 2 SyslogIdentifier=ethereum-synchronize [Install] diff --git a/nodes/node_balancer/cmd/balancer.go b/nodes/node_balancer/cmd/balancer.go index 976849c8..c0939b01 100644 --- a/nodes/node_balancer/cmd/balancer.go +++ b/nodes/node_balancer/cmd/balancer.go @@ -70,9 +70,9 @@ func (node *Node) GetCurrentBlock() (currentBlock uint64) { return currentBlock } -// GetNextPeer returns next active peer to take a connection +// GetNextNode returns next active peer to take a connection // Loop through entire nodes to find out an alive one -func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node { +func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { highestBlock := uint64(0) // Get NodePool with correct blockchain @@ -104,7 +104,7 @@ func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node { // If we have an alive one, use it and store if its not the original one if np.Nodes[idx].IsAlive() { if i != next { - log.Printf("Mark the current one %d", uint64(idx)) + // Mark the current one atomic.StoreUint64(&np.Current, uint64(idx)) } // Pass nodes with low blocks @@ -152,7 +152,7 @@ func (bpool *BlockchainPool) HealthCheck() { n.SetCurrentBlock(0) // Get response from node /ping endpoint - httpClient := http.Client{Timeout: configs.LB_HEALTH_CHECK_CALL_TIMEOUT} + httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT} resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL)) if err != nil { log.Printf("Unable to reach node: %s\n", n.StatusURL) @@ -179,7 +179,7 @@ func (bpool *BlockchainPool) HealthCheck() { n.SetCurrentBlock(statusResponse.CurrentBlock) } - log.Printf("Node %s is alive: %t with current block: %d\n", n.StatusURL, true, statusResponse.CurrentBlock) + log.Printf("Node %s is alive: %t with current block: %d blockchain called: %d times\n", n.StatusURL, true, statusResponse.CurrentBlock, b.Current) } } } diff --git a/nodes/node_balancer/cmd/clients.go b/nodes/node_balancer/cmd/clients.go new file mode 100644 index 00000000..95c9f42d --- /dev/null +++ b/nodes/node_balancer/cmd/clients.go @@ -0,0 +1,96 @@ +package cmd + +import ( + "errors" + "reflect" + "time" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" +) + +var ethereumClientPool ClientPool +var polygonClientPool ClientPool + +// Generate client pools for different blockchains +func CreateClientPools() { + ethereumClientPool.Client = make(map[string]*Client) + polygonClientPool.Client = make(map[string]*Client) +} + +// Return client pool correspongin to blockchain +func GetClientPool(blockchain string) (*ClientPool, error) { + var cpool *ClientPool + if blockchain == "ethereum" { + cpool = ðereumClientPool + } else if blockchain == "polygon" { + cpool = &polygonClientPool + } else { + return nil, errors.New("Unexisting blockchain provided") + } + return cpool, nil +} + +// Updates client last appeal to node +func (client *Client) UpdateClientLastCall() { + ts := time.Now().Unix() + + client.mux.Lock() + client.LastCallTs = ts + client.mux.Unlock() +} + +// Get number of seconds from current last call to client node +func (client *Client) GetClientLastCallDiff() (lastCallTs int64) { + ts := time.Now().Unix() + + client.mux.RLock() + lastCallTs = ts - client.LastCallTs + client.mux.RUnlock() + + return lastCallTs +} + +// Find clint with same ID and update timestamp or +// add new one if doesn't exist +func (cpool *ClientPool) AddClientNode(id string, node *Node) { + + if cpool.Client[id] != nil { + if reflect.DeepEqual(cpool.Client[id].Node, node) { + cpool.Client[id].UpdateClientLastCall() + return + } + } + cpool.Client[id] = &Client{ + Node: node, + LastCallTs: time.Now().Unix(), + } +} + +// Get client hot node if exists +func (cpool *ClientPool) GetClientNode(id string) *Node { + if cpool.Client[id] != nil { + lastCallTs := cpool.Client[id].GetClientLastCallDiff() + if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + cpool.Client[id].UpdateClientLastCall() + return cpool.Client[id].Node + } + delete(cpool.Client, id) + } + + return nil +} + +// Clean client list of hot outdated nodes +func (cpool *ClientPool) CleanInactiveClientNodes() int { + cnt := 0 + for id, client := range cpool.Client { + lastCallTs := client.GetClientLastCallDiff() + if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + delete(cpool.Client, id) + } else { + cnt += 1 + } + } + + return cnt +} diff --git a/nodes/node_balancer/cmd/clients_test.go b/nodes/node_balancer/cmd/clients_test.go new file mode 100644 index 00000000..bcdb0947 --- /dev/null +++ b/nodes/node_balancer/cmd/clients_test.go @@ -0,0 +1,88 @@ +package cmd + +import ( + "reflect" + "testing" + "time" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" +) + +func TestAddClientNode(t *testing.T) { + var cases = []struct { + clients map[string]*Client + expected string + }{ + {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"}, + } + for _, c := range cases { + CreateClientPools() + for id, client := range c.clients { + ethereumClientPool.AddClientNode(id, client.Node) + } + for id := range ethereumClientPool.Client { + if id != c.expected { + t.Log("Wrong client was added") + t.Fatal() + } + } + } +} + +func TestGetClientNode(t *testing.T) { + ts := time.Now().Unix() + + var cases = []struct { + clients map[string]*Client + id string + expected *Node + }{ + {map[string]*Client{}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, + {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, + } + for _, c := range cases { + CreateClientPools() + for id, client := range c.clients { + ethereumClientPool.Client[id] = client + } + + clientNode := ethereumClientPool.GetClientNode(c.id) + if !reflect.DeepEqual(clientNode, c.expected) { + t.Log("Wrong node returned") + t.Fatal() + } + } +} + +func TestCleanInactiveClientNodes(t *testing.T) { + ts := time.Now().Unix() + + var cases = []struct { + clients map[string]*Client + expected string + }{ + {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, + {map[string]*Client{"1": {LastCallTs: ts}}, "1"}, + {map[string]*Client{ + "1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, + "2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, + "3": {LastCallTs: ts}, + }, "3"}, + } + for _, c := range cases { + CreateClientPools() + for id, client := range c.clients { + ethereumClientPool.Client[id] = client + } + + ethereumClientPool.CleanInactiveClientNodes() + for id := range ethereumClientPool.Client { + if id != c.expected { + t.Log("Wrong client was removed") + t.Fatal() + } + } + } +} diff --git a/nodes/node_balancer/cmd/data.go b/nodes/node_balancer/cmd/data.go index 323dc59d..0a1d3e0a 100644 --- a/nodes/node_balancer/cmd/data.go +++ b/nodes/node_balancer/cmd/data.go @@ -17,6 +17,19 @@ type NodeStatusResponse struct { CurrentBlock uint64 `json:"current_block"` } +// Node - which one node client worked with +// LastCallTs - timestamp from last call +type Client struct { + Node *Node + LastCallTs int64 + + mux sync.RWMutex +} + +type ClientPool struct { + Client map[string]*Client +} + // Node structure with // StatusURL for status server at node endpoint // GethURL for geth/bor/etc node http.server endpoint diff --git a/nodes/node_balancer/cmd/middleware.go b/nodes/node_balancer/cmd/middleware.go index 60976441..1bc69fda 100644 --- a/nodes/node_balancer/cmd/middleware.go +++ b/nodes/node_balancer/cmd/middleware.go @@ -5,6 +5,7 @@ package cmd import ( "log" + "net" "net/http" humbug "github.com/bugout-dev/humbug/go/pkg" @@ -31,6 +32,11 @@ func panicMiddleware(next http.Handler) http.Handler { func logMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { next.ServeHTTP(w, r) - log.Printf("%s %s %s\n", r.Method, r.URL.Path, r.RemoteAddr) + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + log.Printf("Unable to parse client IP: %s\n", r.RemoteAddr) + } else { + log.Printf("%s %s %s\n", ip, r.Method, r.URL.Path) + } }) } diff --git a/nodes/node_balancer/cmd/routes.go b/nodes/node_balancer/cmd/routes.go index 94108c6c..672d89e4 100644 --- a/nodes/node_balancer/cmd/routes.go +++ b/nodes/node_balancer/cmd/routes.go @@ -6,8 +6,11 @@ package cmd import ( "encoding/json" "fmt" + "log" "net/http" "strings" + + configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) // pingRoute response with status of load balancer server itself @@ -19,6 +22,13 @@ func pingRoute(w http.ResponseWriter, r *http.Request) { // lbHandler load balances the incoming requests to nodes func lbHandler(w http.ResponseWriter, r *http.Request) { + attempts := GetAttemptsFromContext(r) + if attempts > configs.NB_CONNECTION_RETRIES { + log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path) + http.Error(w, "Service not available", http.StatusServiceUnavailable) + return + } + var blockchain string switch { case strings.HasPrefix(r.URL.Path, "/nb/ethereum"): @@ -30,12 +40,29 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { return } + clientId := w.Header().Get(configs.MOONSTREAM_CLIENT_ID_HEADER) + if clientId == "" { + // TODO(kompotkot): After all internal crawlers and services start + // providing client id header, then replace to http.Error + clientId = "none" + } + // Chose one node - peer := blockchainPool.GetNextPeer(blockchain) - if peer == nil { - http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + var node *Node + cpool, err := GetClientPool(blockchain) + if err != nil { + http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest) return } + node = cpool.GetClientNode(clientId) + if node == nil { + node = blockchainPool.GetNextNode(blockchain) + if node == nil { + http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + return + } + cpool.AddClientNode(clientId, node) + } // Save origin path, to use in proxyErrorHandler if node will not response r.Header.Add("X-Origin-Path", r.URL.Path) @@ -43,11 +70,11 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { switch { case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/ping", blockchain)): r.URL.Path = "/ping" - peer.StatusReverseProxy.ServeHTTP(w, r) + node.StatusReverseProxy.ServeHTTP(w, r) return case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): r.URL.Path = "/" - peer.GethReverseProxy.ServeHTTP(w, r) + node.GethReverseProxy.ServeHTTP(w, r) return default: http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) diff --git a/nodes/node_balancer/cmd/server.go b/nodes/node_balancer/cmd/server.go index adc97885..be57b3ec 100644 --- a/nodes/node_balancer/cmd/server.go +++ b/nodes/node_balancer/cmd/server.go @@ -22,11 +22,14 @@ var reporter *humbug.HumbugReporter // initHealthCheck runs a routine for check status of the nodes every 5 seconds func initHealthCheck(debug bool) { - t := time.NewTicker(configs.LB_HEALTH_CHECK_INTERVAL) + t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: blockchainPool.HealthCheck() + ethereumClients := ethereumClientPool.CleanInactiveClientNodes() + polygonClients := polygonClientPool.CleanInactiveClientNodes() + log.Printf("Active etehereum clients: %d, polygon clients: %d\n", ethereumClients, polygonClients) if debug { blockchainPool.StatusLog() } @@ -60,13 +63,13 @@ func GetRetryFromContext(r *http.Request) int { func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { retries := GetRetryFromContext(r) - if retries < configs.LB_CONNECTION_RETRIES { + if retries < configs.NB_CONNECTION_RETRIES { log.Printf( "An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n", - url, retries+1, configs.LB_CONNECTION_RETRIES, e.Error(), + url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(), ) select { - case <-time.After(configs.LB_CONNECTION_RETRIES_INTERVAL): + case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL): ctx := context.WithValue(r.Context(), Retry, retries+1) proxy.ServeHTTP(w, r.WithContext(ctx)) } @@ -107,6 +110,9 @@ func InitServer() { return } + // Generate map of clients + CreateClientPools() + // Configure Humbug reporter to handle errors var err error sessionID := uuid.New().String() diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/configs/settings.go index e36ad79f..5eb88113 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/configs/settings.go @@ -35,6 +35,7 @@ var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IP var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR") var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT") var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT") +var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER") func checkEnvVarSet() { if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" { @@ -50,7 +51,11 @@ func checkEnvVarSet() { if MOONSTREAM_NODE_POLYGON_B_IPC_ADDR == "" { MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal" } - + + if MOONSTREAM_CLIENT_ID_HEADER == "" { + MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id" + } + if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" { log.Fatal("Some of environment variables not set") } @@ -90,10 +95,13 @@ func (nc *NodeConfigList) InitNodeConfigList() { } } -var LB_CONNECTION_RETRIES = 2 -var LB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10 -var LB_HEALTH_CHECK_INTERVAL = time.Second * 5 -var LB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 +var NB_CONNECTION_RETRIES = 2 +var NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10 +var NB_HEALTH_CHECK_INTERVAL = time.Second * 5 +var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2 + +// Client config +var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds // Humbug config var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")