ClientPool as map instead of slice

pull/519/head
kompotkot 2022-01-14 14:14:48 +00:00
rodzic d0ebd4782f
commit dd91735887
5 zmienionych plików z 89 dodań i 109 usunięć

Wyświetl plik

@ -1,6 +1,7 @@
package cmd
import (
// "fmt"
"log"
"reflect"
"time"
@ -10,88 +11,66 @@ import (
var clientPool ClientPool
func (client *Client) GetClientNode() (clientNode *Node) {
client.mux.RLock()
clientNode = client.Node
client.mux.RUnlock()
return clientNode
}
// Add client node and client itself if doesn't exist
// TODO(kompotkot): Add mutes as for balancer
func (cpool *ClientPool) AddClientNode(ip, blockchain string, node *Node) {
// TODO(kompotkot): Add mutex as for balancer
func (cpool *ClientPool) AddClientNode(id, blockchain string, node *Node) *Client {
ts := time.Now().Unix()
// Find in list clint with same IP
var client *Client
for _, c := range cpool.Clients {
if c.IP == ip {
client = c
currentClient := cpool.Client[id]
// Find clint with same ID and update timestamp or
// add new one if doesn't exist
if currentClient != nil {
if reflect.DeepEqual(currentClient.Node, node) {
currentClient.LastCallTs = ts
return currentClient
}
}
// Add new client if doesn't exist
if client == nil {
client = &Client{
IP: ip,
}
cpool.Clients = append(cpool.Clients, client)
}
newNode := true
for _, cn := range client.ClientNodes {
if reflect.DeepEqual(cn.Node, node) {
cn.LastCallTs = ts
newNode = false
}
}
if newNode {
client.ClientNodes = append(client.ClientNodes, ClientNode{
Blockchain: blockchain,
Node: node,
LastCallTs: ts,
})
}
currentClient.Blockchain = blockchain
currentClient.Node = node
currentClient.LastCallTs = ts
return currentClient
}
// Get client hot node if exists
// TODO(kompotkot): Add mutes as for balancer
func (cpool *ClientPool) GetClientNode(blockchain, ip string) *Node {
// TODO(kompotkot): Add mutex as for balancer
func (cpool *ClientPool) GetClientNode(id string) *Node {
ts := time.Now().Unix()
for _, c := range cpool.Clients {
if c.IP == ip {
for j, cn := range c.ClientNodes {
if cn.Blockchain == blockchain {
if ts-cn.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
// Hot node for client found, use it
if cn.Node.IsAlive() {
cn.LastCallTs = ts
return cn.Node
}
}
// Remove outdated hot node from client hot nodes list
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
}
}
currentClient := cpool.Client[id]
if currentClient != nil {
if ts-currentClient.LastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE {
currentClient.LastCallTs = ts
return currentClient.Node
}
delete(cpool.Client, id)
}
return nil
}
// Clean client list of hot nodes from outdated
// TODO(kompotkot): Add mutes as for balancer
// TODO(kompotkot): Add mutex as for balancer
func (cpool *ClientPool) CleanInactiveClientNodes() {
ts := time.Now().Unix()
for i, c := range cpool.Clients {
for j, cn := range c.ClientNodes {
if ts-cn.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
// Remove client's node
c.ClientNodes = append(c.ClientNodes[:j], c.ClientNodes[j+1:]...)
}
}
// If there are no hot nodes under client, remove client
if len(c.ClientNodes) == 0 {
cpool.Clients = append(cpool.Clients[:i], cpool.Clients[i+1:]...)
cnt := 0
for id, client := range cpool.Client {
if ts-client.LastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE {
delete(cpool.Client, id)
} else {
cnt += 1
}
}
log.Printf("Active clients: %d\n", len(cpool.Clients))
log.Printf("Active clients: %d\n", cnt)
}

Wyświetl plik

@ -8,58 +8,59 @@ import (
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
func TestCleanInactiveClientNodes(t *testing.T) {
func TestGetClientNode(t *testing.T) {
ts := time.Now().Unix()
var cases = []struct {
clients []Client
expected int
clients map[string]*Client
id string
expected *Node
}{
{[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}}}, 0},
{[]Client{{IP: "localhost", ClientNodes: []ClientNode{{LastCallTs: ts}}}}, 1},
{[]Client{
{IP: "localhost", ClientNodes: []ClientNode{
{LastCallTs: ts, Blockchain: "polygon"},
{LastCallTs: ts, Blockchain: "ethereum"},
{LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 1, Blockchain: "solana"},
}}}, 2},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}},
{map[string]*Client{"2": {Blockchain: "polygon", LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil},
}
for _, c := range cases {
clientPool.Clients = []*Client{}
for _, client := range c.clients {
clientPool.Clients = append(clientPool.Clients, &client)
clientPool.Client = make(map[string]*Client)
for id, client := range c.clients {
clientPool.Client[id] = client
}
clientPool.CleanInactiveClientNodes()
for _, client := range clientPool.Clients {
if len(client.ClientNodes) != c.expected {
t.Log("Wrong number of client nodes")
t.Fatal()
}
}
}
}
func TestGetClientNode(t *testing.T) {
var cases = []struct {
clients []Client
blockchain string
ip string
expected *Node
}{
{[]Client{{IP: "localhost"}}, "ethereum", "192.168.1.2", nil},
}
for _, c := range cases {
clientPool.Clients = []*Client{}
for _, client := range c.clients {
clientPool.Clients = append(clientPool.Clients, &client)
}
clientNode := clientPool.GetClientNode(c.blockchain, c.ip)
clientNode := clientPool.GetClientNode(c.id)
if !reflect.DeepEqual(clientNode, c.expected) {
t.Log("Wrong value")
t.Log("Wrong node returned")
t.Fatal()
}
}
}
func TestCleanInactiveClientNodes(t *testing.T) {
ts := time.Now().Unix()
var cases = []struct {
clients map[string]*Client
expected string
}{
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""},
{map[string]*Client{"1": {Blockchain: "ethereum", LastCallTs: ts}}, "1"},
{map[string]*Client{
"1": {Blockchain: "ethereum", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE},
"2": {Blockchain: "polygon", LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10},
"3": {Blockchain: "stellar", LastCallTs: ts},
}, "3"},
}
for _, c := range cases {
clientPool.Client = make(map[string]*Client)
for id, client := range c.clients {
clientPool.Client[id] = client
}
clientPool.CleanInactiveClientNodes()
for key := range clientPool.Client {
if key != c.expected {
t.Log("Wrong client was removed")
t.Fatal()
}
}
}
}

Wyświetl plik

@ -19,19 +19,16 @@ type NodeStatusResponse struct {
// Node - which one node client worked with
// LastCallTs - timestamp from last call
type ClientNode struct {
type Client struct {
Blockchain string
Node *Node
LastCallTs int64
}
type Client struct {
IP string
ClientNodes []ClientNode
mux sync.RWMutex
}
type ClientPool struct {
Clients []*Client
Client map[string]*Client
}
// Node structure with

Wyświetl plik

@ -39,7 +39,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
}
var node *Node
node = clientPool.GetClientNode(blockchain, ip)
node = clientPool.GetClientNode(ip)
if node == nil {
node = blockchainPool.GetNextNode(blockchain)
if node == nil {

Wyświetl plik

@ -108,6 +108,9 @@ func InitServer() {
return
}
// Generate map of clients
clientPool.Client = make(map[string]*Client)
// Configure Humbug reporter to handle errors
var err error
sessionID := uuid.New().String()