From be4442966e477bf8e88c99c44e0286a15d9678f2 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Thu, 5 Aug 2021 17:10:48 -0700 Subject: [PATCH] Make crawl_blocks also handle interrupts --- crawlers/mooncrawl/ethereum.py | 48 +++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/crawlers/mooncrawl/ethereum.py b/crawlers/mooncrawl/ethereum.py index 700a3418..a4f94c4a 100644 --- a/crawlers/mooncrawl/ethereum.py +++ b/crawlers/mooncrawl/ethereum.py @@ -14,6 +14,12 @@ from moonstreamdb.models import ( ) +class EthereumBlockCrawlError(Exception): + """ + Raised when there is a problem crawling Ethereum blocks. + """ + + def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH): web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() if web3_uri is not None: @@ -103,20 +109,31 @@ def crawl_blocks( Open database and geth sessions and fetch block data from blockchain. """ web3_client = connect() - for block_number in blocks_numbers: - with yield_db_session_ctx() as db_session: - block: BlockData = web3_client.eth.get_block( - block_number, full_transactions=with_transactions - ) - add_block(db_session, block) + with yield_db_session_ctx() as db_session: + for block_number in blocks_numbers: + try: + block: BlockData = web3_client.eth.get_block( + block_number, full_transactions=with_transactions + ) + add_block(db_session, block) - if with_transactions: - add_block_transactions(db_session, block) + if with_transactions: + add_block_transactions(db_session, block) - db_session.commit() + db_session.commit() + except Exception as err: + db_session.rollback() + message = f"Error adding block (number={block_number}) to database:\n{repr(err)}" + raise EthereumBlockCrawlError(message) + except: + db_session.rollback() + print( + f"Interrupted while adding block (number={block_number}) to database." + ) + raise if verbose: - print(f"Added {block_number} block") + print(f"Added block: {block_number}") def check_missing_blocks(blocks_numbers: List[int]) -> List[int]: @@ -154,6 +171,9 @@ def crawl_blocks_executor( with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table. verbose - Print logs to stdout? num_processes - Number of processes to use to feed blocks into database. + + Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError. + The error message is a list of all the things that went wrong in the crawl. """ errors: List[Exception] = [] @@ -184,12 +204,10 @@ def crawl_blocks_executor( results.append(result) wait(results) - # TODO(kompotkot): Return list of errors and colors responsible for - # handling errors if len(errors) > 0: - print("Errors:") - for error in errors: - print(f"- {error}") + error_messages = "\n".join([f"- {error}" for error in errors]) + message = f"Error processing blocks in list:\n{error_messages}" + raise EthereumBlockCrawlError(message) def process_contract_deployments() -> List[Tuple[str, str]]: