Load balancer for nodes

pull/507/head
kompotkot 2021-12-20 12:53:54 +00:00
rodzic 6f2813710a
commit 905dcbf71a
11 zmienionych plików z 623 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,16 @@
# Node Balancer application
## Installation
- Prepare environment variables
- Build application
```bash
go build -o nodebalancer
```
- Run with following parameters:
```bash
nodebalancer -host 0.0.0.0 -port 8544 -healthcheck
```

Wyświetl plik

@ -0,0 +1,185 @@
/*
Load balancer, based on https://github.com/kasvith/simplelb/
*/
package cmd
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sync/atomic"
configs "bugout.dev/app-node-balancer/configs"
)
// Main variable of pool of blockchains which contains pool of nodes
// for each blockchain we work during session.
var blockchainPool BlockchainPool
// AddNode to the nodes pool
func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) {
var nodePool *NodePool
for _, b := range bpool.Blockchains {
if b.Blockchain == blockchain {
nodePool = b
}
}
// Check if blockchain not yet in pool
if nodePool == nil {
nodePool = &NodePool{
Blockchain: blockchain,
}
nodePool.Nodes = append(nodePool.Nodes, node)
bpool.Blockchains = append(bpool.Blockchains, nodePool)
} else {
nodePool.Nodes = append(nodePool.Nodes, node)
}
}
// SetAlive with mutex for exact node
func (node *Node) SetAlive(alive bool) {
node.mux.Lock()
node.Alive = alive
node.mux.Unlock()
}
// IsAlive returns true when node is alive
func (node *Node) IsAlive() (alive bool) {
node.mux.RLock()
alive = node.Alive
node.mux.RUnlock()
return alive
}
// SetCurrentBlock with mutex for exact node
func (node *Node) SetCurrentBlock(currentBlock uint64) {
node.mux.Lock()
node.CurrentBlock = currentBlock
node.mux.Unlock()
}
// GetCurrentBlock returns block number
func (node *Node) GetCurrentBlock() (currentBlock uint64) {
node.mux.RLock()
currentBlock = node.CurrentBlock
node.mux.RUnlock()
return currentBlock
}
// GetNextPeer returns next active peer to take a connection
// Loop through entire nodes to find out an alive one
func (bpool *BlockchainPool) GetNextPeer(blockchain string) *Node {
highestBlock := uint64(0)
// Get NodePool with correct blockchain
var np *NodePool
for _, b := range bpool.Blockchains {
if b.Blockchain == blockchain {
np = b
for _, n := range b.Nodes {
if n.CurrentBlock > highestBlock {
highestBlock = n.CurrentBlock
}
}
}
}
// Increase Current value with 1 to be able to track node appeals
currentInc := atomic.AddUint64(&np.Current, uint64(1))
// next is an Atomic incrementer, value always in range from 0 to slice length,
// it returns an index of slice
next := int(currentInc % uint64(len(np.Nodes)))
// Start from next one and move full cycle
l := len(np.Nodes) + next
for i := next; i < l; i++ {
// Take an index by modding with length
idx := i % len(np.Nodes)
// If we have an alive one, use it and store if its not the original one
if np.Nodes[idx].IsAlive() {
if i != next {
log.Printf("Mark the current one %d", uint64(idx))
atomic.StoreUint64(&np.Current, uint64(idx))
}
// Pass nodes with low blocks
// TODO(kompotkot): Re-write to not rotate through not highest blocks
if np.Nodes[idx].CurrentBlock < highestBlock {
continue
}
return np.Nodes[idx]
}
}
return nil
}
// SetNodeStatus changes a status of a node by StatusURL or GethURL
func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
for _, b := range bpool.Blockchains {
for _, n := range b.Nodes {
if n.StatusURL.String() == url.String() || n.GethURL.String() == url.String() {
n.SetAlive(alive)
break
}
}
}
}
// StatusLog logs nodes statuses
// TODO(kompotkot): Print list of alive and dead nodes
func (bpool *BlockchainPool) StatusLog() {
for _, b := range bpool.Blockchains {
for _, n := range b.Nodes {
log.Printf(
"Blockchain %s node %s is alive %t. Blockchain called %d times",
b.Blockchain, n.StatusURL, n.Alive, b.Current,
)
}
}
}
// HealthCheck fetch the node status and current block server
func (bpool *BlockchainPool) HealthCheck() {
for _, b := range bpool.Blockchains {
for _, n := range b.Nodes {
n.SetAlive(false)
n.SetCurrentBlock(0)
// Get response from node /ping endpoint
httpClient := http.Client{Timeout: configs.LB_HEALTH_CHECK_CALL_TIMEOUT}
resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL))
if err != nil {
log.Printf("Unable to reach node: %s\n", n.StatusURL)
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Unable to parse response from node: %s\n", n.StatusURL)
continue
}
var statusResponse NodeStatusResponse
err = json.Unmarshal(body, &statusResponse)
if err != nil {
log.Printf("Unable to read json response from node: %s\n", n.StatusURL)
continue
}
// Mark node in list of nodes as alive or not and update current block
n.SetAlive(true)
if statusResponse.CurrentBlock != 0 {
n.SetCurrentBlock(statusResponse.CurrentBlock)
}
log.Printf("Node %s is alive: %t with current block: %d\n", n.StatusURL, true, statusResponse.CurrentBlock)
}
}
}

Wyświetl plik

@ -0,0 +1,46 @@
/*
Data structure.
*/
package cmd
import (
"net/http/httputil"
"net/url"
"sync"
)
type PingResponse struct {
Status string `json:"status"`
}
type NodeStatusResponse struct {
CurrentBlock uint64 `json:"current_block"`
}
// Node structure with
// StatusURL for status server at node endpoint
// GethURL for geth/bor/etc node http.server endpoint
type Node struct {
StatusURL *url.URL
GethURL *url.URL
Alive bool
CurrentBlock uint64
mux sync.RWMutex
StatusReverseProxy *httputil.ReverseProxy
GethReverseProxy *httputil.ReverseProxy
}
type NodePool struct {
Blockchain string
Nodes []*Node
// Counter to observe all nodes
Current uint64
}
type BlockchainPool struct {
Blockchains []*NodePool
}

Wyświetl plik

@ -0,0 +1,36 @@
/*
Server API middlewares.
*/
package cmd
import (
"log"
"net/http"
humbug "github.com/bugout-dev/humbug/go/pkg"
)
// Handle panic errors to prevent server shutdown
func panicMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
log.Println("recovered", err)
report := humbug.PanicReport(err)
reporter.Publish(report)
http.Error(w, "Internal server error", 500)
}
}()
// There will be a defer with panic handler in each next function
next.ServeHTTP(w, r)
})
}
// Log access requests in proper format
func logMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
log.Printf("%s %s %s\n", r.Method, r.URL.Path, r.RemoteAddr)
})
}

Wyświetl plik

@ -0,0 +1,56 @@
/*
Handle routes for load balancer API.
*/
package cmd
import (
"encoding/json"
"fmt"
"net/http"
"strings"
)
// pingRoute response with status of load balancer server itself
func pingRoute(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
response := PingResponse{Status: "ok"}
json.NewEncoder(w).Encode(response)
}
// lbHandler load balances the incoming requests to nodes
func lbHandler(w http.ResponseWriter, r *http.Request) {
var blockchain string
switch {
case strings.HasPrefix(r.URL.Path, "/lb/ethereum"):
blockchain = "ethereum"
case strings.HasPrefix(r.URL.Path, "/lb/polygon"):
blockchain = "polygon"
default:
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
return
}
// Chose one node
peer := blockchainPool.GetNextPeer(blockchain)
if peer == nil {
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
return
}
// Save origin path, to use in proxyErrorHandler if node will not response
r.Header.Add("X-Origin-Path", r.URL.Path)
switch {
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/lb/%s/ping", blockchain)):
r.URL.Path = "/ping"
peer.StatusReverseProxy.ServeHTTP(w, r)
return
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/lb/%s/rpc", blockchain)):
r.URL.Path = "/"
peer.GethReverseProxy.ServeHTTP(w, r)
return
default:
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
return
}
}

Wyświetl plik

@ -0,0 +1,171 @@
/*
Node load balancer API server initialization.
*/
package cmd
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"time"
configs "bugout.dev/app-node-balancer/configs"
humbug "github.com/bugout-dev/humbug/go/pkg"
"github.com/google/uuid"
)
var reporter *humbug.HumbugReporter
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
func initHealthCheck(debug bool) {
t := time.NewTicker(configs.LB_HEALTH_CHECK_INTERVAL)
for {
select {
case <-t.C:
blockchainPool.HealthCheck()
if debug {
blockchainPool.StatusLog()
}
}
}
}
const (
Attempts int = iota
Retry
)
// GetAttemptsFromContext returns the attempts for request
func GetAttemptsFromContext(r *http.Request) int {
if attempts, ok := r.Context().Value(Attempts).(int); ok {
return attempts
}
return 1
}
// GetRetryFromContext returns the retries for request
func GetRetryFromContext(r *http.Request) int {
if retry, ok := r.Context().Value(Retry).(int); ok {
return retry
}
return 0
}
// Handle errors due calls to proxy endpoint
// Docs: https://pkg.go.dev/net/http/httputil#ReverseProxy
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
retries := GetRetryFromContext(r)
if retries < configs.LB_CONNECTION_RETRIES {
log.Printf(
"An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n",
url, retries+1, configs.LB_CONNECTION_RETRIES, e.Error(),
)
select {
case <-time.After(configs.LB_CONNECTION_RETRIES_INTERVAL):
ctx := context.WithValue(r.Context(), Retry, retries+1)
proxy.ServeHTTP(w, r.WithContext(ctx))
}
return
}
// After 3 retries, mark this backend as down
blockchainPool.SetNodeStatus(url, false)
// Set modified path back
// TODO(kompotkot): Try r.RequestURI instead of header
r.URL.Path = r.Header.Get("X-Origin-Path")
// If the same request routing for few attempts with different nodes, increase the count
// of attempts and send request to next peer
attempts := GetAttemptsFromContext(r)
log.Printf("Attempting number: %d to fetch node %s\n", attempts, url)
ctx := context.WithValue(r.Context(), Attempts, attempts+1)
lbHandler(w, r.WithContext(ctx))
}
}
func InitServer() {
var listeningAddr string
var listeningPort string
var enableHealthCheck bool
var enableDebug bool
flag.StringVar(&listeningAddr, "host", "127.0.0.1", "Server listening address")
flag.StringVar(&listeningPort, "port", "8544", "Server listening port")
flag.BoolVar(&enableHealthCheck, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
flag.BoolVar(&enableDebug, "debug", false, "To enable debug mode with extended log set debug flag")
flag.Parse()
// Configure Humbug reporter to handle errors
var err error
sessionID := uuid.New().String()
consent := humbug.CreateHumbugConsent(humbug.True)
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NODE_BALANCER_TOKEN)
if err != nil {
panic(fmt.Sprintf("Invalid Humbug Crash configuration: %s", err.Error()))
}
// Record system information
reporter.Publish(humbug.SystemReport())
// Fill NodeConfigList with initial nodes from environment variables
configs.ConfigList.InitNodeConfigList()
// Parse nodes and set list of proxies
for i, nodeConfig := range configs.ConfigList.Configs {
gethUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", nodeConfig.Addr, nodeConfig.Port))
if err != nil {
log.Fatal(err)
}
statusUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT))
if err != nil {
log.Fatal(err)
}
proxyToStatus := httputil.NewSingleHostReverseProxy(statusUrl)
proxyToGeth := httputil.NewSingleHostReverseProxy(gethUrl)
proxyErrorHandler(proxyToStatus, statusUrl)
proxyErrorHandler(proxyToGeth, gethUrl)
blockchainPool.AddNode(&Node{
StatusURL: statusUrl,
GethURL: gethUrl,
Alive: true,
StatusReverseProxy: proxyToStatus,
GethReverseProxy: proxyToGeth,
}, nodeConfig.Blockchain)
log.Printf(
"Added new %s proxy %d with geth url: %s and status url: %s\n",
nodeConfig.Blockchain, i, gethUrl, statusUrl)
}
serveMux := http.NewServeMux()
serveMux.HandleFunc("/ping", pingRoute)
serveMux.HandleFunc("/lb/", lbHandler)
// Set common middlewares, from bottom to top
commonHandler := logMiddleware(serveMux)
commonHandler = panicMiddleware(commonHandler)
server := http.Server{
Addr: fmt.Sprintf("%s:%s", listeningAddr, listeningPort),
Handler: commonHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
// Start node health checking and current block fetching
if enableHealthCheck {
go initHealthCheck(enableDebug)
}
log.Printf("Starting server at %s:%s\n", listeningAddr, listeningPort)
err = server.ListenAndServe()
if err != nil {
log.Fatal(err)
}
}

Wyświetl plik

@ -0,0 +1,77 @@
/*
Configurations for load balancer server.
*/
package configs
import (
"log"
"os"
"strconv"
"time"
)
type BlockchainConfig struct {
Blockchain string
IPs []string
Port string
}
type NodeConfig struct {
Blockchain string
Addr string
Port uint16
}
type NodeConfigList struct {
Configs []NodeConfig
}
var ConfigList NodeConfigList
var MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_IPC_PORT = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_PORT")
var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT")
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
// Return list of NodeConfig structures
func (nc *NodeConfigList) InitNodeConfigList() {
// Define available blockchain nodes
blockchainConfigList := make([]BlockchainConfig, 0, 2)
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "ethereum",
IPs: []string{MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR},
Port: MOONSTREAM_NODE_ETHEREUM_IPC_PORT,
})
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "polygon",
IPs: []string{MOONSTREAM_NODE_POLYGON_A_IPC_ADDR, MOONSTREAM_NODE_POLYGON_B_IPC_ADDR},
Port: MOONSTREAM_NODE_POLYGON_IPC_PORT,
})
// Parse node addr, ip and blockchain
for _, b := range blockchainConfigList {
for _, nodeIP := range b.IPs {
port, err := strconv.ParseInt(b.Port, 0, 16)
if err != nil {
log.Printf("Unable to parse port number: %s", b.Port)
continue
}
nc.Configs = append(nc.Configs, NodeConfig{
Blockchain: b.Blockchain,
Addr: nodeIP,
Port: uint16(port),
})
}
}
}
var LB_CONNECTION_RETRIES = 2
var LB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
var LB_HEALTH_CHECK_INTERVAL = time.Second * 5
var LB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
// Humbug config
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")

Wyświetl plik

@ -0,0 +1,8 @@
module bugout.dev/app-node-balancer
go 1.17
require (
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205
github.com/google/uuid v1.3.0
)

Wyświetl plik

@ -0,0 +1,4 @@
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ=
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

Wyświetl plik

@ -0,0 +1,9 @@
package main
import (
"bugout.dev/app-node-balancer/cmd"
)
func main() {
cmd.InitServer()
}

Wyświetl plik

@ -0,0 +1,15 @@
# Required environment variables for load balancer
export MOONSTREAM_NODES_SERVER_PORT="<node_status_server_port>"
export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>"
# Ethereum nodes depends variables
export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="<node_geth_http_port>"
export MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR="<node_geth_http_ip_addr>"
export MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR="<node_geth_http_ip_addr>"
# Polygon nodes depends variables
export MOONSTREAM_NODE_POLYGON_IPC_PORT="<node_bor_http_port>"
export MOONSTREAM_NODE_POLYGON_A_IPC_ADDR="<node_bor_http_ip_addr>"
export MOONSTREAM_NODE_POLYGON_B_IPC_ADDR="<node_bor_http_ip_addr>"