kopia lustrzana https://github.com/bugout-dev/moonstream
265 wiersze
7.9 KiB
Go
265 wiersze
7.9 KiB
Go
/*
|
|
Node load balancer API server initialization.
|
|
*/
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
humbug "github.com/bugout-dev/humbug/go/pkg"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
var (
|
|
internalUsageAccess ClientAccess
|
|
|
|
// Crash reporter
|
|
reporter *humbug.HumbugReporter
|
|
)
|
|
|
|
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
|
|
func initHealthCheck(debug bool) {
|
|
healthCheckInterval, convErr := strconv.Atoi(NB_HEALTH_CHECK_INTERVAL)
|
|
if convErr != nil {
|
|
healthCheckInterval = 30
|
|
}
|
|
t := time.NewTicker(time.Second * time.Duration(healthCheckInterval))
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
blockchainPool.HealthCheck()
|
|
logStr := "Client pool healthcheck."
|
|
for b := range supportedBlockchains {
|
|
cp := clientPool[b]
|
|
clients := cp.CleanInactiveClientNodes()
|
|
logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients)
|
|
}
|
|
log.Println(logStr)
|
|
if debug {
|
|
blockchainPool.StatusLog()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
Attempts int = iota
|
|
Retry
|
|
)
|
|
|
|
// GetAttemptsFromContext returns the attempts for request
|
|
func GetAttemptsFromContext(r *http.Request) int {
|
|
if attempts, ok := r.Context().Value(Attempts).(int); ok {
|
|
return attempts
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// GetRetryFromContext returns the retries for request
|
|
func GetRetryFromContext(r *http.Request) int {
|
|
if retry, ok := r.Context().Value(Retry).(int); ok {
|
|
return retry
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// Handle errors due calls to proxy endpoint
|
|
// Docs: https://pkg.go.dev/net/http/httputil#ReverseProxy
|
|
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
|
|
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
|
|
retries := GetRetryFromContext(r)
|
|
if retries < NB_CONNECTION_RETRIES {
|
|
log.Printf(
|
|
"An error occurred while proxying to %s, number of retries: %d/%d, err: %v",
|
|
url, retries+1, NB_CONNECTION_RETRIES, e.Error(),
|
|
)
|
|
select {
|
|
case <-time.After(NB_CONNECTION_RETRIES_INTERVAL):
|
|
ctx := context.WithValue(r.Context(), Retry, retries+1)
|
|
proxy.ServeHTTP(w, r.WithContext(ctx))
|
|
}
|
|
return
|
|
}
|
|
|
|
// After 3 retries, mark this backend as down
|
|
blockchainPool.SetNodeStatus(url, false)
|
|
|
|
// Set modified path back
|
|
// TODO(kompotkot): Try r.RequestURI instead of header
|
|
r.URL.Path = r.Header.Get("X-Origin-Path")
|
|
|
|
// If the same request routing for few attempts with different nodes, increase the count
|
|
// of attempts and send request to next peer
|
|
attempts := GetAttemptsFromContext(r)
|
|
log.Printf("Attempting number: %d to fetch node %s", attempts, url)
|
|
ctx := context.WithValue(r.Context(), Attempts, attempts+1)
|
|
lbHandler(w, r.WithContext(ctx))
|
|
}
|
|
}
|
|
|
|
func Server() {
|
|
// Create Access ID cache
|
|
CreateAccessCache()
|
|
|
|
// Configure Humbug reporter to handle errors
|
|
var err error
|
|
sessionID := uuid.New().String()
|
|
consent := humbug.CreateHumbugConsent(humbug.True)
|
|
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, HUMBUG_REPORTER_NB_TOKEN)
|
|
if err != nil {
|
|
fmt.Printf("Invalid Humbug Crash configuration, err: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
// Record system information
|
|
reporter.Publish(humbug.SystemReport())
|
|
|
|
// Fetch access id for internal usage (crawlers, infrastructure, etc)
|
|
resources, err := bugoutClient.Brood.GetResources(
|
|
NB_CONTROLLER_TOKEN,
|
|
MOONSTREAM_APPLICATION_ID,
|
|
map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID},
|
|
)
|
|
if err != nil {
|
|
fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if len(resources.Resources) == 1 {
|
|
resourceData, err := json.Marshal(resources.Resources[0].ResourceData)
|
|
if err != nil {
|
|
fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
var clientResourceData ClientResourceData
|
|
err = json.Unmarshal(resourceData, &clientResourceData)
|
|
if err != nil {
|
|
fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
internalUsageAccess = ClientAccess{
|
|
ClientResourceData: ClientResourceData{
|
|
UserID: clientResourceData.UserID,
|
|
AccessID: clientResourceData.AccessID,
|
|
Name: clientResourceData.Name,
|
|
Description: clientResourceData.Description,
|
|
BlockchainAccess: clientResourceData.BlockchainAccess,
|
|
ExtendedMethods: clientResourceData.ExtendedMethods,
|
|
},
|
|
}
|
|
log.Printf(
|
|
"Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t",
|
|
resources.Resources[0].Id, clientResourceData.BlockchainAccess, clientResourceData.ExtendedMethods,
|
|
)
|
|
|
|
} else if len(resources.Resources) == 0 {
|
|
internalUsageAccess = ClientAccess{
|
|
ClientResourceData: ClientResourceData{
|
|
UserID: "rnd-user-id",
|
|
AccessID: NB_CONTROLLER_ACCESS_ID,
|
|
Name: "rnd-name",
|
|
Description: "Randomly generated",
|
|
BlockchainAccess: true,
|
|
ExtendedMethods: true,
|
|
},
|
|
}
|
|
fmt.Printf("There are no provided NB_CONTROLLER_ACCESS_ID records in Brood resources. Using provided with environment variable or randomly generated\n")
|
|
} else {
|
|
fmt.Printf("User with provided access identifier has wrong number of resources: %d\n", len(resources.Resources))
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Fill NodeConfigList with initial nodes from environment variables
|
|
err = LoadConfig(stateCLI.configPathFlag)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(1)
|
|
}
|
|
supportedBlockchains = make(map[string]bool)
|
|
|
|
// Parse nodes and set list of proxies
|
|
for i, nodeConfig := range nodeConfigs {
|
|
endpoint, err := url.Parse(nodeConfig.Endpoint)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Append to supported blockchain set
|
|
supportedBlockchains[nodeConfig.Blockchain] = true
|
|
|
|
proxyToEndpoint := httputil.NewSingleHostReverseProxy(endpoint)
|
|
// If required detailed timeout configuration, define node.GethReverseProxy.Transport = &http.Transport{}
|
|
// as modified structure of DefaultTransport net/http/transport/DefaultTransport
|
|
director := proxyToEndpoint.Director
|
|
proxyToEndpoint.Director = func(r *http.Request) {
|
|
director(r)
|
|
// Overwrite Query and Headers to not bypass nodebalancer Query and Headers
|
|
r.URL.RawQuery = ""
|
|
r.Header.Del(strings.Title(NB_ACCESS_ID_HEADER))
|
|
r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER))
|
|
|
|
r.URL.Scheme = endpoint.Scheme
|
|
r.URL.Host = endpoint.Host
|
|
r.URL.Path = endpoint.Path
|
|
|
|
// Change r.Host from nodebalancer's to end host so TLS check will be passed
|
|
r.Host = r.URL.Host
|
|
}
|
|
proxyErrorHandler(proxyToEndpoint, endpoint)
|
|
|
|
blockchainPool.AddNode(&Node{
|
|
Endpoint: endpoint,
|
|
Alive: true,
|
|
GethReverseProxy: proxyToEndpoint,
|
|
}, nodeConfig.Blockchain)
|
|
log.Printf(
|
|
"Added new %s proxy blockchain under index %d from config file with geth url: %s://%s",
|
|
nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host)
|
|
}
|
|
|
|
// Generate map of clients
|
|
CreateClientPools()
|
|
|
|
serveMux := http.NewServeMux()
|
|
serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler)))
|
|
log.Println("Authentication middleware enabled")
|
|
serveMux.HandleFunc("/ping", pingRoute)
|
|
|
|
// Set common middlewares, from bottom to top
|
|
commonHandler := corsMiddleware(serveMux)
|
|
commonHandler = logMiddleware(commonHandler)
|
|
commonHandler = panicMiddleware(commonHandler)
|
|
|
|
server := http.Server{
|
|
Addr: fmt.Sprintf("%s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag),
|
|
Handler: commonHandler,
|
|
ReadTimeout: 40 * time.Second,
|
|
WriteTimeout: 40 * time.Second,
|
|
}
|
|
|
|
// Start node health checking and current block fetching
|
|
blockchainPool.HealthCheck()
|
|
if stateCLI.enableHealthCheckFlag {
|
|
go initHealthCheck(stateCLI.enableDebugFlag)
|
|
}
|
|
|
|
// Start access id cache cleaning
|
|
go initCacheCleaning(stateCLI.enableDebugFlag)
|
|
|
|
log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
|
|
err = server.ListenAndServe()
|
|
if err != nil {
|
|
fmt.Printf("Failed to start server listener, err: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|