kopia lustrzana https://github.com/bugout-dev/moonstream
Implemented clients with IPs with cleaning after inactive time
rodzic
e1a2d48d33
commit
a0e50b3722
|
@ -70,9 +70,9 @@ func (node *Node) GetCurrentBlock() (currentBlock uint64) {
|
||||||
return currentBlock
|
return currentBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNextPeer returns next active peer to take a connection
|
// GetNextNode returns next active peer to take a connection
|
||||||
// Loop through entire nodes to find out an alive one
|
// Loop through entire nodes to find out an alive one
|
||||||
func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node {
|
func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node {
|
||||||
highestBlock := uint64(0)
|
highestBlock := uint64(0)
|
||||||
|
|
||||||
// Get NodePool with correct blockchain
|
// Get NodePool with correct blockchain
|
||||||
|
@ -113,6 +113,7 @@ func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Used node: %s with hi block: %d\n", np.Nodes[idx].GethURL, np.Nodes[idx].CurrentBlock)
|
||||||
return np.Nodes[idx]
|
return np.Nodes[idx]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -152,7 +153,7 @@ func (bpool *BlockchainPool) HealthCheck() {
|
||||||
n.SetCurrentBlock(0)
|
n.SetCurrentBlock(0)
|
||||||
|
|
||||||
// Get response from node /ping endpoint
|
// Get response from node /ping endpoint
|
||||||
httpClient := http.Client{Timeout: configs.LB_HEALTH_CHECK_CALL_TIMEOUT}
|
httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT}
|
||||||
resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL))
|
resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to reach node: %s\n", n.StatusURL)
|
log.Printf("Unable to reach node: %s\n", n.StatusURL)
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var clientPool ClientPool
|
||||||
|
|
||||||
|
func (cpool *ClientPool) AddClient(ip string) {
|
||||||
|
var client *Client
|
||||||
|
for _, c := range cpool.Clients {
|
||||||
|
if c.IP == ip {
|
||||||
|
client = c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if client == nil {
|
||||||
|
fmt.Println("Adding new client")
|
||||||
|
client = &Client{
|
||||||
|
IP: ip,
|
||||||
|
}
|
||||||
|
cpool.Clients = append(cpool.Clients, client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
for _, c := range cpool.Clients {
|
||||||
|
if c.IP == ip {
|
||||||
|
newNode := true
|
||||||
|
|
||||||
|
for _, cn := range c.ClientNodes {
|
||||||
|
if reflect.DeepEqual(cn.Node, node) {
|
||||||
|
cn.LastCallTs = ts
|
||||||
|
newNode = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if newNode {
|
||||||
|
c.ClientNodes = append(c.ClientNodes, ClientNode{
|
||||||
|
Blockchain: blockchain,
|
||||||
|
Node: node,
|
||||||
|
LastCallTs: ts,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
for _, c := range cpool.Clients {
|
||||||
|
if c.IP == ip {
|
||||||
|
for j, cn := range c.ClientNodes {
|
||||||
|
if cn.Blockchain == blockchain {
|
||||||
|
if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||||
|
cn.LastCallTs = ts
|
||||||
|
fmt.Println("Hot client node found, re-use it")
|
||||||
|
return cn.Node
|
||||||
|
} else {
|
||||||
|
fmt.Println("Client node outdated, remove it")
|
||||||
|
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cpool *ClientPool) CleanInactiveClientNodes() {
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
for i, c := range cpool.Clients {
|
||||||
|
for j, cn := range c.ClientNodes {
|
||||||
|
if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||||
|
fmt.Println("Removing client node")
|
||||||
|
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(c.ClientNodes) == 0 {
|
||||||
|
fmt.Println("Removing client itself")
|
||||||
|
cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (cpool *ClientPool) StatusLog() {
|
||||||
|
log.Printf("Active clients: %d", len(cpool.Clients))
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAddClient(t *testing.T) {
|
||||||
|
var cases = []struct {
|
||||||
|
clients []Client
|
||||||
|
ip string
|
||||||
|
expected int
|
||||||
|
}{
|
||||||
|
{[]Client{}, "localhost", 1},
|
||||||
|
{[]Client{{IP: "localhost"}}, "192.168.1.2", 2},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
clientPool.Clients = []*Client{}
|
||||||
|
for _, client := range c.clients {
|
||||||
|
clientPool.Clients = append(clientPool.Clients, &client)
|
||||||
|
}
|
||||||
|
|
||||||
|
clientPool.AddClient(c.ip)
|
||||||
|
if len(clientPool.Clients) != c.expected {
|
||||||
|
t.Log("Wrong number of clients")
|
||||||
|
t.Fatal()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanInactiveClientNodes(t *testing.T) {
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
var cases = []struct {
|
||||||
|
clients []Client
|
||||||
|
expected int
|
||||||
|
}{
|
||||||
|
{[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}}}, 0},
|
||||||
|
{[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts}}}}, 1},
|
||||||
|
{[]Client{
|
||||||
|
{IP: "localhost", ClientNodes: []ClientNode{
|
||||||
|
{LastCallTs: ts, Blockchain: "polygon"},
|
||||||
|
{LastCallTs: ts, Blockchain: "ethereum"},
|
||||||
|
{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 1, Blockchain: "solana"},
|
||||||
|
}}}, 2},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
clientPool.Clients = []*Client{}
|
||||||
|
for _, client := range c.clients {
|
||||||
|
clientPool.Clients = append(clientPool.Clients, &client)
|
||||||
|
}
|
||||||
|
|
||||||
|
clientPool.CleanInactiveClientNodes()
|
||||||
|
for _, client := range clientPool.Clients {
|
||||||
|
if len(client.ClientNodes) != c.expected {
|
||||||
|
t.Log("Wrong number of client nodes")
|
||||||
|
t.Fatal()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetClientNode(t *testing.T) {
|
||||||
|
var cases = []struct {
|
||||||
|
clients []Client
|
||||||
|
blockchain string
|
||||||
|
ip string
|
||||||
|
expected *Node
|
||||||
|
}{
|
||||||
|
{[]Client{{IP: "localhost"}}, "ethereum", "192.168.1.2", nil},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
clientPool.Clients = []*Client{}
|
||||||
|
for _, client := range c.clients {
|
||||||
|
clientPool.Clients = append(clientPool.Clients, &client)
|
||||||
|
}
|
||||||
|
|
||||||
|
clientNode := clientPool.GetClientNode(c.blockchain, c.ip)
|
||||||
|
if !reflect.DeepEqual(clientNode, c.expected) {
|
||||||
|
t.Log("Wrong value")
|
||||||
|
t.Fatal()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,23 @@ type NodeStatusResponse struct {
|
||||||
CurrentBlock uint64 `json:"current_block"`
|
CurrentBlock uint64 `json:"current_block"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Node - which one node client worked with
|
||||||
|
// LastCallTs - timestamp from last call
|
||||||
|
type ClientNode struct {
|
||||||
|
Blockchain string
|
||||||
|
Node *Node
|
||||||
|
LastCallTs int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
IP string
|
||||||
|
ClientNodes []ClientNode
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClientPool struct {
|
||||||
|
Clients []*Client
|
||||||
|
}
|
||||||
|
|
||||||
// Node structure with
|
// Node structure with
|
||||||
// StatusURL for status server at node endpoint
|
// StatusURL for status server at node endpoint
|
||||||
// GethURL for geth/bor/etc node http.server endpoint
|
// GethURL for geth/bor/etc node http.server endpoint
|
||||||
|
|
|
@ -6,6 +6,7 @@ package cmd
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
@ -31,11 +32,22 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chose one node
|
// Chose one node
|
||||||
peer := blockchainPool.GetNextPeer(blockchain)
|
ip, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||||
if peer == nil {
|
if err != nil {
|
||||||
|
http.Error(w, "Unable to parse client IP", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var node *Node
|
||||||
|
node = clientPool.GetClientNode(blockchain, ip)
|
||||||
|
if node == nil {
|
||||||
|
node = blockchainPool.GetNextNode(blockchain)
|
||||||
|
if node == nil {
|
||||||
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
clientPool.AddClientNode(ip, blockchain, node)
|
||||||
|
}
|
||||||
|
|
||||||
// Save origin path, to use in proxyErrorHandler if node will not response
|
// Save origin path, to use in proxyErrorHandler if node will not response
|
||||||
r.Header.Add("X-Origin-Path", r.URL.Path)
|
r.Header.Add("X-Origin-Path", r.URL.Path)
|
||||||
|
@ -43,11 +55,11 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/ping", blockchain)):
|
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/ping", blockchain)):
|
||||||
r.URL.Path = "/ping"
|
r.URL.Path = "/ping"
|
||||||
peer.StatusReverseProxy.ServeHTTP(w, r)
|
node.StatusReverseProxy.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
|
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
|
||||||
r.URL.Path = "/"
|
r.URL.Path = "/"
|
||||||
peer.GethReverseProxy.ServeHTTP(w, r)
|
node.GethReverseProxy.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
|
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
|
||||||
|
|
|
@ -22,13 +22,15 @@ var reporter *humbug.HumbugReporter
|
||||||
|
|
||||||
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
|
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
|
||||||
func initHealthCheck(debug bool) {
|
func initHealthCheck(debug bool) {
|
||||||
t := time.NewTicker(configs.LB_HEALTH_CHECK_INTERVAL)
|
t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
blockchainPool.HealthCheck()
|
blockchainPool.HealthCheck()
|
||||||
|
clientPool.CleanInactiveClientNodes()
|
||||||
if debug {
|
if debug {
|
||||||
blockchainPool.StatusLog()
|
blockchainPool.StatusLog()
|
||||||
|
clientPool.StatusLog()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,13 +62,13 @@ func GetRetryFromContext(r *http.Request) int {
|
||||||
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
|
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
|
||||||
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
|
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
|
||||||
retries := GetRetryFromContext(r)
|
retries := GetRetryFromContext(r)
|
||||||
if retries < configs.LB_CONNECTION_RETRIES {
|
if retries < configs.NB_CONNECTION_RETRIES {
|
||||||
log.Printf(
|
log.Printf(
|
||||||
"An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n",
|
"An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n",
|
||||||
url, retries+1, configs.LB_CONNECTION_RETRIES, e.Error(),
|
url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(),
|
||||||
)
|
)
|
||||||
select {
|
select {
|
||||||
case <-time.After(configs.LB_CONNECTION_RETRIES_INTERVAL):
|
case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL):
|
||||||
ctx := context.WithValue(r.Context(), Retry, retries+1)
|
ctx := context.WithValue(r.Context(), Retry, retries+1)
|
||||||
proxy.ServeHTTP(w, r.WithContext(ctx))
|
proxy.ServeHTTP(w, r.WithContext(ctx))
|
||||||
}
|
}
|
||||||
|
@ -107,6 +109,8 @@ func InitServer() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientPool.AddClient("localhost")
|
||||||
|
|
||||||
// Configure Humbug reporter to handle errors
|
// Configure Humbug reporter to handle errors
|
||||||
var err error
|
var err error
|
||||||
sessionID := uuid.New().String()
|
sessionID := uuid.New().String()
|
||||||
|
|
|
@ -90,10 +90,13 @@ func (nc *NodeConfigList) InitNodeConfigList() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var LB_CONNECTION_RETRIES = 2
|
var NB_CONNECTION_RETRIES = 2
|
||||||
var LB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
var NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
||||||
var LB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
var NB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||||
var LB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||||
|
|
||||||
|
// Client config
|
||||||
|
var NB_CLIENT_NODE_KEEP_ALIVE = int64(10) // Seconds
|
||||||
|
|
||||||
// Humbug config
|
// Humbug config
|
||||||
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
||||||
|
|
Ładowanie…
Reference in New Issue