kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #519 from bugout-dev/node-balancer-clients
Node balancer re-use client hot nodepull/532/head
commit
cb8b263ce6
|
@ -7,7 +7,7 @@ User=ubuntu
|
|||
Group=www-data
|
||||
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
|
||||
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 1
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 2
|
||||
SyslogIdentifier=ethereum-synchronize
|
||||
|
||||
[Install]
|
||||
|
|
|
@ -70,9 +70,9 @@ func (node *Node) GetCurrentBlock() (currentBlock uint64) {
|
|||
return currentBlock
|
||||
}
|
||||
|
||||
// GetNextPeer returns next active peer to take a connection
|
||||
// GetNextNode returns next active peer to take a connection
|
||||
// Loop through entire nodes to find out an alive one
|
||||
func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node {
|
||||
func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node {
|
||||
highestBlock := uint64(0)
|
||||
|
||||
// Get NodePool with correct blockchain
|
||||
|
@ -104,7 +104,7 @@ func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node {
|
|||
// If we have an alive one, use it and store if its not the original one
|
||||
if np.Nodes[idx].IsAlive() {
|
||||
if i != next {
|
||||
log.Printf("Mark the current one %d", uint64(idx))
|
||||
// Mark the current one
|
||||
atomic.StoreUint64(&np.Current, uint64(idx))
|
||||
}
|
||||
// Pass nodes with low blocks
|
||||
|
@ -152,7 +152,7 @@ func (bpool *BlockchainPool) HealthCheck() {
|
|||
n.SetCurrentBlock(0)
|
||||
|
||||
// Get response from node /ping endpoint
|
||||
httpClient := http.Client{Timeout: configs.LB_HEALTH_CHECK_CALL_TIMEOUT}
|
||||
httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT}
|
||||
resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL))
|
||||
if err != nil {
|
||||
log.Printf("Unable to reach node: %s\n", n.StatusURL)
|
||||
|
@ -179,7 +179,7 @@ func (bpool *BlockchainPool) HealthCheck() {
|
|||
n.SetCurrentBlock(statusResponse.CurrentBlock)
|
||||
}
|
||||
|
||||
log.Printf("Node %s is alive: %t with current block: %d\n", n.StatusURL, true, statusResponse.CurrentBlock)
|
||||
log.Printf("Node %s is alive: %t with current block: %d blockchain called: %d times\n", n.StatusURL, true, statusResponse.CurrentBlock, b.Current)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
var ethereumClientPool ClientPool
|
||||
var polygonClientPool ClientPool
|
||||
|
||||
// Generate client pools for different blockchains
|
||||
func CreateClientPools() {
|
||||
ethereumClientPool.Client = make(map[string]*Client)
|
||||
polygonClientPool.Client = make(map[string]*Client)
|
||||
}
|
||||
|
||||
// Return client pool correspongin to blockchain
|
||||
func GetClientPool(blockchain string) (*ClientPool, error) {
|
||||
var cpool *ClientPool
|
||||
if blockchain == "ethereum" {
|
||||
cpool = ðereumClientPool
|
||||
} else if blockchain == "polygon" {
|
||||
cpool = &polygonClientPool
|
||||
} else {
|
||||
return nil, errors.New("Unexisting blockchain provided")
|
||||
}
|
||||
return cpool, nil
|
||||
}
|
||||
|
||||
// Updates client last appeal to node
|
||||
func (client *Client) UpdateClientLastCall() {
|
||||
ts := time.Now().Unix()
|
||||
|
||||
client.mux.Lock()
|
||||
client.LastCallTs = ts
|
||||
client.mux.Unlock()
|
||||
}
|
||||
|
||||
// Get number of seconds from current last call to client node
|
||||
func (client *Client) GetClientLastCallDiff() (lastCallTs int64) {
|
||||
ts := time.Now().Unix()
|
||||
|
||||
client.mux.RLock()
|
||||
lastCallTs = ts - client.LastCallTs
|
||||
client.mux.RUnlock()
|
||||
|
||||
return lastCallTs
|
||||
}
|
||||
|
||||
// Find clint with same ID and update timestamp or
|
||||
// add new one if doesn't exist
|
||||
func (cpool *ClientPool) AddClientNode(id string, node *Node) {
|
||||
|
||||
if cpool.Client[id] != nil {
|
||||
if reflect.DeepEqual(cpool.Client[id].Node, node) {
|
||||
cpool.Client[id].UpdateClientLastCall()
|
||||
return
|
||||
}
|
||||
}
|
||||
cpool.Client[id] = &Client{
|
||||
Node: node,
|
||||
LastCallTs: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get client hot node if exists
|
||||
func (cpool *ClientPool) GetClientNode(id string) *Node {
|
||||
if cpool.Client[id] != nil {
|
||||
lastCallTs := cpool.Client[id].GetClientLastCallDiff()
|
||||
if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||
cpool.Client[id].UpdateClientLastCall()
|
||||
return cpool.Client[id].Node
|
||||
}
|
||||
delete(cpool.Client, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clean client list of hot outdated nodes
|
||||
func (cpool *ClientPool) CleanInactiveClientNodes() int {
|
||||
cnt := 0
|
||||
for id, client := range cpool.Client {
|
||||
lastCallTs := client.GetClientLastCallDiff()
|
||||
if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||
delete(cpool.Client, id)
|
||||
} else {
|
||||
cnt += 1
|
||||
}
|
||||
}
|
||||
|
||||
return cnt
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
func TestAddClientNode(t *testing.T) {
|
||||
var cases = []struct {
|
||||
clients map[string]*Client
|
||||
expected string
|
||||
}{
|
||||
{map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
CreateClientPools()
|
||||
for id, client := range c.clients {
|
||||
ethereumClientPool.AddClientNode(id, client.Node)
|
||||
}
|
||||
for id := range ethereumClientPool.Client {
|
||||
if id != c.expected {
|
||||
t.Log("Wrong client was added")
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetClientNode(t *testing.T) {
|
||||
ts := time.Now().Unix()
|
||||
|
||||
var cases = []struct {
|
||||
clients map[string]*Client
|
||||
id string
|
||||
expected *Node
|
||||
}{
|
||||
{map[string]*Client{}, "1", nil},
|
||||
{map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}},
|
||||
{map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil},
|
||||
{map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil},
|
||||
}
|
||||
for _, c := range cases {
|
||||
CreateClientPools()
|
||||
for id, client := range c.clients {
|
||||
ethereumClientPool.Client[id] = client
|
||||
}
|
||||
|
||||
clientNode := ethereumClientPool.GetClientNode(c.id)
|
||||
if !reflect.DeepEqual(clientNode, c.expected) {
|
||||
t.Log("Wrong node returned")
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanInactiveClientNodes(t *testing.T) {
|
||||
ts := time.Now().Unix()
|
||||
|
||||
var cases = []struct {
|
||||
clients map[string]*Client
|
||||
expected string
|
||||
}{
|
||||
{map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""},
|
||||
{map[string]*Client{"1": {LastCallTs: ts}}, "1"},
|
||||
{map[string]*Client{
|
||||
"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE},
|
||||
"2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10},
|
||||
"3": {LastCallTs: ts},
|
||||
}, "3"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
CreateClientPools()
|
||||
for id, client := range c.clients {
|
||||
ethereumClientPool.Client[id] = client
|
||||
}
|
||||
|
||||
ethereumClientPool.CleanInactiveClientNodes()
|
||||
for id := range ethereumClientPool.Client {
|
||||
if id != c.expected {
|
||||
t.Log("Wrong client was removed")
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,19 @@ type NodeStatusResponse struct {
|
|||
CurrentBlock uint64 `json:"current_block"`
|
||||
}
|
||||
|
||||
// Node - which one node client worked with
|
||||
// LastCallTs - timestamp from last call
|
||||
type Client struct {
|
||||
Node *Node
|
||||
LastCallTs int64
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
type ClientPool struct {
|
||||
Client map[string]*Client
|
||||
}
|
||||
|
||||
// Node structure with
|
||||
// StatusURL for status server at node endpoint
|
||||
// GethURL for geth/bor/etc node http.server endpoint
|
||||
|
|
|
@ -5,6 +5,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
humbug "github.com/bugout-dev/humbug/go/pkg"
|
||||
|
@ -31,6 +32,11 @@ func panicMiddleware(next http.Handler) http.Handler {
|
|||
func logMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
next.ServeHTTP(w, r)
|
||||
log.Printf("%s %s %s\n", r.Method, r.URL.Path, r.RemoteAddr)
|
||||
ip, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||
if err != nil {
|
||||
log.Printf("Unable to parse client IP: %s\n", r.RemoteAddr)
|
||||
} else {
|
||||
log.Printf("%s %s %s\n", ip, r.Method, r.URL.Path)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,8 +6,11 @@ package cmd
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
// pingRoute response with status of load balancer server itself
|
||||
|
@ -19,6 +22,13 @@ func pingRoute(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// lbHandler load balances the incoming requests to nodes
|
||||
func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||
attempts := GetAttemptsFromContext(r)
|
||||
if attempts > configs.NB_CONNECTION_RETRIES {
|
||||
log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path)
|
||||
http.Error(w, "Service not available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
var blockchain string
|
||||
switch {
|
||||
case strings.HasPrefix(r.URL.Path, "/nb/ethereum"):
|
||||
|
@ -30,12 +40,29 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
clientId := w.Header().Get(configs.MOONSTREAM_CLIENT_ID_HEADER)
|
||||
if clientId == "" {
|
||||
// TODO(kompotkot): After all internal crawlers and services start
|
||||
// providing client id header, then replace to http.Error
|
||||
clientId = "none"
|
||||
}
|
||||
|
||||
// Chose one node
|
||||
peer := blockchainPool.GetNextPeer(blockchain)
|
||||
if peer == nil {
|
||||
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
||||
var node *Node
|
||||
cpool, err := GetClientPool(blockchain)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
node = cpool.GetClientNode(clientId)
|
||||
if node == nil {
|
||||
node = blockchainPool.GetNextNode(blockchain)
|
||||
if node == nil {
|
||||
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
cpool.AddClientNode(clientId, node)
|
||||
}
|
||||
|
||||
// Save origin path, to use in proxyErrorHandler if node will not response
|
||||
r.Header.Add("X-Origin-Path", r.URL.Path)
|
||||
|
@ -43,11 +70,11 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
switch {
|
||||
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/ping", blockchain)):
|
||||
r.URL.Path = "/ping"
|
||||
peer.StatusReverseProxy.ServeHTTP(w, r)
|
||||
node.StatusReverseProxy.ServeHTTP(w, r)
|
||||
return
|
||||
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
|
||||
r.URL.Path = "/"
|
||||
peer.GethReverseProxy.ServeHTTP(w, r)
|
||||
node.GethReverseProxy.ServeHTTP(w, r)
|
||||
return
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
|
||||
|
|
|
@ -22,11 +22,14 @@ var reporter *humbug.HumbugReporter
|
|||
|
||||
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
|
||||
func initHealthCheck(debug bool) {
|
||||
t := time.NewTicker(configs.LB_HEALTH_CHECK_INTERVAL)
|
||||
t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
blockchainPool.HealthCheck()
|
||||
ethereumClients := ethereumClientPool.CleanInactiveClientNodes()
|
||||
polygonClients := polygonClientPool.CleanInactiveClientNodes()
|
||||
log.Printf("Active etehereum clients: %d, polygon clients: %d\n", ethereumClients, polygonClients)
|
||||
if debug {
|
||||
blockchainPool.StatusLog()
|
||||
}
|
||||
|
@ -60,13 +63,13 @@ func GetRetryFromContext(r *http.Request) int {
|
|||
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
|
||||
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
|
||||
retries := GetRetryFromContext(r)
|
||||
if retries < configs.LB_CONNECTION_RETRIES {
|
||||
if retries < configs.NB_CONNECTION_RETRIES {
|
||||
log.Printf(
|
||||
"An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n",
|
||||
url, retries+1, configs.LB_CONNECTION_RETRIES, e.Error(),
|
||||
url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(),
|
||||
)
|
||||
select {
|
||||
case <-time.After(configs.LB_CONNECTION_RETRIES_INTERVAL):
|
||||
case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL):
|
||||
ctx := context.WithValue(r.Context(), Retry, retries+1)
|
||||
proxy.ServeHTTP(w, r.WithContext(ctx))
|
||||
}
|
||||
|
@ -107,6 +110,9 @@ func InitServer() {
|
|||
return
|
||||
}
|
||||
|
||||
// Generate map of clients
|
||||
CreateClientPools()
|
||||
|
||||
// Configure Humbug reporter to handle errors
|
||||
var err error
|
||||
sessionID := uuid.New().String()
|
||||
|
|
|
@ -35,6 +35,7 @@ var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IP
|
|||
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
|
||||
var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT")
|
||||
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
|
||||
var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER")
|
||||
|
||||
func checkEnvVarSet() {
|
||||
if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" {
|
||||
|
@ -50,7 +51,11 @@ func checkEnvVarSet() {
|
|||
if MOONSTREAM_NODE_POLYGON_B_IPC_ADDR == "" {
|
||||
MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal"
|
||||
}
|
||||
|
||||
|
||||
if MOONSTREAM_CLIENT_ID_HEADER == "" {
|
||||
MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id"
|
||||
}
|
||||
|
||||
if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" {
|
||||
log.Fatal("Some of environment variables not set")
|
||||
}
|
||||
|
@ -90,10 +95,13 @@ func (nc *NodeConfigList) InitNodeConfigList() {
|
|||
}
|
||||
}
|
||||
|
||||
var LB_CONNECTION_RETRIES = 2
|
||||
var LB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
||||
var LB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||
var LB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||
var NB_CONNECTION_RETRIES = 2
|
||||
var NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
||||
var NB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||
var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||
|
||||
// Client config
|
||||
var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds
|
||||
|
||||
// Humbug config
|
||||
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
||||
|
|
Ładowanie…
Reference in New Issue