Respect geth structure due work, optimised sql query to db

pull/557/head
kompotkot 2022-03-05 19:13:42 +00:00
rodzic ade2bc2c72
commit 4085d11226
6 zmienionych plików z 366 dodań i 289 usunięć

Wyświetl plik

@ -0,0 +1,117 @@
package cmd
import (
"fmt"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
"github.com/ethereum/go-ethereum/common"
_ "github.com/lib/pq"
)
var (
// Block which not found in database or have inconsistencies with blockchain
corruptBlocks CorruptBlocks
)
// Write down inconsistent state between database and blockchain
/*
- number (uint64): Block number
- source (string): Source of nonconformity [blockchain, database]
- description (string): Description of error, why block marked as malformed
*/
func (cb *CorruptBlocks) registerCorruptBlock(number uint64, source, description string) {
cb.Blocks = append(cb.Blocks, CorruptBlock{
Number: number,
Source: source,
Description: description,
})
}
// Return range of block hashes with transaction hashes from blockchain
func show(start, end uint64) error {
for i := start; i <= end; i++ {
header, err := localConnections.getChainBlock(i)
if err != nil {
fmt.Printf("Unable to get block: %d from chain, err %v\n", i, err)
continue
}
chainTxs := localConnections.getChainTxs(header.Hash(), i)
var txs []common.Hash
for _, tx := range chainTxs {
txs = append(txs, tx.Hash())
}
fmt.Printf("Block %d header with hash: %s and transactions: %s\n", header.Number, header.Hash().String(), txs)
}
return nil
}
// Run verification flow of blockchain with database data
func verify(start, end uint64) error {
var cnt uint64 // Counter until report formed and sent to Humbug
for i := start; i < end; i++ {
header, err := localConnections.getChainBlock(i)
if err != nil {
description := fmt.Sprintf("Unable to get block: %d from chain, err %v", i, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(i, "blockchain", description)
continue
}
dbBlock, err := localConnections.getDatabaseBlockTxs(header.Hash().String())
if err != nil {
description := fmt.Sprintf("Unable to get block: %d, err: %v", i, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(i, "database", description)
continue
}
if dbBlock.Number == nil {
description := fmt.Sprintf("Block %d not presented in database", i)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(i, "database", description)
continue
}
if header.Number.Uint64() != dbBlock.Number.Uint64() {
description := fmt.Sprintf("Incorrect %d block retrieved from database", i)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(i, "database", description)
continue
}
chainTxs := localConnections.getChainTxs(header.Hash(), i)
if len(chainTxs) != len(dbBlock.Transactions) {
description := fmt.Sprintf("Different number of transactions in block %d, err %v", i, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(i, "database", description)
continue
}
fmt.Printf("Processed block number: %d\r", i)
cnt++
if cnt >= configs.BLOCK_RANGE_REPORT {
err := humbugReporter.submitReport(start, end)
if err != nil {
fmt.Printf("Unable to send humbug report: %v", err)
}
cnt = 0
}
}
err := humbugReporter.submitReport(start, end)
if err != nil {
fmt.Printf("Unable to send humbug report: %v", err)
}
fmt.Println("")
return nil
}

Wyświetl plik

@ -1,267 +0,0 @@
package cmd
import (
"database/sql"
"encoding/json"
"fmt"
humbug "github.com/bugout-dev/humbug/go/pkg"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/node"
_ "github.com/lib/pq"
"gopkg.in/urfave/cli.v1"
)
type gethConfig struct {
Eth ethconfig.Config
Node node.Config
}
func defaultNodeConfig() node.Config {
cfg := node.DefaultConfig
cfg.Name = "geth"
return cfg
}
type LocalConnections struct {
Stack *node.Node
Chain *core.BlockChain
ChainDB ethdb.Database
Database *sql.DB
}
type HumbugReporter struct {
Reporter *humbug.HumbugReporter
}
func setLocalChain(ctx *cli.Context) error {
cfg := gethConfig{
Eth: ethconfig.Defaults,
Node: defaultNodeConfig(),
}
// Apply flags
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
if err != nil {
return fmt.Errorf("Failed to create the protocol stack: %v", err)
}
localConnections.Stack = stack
utils.SetEthConfig(ctx, stack, &cfg.Eth)
chain, chainDB := utils.MakeChain(ctx, stack)
localConnections.Chain = chain
localConnections.ChainDB = chainDB
return nil
}
func setDatabase() error {
db, err := sql.Open("postgres", configs.MOONSTREAM_DB_URI)
if err != nil {
return fmt.Errorf("DSN parse error or another database initialization error: %v", err)
}
// Set the maximum number of concurrently idle connections,
// by default sql.DB allows a maximum of 2 idle connections.
db.SetMaxIdleConns(configs.MOONSTREAM_DB_MAX_IDLE_CONNS)
// Set the maximum lifetime of a connection.
// Longer lifetime increase memory usage.
db.SetConnMaxLifetime(configs.MOONSTREAM_DB_CONN_MAX_LIFETIME)
localConnections.Database = db
return nil
}
func (lc *LocalConnections) fetchFromNode(start, end uint64) error {
for i := start; i <= end; i++ {
header := lc.Chain.GetHeaderByNumber(i)
blockHeaderHash := header.Hash()
body := rawdb.ReadBody(lc.ChainDB, blockHeaderHash, i)
var txs []common.Hash
for _, tx := range body.Transactions {
txs = append(txs, tx.Hash())
}
fmt.Printf("- Block header with hash: %s and transactions: %s\n", blockHeaderHash, txs)
}
return nil
}
type Block struct {
Hash string `json:"hash"`
BlockNumber uint64 `json:"block_number"`
}
type Transactions struct {
Txs []string
Quantity int
}
// Retrive block from blockchain
func (lc *LocalConnections) getBlockFromChain(number uint64) (*types.Header, error) {
header := lc.Chain.GetHeaderByNumber(number)
if header == nil {
return nil, fmt.Errorf("Not found %d block in chain", number)
}
return header, nil
}
// Retrieve block from database
func (lc *LocalConnections) getBlockFromDB(header *types.Header) (Block, error) {
var block Block
blockRow := lc.Database.QueryRow(fmt.Sprintf("SELECT hash,block_number FROM ethereum_blocks WHERE hash = '%s';", header.Hash().String()))
err := blockRow.Scan(&block.Hash, &block.BlockNumber)
if err != nil {
if err == sql.ErrNoRows {
return block, fmt.Errorf("Not found %d block: %v", header.Number, err)
}
return block, fmt.Errorf("An error occurred during sql operation: %v", err)
}
return block, nil
}
// Retrive block transactions from blockchain
func (lc *LocalConnections) getTxsFromChain(headerHash common.Hash, number uint64) Transactions {
var transactions Transactions
body := rawdb.ReadBody(lc.ChainDB, headerHash, number)
for _, tx := range body.Transactions {
transactions.Txs = append(transactions.Txs, tx.Hash().String())
transactions.Quantity++
// set[tx.Hash().String()] = false
}
return transactions
}
// Retrive block transactions from database
func (lc *LocalConnections) getTxsFromDB(blockNumber uint64) (Transactions, error) {
var transactions Transactions
rows, err := lc.Database.Query(fmt.Sprintf("SELECT hash FROM ethereum_transactions WHERE block_number = %d;", blockNumber))
if err != nil {
return transactions, fmt.Errorf("An error occurred during sql operation: %v", err)
}
defer rows.Close()
for rows.Next() {
var txHash string
err := rows.Scan(&txHash)
if err != nil {
return transactions, fmt.Errorf("An error occurred during sql operation: %v", err)
}
transactions.Txs = append(transactions.Txs, txHash)
transactions.Quantity++
// set[transaction] = true
}
return transactions, nil
}
type CorruptBlocks struct {
Numbers []uint64
}
var corruptBlocks CorruptBlocks
// Write down inconsistent state between database and blockchain
/*
- number (uint64): Block number
- source (string): Source of nonconformity [blockchain, database]
*/
func (cb *CorruptBlocks) recordNonconformity(number uint64, source string) {
cb.Numbers = append(cb.Numbers, number)
fmt.Println(cb.Numbers)
}
func (r *HumbugReporter) submitReport(start, end uint64) error {
content, err := json.Marshal(corruptBlocks)
if err != nil {
return fmt.Errorf("Unable to marshal to json: %v", err)
}
report := humbug.Report{
Title: fmt.Sprintf("LDB verifier %d-%d", start, end),
Content: string(content),
Tags: []string{
fmt.Sprintf("start:%d", start),
fmt.Sprintf("end:%d", end),
},
}
r.Reporter.Publish(report)
fmt.Println("Error published")
return nil
}
func verify(start, end uint64) error {
var cnt uint64 // Counter until report formed and sent to Humbug
for i := start; i < end; i++ {
header, err := localConnections.getBlockFromChain(i)
if err != nil {
fmt.Printf("Unable to get block: %d from chain, err %v\n", i, err)
corruptBlocks.recordNonconformity(i, "blockchain")
continue
}
block, err := localConnections.getBlockFromDB(header)
if err != nil {
fmt.Printf("Unable to get block: %d, err: %v\n", block.BlockNumber, err)
corruptBlocks.recordNonconformity(i, "database")
continue
}
if header.Number.Uint64() != block.BlockNumber {
fmt.Printf("Incorrect %d block retrieved from database\n", block.BlockNumber)
corruptBlocks.recordNonconformity(i, "database")
continue
}
// set := make(map[string]bool)
chainTxs := localConnections.getTxsFromChain(header.Hash(), i)
dbTxs, err := localConnections.getTxsFromDB(header.Number.Uint64())
if err != nil {
fmt.Printf("Unable to get transactions: %d, err: %v\n", block.BlockNumber, err)
corruptBlocks.recordNonconformity(i, "database")
continue
}
if chainTxs.Quantity != dbTxs.Quantity {
fmt.Printf("Different number of transactions in block %d, err %v\n", block.BlockNumber, err)
corruptBlocks.recordNonconformity(i, "database")
continue
}
fmt.Printf("Processed block number: %d\r", i)
cnt++
if cnt >= configs.BLOCK_RANGE_REPORT {
err := humbugReporter.submitReport(start, end)
if err != nil {
fmt.Printf("Unable to send humbug report: %v", err)
}
cnt = 0
}
}
err := humbugReporter.submitReport(start, end)
if err != nil {
fmt.Printf("Unable to send humbug report: %v", err)
}
fmt.Println("")
return nil
}

Wyświetl plik

@ -8,18 +8,12 @@ import (
"sort"
"strconv"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
"github.com/bugout-dev/humbug/go/pkg"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/google/uuid"
"gopkg.in/urfave/cli.v1"
)
var (
localConnections *LocalConnections
humbugReporter *HumbugReporter
DataDirFlag = cli.StringFlag{
Name: "datadir",
Usage: "Data directory for the databases and keystore",
@ -32,18 +26,6 @@ var (
}
)
// Generate humbug client
func setHumbugClient(sessionID string) error {
consent := humbug.CreateHumbugConsent(humbug.True)
reporter, err := humbug.CreateHumbugReporter(consent, configs.HUMBUG_LDB_CLIENT_ID, sessionID, configs.HUMBUG_LDB_TOKEN)
if err != nil {
return fmt.Errorf("Unable to generate humbug reporter: %v", err)
}
humbugReporter.Reporter = reporter
return nil
}
// Parse start and end blocks from command line input
func startEndBlock(ctx *cli.Context) (uint64, uint64, error) {
start, err := strconv.ParseUint(ctx.Args().Get(0), 10, 32)
@ -75,7 +57,7 @@ func processAddCommand(ctx *cli.Context) error {
defer localConnections.Stack.Close()
defer localConnections.ChainDB.Close()
localConnections.fetchFromNode(start, end)
show(start, end)
localConnections.Chain.Stop()
@ -99,7 +81,7 @@ func processShowCommand(ctx *cli.Context) error {
defer localConnections.Stack.Close()
defer localConnections.ChainDB.Close()
localConnections.fetchFromNode(start, end)
show(start, end)
localConnections.Chain.Stop()
@ -184,10 +166,11 @@ func LDBCLI() {
sort.Sort(cli.FlagsByName(app.Flags))
sort.Sort(cli.CommandsByName(app.Commands))
// Initialize local connections
localConnections = &LocalConnections{}
humbugReporter = &HumbugReporter{}
// Humbug client to be able write data in Bugout journal
// Initialize humbug client to be able write data in Bugout journal
humbugReporter = &HumbugReporter{}
sessionID := uuid.New().String()
err := setHumbugClient(sessionID)
if err != nil {

Wyświetl plik

@ -0,0 +1,145 @@
package cmd
import (
"database/sql"
"fmt"
"math/big"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/node"
_ "github.com/lib/pq"
"gopkg.in/urfave/cli.v1"
)
var (
localConnections *LocalConnections
)
// Modified lightweight go-ethereum function
// Source: github.com/ethereum/go-ethereum/cmd/geth/config.go
func defaultNodeConfig() node.Config {
cfg := node.DefaultConfig
cfg.Name = "geth"
return cfg
}
// Establish connection with blockchain
func setLocalChain(ctx *cli.Context) error {
cfg := gethConfig{
Eth: ethconfig.Defaults,
Node: defaultNodeConfig(),
}
// Apply flags
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
if err != nil {
return fmt.Errorf("Failed to create the protocol stack: %v", err)
}
localConnections.Stack = stack
utils.SetEthConfig(ctx, stack, &cfg.Eth)
chain, chainDB := utils.MakeChain(ctx, stack)
localConnections.Chain = chain
localConnections.ChainDB = chainDB
return nil
}
// Establish connection with database
func setDatabase() error {
db, err := sql.Open("postgres", configs.MOONSTREAM_DB_URI)
if err != nil {
return fmt.Errorf("DSN parse error or another database initialization error: %v", err)
}
// Set the maximum number of concurrently idle connections,
// by default sql.DB allows a maximum of 2 idle connections.
db.SetMaxIdleConns(configs.MOONSTREAM_DB_MAX_IDLE_CONNS)
// Set the maximum lifetime of a connection.
// Longer lifetime increase memory usage.
db.SetConnMaxLifetime(configs.MOONSTREAM_DB_CONN_MAX_LIFETIME)
localConnections.Database = db
return nil
}
// Retrive block from blockchain
func (lc *LocalConnections) getChainBlock(number uint64) (*types.Header, error) {
header := lc.Chain.GetHeaderByNumber(number)
if header == nil {
return nil, fmt.Errorf("Not found %d block in chain", number)
}
return header, nil
}
// Retrive block transactions from blockchain
func (lc *LocalConnections) getChainTxs(headerHash common.Hash, number uint64) []*types.Transaction {
var transactions []*types.Transaction
body := rawdb.ReadBody(lc.ChainDB, headerHash, number)
for _, tx := range body.Transactions {
transactions = append(transactions, tx)
}
return transactions
}
// Retrive block with transactions from database
func (lc *LocalConnections) getDatabaseBlockTxs(headerHash string) (LightBlock, error) {
var lBlock LightBlock
var txs []LightTransaction
query := fmt.Sprintf(
`SELECT
ethereum_blocks.hash,
ethereum_blocks.block_number,
ethereum_transactions.hash
FROM ethereum_blocks
LEFT JOIN ethereum_transactions ON ethereum_blocks.block_number = ethereum_transactions.block_number
WHERE ethereum_blocks.hash = '%s';`,
headerHash,
)
rows, err := lc.Database.Query(query)
if err != nil {
return lBlock, fmt.Errorf("An error occurred during sql operation: %v", err)
}
defer rows.Close()
for rows.Next() {
var blockHash, blockNumberStr, txHash sql.NullString
err := rows.Scan(&blockHash, &blockNumberStr, &txHash)
if err != nil {
return lBlock, fmt.Errorf("An error occurred during sql operation: %v", err)
}
var lTx LightTransaction
if txHash.Valid != false {
lTx = LightTransaction{
Hash: txHash.String,
}
txs = append(lBlock.Transactions, lTx)
}
blockNumber := new(big.Int)
blockNumber, ok := blockNumber.SetString(blockNumberStr.String, 10)
if !ok {
return lBlock, fmt.Errorf("Unable to parse block number")
}
lBlock = LightBlock{
Hash: blockHash.String,
Number: blockNumber,
Transactions: txs,
}
}
return lBlock, nil
}

Wyświetl plik

@ -0,0 +1,54 @@
package cmd
import (
"database/sql"
"math/big"
humbug "github.com/bugout-dev/humbug/go/pkg"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/node"
)
// Modified lightweight go-ethereum struct
// Source: github.com/ethereum/go-ethereum/cmd/geth/config.go
type gethConfig struct {
Eth ethconfig.Config
Node node.Config
}
// Predefined connections to blockchain and database
type LocalConnections struct {
Stack *node.Node
Chain *core.BlockChain
ChainDB ethdb.Database
Database *sql.DB
}
type HumbugReporter struct {
Reporter *humbug.HumbugReporter
}
// Lightweight transactions for database operations
type LightTransaction struct {
Hash string
}
// Lightweight block for database operations
type LightBlock struct {
Hash string
Number *big.Int
Transactions []LightTransaction
}
// Malformed block structure which will be submitted to humbug journal
type CorruptBlock struct {
Number uint64 `json:"number"`
Source string `json:"source"`
Description string `json:"description"`
}
type CorruptBlocks struct {
Blocks []CorruptBlock `json:"blocks"`
}

Wyświetl plik

@ -0,0 +1,45 @@
package cmd
import (
"encoding/json"
"fmt"
"github.com/bugout-dev/humbug/go/pkg"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
)
var (
humbugReporter *HumbugReporter
)
// Generate humbug client
func setHumbugClient(sessionID string) error {
consent := humbug.CreateHumbugConsent(humbug.True)
reporter, err := humbug.CreateHumbugReporter(consent, configs.HUMBUG_LDB_CLIENT_ID, sessionID, configs.HUMBUG_LDB_TOKEN)
if err != nil {
return fmt.Errorf("Unable to generate humbug reporter: %v", err)
}
humbugReporter.Reporter = reporter
return nil
}
func (r *HumbugReporter) submitReport(start, end uint64) error {
content, err := json.Marshal(corruptBlocks)
if err != nil {
return fmt.Errorf("Unable to marshal to json: %v", err)
}
report := humbug.Report{
Title: fmt.Sprintf("LDB verifier %d-%d", start, end),
Content: string(content),
Tags: []string{
fmt.Sprintf("start:%d", start),
fmt.Sprintf("end:%d", end),
},
}
r.Reporter.Publish(report)
fmt.Println("Error published")
return nil
}