kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #645 from bugout-dev/nb-blockchains-from-config
Generate pool of available blockchains from configpull/646/head
commit
4b47250849
|
@ -1,16 +1,13 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ethereumClientPool ClientPool
|
||||
polygonClientPool ClientPool
|
||||
xdaiClientPool ClientPool
|
||||
clientPool map[string]ClientPool
|
||||
)
|
||||
|
||||
// Structure to define user access according with Brood resources
|
||||
|
@ -43,24 +40,27 @@ type ClientPool struct {
|
|||
|
||||
// Generate pools for clients for different blockchains
|
||||
func CreateClientPools() {
|
||||
ethereumClientPool.Client = make(map[string]*Client)
|
||||
polygonClientPool.Client = make(map[string]*Client)
|
||||
xdaiClientPool.Client = make(map[string]*Client)
|
||||
clientPool = make(map[string]ClientPool)
|
||||
|
||||
for b := range configBlockchains {
|
||||
clientPool[b] = ClientPool{}
|
||||
if cp, ok := clientPool[b]; ok {
|
||||
cp.Client = make(map[string]*Client)
|
||||
clientPool[b] = cp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return client pool corresponding to provided blockchain
|
||||
func GetClientPool(blockchain string) (*ClientPool, error) {
|
||||
func GetClientPool(blockchain string) *ClientPool {
|
||||
var cpool *ClientPool
|
||||
if blockchain == "ethereum" {
|
||||
cpool = ðereumClientPool
|
||||
} else if blockchain == "polygon" {
|
||||
cpool = &polygonClientPool
|
||||
} else if blockchain == "xdai" {
|
||||
cpool = &xdaiClientPool
|
||||
} else {
|
||||
return nil, errors.New("Unsupported blockchain provided")
|
||||
for b := range configBlockchains {
|
||||
if b == blockchain {
|
||||
c := clientPool[blockchain]
|
||||
cpool = &c
|
||||
}
|
||||
}
|
||||
return cpool, nil
|
||||
return cpool
|
||||
}
|
||||
|
||||
// Updates client last appeal to node
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
// TODO(kompotkot): Re-write tests for client
|
||||
package main
|
||||
|
||||
import (
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
nodeConfigs []NodeConfig
|
||||
|
||||
// Bugout and application configuration
|
||||
BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL")
|
||||
BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5
|
||||
|
@ -62,18 +64,18 @@ type NodeConfig struct {
|
|||
Endpoint string `json:"endpoint"`
|
||||
}
|
||||
|
||||
func LoadConfig(configPath string) (*[]NodeConfig, error) {
|
||||
func LoadConfig(configPath string) error {
|
||||
rawBytes, err := ioutil.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
nodeConfigs := &[]NodeConfig{}
|
||||
err = json.Unmarshal(rawBytes, nodeConfigs)
|
||||
nodeConfigsTemp := &[]NodeConfig{}
|
||||
err = json.Unmarshal(rawBytes, nodeConfigsTemp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
return nodeConfigs, nil
|
||||
nodeConfigs = *nodeConfigsTemp
|
||||
return nil
|
||||
}
|
||||
|
||||
type ConfigPlacement struct {
|
||||
|
|
|
@ -25,11 +25,6 @@ func pingRoute(w http.ResponseWriter, r *http.Request) {
|
|||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
func debugRoute(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("Clients: %v", ethereumClientPool)
|
||||
return
|
||||
}
|
||||
|
||||
// lbHandler load balances the incoming requests to nodes
|
||||
func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||
currentClientAccessRaw := r.Context().Value("currentClientAccess")
|
||||
|
@ -47,25 +42,20 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
var blockchain string
|
||||
switch {
|
||||
case strings.HasPrefix(r.URL.Path, "/nb/ethereum"):
|
||||
blockchain = "ethereum"
|
||||
case strings.HasPrefix(r.URL.Path, "/nb/polygon"):
|
||||
blockchain = "polygon"
|
||||
case strings.HasPrefix(r.URL.Path, "/nb/xdai"):
|
||||
blockchain = "xdai"
|
||||
default:
|
||||
for b := range configBlockchains {
|
||||
if strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/", b)) {
|
||||
blockchain = b
|
||||
break
|
||||
}
|
||||
}
|
||||
if blockchain == "" {
|
||||
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Chose one node
|
||||
var node *Node
|
||||
cpool, err := GetClientPool(blockchain)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
cpool := GetClientPool(blockchain)
|
||||
node = cpool.GetClientNode(currentClientAccess.AccessID)
|
||||
if node == nil {
|
||||
node = blockchainPool.GetNextNode(blockchain)
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
var (
|
||||
internalCrawlersAccess ClientResourceData
|
||||
|
||||
configBlockchains map[string]bool
|
||||
|
||||
// Crash reporter
|
||||
reporter *humbug.HumbugReporter
|
||||
)
|
||||
|
@ -33,10 +35,13 @@ func initHealthCheck(debug bool) {
|
|||
select {
|
||||
case <-t.C:
|
||||
blockchainPool.HealthCheck()
|
||||
ethereumClients := ethereumClientPool.CleanInactiveClientNodes()
|
||||
polygonClients := polygonClientPool.CleanInactiveClientNodes()
|
||||
xdaiClients := xdaiClientPool.CleanInactiveClientNodes()
|
||||
log.Printf("Active ethereum clients: %d, polygon clients: %d, xdai clients: %d", ethereumClients, polygonClients, xdaiClients)
|
||||
logStr := "Client pool healthcheck."
|
||||
for b := range configBlockchains {
|
||||
cp := clientPool[b]
|
||||
clients := cp.CleanInactiveClientNodes()
|
||||
logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients)
|
||||
}
|
||||
log.Println(logStr)
|
||||
if debug {
|
||||
blockchainPool.StatusLog()
|
||||
}
|
||||
|
@ -100,9 +105,6 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
|
|||
}
|
||||
|
||||
func Server() {
|
||||
// Generate map of clients
|
||||
CreateClientPools()
|
||||
|
||||
// Create Access ID cache
|
||||
CreateAccessCache()
|
||||
|
||||
|
@ -163,20 +165,24 @@ func Server() {
|
|||
}
|
||||
|
||||
// Fill NodeConfigList with initial nodes from environment variables
|
||||
nodeConfig, err := LoadConfig(stateCLI.configPathFlag)
|
||||
err = LoadConfig(stateCLI.configPathFlag)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
configBlockchains = make(map[string]bool)
|
||||
|
||||
// Parse nodes and set list of proxies
|
||||
for i, nodeConfig := range *nodeConfig {
|
||||
for i, nodeConfig := range nodeConfigs {
|
||||
endpoint, err := url.Parse(nodeConfig.Endpoint)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Append to supported blockchain set
|
||||
configBlockchains[nodeConfig.Blockchain] = true
|
||||
|
||||
proxyToEndpoint := httputil.NewSingleHostReverseProxy(endpoint)
|
||||
// If required detailed timeout configuration, define node.GethReverseProxy.Transport = &http.Transport{}
|
||||
// as modified structure of DefaultTransport net/http/transport/DefaultTransport
|
||||
|
@ -202,12 +208,12 @@ func Server() {
|
|||
nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host)
|
||||
}
|
||||
|
||||
// Generate map of clients
|
||||
CreateClientPools()
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler)))
|
||||
log.Println("Authentication middleware enabled")
|
||||
if stateCLI.enableDebugFlag {
|
||||
serveMux.HandleFunc("/debug", debugRoute)
|
||||
}
|
||||
serveMux.HandleFunc("/ping", pingRoute)
|
||||
|
||||
// Set common middlewares, from bottom to top
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
package main
|
||||
|
||||
var NB_VERSION = "0.1.1"
|
||||
var NB_VERSION = "0.2.1"
|
||||
|
|
Ładowanie…
Reference in New Issue