Verification command in goroutines

pull/557/head
kompotkot 2022-03-06 22:59:47 +00:00
rodzic 39c391dfa0
commit 838c75ae29
3 zmienionych plików z 116 dodań i 51 usunięć

Wyświetl plik

@ -73,50 +73,117 @@ func show(blockNumbers []uint64) error {
return nil
}
// TODO(kompotkot): Find way to remove Number
type Result struct {
ErrorOutput string
ErrorSource string
Number uint64
Output string
}
type Job struct {
BlockNumber uint64
Results chan<- Result
}
// Run verification flow of blockchain with database data
func verify(blockchain string, blockNumbers []uint64) error {
for _, bn := range blockNumbers {
chainBlock, err := localConnections.getChainBlock(bn)
if err != nil {
description := fmt.Sprintf("Unable to get block: %d from chain, err %v", bn, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(bn, "blockchain", description)
continue
func verify(blockchain string, blockNumbers []uint64, workers int) error {
jobsCh := make(chan Job, workers)
resultCh := make(chan Result, len(blockNumbers))
doneCh := make(chan struct{}, workers)
// Add jobs
go func() {
for _, bn := range blockNumbers {
jobsCh <- Job{
BlockNumber: bn,
Results: resultCh,
}
}
close(jobsCh)
}()
dbBlock, err := localConnections.getDatabaseBlockTxs(blockchain, chainBlock.Hash().String())
for i := 0; i < workers; i++ {
// Do jobs
go func() {
for job := range jobsCh {
chainBlock, err := localConnections.getChainBlock(job.BlockNumber)
if err != nil {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Unable to get block: %d from chain, err %v", job.BlockNumber, err),
ErrorSource: "blockchain",
Number: job.BlockNumber,
}
continue
}
if err != nil {
description := fmt.Sprintf("Unable to get block: %d, err: %v", bn, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(bn, "database", description)
continue
dbBlock, err := localConnections.getDatabaseBlockTxs(blockchain, chainBlock.Hash().String())
if err != nil {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Unable to get block: %d, err: %v", job.BlockNumber, err),
ErrorSource: "database",
Number: job.BlockNumber,
}
continue
}
if dbBlock.Number == nil {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Block %d not presented in database", job.BlockNumber),
ErrorSource: "database",
Number: job.BlockNumber,
}
continue
}
if chainBlock.NumberU64() != dbBlock.Number.Uint64() {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Incorrect %d block retrieved from database", job.BlockNumber),
ErrorSource: "database",
Number: job.BlockNumber,
}
continue
}
chainTxs := localConnections.getChainTxs(chainBlock.Hash(), job.BlockNumber)
if len(chainTxs) != len(dbBlock.Transactions) {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Different number of transactions in block %d, err %v", job.BlockNumber, err),
ErrorSource: "database",
Number: job.BlockNumber,
}
continue
}
job.Results <- Result{
Output: fmt.Sprintf("Processed block number: %d", job.BlockNumber),
}
}
doneCh <- struct{}{}
}()
}
// Await completion
go func() {
for i := 0; i < workers; i++ {
<-doneCh
}
close(resultCh)
}()
if dbBlock.Number == nil {
description := fmt.Sprintf("Block %d not presented in database", bn)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(bn, "database", description)
continue
for result := range resultCh {
if result.ErrorOutput != "" {
fmt.Println(result.ErrorOutput)
corruptBlocks.registerCorruptBlock(result.Number, result.ErrorSource, result.ErrorOutput)
}
if chainBlock.NumberU64() != dbBlock.Number.Uint64() {
description := fmt.Sprintf("Incorrect %d block retrieved from database", bn)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(bn, "database", description)
continue
if result.Output != "" {
fmt.Println(result.Output)
}
chainTxs := localConnections.getChainTxs(chainBlock.Hash(), bn)
if len(chainTxs) != len(dbBlock.Transactions) {
description := fmt.Sprintf("Different number of transactions in block %d, err %v", bn, err)
fmt.Println(description)
corruptBlocks.registerCorruptBlock(bn, "database", description)
continue
if result.Output != "" && result.ErrorOutput != "" {
fmt.Printf("Unprocessable result with error: %s and output: %s", result.ErrorOutput, result.Output)
}
fmt.Printf("Processed block number: %d\r", bn)
}
return nil

Wyświetl plik

@ -8,7 +8,7 @@ import (
"sort"
"strconv"
"github.com/bugout-dev/moonstream/crawlers/ldb/configs"
// "github.com/bugout-dev/moonstream/crawlers/ldb/configs"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/google/uuid"
@ -33,9 +33,15 @@ var (
Usage: `Blockchain garbage collection mode ("full", "archive")`,
Value: "full",
}
ThreadsFlag = cli.IntFlag{
Name: "threads",
Usage: "Number of threads to use",
Value: 2,
}
)
// Block step generator, yield list of blocks with length equal blockStep
// TODO(kompotkot): Not-safe method with slices in channel, re-write this function
func BlockYield(start, end, blockStep uint64) chan []uint64 {
ch := make(chan []uint64)
@ -173,6 +179,10 @@ func processVerifyCommand(ctx *cli.Context) error {
if blockchain != "ethereum" && blockchain != "polygon" {
return fmt.Errorf("Unsupported blockchain provided")
}
threads := ctx.GlobalInt(ThreadsFlag.Name)
if threads <= 0 {
threads = 1
}
start, end, err := startEndBlock(ctx)
if err != nil {
@ -191,23 +201,11 @@ func processVerifyCommand(ctx *cli.Context) error {
return fmt.Errorf("Unable to set database connection: %v", err)
}
cnt := uint64(0)
reportStart := uint64(start)
for blocks := range BlockYield(start, end, BlockNumberStep) {
err = verify(blockchain, blocks)
err = verify(blockchain, blocks, threads)
if err != nil {
return fmt.Errorf("Error occurred due verify acction: %v", err)
}
cnt += BlockNumberStep
if cnt >= configs.BLOCK_RANGE_REPORT {
err := humbugReporter.submitReport(reportStart, blocks[len(blocks)-1]+1, "")
if err != nil {
return fmt.Errorf("Unable to send humbug report: %v", err)
}
reportStart = blocks[len(blocks)-1] + 1
cnt = 0
}
}
err = humbugReporter.submitReport(start, end, "Total ")
@ -230,6 +228,7 @@ func LDBCLI() {
BlockchainFlag,
DataDirFlag,
GCModeFlag,
ThreadsFlag,
}
app.Commands = []cli.Command{
@ -267,6 +266,7 @@ func LDBCLI() {
BlockchainFlag,
DataDirFlag,
GCModeFlag,
ThreadsFlag,
},
},
}

Wyświetl plik

@ -5,8 +5,6 @@ import (
"time"
)
var BLOCK_RANGE_REPORT uint64 = 100000
// Database configs
var MOONSTREAM_DB_MAX_IDLE_CONNS int = 30
var MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute