kopia lustrzana https://github.com/bugout-dev/moonstream
Working example with explicit check of internal crawlers access id
rodzic
a68cc186dc
commit
e760416d3f
|
@ -4,7 +4,6 @@ import (
|
|||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
|
@ -15,6 +14,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// Storing CLI definitions at server startup
|
||||
stateCLI StateCLI
|
||||
|
||||
bugoutClient bugout.BugoutClient
|
||||
|
@ -52,9 +52,8 @@ type StateCLI struct {
|
|||
extendedMethodsFlag bool
|
||||
|
||||
// Server flags
|
||||
listeningAddrFlag string
|
||||
listeningPortFlag string
|
||||
|
||||
listeningAddrFlag string
|
||||
listeningPortFlag string
|
||||
enableHealthCheckFlag bool
|
||||
enableDebugFlag bool
|
||||
|
||||
|
@ -63,10 +62,6 @@ type StateCLI struct {
|
|||
offsetFlag int
|
||||
}
|
||||
|
||||
type PingResponse struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type UserAccess struct {
|
||||
UserID string `json:"user_id"`
|
||||
AccessID string `json:"access_id"`
|
||||
|
@ -90,6 +85,7 @@ subcommands:
|
|||
`, s.addAccessCmd.Name(), s.deleteAccessCmd.Name(), s.serverCmd.Name(), s.usersCmd.Name(), s.versionCmd.Name())
|
||||
}
|
||||
|
||||
// Check if required flags are set
|
||||
func (s *StateCLI) checkRequirements() {
|
||||
if s.helpFlag {
|
||||
switch {
|
||||
|
@ -146,31 +142,11 @@ func (s *StateCLI) checkRequirements() {
|
|||
s.usersCmd.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
if s.configPathFlag == "" {
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to find user home directory, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
configDirPath := fmt.Sprintf("%s/.nodebalancer", homeDir)
|
||||
configPath := fmt.Sprintf("%s/config.txt", configDirPath)
|
||||
|
||||
err = os.MkdirAll(configDirPath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to create directory, %v", err)
|
||||
}
|
||||
|
||||
_, err = os.Stat(configPath)
|
||||
if err != nil {
|
||||
tempConfigB := []byte("ethereum,http://127.0.0.1,8545")
|
||||
err = os.WriteFile(configPath, tempConfigB, 0644)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to write config, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.configPathFlag = configPath
|
||||
}
|
||||
if s.configPathFlag == "" {
|
||||
configPath := configs.GenerateDefaultConfig()
|
||||
s.configPathFlag = configPath
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,13 +173,13 @@ func (s *StateCLI) populateCLI() {
|
|||
// Add user access subcommand flag pointers
|
||||
s.addAccessCmd.StringVar(&s.accessNameFlag, "name", "", "Name of access")
|
||||
s.addAccessCmd.StringVar(&s.accessDescriptionFlag, "description", "", "Description of access")
|
||||
s.addAccessCmd.BoolVar(&s.blockchainAccessFlag, "blockchain-access", false, "Provide if allow to access blockchain nodes")
|
||||
s.addAccessCmd.BoolVar(&s.blockchainAccessFlag, "blockchain-access", false, "Provide if allow direct access to blockchain nodes")
|
||||
s.addAccessCmd.BoolVar(&s.extendedMethodsFlag, "extended-methods", false, "Provide to be able to execute not whitelisted methods")
|
||||
|
||||
// Server subcommand flag pointers
|
||||
s.serverCmd.StringVar(&s.listeningAddrFlag, "host", "127.0.0.1", "Server listening address")
|
||||
s.serverCmd.StringVar(&s.listeningPortFlag, "port", "8544", "Server listening port")
|
||||
s.serverCmd.BoolVar(&s.enableHealthCheckFlag, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
|
||||
s.serverCmd.BoolVar(&s.enableHealthCheckFlag, "healthcheck", false, "To enable healthcheck set healthcheck flag")
|
||||
s.serverCmd.BoolVar(&s.enableDebugFlag, "debug", false, "To enable debug mode with extended log set debug flag")
|
||||
|
||||
// Users list subcommand flag pointers
|
||||
|
@ -218,6 +194,14 @@ func CLI() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Init bugout client
|
||||
bc, err := bugout.ClientFromEnv()
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to initialize bugout client %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
bugoutClient = bc
|
||||
|
||||
// Parse subcommands and appropriate FlagSet
|
||||
switch os.Args[1] {
|
||||
case "add-access":
|
||||
|
@ -308,6 +292,8 @@ func CLI() {
|
|||
stateCLI.serverCmd.Parse(os.Args[2:])
|
||||
stateCLI.checkRequirements()
|
||||
|
||||
configs.CheckEnvVarSet()
|
||||
|
||||
Server()
|
||||
|
||||
case "users":
|
||||
|
@ -374,15 +360,3 @@ func CLI() {
|
|||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
configs.CheckEnvVarSet()
|
||||
|
||||
// Init bugout client
|
||||
bc, err := bugout.ClientFromEnv()
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to initialize bugout client %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
bugoutClient = bc
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
func extractAccessID(r *http.Request) string {
|
||||
var accessID string
|
||||
|
||||
accessIDHeaders := r.Header[configs.NB_ACCESS_ID_HEADER]
|
||||
accessIDHeaders := r.Header[strings.Title(configs.NB_ACCESS_ID_HEADER)]
|
||||
for _, h := range accessIDHeaders {
|
||||
accessID = h
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ func extractAccessID(r *http.Request) string {
|
|||
func extractDataSource(r *http.Request) string {
|
||||
dataSource := "database"
|
||||
|
||||
dataSources := r.Header[configs.NB_DATA_SOURCE_HEADER]
|
||||
dataSources := r.Header[strings.Title(configs.NB_DATA_SOURCE_HEADER)]
|
||||
for _, h := range dataSources {
|
||||
dataSource = h
|
||||
}
|
||||
|
@ -134,9 +134,9 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
return
|
||||
}
|
||||
|
||||
// If access id does not belong to controller, then find it in Bugout resources
|
||||
// If access id does not belong to internal crawlers, then find it in Bugout resources
|
||||
if accessID == configs.NB_CONTROLLER_ACCESS_ID {
|
||||
currentUserAccess = controllerUserAccess
|
||||
currentUserAccess = internalCrawlersAccess
|
||||
currentUserAccess.dataSource = dataSource
|
||||
} else {
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
|
|
|
@ -16,6 +16,10 @@ import (
|
|||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
type PingResponse struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// pingRoute response with status of load balancer server itself
|
||||
func pingRoute(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
|
@ -20,7 +21,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
controllerUserAccess UserAccess
|
||||
internalCrawlersAccess UserAccess
|
||||
|
||||
// Crash reporter
|
||||
reporter *humbug.HumbugReporter
|
||||
|
@ -109,12 +110,12 @@ func Server() {
|
|||
consent := humbug.CreateHumbugConsent(humbug.True)
|
||||
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NB_TOKEN)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Invalid Humbug Crash configuration: %s", err.Error()))
|
||||
fmt.Printf("Invalid Humbug Crash configuration: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Record system information
|
||||
reporter.Publish(humbug.SystemReport())
|
||||
|
||||
// TODO(kompotkot): Remove, make it work without brood for internal crawlers
|
||||
resources, err := bugoutClient.Brood.GetResources(
|
||||
configs.NB_CONTROLLER_TOKEN,
|
||||
configs.NB_APPLICATION_ID,
|
||||
|
@ -122,20 +123,24 @@ func Server() {
|
|||
)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to get user with provided access identifier %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if len(resources.Resources) == 0 {
|
||||
fmt.Printf("User with provided access identifier not found %v", err)
|
||||
if len(resources.Resources) != 1 {
|
||||
fmt.Printf("User with provided access identifier has wrong number of resources %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
resource_data, err := json.Marshal(resources.Resources[0].ResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to encode resource data interface to json %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var userAccess UserAccess
|
||||
err = json.Unmarshal(resource_data, &userAccess)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource data json to structure %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
controllerUserAccess = UserAccess{
|
||||
internalCrawlersAccess = UserAccess{
|
||||
UserID: userAccess.UserID,
|
||||
AccessID: userAccess.AccessID,
|
||||
Name: userAccess.Name,
|
||||
|
@ -143,10 +148,16 @@ func Server() {
|
|||
BlockchainAccess: userAccess.BlockchainAccess,
|
||||
ExtendedMethods: userAccess.ExtendedMethods,
|
||||
}
|
||||
log.Printf(
|
||||
"Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t",
|
||||
resources.Resources[0].Id, userAccess.BlockchainAccess, userAccess.ExtendedMethods,
|
||||
)
|
||||
|
||||
err = InitDatabaseClient()
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to initialize database connection %v", err)
|
||||
log.Printf("Unable to initialize database connection %v\n", err)
|
||||
} else {
|
||||
log.Printf("Connection with database established\n")
|
||||
}
|
||||
|
||||
// Fill NodeConfigList with initial nodes from environment variables
|
||||
|
@ -156,11 +167,13 @@ func Server() {
|
|||
for i, nodeConfig := range nodeConfigs.NodeConfigs {
|
||||
gethUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", nodeConfig.Addr, nodeConfig.Port))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
fmt.Printf("Unable to parse gethUrl with addr: %s and port: %d\n", nodeConfig.Addr, nodeConfig.Port)
|
||||
continue
|
||||
}
|
||||
statusUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
fmt.Printf("Unable to parse statusUrl with addr: %s and port: %s\n", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT)
|
||||
continue
|
||||
}
|
||||
|
||||
proxyToStatus := httputil.NewSingleHostReverseProxy(statusUrl)
|
||||
|
@ -177,12 +190,13 @@ func Server() {
|
|||
GethReverseProxy: proxyToGeth,
|
||||
}, nodeConfig.Blockchain)
|
||||
log.Printf(
|
||||
"Added new %s proxy %d with geth url: %s and status url: %s\n",
|
||||
"Added new %s proxy blockchain under index %d from config file with geth url: %s and status url: %s\n",
|
||||
nodeConfig.Blockchain, i, gethUrl, statusUrl)
|
||||
}
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler)))
|
||||
log.Println("Authentication middleware enabled")
|
||||
serveMux.HandleFunc("/ping", pingRoute)
|
||||
|
||||
// Set common middlewares, from bottom to top
|
||||
|
@ -202,9 +216,10 @@ func Server() {
|
|||
go initHealthCheck(stateCLI.enableDebugFlag)
|
||||
}
|
||||
|
||||
log.Printf("Starting server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
|
||||
log.Printf("Starting node load balancer HTTP server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
|
||||
err = server.ListenAndServe()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
fmt.Printf("Failed to start server listener %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ Configurations for load balancer server.
|
|||
package configs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
@ -38,14 +39,47 @@ var (
|
|||
)
|
||||
|
||||
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
|
||||
var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER")
|
||||
|
||||
func CheckEnvVarSet() {
|
||||
if MOONSTREAM_CLIENT_ID_HEADER == "" {
|
||||
MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id"
|
||||
if NB_ACCESS_ID_HEADER == "" {
|
||||
NB_ACCESS_ID_HEADER = "x-node-balancer-access-id"
|
||||
}
|
||||
if NB_DATA_SOURCE_HEADER == "" {
|
||||
NB_DATA_SOURCE_HEADER = "x-node-balancer-data-source"
|
||||
}
|
||||
|
||||
if MOONSTREAM_NODES_SERVER_PORT == "" {
|
||||
log.Fatal("Environment variable MOONSTREAM_NODES_SERVER_PORT not set")
|
||||
fmt.Println("Environment variable MOONSTREAM_NODES_SERVER_PORT not set")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateDefaultConfig() string {
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to find user home directory, %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
configDirPath := fmt.Sprintf("%s/.nodebalancer", homeDir)
|
||||
configPath := fmt.Sprintf("%s/config.txt", configDirPath)
|
||||
|
||||
err = os.MkdirAll(configDirPath, os.ModePerm)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to create directory, %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
_, err = os.Stat(configPath)
|
||||
if err != nil {
|
||||
tempConfigB := []byte("ethereum,127.0.0.1,8545")
|
||||
err = os.WriteFile(configPath, tempConfigB, 0644)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to create directory, %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Printf("Config directory were not found, created default configuration at %s", configPath)
|
||||
}
|
||||
|
||||
return configPath
|
||||
}
|
||||
|
|
|
@ -3,18 +3,8 @@ export BUGOUT_AUTH_URL="https://auth.bugout.dev"
|
|||
export NB_APPLICATION_ID="<application_id_to_controll_access>"
|
||||
export NB_CONTROLLER_TOKEN="<token_of_controller_user>"
|
||||
export NB_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_crawlers>"
|
||||
|
||||
# Database variables
|
||||
export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"
|
||||
|
||||
# Nodes
|
||||
export MOONSTREAM_NODES_SERVER_PORT="<node_status_server_port>"
|
||||
# Ethereum nodes depends variables
|
||||
export MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR="127.0.0.1"
|
||||
export MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR="127.0.0.2"
|
||||
# Polygon nodes depends variables
|
||||
export MOONSTREAM_NODE_POLYGON_A_IPC_ADDR="127.0.0.1"
|
||||
export MOONSTREAM_NODE_POLYGON_B_IPC_ADDR="127.0.0.2"
|
||||
|
||||
# Error humbug reporter
|
||||
export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>"
|
||||
|
|
Ładowanie…
Reference in New Issue