From 83eaeebe1af31c94dc33a945c9db50418ef0e05d Mon Sep 17 00:00:00 2001 From: yhtiyar Date: Thu, 16 Dec 2021 22:43:32 +0300 Subject: [PATCH] [WIP] crawler --- .../mooncrawl/moonworm_crawler/cli.py | 145 ++++++++++-- .../moonworm_crawler/continuous_crawler.py | 213 ++++++++++-------- .../mooncrawl/moonworm_crawler/crawler.py | 28 +-- .../mooncrawl/moonworm_crawler/db.py | 172 ++++++++++++++ .../moonworm_crawler/event_crawler.py | 26 +-- .../moonworm_crawler/function_call_crawler.py | 34 +-- crawlers/mooncrawl/mooncrawl/settings.py | 2 +- 7 files changed, 431 insertions(+), 189 deletions(-) create mode 100644 crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 82c7708c..df6554a6 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -1,14 +1,26 @@ -import web3 +import logging +import argparse + +from web3 import Web3 from moonstreamdb.db import yield_db_session_ctx from web3.middleware import geth_poa_middleware from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client -from .crawler import * -from .event_crawler import continuous_event_crawler -from .function_call_crawler import function_call_crawler +from .crawler import ( + make_event_crawl_jobs, + make_function_call_crawl_jobs, + get_crawl_job_entries, + SubscriptionTypes, +) +from ..blockchain import AvailableBlockchainType +from .continuous_crawler import continuous_crawler -def crawl_events(): +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def handle_crawl(args: argparse.Namespace) -> None: initial_event_jobs = make_event_crawl_jobs( get_crawl_job_entries( @@ -32,28 +44,115 @@ def crawl_events(): print(initial_function_call_jobs) with yield_db_session_ctx() as db_session: - web3 = Web3( - Web3.HTTPProvider( - "https://polygon-mainnet.infura.io/v3/0492b7dd00bb4ad8a3346b3a0d780140" + web3 = None + if args.web3 is not None: + web3 = Web3( + Web3.HTTPProvider( + args.web3, + ) ) - ) - web3.middleware_onion.inject(geth_poa_middleware, layer=0) - function_call_crawler( + if args.poa: + web3.middleware_onion.inject(geth_poa_middleware, layer=0) + + start_block = args.start_block + continuous_crawler( db_session, - AvailableBlockchainType.POLYGON, + args.blockchain_type, web3, + initial_event_jobs, initial_function_call_jobs, - start_block=21418707, - end_block=web3.eth.blockNumber, - batch_size=100, + start_block, + args.max_blocks_batch, + args.min_blocks_batch, + args.confirmations, + args.min_sleep_time, + args.heartbeat_interval, + args.new_jobs_refetch_interval, ) - # continuous_event_crawler( - # db_session=session, - # blockchain_type=AvailableBlockchainType.POLYGON, - # web3=web3, - # crawl_jobs=initial_event_jobs, - # start_block=web3.eth.blockNumber - 120000, - # ) -crawl_events() +def generate_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + + crawl_parser = subparsers.add_parser("crawl") + + crawl_parser.add_argument( + "--start", + "-s", + type=int, + default=None, + ) + crawl_parser.add_argument( + "--blockchain-type", + "-b", + type=str, + choices=[ + AvailableBlockchainType.ETHEREUM.value, + AvailableBlockchainType.POLYGON.value, + ], + required=True, + ) + crawl_parser.add_argument( + "--web3", + type=str, + default=None, + help="Web3 provider URL", + ) + crawl_parser.add_argument( + "--poa", + action="store_true", + default=False, + help="Use PoA middleware", + ) + + crawl_parser.add_argument( + "--max-blocks-batch", + "-m", + type=int, + default=100, + help="Maximum number of blocks to crawl in a single batch", + ) + + crawl_parser.add_argument( + "--min-blocks-batch", + "-n", + type=int, + default=10, + help="Minimum number of blocks to crawl in a single batch", + ) + + crawl_parser.add_argument( + "--confirmations", + "-c", + type=int, + default=100, + help="Number of confirmations to wait for", + ) + + crawl_parser.add_argument( + "--min-sleep-time", + "-t", + type=float, + default=0.01, + help="Minimum time to sleep between crawl step", + ) + + crawl_parser.add_argument( + "--heartbeat-interval", + "-i", + type=float, + default=60, + help="Heartbeat interval in seconds", + ) + + crawl_parser.add_argument( + "--new-jobs-refetch-interval", + "-r", + type=float, + default=120, + help="Time to wait before refetching new jobs", + ) + + crawl_parser.set_defaults(func=handle_crawl) + return parser diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 94bb4e7e..be6ec4f4 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -2,15 +2,16 @@ import logging import time import traceback from datetime import datetime, timedelta -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple from moonworm.crawler.moonstream_ethereum_state_provider import ( MoonstreamEthereumStateProvider, ) +from moonworm.crawler.networks import Network from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import and_ from web3 import Web3 - +from ..blockchain import connect from ..data import AvailableBlockchainType from .crawler import ( EventCrawlJob, @@ -19,14 +20,14 @@ from .crawler import ( get_crawl_job_entries, heartbeat, make_event_crawl_jobs, - merge_event_crawl_jobs, - save_labels, - merge_function_call_crawl_jobs, make_function_call_crawl_jobs, + merge_event_crawl_jobs, + merge_function_call_crawl_jobs, ) -from .event_crawler import _event_to_label, _crawl_events -from .function_call_crawler import _crawl_functions, _function_call_to_label -from moonworm.crawler.networks import Network + +from .event_crawler import _crawl_events +from .function_call_crawler import _crawl_functions +from .db import save_events, save_function_calls logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -81,10 +82,37 @@ def _refetch_new_jobs( return event_crawl_jobs, function_call_crawl_jobs +def _retry_connect_web3( + blockchain_type: AvailableBlockchainType, + retry_count: int = 10, + sleep_time: float = 5, +) -> Web3: + """ + Retry connecting to the blockchain. + """ + while retry_count > 0: + retry_count -= 1 + try: + web3 = connect(blockchain_type) + web3.eth.block_number + logger.info(f"Connected to {blockchain_type}") + return web3 + except Exception as e: + if retry_count == 0: + error = e + break + logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}") + logger.info(f"Retrying in {sleep_time} seconds") + time.sleep(sleep_time) + raise Exception( + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + ) + + def continuous_crawler( db_session: Session, blockchain_type: AvailableBlockchainType, - web3: Web3, + web3: Optional[Web3], event_crawl_jobs: List[EventCrawlJob], function_call_crawl_jobs: List[FunctionCallCrawlJob], start_block: int, @@ -111,6 +139,8 @@ def continuous_crawler( crawl_start_time = datetime.utcnow() jobs_refetchet_time = crawl_start_time + if web3 is None: + web3 = _retry_connect_web3(blockchain_type) network = ( Network.ethereum @@ -146,92 +176,99 @@ def continuous_crawler( try: while True: - # query db with limit 1, to avoid session closing - db_session.execute("SELECT 1") - time.sleep(min_sleep_time) + try: + # query db with limit 1, to avoid session closing + db_session.execute("SELECT 1") + time.sleep(min_sleep_time) - end_block = min( - web3.eth.blockNumber - confirmations, - start_block + max_blocks_batch, - ) - - if start_block + min_blocks_batch > end_block: - min_sleep_time *= 2 - continue - min_sleep_time /= 2 - - logger.info(f"Crawling events from {start_block} to {end_block}") - all_events = _crawl_events( - db_session=db_session, - blockchain_type=blockchain_type, - web3=web3, - jobs=event_crawl_jobs, - from_block=start_block, - to_block=end_block, - blocks_cache=blocks_cache, - db_block_query_batch=min_blocks_batch * 2, - ) - logger.info( - f"Crawled {len(all_events)} events from {start_block} to {end_block}." - ) - - if all_events: - save_labels( - db_session, - [_event_to_label(blockchain_type, event) for event in all_events], + end_block = min( + web3.eth.blockNumber - confirmations, + start_block + max_blocks_batch, ) - logger.info(f"Crawling function calls from {start_block} to {end_block}") - all_function_calls = _crawl_functions( - blockchain_type, - ethereum_state_provider, - function_call_crawl_jobs, - start_block, - end_block, - ) - logger.info( - f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}." - ) + if start_block + min_blocks_batch > end_block: + min_sleep_time *= 2 + logger.info( + f"Sleeping for {min_sleep_time} seconds because of low block count" + ) + continue + min_sleep_time = max(min_sleep_time, min_sleep_time / 2) - if all_function_calls: - save_labels( - db_session, - [ - _function_call_to_label(blockchain_type, function_call) - for function_call in all_function_calls - ], - ) - - current_time = datetime.utcnow() - - if current_time - jobs_refetchet_time > timedelta( - seconds=new_jobs_refetch_interval - ): - logger.info( - f"Refetching new jobs from bugout journal since {jobs_refetchet_time}" - ) - event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( - event_crawl_jobs, function_call_crawl_jobs, blockchain_type - ) - jobs_refetchet_time = current_time - - if current_time - last_heartbeat_time > timedelta( - seconds=heartbeat_interval - ): - # Update heartbeat and send to humbug - heartbeat_template["last_block"] = end_block - heartbeat_template["current_time"] = current_time - heartbeat_template["current_event_jobs_length"] = len(event_crawl_jobs) - heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time - heartbeat( - crawler_type="event", + logger.info(f"Crawling events from {start_block} to {end_block}") + all_events = _crawl_events( + db_session=db_session, blockchain_type=blockchain_type, - crawler_status=heartbeat_template, + web3=web3, + jobs=event_crawl_jobs, + from_block=start_block, + to_block=end_block, + blocks_cache=blocks_cache, + db_block_query_batch=min_blocks_batch * 2, + ) + logger.info( + f"Crawled {len(all_events)} events from {start_block} to {end_block}." ) - logger.info("Sending heartbeat to humbug.", heartbeat_template) - last_heartbeat_time = datetime.utcnow() - start_block = end_block + 1 + save_events(db_session, all_events, blockchain_type) + + logger.info( + f"Crawling function calls from {start_block} to {end_block}" + ) + all_function_calls = _crawl_functions( + blockchain_type, + ethereum_state_provider, + function_call_crawl_jobs, + start_block, + end_block, + ) + logger.info( + f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}." + ) + + save_function_calls(db_session, all_function_calls, blockchain_type) + + current_time = datetime.utcnow() + + if current_time - jobs_refetchet_time > timedelta( + seconds=new_jobs_refetch_interval + ): + logger.info( + f"Refetching new jobs from bugout journal since {jobs_refetchet_time}" + ) + event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( + event_crawl_jobs, function_call_crawl_jobs, blockchain_type + ) + jobs_refetchet_time = current_time + + if current_time - last_heartbeat_time > timedelta( + seconds=heartbeat_interval + ): + # Update heartbeat and send to humbug + heartbeat_template["last_block"] = end_block + heartbeat_template["current_time"] = current_time + heartbeat_template["current_event_jobs_length"] = len( + event_crawl_jobs + ) + heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time + heartbeat( + crawler_type="event", + blockchain_type=blockchain_type, + crawler_status=heartbeat_template, + ) + logger.info("Sending heartbeat to humbug.", heartbeat_template) + last_heartbeat_time = datetime.utcnow() + + start_block = end_block + 1 + except Exception as e: + logger.error(f"Internal error: {e}") + logger.exception(e) + try: + web3 = _retry_connect_web3(blockchain_type) + except Exception as err: + logger.error(f"Failed to reconnect: {err}") + logger.exception(err) + raise err + except BaseException as e: logger.error(f"!!!!Crawler Died!!!!") heartbeat_template["status"] = "dead" diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 5b0f12d2..1a1d1043 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -315,29 +315,9 @@ def heartbeat( ) -def get_last_labeled_block_number( - db_session: Session, blockchain_type: AvailableBlockchainType -) -> Optional[int]: - label_model = get_label_model(blockchain_type) - block_number = ( - db_session.query(label_model.block_number) - .filter(label_model.label == CRAWLER_LABEL) - .order_by(label_model.block_number.desc()) - .limit(1) - .one_or_none() - ) - - return block_number[0] if block_number else None -def save_labels(db_session: Session, labels: List[Base]) -> None: - """ - Save labels in the database. - """ - try: - db_session.add_all(labels) - db_session.commit() - except Exception as e: - logger.error(f"Failed to save labels: {e}") - db_session.rollback() - raise e + + + + diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py new file mode 100644 index 00000000..5e2e9b5d --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -0,0 +1,172 @@ +import logging +from typing import Any, Dict, List, Optional, Union + +from eth_typing.evm import ChecksumAddress +from hexbytes.main import HexBytes +from moonstreamdb.db import yield_db_session_ctx +from moonstreamdb.models import ( + Base, + EthereumLabel, + EthereumTransaction, + PolygonLabel, + PolygonTransaction, +) +from moonworm.crawler.function_call_crawler import ( + ContractFunctionCall, +) + +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import label + +from ..blockchain import connect, get_block_model, get_label_model +from ..data import AvailableBlockchainType +from ..settings import CRAWLER_LABEL +from .crawler import FunctionCallCrawlJob, _generate_reporter_callback +from .event_crawler import Event + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base: + """ + Creates a label model. + """ + label_model = get_label_model(blockchain_type) + label = label_model( + label=CRAWLER_LABEL, + label_data={ + "type": "event", + "name": event.event_name, + "args": event.args, + }, + address=event.address, + block_number=event.block_number, + block_timestamp=event.block_timestamp, + transaction_hash=event.transaction_hash, + log_index=event.log_index, + ) + return label + + +def _function_call_to_label( + blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall +) -> Base: + """ + Creates a label model. + """ + label_model = get_label_model(blockchain_type) + label = label_model( + label=CRAWLER_LABEL, + label_data={ + "type": "tx_call", + "name": function_call.function_name, + "caller": function_call.caller_address, + "args": function_call.function_args, + "status": function_call.status, + "gasUsed": function_call.gas_used, + }, + address=function_call.contract_address, + block_number=function_call.block_number, + transaction_hash=function_call.transaction_hash, + block_timestamp=function_call.block_timestamp, + ) + + return label + + +def get_last_labeled_block_number( + db_session: Session, blockchain_type: AvailableBlockchainType +) -> Optional[int]: + label_model = get_label_model(blockchain_type) + block_number = ( + db_session.query(label_model.block_number) + .filter(label_model.label == CRAWLER_LABEL) + .order_by(label_model.block_number.desc()) + .limit(1) + .one_or_none() + ) + + return block_number[0] if block_number else None + + +def save_labels(db_session: Session, labels: List[Base]) -> None: + """ + Save labels in the database. + """ + try: + db_session.add_all(labels) + db_session.commit() + except Exception as e: + logger.error(f"Failed to save labels: {e}") + db_session.rollback() + raise e + + +def save_events( + db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType +) -> None: + label_model = get_label_model(blockchain_type) + + events_hashes_to_save = [event.transaction_hash for event in events] + + existing_labels = ( + db_session.query(label_model.transaction_hash, label_model.log_index) + .filter( + label_model.label == CRAWLER_LABEL, + label_model.log_index != None, + label_model.transaction_hash.in_(events_hashes_to_save), + ) + .all() + ) + + existing_labels_transactions = [] + existing_log_index_by_tx_hash: Dict[str, List[int]] = {} + for label in existing_labels: + if label[0] not in existing_labels_transactions: + existing_labels_transactions.append(label[0]) + existing_log_index_by_tx_hash[label[0]] = [] + existing_log_index_by_tx_hash[label[0]].append(label[1]) + + labels_to_save = [] + for event in events: + if event.transaction_hash not in existing_labels_transactions: + labels_to_save.append(_event_to_label(blockchain_type, event)) + elif ( + event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] + ): + labels_to_save.append(_event_to_label(blockchain_type, event)) + + save_labels(db_session, labels_to_save) + + +def save_function_calls( + db_session: Session, + function_calls: List[ContractFunctionCall], + blockchain_type: AvailableBlockchainType, +) -> None: + + label_model = get_label_model(blockchain_type) + transactions_hashes_to_save = [ + function_call.transaction_hash for function_call in function_calls + ] + + existing_labels = ( + db_session.query(label_model.transaction_hash) + .filter( + label_model.label == CRAWLER_LABEL, + label_model.log_index == None, + label_model.transaction_hash.in_(transactions_hashes_to_save), + ) + .all() + ) + + existing_labels_transactions = [label[0] for label in existing_labels] + + labels_to_save = [ + _function_call_to_label(blockchain_type, function_call) + for function_call in function_calls + if function_call.transaction_hash not in existing_labels_transactions + ] + + save_labels(db_session, labels_to_save) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index 8ca20ca9..e398ffcd 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -1,12 +1,10 @@ import logging - -from dataclasses import dataclass - import traceback +from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union, cast -from moonstreamdb.models import Base from eth_typing.evm import ChecksumAddress +from moonstreamdb.models import Base from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import and_ @@ -32,25 +30,7 @@ class Event: log_index: int -def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base: - """ - Creates a label model. - """ - label_model = get_label_model(blockchain_type) - label = label_model( - label=CRAWLER_LABEL, - label_data={ - "type": "event", - "name": event.event_name, - "args": event.args, - }, - address=event.address, - block_number=event.block_number, - block_timestamp=event.block_timestamp, - transaction_hash=event.transaction_hash, - log_index=event.log_index, - ) - return label + def _get_block_timestamp_from_web3( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py index c49fd325..926586ad 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py @@ -5,6 +5,7 @@ from eth_typing.evm import ChecksumAddress from hexbytes.main import HexBytes from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.models import ( + Base, EthereumLabel, EthereumTransaction, PolygonLabel, @@ -17,47 +18,20 @@ from moonworm.crawler.function_call_crawler import ( from moonworm.crawler.moonstream_ethereum_state_provider import ( MoonstreamEthereumStateProvider, ) -from moonworm.cu_watch import MockState from moonworm.crawler.networks import Network +from moonworm.cu_watch import MockState from sqlalchemy.orm import Session from web3 import Web3 -from moonstreamdb.models import Base -from ..data import AvailableBlockchainType -from .crawler import FunctionCallCrawlJob, _generate_reporter_callback from ..blockchain import connect, get_block_model, get_label_model +from ..data import AvailableBlockchainType from ..settings import CRAWLER_LABEL +from .crawler import FunctionCallCrawlJob, _generate_reporter_callback logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def _function_call_to_label( - blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall -) -> Base: - """ - Creates a label model. - """ - label_model = get_label_model(blockchain_type) - label = label_model( - label=CRAWLER_LABEL, - label_data={ - "type": "tx_call", - "name": function_call.function_name, - "caller": function_call.caller_address, - "args": function_call.function_args, - "status": function_call.status, - "gasUsed": function_call.gas_used, - }, - address=function_call.contract_address, - block_number=function_call.block_number, - transaction_hash=function_call.transaction_hash, - block_timestamp=function_call.block_timestamp, - ) - - return label - - def _crawl_functions( blockchain_type: AvailableBlockchainType, ethereum_state_provider: MoonstreamEthereumStateProvider, diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 86990fa4..4484f993 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -23,7 +23,7 @@ DOCS_TARGET_PATH = "docs" # Crawler label -CRAWLER_LABEL = "moonworm" +CRAWLER_LABEL = "moonworm-alpha" # Geth connection address MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get(