From 1b2f60d97c04a49dd1b4df9816508c745506c86e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 12 Jul 2022 18:48:44 +0000 Subject: [PATCH] Removed status url, config is json, instead of status call latest --- .../cmd/nodebalancer/balancer.go | 76 ++++++++++------ .../cmd/nodebalancer/blockchain.go | 52 ----------- nodes/node_balancer/cmd/nodebalancer/cli.go | 68 ++++++++------ .../node_balancer/cmd/nodebalancer/clients.go | 6 +- .../cmd/nodebalancer/clients_test.go | 10 +-- .../nodebalancer/configs.go} | 90 ++++++++++++------- nodes/node_balancer/cmd/nodebalancer/db.go | 8 +- nodes/node_balancer/cmd/nodebalancer/main.go | 2 +- .../cmd/nodebalancer/middleware.go | 18 ++-- .../node_balancer/cmd/nodebalancer/routes.go | 8 +- .../node_balancer/cmd/nodebalancer/server.go | 81 ++++++++--------- .../node_balancer/cmd/nodebalancer/version.go | 3 + nodes/node_balancer/configs/version.go | 3 - nodes/node_balancer/sample.env | 1 - 14 files changed, 210 insertions(+), 216 deletions(-) rename nodes/node_balancer/{configs/settings.go => cmd/nodebalancer/configs.go} (59%) create mode 100644 nodes/node_balancer/cmd/nodebalancer/version.go delete mode 100644 nodes/node_balancer/configs/version.go diff --git a/nodes/node_balancer/cmd/nodebalancer/balancer.go b/nodes/node_balancer/cmd/nodebalancer/balancer.go index 17b6daf9..e1d1a30c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/balancer.go +++ b/nodes/node_balancer/cmd/nodebalancer/balancer.go @@ -4,17 +4,17 @@ Load balancer, based on https://github.com/kasvith/simplelb/ package main import ( + "bytes" "encoding/json" - "fmt" "io/ioutil" "log" "net/http" "net/http/httputil" "net/url" + "strconv" + "strings" "sync" "sync/atomic" - - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) // Main variable of pool of blockchains which contains pool of nodes @@ -23,10 +23,9 @@ var blockchainPool BlockchainPool // Node structure with // StatusURL for status server at node endpoint -// GethURL for geth/bor/etc node http.server endpoint +// Endpoint for geth/bor/etc node http.server endpoint type Node struct { - StatusURL *url.URL - GethURL *url.URL + Endpoint *url.URL Alive bool CurrentBlock uint64 @@ -49,8 +48,13 @@ type BlockchainPool struct { Blockchains []*NodePool } +// Node status response struct for HealthCheck +type NodeStatusResultResponse struct { + Number string `json:"number"` +} + type NodeStatusResponse struct { - CurrentBlock uint64 `json:"current_block"` + Result NodeStatusResultResponse `json:"result"` } // AddNode to the nodes pool @@ -153,11 +157,11 @@ func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { return nil } -// SetNodeStatus changes a status of a node by StatusURL or GethURL +// SetNodeStatus modify status of the node func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { for _, b := range bpool.Blockchains { for _, n := range b.Nodes { - if n.StatusURL.String() == url.String() || n.GethURL.String() == url.String() { + if n.Endpoint.String() == url.String() { n.SetAlive(alive) break } @@ -165,55 +169,77 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { } } -// StatusLog logs nodes statuses +// StatusLog logs node status // TODO(kompotkot): Print list of alive and dead nodes func (bpool *BlockchainPool) StatusLog() { for _, b := range bpool.Blockchains { for _, n := range b.Nodes { log.Printf( "Blockchain %s node %s is alive %t. Blockchain called %d times", - b.Blockchain, n.StatusURL, n.Alive, b.Current, + b.Blockchain, n.Endpoint.Host, n.Alive, b.Current, ) } } } -// HealthCheck fetch the node status and current block server +// HealthCheck fetch the node latest block func (bpool *BlockchainPool) HealthCheck() { for _, b := range bpool.Blockchains { for _, n := range b.Nodes { - n.SetAlive(false) - n.SetCurrentBlock(0) - - // Get response from node /ping endpoint - httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT} - resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL)) + httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} + resp, err := httpClient.Post( + n.Endpoint.String(), + "application/json", + bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), + ) if err != nil { - log.Printf("Unable to reach node: %s\n", n.StatusURL) + n.SetAlive(false) + n.SetCurrentBlock(0) + log.Printf("Unable to reach node: %s", n.Endpoint.Host) continue } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("Unable to parse response from node: %s\n", n.StatusURL) + n.SetAlive(false) + n.SetCurrentBlock(0) + log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) continue } var statusResponse NodeStatusResponse err = json.Unmarshal(body, &statusResponse) if err != nil { - log.Printf("Unable to read json response from node: %s\n", n.StatusURL) + n.SetAlive(false) + n.SetCurrentBlock(0) + log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) + continue + } + + blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) + blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) + if err != nil { + n.SetAlive(false) + n.SetCurrentBlock(0) + log.Printf("Unable to parse block number from hex to string, err: %v", err) continue } // Mark node in list of nodes as alive or not and update current block - n.SetAlive(true) - if statusResponse.CurrentBlock != 0 { - n.SetCurrentBlock(statusResponse.CurrentBlock) + var alive bool + if blockNumber != 0 { + alive = true + } else { + alive = false } + n.SetAlive(alive) + n.SetCurrentBlock(blockNumber) - log.Printf("Node %s is alive: %t with current block: %d blockchain called: %d times\n", n.StatusURL, true, statusResponse.CurrentBlock, b.Current) + log.Printf( + "Node %s is alive: %t with current block: %d blockchain called: %d times", + n.Endpoint.Host, alive, blockNumber, b.Current, + ) } } } diff --git a/nodes/node_balancer/cmd/nodebalancer/blockchain.go b/nodes/node_balancer/cmd/nodebalancer/blockchain.go index 4711ca28..40ef984c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/blockchain.go +++ b/nodes/node_balancer/cmd/nodebalancer/blockchain.go @@ -1,17 +1,6 @@ package main -import ( - "io/ioutil" - "log" - "strconv" - "strings" - - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" -) - var ( - nodeConfigs NodeConfigs - ALLOWED_METHODS = map[string]bool{ "eth_blockNumber": true, "eth_call": true, @@ -57,49 +46,8 @@ type JSONRPCRequest struct { ID uint64 `json:"id"` } -// Node conf type BlockchainConfig struct { Blockchain string IPs []string Port string } - -type NodeConfig struct { - Blockchain string - Addr string - Port uint16 -} - -type NodeConfigs struct { - NodeConfigs []NodeConfig -} - -// Return list of NodeConfig structures -func (nc *NodeConfigs) InitNodeConfigList(configPath string) { - configs.CheckEnvVarSet() - - rawBytes, err := ioutil.ReadFile(configPath) - if err != nil { - log.Fatalf("Unable to read config file, %v", err) - } - text := string(rawBytes) - lines := strings.Split(text, "\n") - - // Define available blockchain nodes - for _, line := range lines { - fields := strings.Split(line, ",") - if len(fields) == 3 { - port, err := strconv.ParseInt(fields[2], 0, 16) - if err != nil { - log.Printf("Unable to parse port number, %v", err) - continue - } - - nc.NodeConfigs = append(nc.NodeConfigs, NodeConfig{ - Blockchain: fields[0], - Addr: fields[1], - Port: uint16(port), - }) - } - } -} diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index 2a5bfe4b..23ccf9fc 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -4,13 +4,12 @@ import ( "encoding/json" "flag" "fmt" + "log" "os" "strings" bugout "github.com/bugout-dev/bugout-go/pkg" "github.com/google/uuid" - - "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) var ( @@ -138,9 +137,20 @@ func (s *StateCLI) checkRequirements() { } } - config := configs.GetConfigPath(s.configPathFlag) - if !configs.CheckPathExists(config.ConfigPath) { - configs.GenerateDefaultConfig(config) + // Load configuration + config, err := GetConfigPath(s.configPathFlag) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + if !config.ConfigExists { + if err := GenerateDefaultConfig(config); err != nil { + fmt.Println(err) + os.Exit(1) + } + } else { + log.Printf("Loaded configuration from %s", config.ConfigPath) } s.configPathFlag = config.ConfigPath } @@ -183,7 +193,7 @@ func (s *StateCLI) populateCLI() { s.usersCmd.IntVar(&s.offsetFlag, "offset", 0, "Result output offset") } -func CLI() { +func cli() { stateCLI.populateCLI() if len(os.Args) < 2 { stateCLI.usage() @@ -193,7 +203,7 @@ func CLI() { // Init bugout client bc, err := bugout.ClientFromEnv() if err != nil { - fmt.Printf("Unable to initialize bugout client %v", err) + fmt.Printf("Unable to initialize bugout client, err: %v\n", err) os.Exit(1) } bugoutClient = bc @@ -213,24 +223,24 @@ func CLI() { ExtendedMethods: stateCLI.extendedMethodsFlag, } _, err := bugoutClient.Brood.FindUser( - configs.NB_CONTROLLER_TOKEN, + NB_CONTROLLER_TOKEN, map[string]string{ "user_id": proposedUserAccess.UserID, - "application_id": configs.NB_APPLICATION_ID, + "application_id": NB_APPLICATION_ID, }, ) if err != nil { - fmt.Printf("User does not exists %v\n", err) + fmt.Printf("User does not exists, err: %v\n", err) os.Exit(1) } - resource, err := bugoutClient.Brood.CreateResource(configs.NB_CONTROLLER_TOKEN, configs.NB_APPLICATION_ID, proposedUserAccess) + resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedUserAccess) if err != nil { - fmt.Printf("Unable to create user access %v\n", err) + fmt.Printf("Unable to create user access, err: %v\n", err) os.Exit(1) } resource_data, err := json.Marshal(resource.ResourceData) if err != nil { - fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) + fmt.Printf("Unable to encode resource %s data interface to json, err: %v", resource.Id, err) os.Exit(1) } fmt.Println(string(resource_data)) @@ -251,31 +261,31 @@ func CLI() { queryParameters["access_id"] = stateCLI.accessIDFlag } resources, err := bugoutClient.Brood.GetResources( - configs.NB_CONTROLLER_TOKEN, - configs.NB_APPLICATION_ID, + NB_CONTROLLER_TOKEN, + NB_APPLICATION_ID, queryParameters, ) if err != nil { - fmt.Printf("Unable to get Bugout resources %v\n", err) + fmt.Printf("Unable to get Bugout resources, err: %v\n", err) os.Exit(1) } var userAccesses []ClientResourceData for _, resource := range resources.Resources { - deletedResource, err := bugoutClient.Brood.DeleteResource(configs.NB_CONTROLLER_TOKEN, resource.Id) + deletedResource, err := bugoutClient.Brood.DeleteResource(NB_CONTROLLER_TOKEN, resource.Id) if err != nil { - fmt.Printf("Unable to delete resource %s %v\n", resource.Id, err) + fmt.Printf("Unable to delete resource %s, err: %v\n", resource.Id, err) continue } resource_data, err := json.Marshal(deletedResource.ResourceData) if err != nil { - fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) + fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) continue } var userAccess ClientResourceData err = json.Unmarshal(resource_data, &userAccess) if err != nil { - fmt.Printf("Unable to decode resource %s data json to structure %v", resource.Id, err) + fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) continue } userAccesses = append(userAccesses, userAccess) @@ -283,7 +293,7 @@ func CLI() { userAccessesJson, err := json.Marshal(userAccesses) if err != nil { - fmt.Printf("Unable to marshal user access struct %v\n", err) + fmt.Printf("Unable to marshal user access struct, err: %v\n", err) os.Exit(1) } fmt.Println(string(userAccessesJson)) @@ -292,7 +302,7 @@ func CLI() { stateCLI.serverCmd.Parse(os.Args[2:]) stateCLI.checkRequirements() - configs.CheckEnvVarSet() + CheckEnvVarSet() Server() @@ -308,12 +318,12 @@ func CLI() { queryParameters["access_id"] = stateCLI.accessIDFlag } resources, err := bugoutClient.Brood.GetResources( - configs.NB_CONTROLLER_TOKEN, - configs.NB_APPLICATION_ID, + NB_CONTROLLER_TOKEN, + NB_APPLICATION_ID, queryParameters, ) if err != nil { - fmt.Printf("Unable to get Bugout resources %v\n", err) + fmt.Printf("Unable to get Bugout resources, err: %v\n", err) os.Exit(1) } @@ -331,20 +341,20 @@ func CLI() { for _, resource := range resources.Resources[offset:limit] { resource_data, err := json.Marshal(resource.ResourceData) if err != nil { - fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) + fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err) continue } var userAccess ClientResourceData err = json.Unmarshal(resource_data, &userAccess) if err != nil { - fmt.Printf("Unable to decode resource %s data json to structure %v", resource.Id, err) + fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err) continue } userAccesses = append(userAccesses, userAccess) } userAccessesJson, err := json.Marshal(userAccesses) if err != nil { - fmt.Printf("Unable to marshal user accesses struct %v\n", err) + fmt.Printf("Unable to marshal user accesses struct, err: %v\n", err) os.Exit(1) } fmt.Println(string(userAccessesJson)) @@ -353,7 +363,7 @@ func CLI() { stateCLI.versionCmd.Parse(os.Args[2:]) stateCLI.checkRequirements() - fmt.Printf("v%s\n", configs.NB_VERSION) + fmt.Printf("v%s\n", NB_VERSION) default: stateCLI.usage() diff --git a/nodes/node_balancer/cmd/nodebalancer/clients.go b/nodes/node_balancer/cmd/nodebalancer/clients.go index 94ea3331..3eaffd08 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients.go @@ -5,8 +5,6 @@ import ( "reflect" "sync" "time" - - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) var ( @@ -104,7 +102,7 @@ func (cpool *ClientPool) AddClientNode(id string, node *Node) { func (cpool *ClientPool) GetClientNode(id string) *Node { if cpool.Client[id] != nil { lastCallTs := cpool.Client[id].GetClientLastCallDiff() - if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { + if lastCallTs < NB_CLIENT_NODE_KEEP_ALIVE { cpool.Client[id].UpdateClientLastCall() return cpool.Client[id].Node } @@ -119,7 +117,7 @@ func (cpool *ClientPool) CleanInactiveClientNodes() int { cnt := 0 for id, client := range cpool.Client { lastCallTs := client.GetClientLastCallDiff() - if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { + if lastCallTs >= NB_CLIENT_NODE_KEEP_ALIVE { delete(cpool.Client, id) } else { cnt += 1 diff --git a/nodes/node_balancer/cmd/nodebalancer/clients_test.go b/nodes/node_balancer/cmd/nodebalancer/clients_test.go index 41a4d52d..c485ebf4 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients_test.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients_test.go @@ -4,8 +4,6 @@ import ( "reflect" "testing" "time" - - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) func TestAddClientNode(t *testing.T) { @@ -40,7 +38,7 @@ func TestGetClientNode(t *testing.T) { {map[string]*Client{}, "1", nil}, {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, - {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, + {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } for _, c := range cases { CreateClientPools() @@ -63,11 +61,11 @@ func TestCleanInactiveClientNodes(t *testing.T) { clients map[string]*Client expected string }{ - {map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, + {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, {map[string]*Client{"1": {LastCallTs: ts}}, "1"}, {map[string]*Client{ - "1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, - "2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, + "1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE}, + "2": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE - 10}, "3": {LastCallTs: ts}, }, "3"}, } diff --git a/nodes/node_balancer/configs/settings.go b/nodes/node_balancer/cmd/nodebalancer/configs.go similarity index 59% rename from nodes/node_balancer/configs/settings.go rename to nodes/node_balancer/cmd/nodebalancer/configs.go index 03b39d72..dc2fdebb 100644 --- a/nodes/node_balancer/configs/settings.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -1,10 +1,12 @@ /* Configurations for load balancer server. */ -package configs +package main import ( + "encoding/json" "fmt" + "io/ioutil" "log" "os" "path/filepath" @@ -43,8 +45,6 @@ var ( MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute ) -var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT") - func CheckEnvVarSet() { if NB_ACCESS_ID_HEADER == "" { NB_ACCESS_ID_HEADER = "x-node-balancer-access-id" @@ -52,14 +52,31 @@ func CheckEnvVarSet() { if NB_DATA_SOURCE_HEADER == "" { NB_DATA_SOURCE_HEADER = "x-node-balancer-data-source" } - - if MOONSTREAM_NODES_SERVER_PORT == "" { - fmt.Println("Environment variable MOONSTREAM_NODES_SERVER_PORT not set") - os.Exit(1) - } } -type Config struct { +// Nodes configuration +type NodeConfig struct { + Blockchain string `json:"blockchain"` + Endpoint string `json:"endpoint"` + + Internal bool `json:"internal"` +} + +func LoadConfig(configPath string) (*[]NodeConfig, error) { + rawBytes, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + nodeConfigs := &[]NodeConfig{} + err = json.Unmarshal(rawBytes, nodeConfigs) + if err != nil { + return nil, err + } + + return nodeConfigs, nil +} + +type ConfigPlacement struct { ConfigDirPath string ConfigDirExists bool @@ -67,28 +84,26 @@ type Config struct { ConfigExists bool } -func CheckPathExists(path string) bool { +func CheckPathExists(path string) (bool, error) { var exists = true _, err := os.Stat(path) if err != nil { if os.IsNotExist(err) { exists = false } else { - fmt.Println(err) - os.Exit(1) + return exists, fmt.Errorf("Error due checking file path exists, err: %v", err) } } - return exists + return exists, nil } -func GetConfigPath(providedPath string) *Config { +func GetConfigPath(providedPath string) (*ConfigPlacement, error) { var configDirPath, configPath string if providedPath == "" { homeDir, err := os.UserHomeDir() if err != nil { - fmt.Printf("Unable to find user home directory, %v", err) - os.Exit(1) + return nil, fmt.Errorf("Unable to find user home directory, %v", err) } configDirPath = fmt.Sprintf("%s/.nodebalancer", homeDir) configPath = fmt.Sprintf("%s/config.txt", configDirPath) @@ -97,35 +112,48 @@ func GetConfigPath(providedPath string) *Config { configDirPath = filepath.Dir(configPath) } - defaultConfig := &Config{ - ConfigDirPath: configDirPath, - ConfigDirExists: CheckPathExists(configDirPath), - - ConfigPath: configPath, - ConfigExists: CheckPathExists(configPath), + configDirPathExists, err := CheckPathExists(configDirPath) + if err != nil { + return nil, err + } + configPathExists, err := CheckPathExists(configPath) + if err != nil { + return nil, err } - return defaultConfig + config := &ConfigPlacement{ + ConfigDirPath: configDirPath, + ConfigDirExists: configDirPathExists, + + ConfigPath: configPath, + ConfigExists: configPathExists, + } + + return config, nil } -func GenerateDefaultConfig(config *Config) string { +func GenerateDefaultConfig(config *ConfigPlacement) error { if !config.ConfigDirExists { if err := os.MkdirAll(config.ConfigDirPath, os.ModePerm); err != nil { - fmt.Printf("Unable to create directory, %v", err) - os.Exit(1) + return fmt.Errorf("Unable to create directory, %v", err) } log.Printf("Config directory created at: %s", config.ConfigDirPath) } if !config.ConfigExists { - tempConfigB := []byte("ethereum,127.0.0.1,8545") - err := os.WriteFile(config.ConfigPath, tempConfigB, 0644) + tempConfig := []NodeConfig{ + {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545", Internal: true}, + } + tempConfigJson, err := json.Marshal(tempConfig) if err != nil { - fmt.Printf("Unable to create temp config file, %v", err) - os.Exit(1) + return fmt.Errorf("Unable to marshal configuration data, err: %v", err) + } + err = ioutil.WriteFile(config.ConfigPath, tempConfigJson, os.ModePerm) + if err != nil { + return fmt.Errorf("Unable to write default config to file %s, err: %v", config.ConfigPath, err) } log.Printf("Created default configuration at %s", config.ConfigPath) } - return config.ConfigPath + return nil } diff --git a/nodes/node_balancer/cmd/nodebalancer/db.go b/nodes/node_balancer/cmd/nodebalancer/db.go index 4ce99884..28bf3fc8 100644 --- a/nodes/node_balancer/cmd/nodebalancer/db.go +++ b/nodes/node_balancer/cmd/nodebalancer/db.go @@ -4,8 +4,6 @@ import ( "database/sql" "fmt" - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" - _ "github.com/lib/pq" ) @@ -19,18 +17,18 @@ type DatabaseClient struct { // Establish connection with database func InitDatabaseClient() error { - db, err := sql.Open("postgres", configs.MOONSTREAM_DB_URI_READ_ONLY) + db, err := sql.Open("postgres", MOONSTREAM_DB_URI_READ_ONLY) if err != nil { return fmt.Errorf("DSN parse error or another database initialization error: %v", err) } // Set the maximum number of concurrently idle connections, // by default sql.DB allows a maximum of 2 idle connections. - db.SetMaxIdleConns(configs.MOONSTREAM_DB_MAX_IDLE_CONNS) + db.SetMaxIdleConns(MOONSTREAM_DB_MAX_IDLE_CONNS) // Set the maximum lifetime of a connection. // Longer lifetime increase memory usage. - db.SetConnMaxLifetime(configs.MOONSTREAM_DB_CONN_MAX_LIFETIME) + db.SetConnMaxLifetime(MOONSTREAM_DB_CONN_MAX_LIFETIME) databaseClient = DatabaseClient{ Client: db, diff --git a/nodes/node_balancer/cmd/nodebalancer/main.go b/nodes/node_balancer/cmd/nodebalancer/main.go index f39b62eb..cee85c6c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/main.go +++ b/nodes/node_balancer/cmd/nodebalancer/main.go @@ -1,5 +1,5 @@ package main func main() { - CLI() + cli() } diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 5e33bade..da93a166 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -16,8 +16,6 @@ import ( "sync" "time" - "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" - humbug "github.com/bugout-dev/humbug/go/pkg" ) @@ -89,7 +87,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) { tsNow := time.Now().Unix() ac.mux.Lock() for aId, aData := range ac.accessIds { - if tsNow-aData.LastAccessTs > configs.NB_CACHE_ACCESS_ID_LIFETIME { + if tsNow-aData.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { delete(ac.accessIds, aId) removedAccessIds++ } else { @@ -101,7 +99,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) { } func initCacheCleaning(debug bool) { - t := time.NewTicker(configs.NB_CACHE_CLEANING_INTERVAL) + t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL) for { select { case <-t.C: @@ -118,7 +116,7 @@ func initCacheCleaning(debug bool) { func extractAccessID(r *http.Request) string { var accessID string - accessIDHeaders := r.Header[strings.Title(configs.NB_ACCESS_ID_HEADER)] + accessIDHeaders := r.Header[strings.Title(NB_ACCESS_ID_HEADER)] for _, h := range accessIDHeaders { accessID = h } @@ -137,7 +135,7 @@ func extractAccessID(r *http.Request) string { func extractDataSource(r *http.Request) string { dataSource := "database" - dataSources := r.Header[strings.Title(configs.NB_DATA_SOURCE_HEADER)] + dataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)] for _, h := range dataSources { dataSource = h } @@ -203,7 +201,7 @@ func logMiddleware(next http.Handler) http.Handler { var jsonrpcRequest JSONRPCRequest err = json.Unmarshal(body, &jsonrpcRequest) if err != nil { - log.Printf("Unable to parse body %v", err) + log.Printf("Unable to parse body, err: %v", err) } logStr += fmt.Sprintf(" %s", jsonrpcRequest.Method) } @@ -236,7 +234,7 @@ func accessMiddleware(next http.Handler) http.Handler { } // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources - if accessID == configs.NB_CONTROLLER_ACCESS_ID { + if accessID == NB_CONTROLLER_ACCESS_ID { if stateCLI.enableDebugFlag { log.Printf("Access id belongs to internal crawlers") } @@ -254,8 +252,8 @@ func accessMiddleware(next http.Handler) http.Handler { log.Printf("New access id, looking at Brood resources") } resources, err := bugoutClient.Brood.GetResources( - configs.NB_CONTROLLER_TOKEN, - configs.NB_APPLICATION_ID, + NB_CONTROLLER_TOKEN, + NB_APPLICATION_ID, map[string]string{"access_id": accessID}, ) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index c1deb560..4722533e 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -12,8 +12,6 @@ import ( "net/http" "strconv" "strings" - - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" ) type PingResponse struct { @@ -42,8 +40,8 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } attempts := GetAttemptsFromContext(r) - if attempts > configs.NB_CONNECTION_RETRIES { - log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path) + if attempts > NB_CONNECTION_RETRIES { + log.Printf("Max attempts reached from %s %s, terminating", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } @@ -147,7 +145,7 @@ func lbDatabaseHandler(w http.ResponseWriter, r *http.Request, blockchain string block, err := databaseClient.GetBlock(blockchain, blockNumber) if err != nil { - fmt.Printf("Unable to get block from database %v", err) + log.Printf("Unable to get block from database, err: %v", err) http.Error(w, fmt.Sprintf("no such block %v", blockNumber), http.StatusBadRequest) return } diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index 9869d72a..50230413 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -14,8 +14,6 @@ import ( "os" "time" - configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs" - humbug "github.com/bugout-dev/humbug/go/pkg" "github.com/google/uuid" ) @@ -29,7 +27,7 @@ var ( // initHealthCheck runs a routine for check status of the nodes every 5 seconds func initHealthCheck(debug bool) { - t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL) + t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: @@ -37,7 +35,7 @@ func initHealthCheck(debug bool) { ethereumClients := ethereumClientPool.CleanInactiveClientNodes() polygonClients := polygonClientPool.CleanInactiveClientNodes() xdaiClients := xdaiClientPool.CleanInactiveClientNodes() - log.Printf("Active etehereum clients: %d, polygon clients: %d, xdai clients: %d\n", ethereumClients, polygonClients, xdaiClients) + log.Printf("Active ethereum clients: %d, polygon clients: %d, xdai clients: %d", ethereumClients, polygonClients, xdaiClients) if debug { blockchainPool.StatusLog() } @@ -71,13 +69,13 @@ func GetRetryFromContext(r *http.Request) int { func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { retries := GetRetryFromContext(r) - if retries < configs.NB_CONNECTION_RETRIES { + if retries < NB_CONNECTION_RETRIES { log.Printf( - "An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n", - url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(), + "An error occurred while proxying to %s, number of retries: %d/%d, err: %v", + url, retries+1, NB_CONNECTION_RETRIES, e.Error(), ) select { - case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL): + case <-time.After(NB_CONNECTION_RETRIES_INTERVAL): ctx := context.WithValue(r.Context(), Retry, retries+1) proxy.ServeHTTP(w, r.WithContext(ctx)) } @@ -94,7 +92,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { // If the same request routing for few attempts with different nodes, increase the count // of attempts and send request to next peer attempts := GetAttemptsFromContext(r) - log.Printf("Attempting number: %d to fetch node %s\n", attempts, url) + log.Printf("Attempting number: %d to fetch node %s", attempts, url) ctx := context.WithValue(r.Context(), Attempts, attempts+1) lbHandler(w, r.WithContext(ctx)) } @@ -111,36 +109,36 @@ func Server() { var err error sessionID := uuid.New().String() consent := humbug.CreateHumbugConsent(humbug.True) - reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NB_TOKEN) + reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, HUMBUG_REPORTER_NB_TOKEN) if err != nil { - fmt.Printf("Invalid Humbug Crash configuration: %v", err) + fmt.Printf("Invalid Humbug Crash configuration, err: %v\n", err) os.Exit(1) } // Record system information reporter.Publish(humbug.SystemReport()) resources, err := bugoutClient.Brood.GetResources( - configs.NB_CONTROLLER_TOKEN, - configs.NB_APPLICATION_ID, - map[string]string{"access_id": configs.NB_CONTROLLER_ACCESS_ID}, + NB_CONTROLLER_TOKEN, + NB_APPLICATION_ID, + map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID}, ) if err != nil { - fmt.Printf("Unable to get user with provided access identifier %v", err) + fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err) os.Exit(1) } if len(resources.Resources) != 1 { - fmt.Printf("User with provided access identifier has wrong number of resources %v", err) + fmt.Printf("User with provided access identifier has wrong number of resources, err: %v\n", err) os.Exit(1) } resource_data, err := json.Marshal(resources.Resources[0].ResourceData) if err != nil { - fmt.Printf("Unable to encode resource data interface to json %v", err) + fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err) os.Exit(1) } var clientAccess ClientResourceData err = json.Unmarshal(resource_data, &clientAccess) if err != nil { - fmt.Printf("Unable to decode resource data json to structure %v", err) + fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err) os.Exit(1) } internalCrawlersAccess = ClientResourceData{ @@ -158,43 +156,38 @@ func Server() { err = InitDatabaseClient() if err != nil { - log.Printf("Unable to initialize database connection %v\n", err) + log.Printf("Unable to initialize database connection, err: %v", err) } else { - log.Printf("Connection with database established\n") + log.Printf("Connection with database established") } // Fill NodeConfigList with initial nodes from environment variables - nodeConfigs.InitNodeConfigList(stateCLI.configPathFlag) + nodeConfig, err := LoadConfig(stateCLI.configPathFlag) + if err != nil { + fmt.Println(err) + os.Exit(1) + } // Parse nodes and set list of proxies - for i, nodeConfig := range nodeConfigs.NodeConfigs { - gethUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", nodeConfig.Addr, nodeConfig.Port)) + for i, nodeConfig := range *nodeConfig { + + endpoint, err := url.Parse(nodeConfig.Endpoint) if err != nil { - fmt.Printf("Unable to parse gethUrl with addr: %s and port: %d\n", nodeConfig.Addr, nodeConfig.Port) - continue - } - statusUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT)) - if err != nil { - fmt.Printf("Unable to parse statusUrl with addr: %s and port: %s\n", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT) - continue + fmt.Println(err) + os.Exit(1) } - proxyToStatus := httputil.NewSingleHostReverseProxy(statusUrl) - proxyToGeth := httputil.NewSingleHostReverseProxy(gethUrl) - - proxyErrorHandler(proxyToStatus, statusUrl) - proxyErrorHandler(proxyToGeth, gethUrl) + proxyToEndpoint := httputil.NewSingleHostReverseProxy(endpoint) + proxyErrorHandler(proxyToEndpoint, endpoint) blockchainPool.AddNode(&Node{ - StatusURL: statusUrl, - GethURL: gethUrl, - Alive: true, - StatusReverseProxy: proxyToStatus, - GethReverseProxy: proxyToGeth, + Endpoint: endpoint, + Alive: true, + GethReverseProxy: proxyToEndpoint, }, nodeConfig.Blockchain) log.Printf( - "Added new %s proxy blockchain under index %d from config file with geth url: %s and status url: %s\n", - nodeConfig.Blockchain, i, gethUrl, statusUrl) + "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s", + nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host) } serveMux := http.NewServeMux() @@ -225,10 +218,10 @@ func Server() { // Start access id cache cleaning go initCacheCleaning(stateCLI.enableDebugFlag) - log.Printf("Starting node load balancer HTTP server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) + log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) err = server.ListenAndServe() if err != nil { - fmt.Printf("Failed to start server listener %v", err) + fmt.Printf("Failed to start server listener, err: %v\n", err) os.Exit(1) } } diff --git a/nodes/node_balancer/cmd/nodebalancer/version.go b/nodes/node_balancer/cmd/nodebalancer/version.go new file mode 100644 index 00000000..50254033 --- /dev/null +++ b/nodes/node_balancer/cmd/nodebalancer/version.go @@ -0,0 +1,3 @@ +package main + +var NB_VERSION = "0.1.1" diff --git a/nodes/node_balancer/configs/version.go b/nodes/node_balancer/configs/version.go deleted file mode 100644 index 477524d8..00000000 --- a/nodes/node_balancer/configs/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package configs - -var NB_VERSION = "0.1.0" diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index ce99f7ca..d1aea3b6 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -4,7 +4,6 @@ export NB_APPLICATION_ID="" export NB_CONTROLLER_TOKEN="" export NB_CONTROLLER_ACCESS_ID="" export MOONSTREAM_DB_URI="postgresql://:@:/" -export MOONSTREAM_NODES_SERVER_PORT="" # Error humbug reporter export HUMBUG_REPORTER_NODE_BALANCER_TOKEN=""