Get block and uth workflow

pull/559/head
kompotkot 2022-03-16 14:49:39 +00:00
rodzic 06d16457a0
commit 7bd293d11e
11 zmienionych plików z 420 dodań i 48 usunięć

Wyświetl plik

@ -0,0 +1,92 @@
package cmd
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
var (
bugoutClient BugoutClient
)
type BugoutClient struct {
Client http.Client
AuthURL string
}
// Initialize Bugout http client
func InitBugoutClient() {
client := http.Client{Timeout: configs.BUGOUT_AUTH_CALL_TIMEOUT}
bugoutClient = BugoutClient{
Client: client,
AuthURL: configs.BUGOUT_AUTH_URL,
}
}
// Get Bugout user
func (bc *BugoutClient) GetUser(token string) (BugoutUserResponse, error) {
url := fmt.Sprintf("%s/user", configs.BUGOUT_AUTH_URL)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return BugoutUserResponse{}, err
}
req.Header = http.Header{
"Authorization": []string{fmt.Sprintf("Bearer %s", token)},
}
resp, err := bc.Client.Do(req)
if err != nil {
return BugoutUserResponse{}, err
}
defer resp.Body.Close()
// Parse response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return BugoutUserResponse{}, err
}
var userResponse BugoutUserResponse
err = json.Unmarshal(body, &userResponse)
if err != nil {
return BugoutUserResponse{}, err
}
return userResponse, nil
}
// Get Bugout resources
func (bc *BugoutClient) GetResources(token string, userID string) (BugoutResourcesResponse, error) {
url := fmt.Sprintf("%s/resources?application_id=%s", configs.BUGOUT_AUTH_URL, configs.BUGOUT_NODE_BALANCER_APPLICATION_ID)
if userID != "" {
url += fmt.Sprintf("&user_id=%s", userID)
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return BugoutResourcesResponse{}, err
}
req.Header = http.Header{
"Authorization": []string{fmt.Sprintf("Bearer %s", token)},
}
resp, err := bc.Client.Do(req)
if err != nil {
return BugoutResourcesResponse{}, err
}
defer resp.Body.Close()
// Parse response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return BugoutResourcesResponse{}, err
}
var resourcesResponse BugoutResourcesResponse
err = json.Unmarshal(body, &resourcesResponse)
if err != nil {
return BugoutResourcesResponse{}, err
}
return resourcesResponse, nil
}

Wyświetl plik

@ -0,0 +1,78 @@
package cmd
import (
"encoding/json"
"flag"
"fmt"
"os"
"github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
var (
stateCLI StateCLI
)
// Command Line Interface state
type StateCLI struct {
serverCmd *flag.FlagSet
clientsCmd *flag.FlagSet
// Common flags
showVersion bool
// Server flags
listeningAddr string
listeningPort string
enableHealthCheck bool
enableDebug bool
}
func (s *StateCLI) populateCLI() {
// Subcommands setup
s.serverCmd = flag.NewFlagSet("server", flag.ExitOnError)
s.clientsCmd = flag.NewFlagSet("clients", flag.ExitOnError)
// Server subcommand flag pointers
s.serverCmd.StringVar(&s.listeningAddr, "host", "127.0.0.1", "Server listening address")
s.serverCmd.StringVar(&s.listeningPort, "port", "8544", "Server listening port")
s.serverCmd.BoolVar(&s.enableHealthCheck, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
s.serverCmd.BoolVar(&s.enableDebug, "debug", false, "To enable debug mode with extended log set debug flag")
}
func init() {
InitBugoutClient()
}
func CLI() {
stateCLI.populateCLI()
if len(os.Args) < 2 {
fmt.Println("Command: server or version is required")
os.Exit(1)
}
// Parse subcommands and appropriate FlagSet
switch os.Args[1] {
case "server":
stateCLI.serverCmd.Parse(os.Args[2:])
Server()
case "clients":
stateCLI.clientsCmd.Parse(os.Args[2:])
resources, err := bugoutClient.GetResources(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN, "")
if err != nil {
fmt.Printf("Unable to get resources %v", err)
return
}
resourcesJson, err := json.Marshal(resources)
if err != nil {
fmt.Printf("Unable to marshal resources %v", err)
return
}
fmt.Println(string(resourcesJson))
case "version":
fmt.Printf("v%s\n", configs.NODE_BALANCER_VERSION)
default:
flag.PrintDefaults()
os.Exit(1)
}
}

Wyświetl plik

@ -17,11 +17,26 @@ type NodeStatusResponse struct {
CurrentBlock uint64 `json:"current_block"`
}
// Bugout responses
type BugoutUserResponse struct {
ID string `json:"user_id"`
ID string `json:"user_id"`
ApplicationID string `json:"application_id"`
}
type BugoutResourceDataResponse struct {
UserID string `json:"user_id"`
BlockchainAccess bool `json:"blockchain_access"`
}
type BugoutResourceResponse struct {
ID string `json:"id"`
ResourceData BugoutResourceDataResponse `json:"resource_data"`
}
type BugoutResourcesResponse struct {
Resources []BugoutResourceResponse `json:"resources"`
}
// Node - which one node client worked with
// LastCallTs - timestamp from last call
type Client struct {

Wyświetl plik

@ -0,0 +1,108 @@
package cmd
import (
"database/sql"
"fmt"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
_ "github.com/lib/pq"
)
var (
databaseClient DatabaseClient
)
type DatabaseClient struct {
Client *sql.DB
}
// Establish connection with database
func InitDatabaseClient() error {
db, err := sql.Open("postgres", configs.MOONSTREAM_DB_URI_READ_ONLY)
if err != nil {
return fmt.Errorf("DSN parse error or another database initialization error: %v", err)
}
// Set the maximum number of concurrently idle connections,
// by default sql.DB allows a maximum of 2 idle connections.
db.SetMaxIdleConns(configs.MOONSTREAM_DB_MAX_IDLE_CONNS)
// Set the maximum lifetime of a connection.
// Longer lifetime increase memory usage.
db.SetConnMaxLifetime(configs.MOONSTREAM_DB_CONN_MAX_LIFETIME)
databaseClient = DatabaseClient{
Client: db,
}
return nil
}
type Block struct {
BlockNumber uint64 `json:"block_number"`
Difficulty uint64 `json:"difficulty"`
ExtraData string `json:"extra_data"`
GasLimit uint64 `json:"gas_limit"`
GasUsed uint64 `json:"gas_used"`
BaseFeePerGas interface{} `json:"base_fee_per_gas"`
Hash string `json:"hash"`
LogsBloom string `json:"logs_bloom"`
Miner string `json:"miner"`
Nonce string `json:"nonce"`
ParentHash string `json:"parent_hash"`
ReceiptRoot string `json:"receipt_root"`
Uncles string `json:"uncles"`
Size float64 `json:"size"`
StateRoot string `json:"state_root"`
Timestamp uint64 `json:"timestamp"`
TotalDifficulty string `json:"total_difficulty"`
TransactionsRoot string `json:"transactions_root"`
IndexedAt string `json:"indexed_at"`
}
// Get block from database
func (dbc *DatabaseClient) GetBlock(blockchain string, blockNumber uint64) (Block, error) {
var block Block
// var tableName string
// if blockchain == "ethereum" {
// tableName = "ethereum_blocks"
// } else if blockchain == "polygon" {
// tableName = "polygon_blocks"
// } else {
// return block, fmt.Errorf("Unsupported blockchain")
// }
row := dbc.Client.QueryRow(
"SELECT block_number,difficulty,extra_data,gas_limit,gas_used,base_fee_per_gas,hash,logs_bloom,miner,nonce,parent_hash,receipt_root,uncles,size,state_root,timestamp,total_difficulty,transactions_root,indexed_at FROM ethereum_blocks WHERE block_number = $1",
// tableName,
blockNumber,
)
if err := row.Scan(
&block.BlockNumber,
&block.Difficulty,
&block.ExtraData,
&block.GasLimit,
&block.GasUsed,
&block.BaseFeePerGas,
&block.Hash,
&block.LogsBloom,
&block.Miner,
&block.Nonce,
&block.ParentHash,
&block.ReceiptRoot,
&block.Uncles,
&block.Size,
&block.StateRoot,
&block.Timestamp,
&block.TotalDifficulty,
&block.TransactionsRoot,
&block.IndexedAt,
); err != nil {
return block, err
}
return block, nil
}

Wyświetl plik

@ -6,8 +6,10 @@ package cmd
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
@ -47,13 +49,6 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
return
}
clientId := w.Header().Get(configs.MOONSTREAM_CLIENT_ID_HEADER)
if clientId == "" {
// TODO(kompotkot): After all internal crawlers and services start
// providing client id header, then replace to http.Error
clientId = "none"
}
// Chose one node
var node *Node
cpool, err := GetClientPool(blockchain)
@ -61,14 +56,14 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
return
}
node = cpool.GetClientNode(clientId)
node = cpool.GetClientNode(user.ID)
if node == nil {
node = blockchainPool.GetNextNode(blockchain)
if node == nil {
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
return
}
cpool.AddClientNode(clientId, node)
cpool.AddClientNode(user.ID, node)
}
// Save origin path, to use in proxyErrorHandler if node will not response
@ -80,15 +75,95 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
node.StatusReverseProxy.ServeHTTP(w, r)
return
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
if user.ID == configs.BUGOUT_INTERNAL_CRAWLERS_USER_ID {
r.URL.Path = "/"
node.GethReverseProxy.ServeHTTP(w, r)
} else {
fmt.Println("Fetch from db")
}
lbJSONRPCHandler(w, r, blockchain, node, user)
return
default:
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
return
}
}
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, user BugoutUserResponse) {
var dataSource string
dataSources := r.Header[configs.MOONSTREAM_DATA_SOURCE_HEADER]
// TODO(kompotkot): Re-write it, to be able to work without database
if len(dataSources) == 0 {
dataSource = "database"
} else {
dataSource = dataSources[0]
}
switch {
case dataSource == "blockchain":
if user.ID != controllerUserID {
resources, err := bugoutClient.GetResources(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN, user.ID)
if err != nil {
http.Error(w, fmt.Sprintf("not allowed %s", dataSource), http.StatusBadRequest)
return
}
blockchainAccess := false
for _, resource := range resources.Resources {
if resource.ResourceData.BlockchainAccess == true {
blockchainAccess = true
}
}
if blockchainAccess == false {
http.Error(w, fmt.Sprintf("not allowed %s", dataSource), http.StatusBadRequest)
return
}
}
fmt.Println("proxied to node")
// r.URL.Path = "/"
// node.GethReverseProxy.ServeHTTP(w, r)
return
case dataSource == "database":
lbDatabaseHandler(w, r, blockchain)
return
default:
http.Error(w, fmt.Sprintf("Unacceptable data source %s", dataSource), http.StatusBadRequest)
return
}
}
type JSONRPCRequest struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
}
// var ALLOWED_ETH_ENDPOINTS = []string{"eth_getBlockByNumber"}
func lbDatabaseHandler(w http.ResponseWriter, r *http.Request, blockchain string) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Println(err)
return
}
var jsonrpcRequest JSONRPCRequest
err = json.Unmarshal(body, &jsonrpcRequest)
if err != nil {
fmt.Println(err)
return
}
switch {
case jsonrpcRequest.Method == "eth_getBlockByNumber":
var blockNumber uint64
blockNumber, _ = strconv.ParseUint(jsonrpcRequest.Params[0].(string), 10, 32)
block, err := databaseClient.GetBlock(blockchain, blockNumber)
if err != nil {
fmt.Printf("Unable to get block from database %v", err)
http.Error(w, fmt.Sprintf("no such block %v", blockNumber), http.StatusBadRequest)
return
}
fmt.Println(block)
default:
http.Error(w, fmt.Sprintf("Unsupported method %s by database, please use blockchain as data source", jsonrpcRequest.Method), http.StatusBadRequest)
return
}
}

Wyświetl plik

@ -5,7 +5,6 @@ package cmd
import (
"context"
"flag"
"fmt"
"log"
"net/http"
@ -18,7 +17,13 @@ import (
"github.com/google/uuid"
)
var reporter *humbug.HumbugReporter
var (
// User id to controll access to blockchain nodes
controllerUserID string
// Crash reporter
reporter *humbug.HumbugReporter
)
// initHealthCheck runs a routine for check status of the nodes every 5 seconds
func initHealthCheck(debug bool) {
@ -92,24 +97,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
}
}
func InitServer() {
var listeningAddr string
var listeningPort string
var enableHealthCheck bool
var enableDebug bool
var showVersion bool
flag.StringVar(&listeningAddr, "host", "127.0.0.1", "Server listening address")
flag.StringVar(&listeningPort, "port", "8544", "Server listening port")
flag.BoolVar(&enableHealthCheck, "healthcheck", false, "To enable healthcheck ser healthcheck flag")
flag.BoolVar(&enableDebug, "debug", false, "To enable debug mode with extended log set debug flag")
flag.BoolVar(&showVersion, "version", false, "Print version")
flag.Parse()
if showVersion {
fmt.Printf("Node balancer version: v%s\n", configs.NODE_BALANCER_VERSION)
return
}
func Server() {
// Generate map of clients
CreateClientPools()
@ -124,6 +112,17 @@ func InitServer() {
// Record system information
reporter.Publish(humbug.SystemReport())
user, err := bugoutClient.GetUser(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN)
if err != nil {
fmt.Printf("Unable to access Bugout authentication server %v", err)
}
controllerUserID = user.ID
err = InitDatabaseClient()
if err != nil {
fmt.Printf("Unable to initialize database connection %v", err)
}
// Fill NodeConfigList with initial nodes from environment variables
configs.ConfigList.InitNodeConfigList()
@ -165,18 +164,18 @@ func InitServer() {
commonHandler = panicMiddleware(commonHandler)
server := http.Server{
Addr: fmt.Sprintf("%s:%s", listeningAddr, listeningPort),
Addr: fmt.Sprintf("%s:%s", stateCLI.listeningAddr, stateCLI.listeningPort),
Handler: commonHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
// Start node health checking and current block fetching
if enableHealthCheck {
go initHealthCheck(enableDebug)
if stateCLI.enableHealthCheck {
go initHealthCheck(stateCLI.enableDebug)
}
log.Printf("Starting server at %s:%s\n", listeningAddr, listeningPort)
log.Printf("Starting server at %s:%s\n", stateCLI.listeningAddr, stateCLI.listeningPort)
err = server.ListenAndServe()
if err != nil {
log.Fatal(err)

Wyświetl plik

@ -13,8 +13,8 @@ import (
// Bugout config
var BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL")
var BUGOUT_NODE_BALANCER_APPLICATION_ID = os.Getenv("BUGOUT_NODE_BALANCER_APPLICATION_ID")
var BUGOUT_INTERNAL_CRAWLERS_USER_ID = os.Getenv("BUGOUT_INTERNAL_CRAWLERS_USER_ID")
var BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 1
var BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN = os.Getenv("BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN")
var BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5
// Node config
type BlockchainConfig struct {
@ -40,7 +40,7 @@ var MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_
var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER")
var MOONSTREAM_DATA_SOURCE_HEADER = os.Getenv("MOONSTREAM_DATA_SOURCE_HEADER")
func checkEnvVarSet() {
if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" {
@ -57,8 +57,8 @@ func checkEnvVarSet() {
MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal"
}
if MOONSTREAM_CLIENT_ID_HEADER == "" {
MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id"
if MOONSTREAM_DATA_SOURCE_HEADER == "" {
MOONSTREAM_DATA_SOURCE_HEADER = "X-Moonstream-Data-Source"
}
if MOONSTREAM_NODES_SERVER_PORT == "" {
@ -112,4 +112,6 @@ var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list f
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
// Database config
var MOONSTREAM_DB_URI = os.Getenv("MOONSTREAM_DB_URI")
var MOONSTREAM_DB_URI_READ_ONLY = os.Getenv("MOONSTREAM_DB_URI_READ_ONLY")
var MOONSTREAM_DB_MAX_IDLE_CONNS int = 30
var MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute

Wyświetl plik

@ -1,3 +1,3 @@
package configs
var NODE_BALANCER_VERSION = "0.0.1"
var NODE_BALANCER_VERSION = "0.0.2"

Wyświetl plik

@ -5,4 +5,5 @@ go 1.17
require (
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205
github.com/google/uuid v1.3.0
github.com/lib/pq v1.10.4
)

Wyświetl plik

@ -2,3 +2,5 @@ github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOV
github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk=
github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

Wyświetl plik

@ -5,5 +5,5 @@ import (
)
func main() {
cmd.InitServer()
cmd.CLI()
}