diff --git a/crawlers/ldb/cmd/actions.go b/crawlers/ldb/cmd/actions.go index 85c89389..5e462306 100644 --- a/crawlers/ldb/cmd/actions.go +++ b/crawlers/ldb/cmd/actions.go @@ -73,50 +73,117 @@ func show(blockNumbers []uint64) error { return nil } +// TODO(kompotkot): Find way to remove Number +type Result struct { + ErrorOutput string + ErrorSource string + Number uint64 + Output string +} + +type Job struct { + BlockNumber uint64 + Results chan<- Result +} + // Run verification flow of blockchain with database data -func verify(blockchain string, blockNumbers []uint64) error { - for _, bn := range blockNumbers { - chainBlock, err := localConnections.getChainBlock(bn) - if err != nil { - description := fmt.Sprintf("Unable to get block: %d from chain, err %v", bn, err) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "blockchain", description) - continue +func verify(blockchain string, blockNumbers []uint64, workers int) error { + jobsCh := make(chan Job, workers) + resultCh := make(chan Result, len(blockNumbers)) + doneCh := make(chan struct{}, workers) + + // Add jobs + go func() { + for _, bn := range blockNumbers { + jobsCh <- Job{ + BlockNumber: bn, + Results: resultCh, + } } + close(jobsCh) + }() - dbBlock, err := localConnections.getDatabaseBlockTxs(blockchain, chainBlock.Hash().String()) + for i := 0; i < workers; i++ { + // Do jobs + go func() { + for job := range jobsCh { + chainBlock, err := localConnections.getChainBlock(job.BlockNumber) + if err != nil { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Unable to get block: %d from chain, err %v", job.BlockNumber, err), + ErrorSource: "blockchain", + Number: job.BlockNumber, + } + continue + } - if err != nil { - description := fmt.Sprintf("Unable to get block: %d, err: %v", bn, err) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "database", description) - continue + dbBlock, err := localConnections.getDatabaseBlockTxs(blockchain, chainBlock.Hash().String()) + + if err != nil { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Unable to get block: %d, err: %v", job.BlockNumber, err), + ErrorSource: "database", + Number: job.BlockNumber, + } + continue + } + + if dbBlock.Number == nil { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Block %d not presented in database", job.BlockNumber), + ErrorSource: "database", + Number: job.BlockNumber, + } + continue + } + + if chainBlock.NumberU64() != dbBlock.Number.Uint64() { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Incorrect %d block retrieved from database", job.BlockNumber), + ErrorSource: "database", + Number: job.BlockNumber, + } + continue + } + + chainTxs := localConnections.getChainTxs(chainBlock.Hash(), job.BlockNumber) + + if len(chainTxs) != len(dbBlock.Transactions) { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Different number of transactions in block %d, err %v", job.BlockNumber, err), + ErrorSource: "database", + Number: job.BlockNumber, + } + continue + } + + job.Results <- Result{ + Output: fmt.Sprintf("Processed block number: %d", job.BlockNumber), + } + } + doneCh <- struct{}{} + }() + } + + // Await completion + go func() { + for i := 0; i < workers; i++ { + <-doneCh } + close(resultCh) + }() - if dbBlock.Number == nil { - description := fmt.Sprintf("Block %d not presented in database", bn) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "database", description) - continue + for result := range resultCh { + if result.ErrorOutput != "" { + fmt.Println(result.ErrorOutput) + corruptBlocks.registerCorruptBlock(result.Number, result.ErrorSource, result.ErrorOutput) } - - if chainBlock.NumberU64() != dbBlock.Number.Uint64() { - description := fmt.Sprintf("Incorrect %d block retrieved from database", bn) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "database", description) - continue + if result.Output != "" { + fmt.Println(result.Output) } - - chainTxs := localConnections.getChainTxs(chainBlock.Hash(), bn) - - if len(chainTxs) != len(dbBlock.Transactions) { - description := fmt.Sprintf("Different number of transactions in block %d, err %v", bn, err) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "database", description) - continue + if result.Output != "" && result.ErrorOutput != "" { + fmt.Printf("Unprocessable result with error: %s and output: %s", result.ErrorOutput, result.Output) } - - fmt.Printf("Processed block number: %d\r", bn) } return nil diff --git a/crawlers/ldb/cmd/cli.go b/crawlers/ldb/cmd/cli.go index 8f43b267..4eae0478 100644 --- a/crawlers/ldb/cmd/cli.go +++ b/crawlers/ldb/cmd/cli.go @@ -8,7 +8,7 @@ import ( "sort" "strconv" - "github.com/bugout-dev/moonstream/crawlers/ldb/configs" + // "github.com/bugout-dev/moonstream/crawlers/ldb/configs" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/google/uuid" @@ -33,9 +33,15 @@ var ( Usage: `Blockchain garbage collection mode ("full", "archive")`, Value: "full", } + ThreadsFlag = cli.IntFlag{ + Name: "threads", + Usage: "Number of threads to use", + Value: 2, + } ) // Block step generator, yield list of blocks with length equal blockStep +// TODO(kompotkot): Not-safe method with slices in channel, re-write this function func BlockYield(start, end, blockStep uint64) chan []uint64 { ch := make(chan []uint64) @@ -173,6 +179,10 @@ func processVerifyCommand(ctx *cli.Context) error { if blockchain != "ethereum" && blockchain != "polygon" { return fmt.Errorf("Unsupported blockchain provided") } + threads := ctx.GlobalInt(ThreadsFlag.Name) + if threads <= 0 { + threads = 1 + } start, end, err := startEndBlock(ctx) if err != nil { @@ -191,23 +201,11 @@ func processVerifyCommand(ctx *cli.Context) error { return fmt.Errorf("Unable to set database connection: %v", err) } - cnt := uint64(0) - reportStart := uint64(start) for blocks := range BlockYield(start, end, BlockNumberStep) { - err = verify(blockchain, blocks) + err = verify(blockchain, blocks, threads) if err != nil { return fmt.Errorf("Error occurred due verify acction: %v", err) } - - cnt += BlockNumberStep - if cnt >= configs.BLOCK_RANGE_REPORT { - err := humbugReporter.submitReport(reportStart, blocks[len(blocks)-1]+1, "") - if err != nil { - return fmt.Errorf("Unable to send humbug report: %v", err) - } - reportStart = blocks[len(blocks)-1] + 1 - cnt = 0 - } } err = humbugReporter.submitReport(start, end, "Total ") @@ -230,6 +228,7 @@ func LDBCLI() { BlockchainFlag, DataDirFlag, GCModeFlag, + ThreadsFlag, } app.Commands = []cli.Command{ @@ -267,6 +266,7 @@ func LDBCLI() { BlockchainFlag, DataDirFlag, GCModeFlag, + ThreadsFlag, }, }, } diff --git a/crawlers/ldb/configs/settings.go b/crawlers/ldb/configs/settings.go index 19914539..3145f01f 100644 --- a/crawlers/ldb/configs/settings.go +++ b/crawlers/ldb/configs/settings.go @@ -5,8 +5,6 @@ import ( "time" ) -var BLOCK_RANGE_REPORT uint64 = 100000 - // Database configs var MOONSTREAM_DB_MAX_IDLE_CONNS int = 30 var MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute