kopia lustrzana https://github.com/bugout-dev/moonstream
Extended client nodebalancer with additional comments
rodzic
f031de68a4
commit
de0ef49825
|
@ -1,7 +1,6 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
@ -11,9 +10,12 @@ import (
|
||||||
|
|
||||||
var clientPool ClientPool
|
var clientPool ClientPool
|
||||||
|
|
||||||
|
// Add client node and client itself if doesn't exist
|
||||||
|
// TODO(kompotkot): Add mutes as for balancer
|
||||||
func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
|
func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
|
||||||
ts := time.Now().Unix()
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
// Find in list clint with same IP
|
||||||
var client *Client
|
var client *Client
|
||||||
for _, c := range cpool.Clients {
|
for _, c := range cpool.Clients {
|
||||||
if c.IP == ip {
|
if c.IP == ip {
|
||||||
|
@ -21,8 +23,8 @@ func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add new client if doesn't exist
|
||||||
if client == nil {
|
if client == nil {
|
||||||
fmt.Println("Adding new client")
|
|
||||||
client = &Client{
|
client = &Client{
|
||||||
IP: ip,
|
IP: ip,
|
||||||
}
|
}
|
||||||
|
@ -46,6 +48,8 @@ func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get client hot node if exists
|
||||||
|
// TODO(kompotkot): Add mutes as for balancer
|
||||||
func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
|
func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
|
||||||
ts := time.Now().Unix()
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
|
@ -54,13 +58,14 @@ func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
|
||||||
for j, cn := range c.ClientNodes {
|
for j, cn := range c.ClientNodes {
|
||||||
if cn.Blockchain == blockchain {
|
if cn.Blockchain == blockchain {
|
||||||
if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||||
cn.LastCallTs = ts
|
// Hot node for client found, use it
|
||||||
fmt.Printf("Hot client node found: %s, re-use it", cn.Node.GethURL)
|
if cn.Node.IsAlive() {
|
||||||
return cn.Node
|
cn.LastCallTs = ts
|
||||||
} else {
|
return cn.Node
|
||||||
fmt.Println("Client node outdated, remove it")
|
}
|
||||||
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
|
||||||
}
|
}
|
||||||
|
// Remove outdated hot node from client hot nodes list
|
||||||
|
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,22 +74,24 @@ func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean client list of hot nodes from outdated
|
||||||
|
// TODO(kompotkot): Add mutes as for balancer
|
||||||
func (cpool *ClientPool) CleanInactiveClientNodes() {
|
func (cpool *ClientPool) CleanInactiveClientNodes() {
|
||||||
ts := time.Now().Unix()
|
ts := time.Now().Unix()
|
||||||
|
|
||||||
for i, c := range cpool.Clients {
|
for i, c := range cpool.Clients {
|
||||||
for j, cn := range c.ClientNodes {
|
for j, cn := range c.ClientNodes {
|
||||||
if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
|
||||||
fmt.Println("Removing client node")
|
// Remove client's node
|
||||||
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If there are no hot nodes under client, remove client
|
||||||
if len(c.ClientNodes) == 0 {
|
if len(c.ClientNodes) == 0 {
|
||||||
fmt.Println("Removing client itself")
|
|
||||||
cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...)
|
cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
func (cpool *ClientPool) StatusLog() {
|
|
||||||
log.Printf("Active clients: %d", len(cpool.Clients))
|
log.Printf("Active clients: %d", len(cpool.Clients))
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,7 @@ func initHealthCheck(debug bool) {
|
||||||
blockchainPool.HealthCheck()
|
blockchainPool.HealthCheck()
|
||||||
clientPool.CleanInactiveClientNodes()
|
clientPool.CleanInactiveClientNodes()
|
||||||
if debug {
|
if debug {
|
||||||
// blockchainPool.StatusLog()
|
blockchainPool.StatusLog()
|
||||||
clientPool.StatusLog()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ var NB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||||
var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||||
|
|
||||||
// Client config
|
// Client config
|
||||||
var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // Seconds
|
var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds
|
||||||
|
|
||||||
// Humbug config
|
// Humbug config
|
||||||
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
||||||
|
|
Ładowanie…
Reference in New Issue