From 73da2e430aea774f5735ed5a29e6e4d359e5c96e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 29 Jul 2021 00:43:35 +0300 Subject: [PATCH] Sync latest blocks and optimized blocks list gen --- crawlers/moonstreamcrawlers/cli.py | 122 ++++++++++++++---------- crawlers/moonstreamcrawlers/ethereum.py | 70 +++++++++----- 2 files changed, 116 insertions(+), 76 deletions(-) diff --git a/crawlers/moonstreamcrawlers/cli.py b/crawlers/moonstreamcrawlers/cli.py index 1b14d75f..c90223ea 100644 --- a/crawlers/moonstreamcrawlers/cli.py +++ b/crawlers/moonstreamcrawlers/cli.py @@ -4,81 +4,85 @@ Moonstream crawlers CLI. import argparse from distutils.util import strtobool import time -from typing import List -from .ethereum import crawl, check_missing_blocks +from .ethereum import crawl, check_missing_blocks, synchronize_latest_blocks from .settings import MOONSTREAM_CRAWL_WORKERS -def get_blocks_numbers_lists( - bottom_block_number: int, top_block_number: int -) -> List[List[int]]: +def yield_blocks_numbers_lists(blocks_range_str: str) -> None: """ Generate list of blocks. + Block steps used to prevent long executor tasks and data loss possibility. """ block_step = 1000 - blocks_numbers_list_raw = list(range(top_block_number, bottom_block_number - 1, -1)) - blocks_numbers_list_raw_len = len(blocks_numbers_list_raw) - # Block steps used to prevent long executor tasks and data loss possibility - # Block step 2 convert [1,2,3] -> [[1,2],[3]] - if blocks_numbers_list_raw_len / block_step > 1: - blocks_numbers_lists = [ - blocks_numbers_list_raw[i : i + block_step] - for i in range(0, blocks_numbers_list_raw_len, block_step) - ] - else: - blocks_numbers_lists = [blocks_numbers_list_raw] - return blocks_numbers_lists, blocks_numbers_list_raw_len + try: + blocks_start_end = blocks_range_str.split("-") + bottom_block_number = int(blocks_start_end[0]) + top_block_number = int(blocks_start_end[1]) + required_blocks_len = top_block_number - bottom_block_number + 1 + except Exception: + print( + "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" + ) + return + + print(f"Required {required_blocks_len} blocks to process") + + while not top_block_number < bottom_block_number: + temp_bottom_block_number = top_block_number - block_step + if temp_bottom_block_number < bottom_block_number: + temp_bottom_block_number = bottom_block_number - 1 + blocks_numbers_list = list( + range(top_block_number, temp_bottom_block_number, -1) + ) + + yield blocks_numbers_list + + top_block_number -= block_step + + +def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None: + """ + Synchronize latest Ethereum blocks with database. + """ + while True: + bottom_block_number, top_block_number = synchronize_latest_blocks( + bool(strtobool(args.transactions)) + ) + for blocks_numbers_list in yield_blocks_numbers_lists( + f"{bottom_block_number}-{top_block_number}" + ): + crawl( + block_numbers_list=blocks_numbers_list, + with_transactions=bool(strtobool(args.transactions)), + ) + print(f"Synchronized blocks from {bottom_block_number} to {top_block_number}") + time.sleep(10) def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None: """ Add blocks to moonstream database. """ - try: - blocks_start_end = args.blocks.split("-") - bottom_block_number = int(blocks_start_end[0]) - top_block_number = int(blocks_start_end[1]) - except Exception: - print( - "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" - ) - return - - blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists( - bottom_block_number, top_block_number - ) - startTime = time.time() - for blocks_numbers_list in blocks_numbers_lists: + + for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks): crawl( block_numbers_list=blocks_numbers_list, with_transactions=bool(strtobool(args.transactions)), ) + print( - f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} " - f"blocks with {MOONSTREAM_CRAWL_WORKERS} workers" + f"Required {time.time() - startTime} " + f"with {MOONSTREAM_CRAWL_WORKERS} workers" ) def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None: - try: - blocks_start_end = args.blocks.split("-") - bottom_block_number = int(blocks_start_end[0]) - top_block_number = int(blocks_start_end[1]) - except Exception: - print( - "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" - ) - return - - blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists( - bottom_block_number, top_block_number - ) startTime = time.time() missing_blocks_numbers_total = [] - for blocks_numbers_list in blocks_numbers_lists: + for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks): print( f"Check missing blocks from {blocks_numbers_list[0]} to {blocks_numbers_list[-1]}" ) @@ -96,9 +100,9 @@ def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None: with_transactions=bool(strtobool(args.transactions)), ) print( - f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} " - f"blocks with {MOONSTREAM_CRAWL_WORKERS} workers" - f" with {len(missing_blocks_numbers_total)} missing blocks" + f"Required {time.time() - startTime} " + f"with {MOONSTREAM_CRAWL_WORKERS} workers " + f"for {len(missing_blocks_numbers_total)} missing blocks" ) @@ -126,6 +130,18 @@ def main() -> None: description="Ethereum blocks commands" ) + parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser( + "synchronize", description="Synchronize to latest ethereum block commands" + ) + parser_ethcrawler_blocks_sync.add_argument( + "-t", + "--transactions", + choices=["True", "False"], + default="True", + help="Add or not block transactions", + ) + parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler) + parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser( "add", description="Add ethereum blocks commands" ) @@ -139,7 +155,7 @@ def main() -> None: "-t", "--transactions", choices=["True", "False"], - default="False", + default="True", help="Add or not block transactions", ) parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler) @@ -157,7 +173,7 @@ def main() -> None: "-t", "--transactions", choices=["True", "False"], - default="False", + default="True", help="Add or not block transactions", ) parser_ethcrawler_blocks_missing.set_defaults( diff --git a/crawlers/moonstreamcrawlers/ethereum.py b/crawlers/moonstreamcrawlers/ethereum.py index fe17e876..c487a153 100644 --- a/crawlers/moonstreamcrawlers/ethereum.py +++ b/crawlers/moonstreamcrawlers/ethereum.py @@ -14,7 +14,7 @@ def connect(ipc_path: Optional[str] = MOONSTREAM_IPC_PATH): return web3_client -def add_block(db_session, block: BlockData, block_number: int) -> None: +def add_block(db_session, block: BlockData) -> None: """ Add block if doesn't presented in database. """ @@ -38,29 +38,48 @@ def add_block(db_session, block: BlockData, block_number: int) -> None: transactions_root=block.transactionsRoot.hex(), ) db_session.add(block_obj) - print(f"Added new block: {block_number}") + print(f"Added new block: {block.number}") -def add_block_transaction(db_session, block_number: int, tx) -> None: +def add_block_transactions(db_session, block: BlockData) -> None: """ - Add block transaction if doesn't presented in database. + Add block transactions. """ - tx_obj = EthereumTransaction( - hash=tx.hash.hex(), - block_number=block_number, - from_address=tx["from"], - to_address=tx.to, - gas=tx.gas, - gas_price=tx.gasPrice, - input=tx.input, - nonce=tx.nonce, - transaction_index=tx.transactionIndex, - value=tx.value, + transactions_pack = [] + for tx in block.transactions: + transactions_pack.append( + EthereumTransaction( + hash=tx.hash.hex(), + block_number=block.number, + from_address=tx["from"], + to_address=tx.to, + gas=tx.gas, + gas_price=tx.gasPrice, + input=tx.input, + nonce=tx.nonce, + transaction_index=tx.transactionIndex, + value=tx.value, + ) + ) + db_session.bulk_save_objects(transactions_pack) + + +def get_latest_blocks(with_transactions: bool = False) -> None: + web3_client = connect() + block_latest: BlockData = web3_client.eth.get_block( + "latest", full_transactions=with_transactions ) - db_session.add(tx_obj) + with yield_db_session_ctx() as db_session: + block_latest_exist = ( + db_session.query(EthereumBlock) + .order_by(EthereumBlock.block_number.desc()) + .first() + ) + + return block_latest_exist.block_number, block_latest.number -def process_blocks(blocks_numbers: List[int], with_transactions: bool = False): +def crawl_blocks(blocks_numbers: List[int], with_transactions: bool = False) -> None: """ Open database and geth sessions and fetch block data from blockchain. """ @@ -70,14 +89,19 @@ def process_blocks(blocks_numbers: List[int], with_transactions: bool = False): block: BlockData = web3_client.eth.get_block( block_number, full_transactions=with_transactions ) - add_block(db_session, block, block_number) + add_block(db_session, block) + if with_transactions: - for tx in block.transactions: - add_block_transaction(db_session, block.number, tx) + add_block_transactions(db_session, block) + db_session.commit() -def check_missing_blocks(blocks_numbers: List[int]): +def check_missing_blocks(blocks_numbers: List[int]) -> List[int]: + """ + Query block from postgres. If block does not presented in database, + add to missing blocks numbers list. + """ missing_blocks_numbers = [] for block_number in blocks_numbers: with yield_db_session_ctx() as db_session: @@ -93,7 +117,7 @@ def check_missing_blocks(blocks_numbers: List[int]): def crawl(block_numbers_list: List[int], with_transactions: bool = False) -> None: """ - Execute crawler. + Execute crawler in processes. """ with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: for worker in range(1, MOONSTREAM_CRAWL_WORKERS + 1): @@ -101,7 +125,7 @@ def crawl(block_numbers_list: List[int], with_transactions: bool = False) -> Non f"Added executor for list of blocks with len: {len(block_numbers_list[worker-1::MOONSTREAM_CRAWL_WORKERS])}" ) executor.submit( - process_blocks, + crawl_blocks, block_numbers_list[worker - 1 :: MOONSTREAM_CRAWL_WORKERS], with_transactions, )