From 87ccc820491ce3c777baf4c6f8b0e74e507aa627 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Fri, 18 Mar 2022 20:41:28 +0300 Subject: [PATCH] update erc20 populate --- .../mooncrawl/generic_crawler/base.py | 68 +++++++++ .../mooncrawl/generic_crawler/cli.py | 130 +++++++++++++++++- 2 files changed, 191 insertions(+), 7 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py index 6d9eb94c..6af77416 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py @@ -265,6 +265,74 @@ def _processEvent(raw_event: Dict[str, Any]): return event +def populate_with_events( + db_session: Session, + web3: Web3, + blockchain_type: AvailableBlockchainType, + label_name: str, + populate_from_label: str, + abi: List[Dict[str, Any]], + from_block: int, + to_block: int, + batch_size: int = 100, +): + db_blocks_cache = {} + current_block = from_block + + events_abi = [event for event in abi if event["type"] == "event"] + label_model = get_label_model(blockchain_type) + + pbar = tqdm(total=(to_block - from_block + 1)) + pbar.set_description(f"Populating events for blocks {from_block}-{to_block}") + + while current_block <= to_block: + batch_end = min(current_block + batch_size, to_block) + events = [] + logger.info("Fetching events") + for event_abi in events_abi: + raw_events = _fetch_events_chunk( + web3, + event_abi, + current_block, + batch_end, + ) + for raw_event in raw_events: + raw_event["blockTimestamp"] = get_block_timestamp( + db_session, + web3, + blockchain_type, + raw_event["blockNumber"], + blocks_cache=db_blocks_cache, + max_blocks_batch=1000, + ) + event = _processEvent(raw_event) + events.append(event) + logger.info(f"Fetched {len(events)} events") + txs = ( + db_session.query(label_model.transaction_hash) + .filter( + label_name == populate_from_label, + label_model.block_number >= current_block, + label_model.block_number <= batch_end, + ) + .distinct() + .all() + ) + txs_to_populate = [tx[0] for tx in txs] + + events_to_save = [] + for event in events: + if event.transaction_hash in txs_to_populate: + events_to_save.append(event) + + logger.info(f"Found {len(events_to_save)} events to save") + + add_events_to_session(db_session, events_to_save, blockchain_type, label_name) + commit_session(db_session) + pbar.update(batch_end - current_block + 1) + current_block = batch_end + 1 + + def crawl( db_session: Session, web3: Web3, diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index c813dc1f..92a156c1 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -8,7 +8,7 @@ from web3.middleware import geth_poa_middleware import json from mooncrawl.data import AvailableBlockchainType # type: ignore from ..blockchain import connect -from .base import crawl, get_checkpoint +from .base import crawl, get_checkpoint, populate_with_events logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -19,11 +19,6 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: with open("mooncrawl/generic_crawler/abis/erc721.json") as f: abi = json.load(f) - with open("mooncrawl/generic_crawler/abis/erc20.json") as f: - erc20_abi = json.load(f) - erc20_abi = [ - abi for abi in erc20_abi if abi.get("name") == "Transfer" - ] # only care about transfer event label = args.label_name from_block = args.start_block @@ -61,13 +56,65 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: blockchain_type, label, abi, - erc20_abi, + [], from_block=last_crawled_block, to_block=to_block, batch_size=args.max_blocks_batch, ) +def populate_with_erc20_transfers(args: argparse.Namespace) -> None: + logger.info(f"Starting erc20 transfer crawler") + + label = args.label_name + from_block = args.start_block + to_block = args.end_block + + with open(args.abi) as f: + erc20_abi = json.load(f) + # Taking only transfer event from erc20_abi + erc20_abi = [ + event + for event in erc20_abi + if event["type"] == "event" and event["name"] == "Transfer" + ] + + blockchain_type = AvailableBlockchainType(args.blockchain_type) + + logger.info(f"Blockchain type: {blockchain_type.value}") + with yield_db_session_ctx() as db_session: + web3: Optional[Web3] = None + if args.web3 is None: + logger.info( + "No web3 provider URL provided, using default (blockchan.py: connect())" + ) + web3 = connect(blockchain_type) + else: + logger.info(f"Using web3 provider URL: {args.web3}") + web3 = Web3( + Web3.HTTPProvider(args.web3), + ) + if args.poa: + logger.info("Using PoA middleware") + web3.middleware_onion.inject(geth_poa_middleware, layer=0) + last_crawled_block = get_checkpoint( + db_session, blockchain_type, from_block, to_block, label + ) + + logger.info(f"Starting from block: {last_crawled_block}") + populate_with_events( + db_session, + web3, + blockchain_type, + label, + args.label_to_populate, + erc20_abi, + last_crawled_block, + to_block, + batch_size=args.max_blocks_batch, + ) + + def handle_crawl(args: argparse.Namespace) -> None: logger.info(f"Starting generic crawler") @@ -238,6 +285,75 @@ def main(): nft_crawler_parser.set_defaults(func=handle_nft_crawler) + erc20_populate_parser = subparsers.add_parser( + "erc20_populate", + help="Populate erc20 labels", + ) + + erc20_populate_parser.add_argument( + "--blockchain_type", + type=str, + required=True, + choices=[ + "ethereum", + "polygon", + ], + ) + erc20_populate_parser.add_argument( + "--web3", + type=str, + default=None, + help="Web3 provider URL", + ) + + erc20_populate_parser.add_argument( + "--poa", + action="store_true", + default=False, + help="Use PoA middleware", + ) + + erc20_populate_parser.add_argument( + "--start_block", + type=int, + default=None, + ) + erc20_populate_parser.add_argument( + "--end_block", + type=int, + default=None, + ) + + erc20_populate_parser.add_argument( + "--max_blocks_batch", + type=int, + default=500, + help="Maximum number of blocks to crawl in a single crawl step", + ) + + erc20_populate_parser.add_argument( + "--label_name", + type=str, + required=True, + help="Label name", + ) + + erc20_populate_parser.add_argument( + "--label_to_populate", + type=str, + required=True, + help="Label name to populate", + ) + + erc20_populate_parser.add_argument( + "--abi", + type=str, + default=None, + help="Abi of the erc20 contract", + ) + + erc20_populate_parser.set_defaults(func=populate_with_erc20_transfers) + args = parser.parse_args() args.func(args)