Merge pull request #605 from bugout-dev/nodebalancer-add-nodes-cli

Cli config file for node list setup
pull/607/head
Sergei Sumarokov 2022-05-11 14:22:07 +03:00 zatwierdzone przez GitHub
commit a62f41af00
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
12 zmienionych plików z 275 dodań i 73 usunięć

62
nodes/.gitignore vendored
Wyświetl plik

@ -1,5 +1,65 @@
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,go
### Go ###
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
### Go Patch ###
/vendor/
/Godeps/
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# Support for Project snippet scope
.vscode/*.code-snippets
# Ignore code-workspaces
*.code-workspace
# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Custom
.secrets/*
dev.env
prod.env
test.env
test.env
nodebalancer

Wyświetl plik

@ -24,6 +24,8 @@ PARAMETERS_ENV_PATH="${SECRETS_DIR}/app.env"
AWS_SSM_PARAMETER_PATH="${AWS_SSM_PARAMETER_PATH:-/moonstream/prod}"
SCRIPT_DIR="$(realpath $(dirname $0))"
PARAMETERS_SCRIPT="${SCRIPT_DIR}/parameters.py"
NODE_BALANCER_CONFIG_PATH="${NODE_BALANCER_CONFIG_PATH:-/home/ubuntu/.nodebalancer}"
NODE_BALANCER_CONFIG_SOURCE_FILE="node-balancer-config.txt"
# Service file
NODE_BALANCER_SERVICE_FILE="node-balancer.service"
@ -59,6 +61,15 @@ cd "${APP_NODES_DIR}/node_balancer"
HOME=/root /usr/local/go/bin/go build -o "${APP_NODES_DIR}/node_balancer/nodebalancer" "${APP_NODES_DIR}/node_balancer/main.go"
cd "${EXEC_DIR}"
echo
echo
echo -e "${PREFIX_INFO} Update nodebalancer configuration file"
if [ ! -d "$NODE_BALANCER_CONFIG_PATH" ]; then
mkdir "$NODE_BALANCER_CONFIG_PATH"
echo -e "${PREFIX_WARN} Created new node balancer config directory"
fi
cp "${SCRIPT_DIR}/${NODE_BALANCER_CONFIG_SOURCE_FILE}" "${NODE_BALANCER_CONFIG_PATH}/config.txt"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing load balancer for nodes service definition with ${NODE_BALANCER_SERVICE_FILE}"

Wyświetl plik

@ -0,0 +1,5 @@
ethereum,a.ethereum.moonstream.internal,8545
ethereum,b.ethereum.moonstream.internal,8545
polygon,a.polygon.moonstream.internal,8545
polygon,b.polygon.moonstream.internal,8545
xdai,a.xdai.moonstream.internal,8545

Wyświetl plik

@ -11,7 +11,11 @@ WorkingDirectory=/home/ubuntu/moonstream/nodes/node_balancer
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream/nodes/node_balancer/nodebalancer -host "${AWS_LOCAL_IPV4}" -port 8544 -healthcheck
ExecStart=/home/ubuntu/moonstream/nodes/node_balancer/nodebalancer server \
-host "${AWS_LOCAL_IPV4}" \
-port 8544 \
-healthcheck \
-config /home/ubuntu/.nodebalancer/config.txt
SyslogIdentifier=node-balancer
[Install]

Wyświetl plik

@ -0,0 +1,143 @@
package cmd
import (
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
var (
stateCLI StateCLI
)
type flagSlice []string
func (i *flagSlice) String() string {
return strings.Join(*i, ", ")
}
func (i *flagSlice) Set(value string) error {
*i = append(*i, value)
return nil
}
// Command Line Interface state
type StateCLI struct {
serverCmd *flag.FlagSet
versionCmd *flag.FlagSet
// Common flags
configPathFlag string
helpFlag bool
// Server flags
listeningAddrFlag string
listeningPortFlag string
nodesFlag flagSlice
enableHealthCheckFlag bool
enableDebugFlag bool
}
func (s *StateCLI) usage() {
fmt.Printf(`usage: nodebalancer [-h] {%[1]s,%[2]s} ...
Moonstream node balancer CLI
optional arguments:
-h, --help show this help message and exit
subcommands:
{%[1]s,%[2]s}
`, s.serverCmd.Name(), s.versionCmd.Name())
}
func (s *StateCLI) checkRequirements() {
if s.helpFlag {
switch {
case s.serverCmd.Parsed():
fmt.Printf("Start nodebalancer server\n\n")
s.serverCmd.PrintDefaults()
os.Exit(0)
case s.versionCmd.Parsed():
fmt.Printf("Show version\n\n")
s.versionCmd.PrintDefaults()
os.Exit(0)
default:
s.usage()
os.Exit(0)
}
}
if s.configPathFlag == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
log.Fatalf("Unable to find user home directory, %v", err)
}
configDirPath := fmt.Sprintf("%s/.nodebalancer", homeDir)
configPath := fmt.Sprintf("%s/config.txt", configDirPath)
err = os.MkdirAll(configDirPath, os.ModePerm)
if err != nil {
log.Fatalf("Unable to create directory, %v", err)
}
_, err = os.Stat(configPath)
if err != nil {
tempConfigB := []byte("ethereum,http://127.0.0.1,8545")
err = os.WriteFile(configPath, tempConfigB, 0644)
if err != nil {
log.Fatalf("Unable to write config, %v", err)
}
}
s.configPathFlag = configPath
}
}
func (s *StateCLI) populateCLI() {
// Subcommands setup
s.serverCmd = flag.NewFlagSet("server", flag.ExitOnError)
s.versionCmd = flag.NewFlagSet("version", flag.ExitOnError)
// Common flag pointers
for _, fs := range []*flag.FlagSet{s.serverCmd, s.versionCmd} {
fs.BoolVar(&s.helpFlag, "help", false, "Show help message")
fs.StringVar(&s.configPathFlag, "config", "", "Path to configuration file (default: ~/.nodebalancer/config.txt)")
}
// Server subcommand flag pointers
s.serverCmd.StringVar(&s.listeningAddrFlag, "host", "127.0.0.1", "Server listening address")
s.serverCmd.StringVar(&s.listeningPortFlag, "port", "8544", "Server listening port")
s.serverCmd.BoolVar(&s.enableHealthCheckFlag, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
s.serverCmd.BoolVar(&s.enableDebugFlag, "debug", false, "To enable debug mode with extended log set debug flag")
}
func CLI() {
stateCLI.populateCLI()
if len(os.Args) < 2 {
stateCLI.usage()
os.Exit(1)
}
// Parse subcommands and appropriate FlagSet
switch os.Args[1] {
case "server":
stateCLI.serverCmd.Parse(os.Args[2:])
stateCLI.checkRequirements()
Server()
case "version":
stateCLI.versionCmd.Parse(os.Args[2:])
stateCLI.checkRequirements()
fmt.Printf("v%s\n", configs.NB_VERSION)
default:
stateCLI.usage()
os.Exit(1)
}
}

Wyświetl plik

@ -8,13 +8,17 @@ import (
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
var ethereumClientPool ClientPool
var polygonClientPool ClientPool
var (
ethereumClientPool ClientPool
polygonClientPool ClientPool
xdaiClientPool ClientPool
)
// Generate client pools for different blockchains
func CreateClientPools() {
ethereumClientPool.Client = make(map[string]*Client)
polygonClientPool.Client = make(map[string]*Client)
xdaiClientPool.Client = make(map[string]*Client)
}
// Return client pool correspongin to blockchain
@ -24,6 +28,8 @@ func GetClientPool(blockchain string) (*ClientPool, error) {
cpool = &ethereumClientPool
} else if blockchain == "polygon" {
cpool = &polygonClientPool
} else if blockchain == "xdai" {
cpool = &xdaiClientPool
} else {
return nil, errors.New("Unexisting blockchain provided")
}

Wyświetl plik

@ -35,6 +35,8 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
blockchain = "ethereum"
case strings.HasPrefix(r.URL.Path, "/nb/polygon"):
blockchain = "polygon"
case strings.HasPrefix(r.URL.Path, "/nb/xdai"):
blockchain = "xdai"
default:
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
return

Wyświetl plik

@ -5,7 +5,6 @@ package cmd
import (
"context"
"flag"
"fmt"
"log"
"net/http"
@ -29,7 +28,8 @@ func initHealthCheck(debug bool) {
blockchainPool.HealthCheck()
ethereumClients := ethereumClientPool.CleanInactiveClientNodes()
polygonClients := polygonClientPool.CleanInactiveClientNodes()
log.Printf("Active etehereum clients: %d, polygon clients: %d\n", ethereumClients, polygonClients)
xdaiClients := xdaiClientPool.CleanInactiveClientNodes()
log.Printf("Active etehereum clients: %d, polygon clients: %d, xdai clients: %d\n", ethereumClients, polygonClients, xdaiClients)
if debug {
blockchainPool.StatusLog()
}
@ -92,24 +92,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
}
}
func InitServer() {
var listeningAddr string
var listeningPort string
var enableHealthCheck bool
var enableDebug bool
var showVersion bool
flag.StringVar(&listeningAddr, "host", "127.0.0.1", "Server listening address")
flag.StringVar(&listeningPort, "port", "8544", "Server listening port")
flag.BoolVar(&enableHealthCheck, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
flag.BoolVar(&enableDebug, "debug", false, "To enable debug mode with extended log set debug flag")
flag.BoolVar(&showVersion, "version", false, "Print version")
flag.Parse()
if showVersion {
fmt.Printf("Node balancer version: v%s\n", configs.NODE_BALANCER_VERSION)
return
}
func Server() {
// Generate map of clients
CreateClientPools()
@ -125,7 +108,7 @@ func InitServer() {
reporter.Publish(humbug.SystemReport())
// Fill NodeConfigList with initial nodes from environment variables
configs.ConfigList.InitNodeConfigList()
configs.ConfigList.InitNodeConfigList(stateCLI.configPathFlag)
// Parse nodes and set list of proxies
for i, nodeConfig := range configs.ConfigList.Configs {
@ -165,18 +148,18 @@ func InitServer() {
commonHandler = panicMiddleware(commonHandler)
server := http.Server{
Addr: fmt.Sprintf("%s:%s", listeningAddr, listeningPort),
Addr: fmt.Sprintf("%s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag),
Handler: commonHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
// Start node health checking and current block fetching
if enableHealthCheck {
go initHealthCheck(enableDebug)
if stateCLI.enableHealthCheckFlag {
go initHealthCheck(stateCLI.enableDebugFlag)
}
log.Printf("Starting server at %s:%s\n", listeningAddr, listeningPort)
log.Printf("Starting server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
err = server.ListenAndServe()
if err != nil {
log.Fatal(err)

Wyświetl plik

@ -4,9 +4,11 @@ Configurations for load balancer server.
package configs
import (
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"
)
@ -28,67 +30,43 @@ type NodeConfigList struct {
var ConfigList NodeConfigList
var MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_IPC_PORT = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_PORT")
var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT")
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER")
func checkEnvVarSet() {
if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" {
MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = "a.ethereum.moonstream.internal"
}
if MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR == "" {
MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = "b.ethereum.moonstream.internal"
}
if MOONSTREAM_NODE_POLYGON_A_IPC_ADDR == "" {
MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = "a.polygon.moonstream.internal"
}
if MOONSTREAM_NODE_POLYGON_B_IPC_ADDR == "" {
MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal"
}
if MOONSTREAM_CLIENT_ID_HEADER == "" {
MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id"
}
if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" {
log.Fatal("Some of environment variables not set")
if MOONSTREAM_NODES_SERVER_PORT == "" {
log.Fatal("Environment variable MOONSTREAM_NODES_SERVER_PORT not set")
}
}
// Return list of NodeConfig structures
func (nc *NodeConfigList) InitNodeConfigList() {
func (nc *NodeConfigList) InitNodeConfigList(configPath string) {
checkEnvVarSet()
// Define available blockchain nodes
blockchainConfigList := make([]BlockchainConfig, 0, 2)
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "ethereum",
IPs: []string{MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR},
Port: MOONSTREAM_NODE_ETHEREUM_IPC_PORT,
})
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "polygon",
IPs: []string{MOONSTREAM_NODE_POLYGON_A_IPC_ADDR, MOONSTREAM_NODE_POLYGON_B_IPC_ADDR},
Port: MOONSTREAM_NODE_POLYGON_IPC_PORT,
})
rawBytes, err := ioutil.ReadFile(configPath)
if err != nil {
log.Fatalf("Unable to read config file, %v", err)
}
text := string(rawBytes)
lines := strings.Split(text, "\n")
// Parse node addr, ip and blockchain
for _, b := range blockchainConfigList {
for _, nodeIP := range b.IPs {
port, err := strconv.ParseInt(b.Port, 0, 16)
// Define available blockchain nodes
for _, line := range lines {
fields := strings.Split(line, ",")
if len(fields) == 3 {
port, err := strconv.ParseInt(fields[2], 0, 16)
if err != nil {
log.Printf("Unable to parse port number: %s", b.Port)
log.Printf("Unable to parse port number, %v", err)
continue
}
nc.Configs = append(nc.Configs, NodeConfig{
Blockchain: b.Blockchain,
Addr: nodeIP,
Blockchain: fields[0],
Addr: fields[1],
Port: uint16(port),
})
}

Wyświetl plik

@ -1,3 +1,3 @@
package configs
var NODE_BALANCER_VERSION = "0.0.1"
var NB_VERSION = "0.0.2"

Wyświetl plik

@ -0,0 +1,10 @@
#!/usr/bin/env sh
# Compile application and run with provided arguments
set -e
PROGRAM_NAME="nodebalancer"
go build -o "$PROGRAM_NAME" .
./"$PROGRAM_NAME" "$@"

Wyświetl plik

@ -5,5 +5,5 @@ import (
)
func main() {
cmd.InitServer()
cmd.CLI()
}