From 7c90baebf1c4021cbf800a55b6ef7bf7164ca131 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 14 Mar 2022 19:26:59 +0000 Subject: [PATCH] Multithread for adding blocks with txs --- crawlers/ldb/cmd/actions.go | 79 +++++++++++++++++++++++++++------ crawlers/ldb/cmd/cli.go | 7 ++- crawlers/ldb/cmd/connections.go | 33 +++++++++----- 3 files changed, 92 insertions(+), 27 deletions(-) diff --git a/crawlers/ldb/cmd/actions.go b/crawlers/ldb/cmd/actions.go index 15ea5e4b..e70ecf62 100644 --- a/crawlers/ldb/cmd/actions.go +++ b/crawlers/ldb/cmd/actions.go @@ -27,25 +27,76 @@ func (cb *CorruptBlocks) registerCorruptBlock(number uint64, source, description } // Add new blocks with transactions to database -func add(blockchain string, blockNumbers []uint64) error { - for _, bn := range blockNumbers { - block, err := localConnections.getChainBlock(bn) - if err != nil { - description := fmt.Sprintf("Unable to get block: %d from chain, err %v", bn, err) - fmt.Println(description) - corruptBlocks.registerCorruptBlock(bn, "blockchain", description) - continue +func add(blockchain string, blockNumbers []uint64, workers int) error { + jobsCh := make(chan Job, workers) + resultCh := make(chan Result, len(blockNumbers)) + doneCh := make(chan struct{}, workers) + + // Add jobs + go func() { + for _, bn := range blockNumbers { + jobsCh <- Job{ + BlockNumber: bn, + Results: resultCh, + } } - td := localConnections.Chain.GetTd(block.Hash(), block.NumberU64()) + close(jobsCh) + }() - chainTxs := localConnections.getChainTxs(block.Hash(), bn) + for i := 0; i < workers; i++ { + // Do jobs + go func() { + for job := range jobsCh { + block, err := localConnections.getChainBlock(job.BlockNumber) + if err != nil { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Unable to get block: %d from chain, err %v", job.BlockNumber, err), + ErrorSource: "blockchain", + Number: job.BlockNumber, + } + continue + } - err = localConnections.writeDatabaseBlockTxs(blockchain, block, chainTxs, td) - if err != nil { - fmt.Printf("Error occurred due saving block %d with transactions in database: %v", bn, err) + td := localConnections.Chain.GetTd(block.Hash(), block.NumberU64()) + + chainTxs := localConnections.getChainTxs(block.Hash(), block.NumberU64()) + + err = localConnections.writeDatabaseBlockTxs(blockchain, block, chainTxs, td) + if err != nil { + job.Results <- Result{ + ErrorOutput: fmt.Sprintf("Unable to write block %d with txs in database, err %v", job.BlockNumber, err), + ErrorSource: "database", + Number: job.BlockNumber, + } + continue + } + + job.Results <- Result{ + Output: fmt.Sprintf("Processed block number: %d", job.BlockNumber), + } + } + doneCh <- struct{}{} + }() + } + + // Await completion + go func() { + for i := 0; i < workers; i++ { + <-doneCh } + close(resultCh) + }() - fmt.Printf("Processed block number: %d\r", bn) + for result := range resultCh { + if result.ErrorOutput != "" { + fmt.Println(result.ErrorOutput) + } + if result.Output != "" { + fmt.Println(result.Output) + } + if result.Output != "" && result.ErrorOutput != "" { + fmt.Printf("Unprocessable result with error: %s and output: %s", result.ErrorOutput, result.Output) + } } return nil diff --git a/crawlers/ldb/cmd/cli.go b/crawlers/ldb/cmd/cli.go index 97e5e44b..d590b74a 100644 --- a/crawlers/ldb/cmd/cli.go +++ b/crawlers/ldb/cmd/cli.go @@ -105,6 +105,10 @@ func processAddCommand(ctx *cli.Context) error { if blockchain != "ethereum" && blockchain != "polygon" { return fmt.Errorf("Unsupported blockchain provided") } + threads := ctx.GlobalInt(ThreadsFlag.Name) + if threads <= 0 { + threads = 1 + } start, end, err := startEndBlock(ctx) if err != nil { @@ -124,7 +128,7 @@ func processAddCommand(ctx *cli.Context) error { } for blocks := range BlockYield(start, end, BlockNumberStep) { - err = add(blockchain, blocks) + err = add(blockchain, blocks, threads) if err != nil { return fmt.Errorf("Error occurred due add acction: %v", err) } @@ -239,6 +243,7 @@ func LDBCLI() { BlockchainFlag, DataDirFlag, GCModeFlag, + ThreadsFlag, }, }, { diff --git a/crawlers/ldb/cmd/connections.go b/crawlers/ldb/cmd/connections.go index 6e6265bc..a8ab870b 100644 --- a/crawlers/ldb/cmd/connections.go +++ b/crawlers/ldb/cmd/connections.go @@ -227,6 +227,12 @@ func prepareTxsQuery(blockchain string, block *types.Block, txs []*types.Transac ) for i, tx := range txs { + var maxFeePerGas interface{} + maxFeePerGas = "NULL" + + var maxPriorityFeePerGas interface{} + maxPriorityFeePerGas = "NULL" + m, err := tx.AsMessage(signer, block.Number()) if err != nil { return "", fmt.Errorf("Transaction to message transformation failed: %v", err) @@ -235,15 +241,15 @@ func prepareTxsQuery(blockchain string, block *types.Block, txs []*types.Transac txsQuery += "," } txsQuery += fmt.Sprintf( - `('%s', %d, '%s', '%s', %d, %d, %d, %d, '0x%x', '0x%x', %d, %d, %d)`, + `('%s', %d, '%s', '%s', %d, %d, %v, %v, '0x%x', %d, %d, %d, %d)`, tx.Hash(), block.Number(), m.From(), tx.To(), tx.Gas(), tx.GasPrice(), - 0, //"max_fee", - 0, //"max_prior", + maxFeePerGas, + maxPriorityFeePerGas, tx.Data(), tx.Nonce(), i, @@ -274,15 +280,18 @@ func (lc *LocalConnections) writeDatabaseBlockTxs( genesisHash := common.HexToHash("0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3") chainConfig := rawdb.ReadChainConfig(lc.ChainDB, genesisHash) signer := types.MakeSigner(chainConfig, block.Number()) - txsQuery, err := prepareTxsQuery(blockchain, block, txs, signer) - if err != nil { - dbTx.Rollback() - return err - } - _, err = dbTx.Exec(txsQuery) - if err != nil { - dbTx.Rollback() - return fmt.Errorf("An error occurred during sql operation: %v", err) + + if len(txs) > 0 { + txsQuery, err := prepareTxsQuery(blockchain, block, txs, signer) + if err != nil { + dbTx.Rollback() + return err + } + _, err = dbTx.Exec(txsQuery) + if err != nil { + dbTx.Rollback() + return fmt.Errorf("An error occurred during sql operation: %v", err) + } } err = dbTx.Commit()