From 249224df1decf29ae99f8c8de0dd5fb63b0c1762 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 25 Jun 2024 17:09:39 +0300 Subject: [PATCH] Add v3 crawler. --- .../mooncrawl/moonworm_crawler/cli.py | 13 ++- .../moonworm_crawler/continuous_crawler.py | 93 +++++++++++++++---- .../mooncrawl/moonworm_crawler/crawler.py | 10 +- 3 files changed, 92 insertions(+), 24 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index a16bbfc0..8bab81db 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -197,7 +197,14 @@ def handle_crawl_v3(args: argparse.Namespace) -> None: index_engine = MoonstreamDBIndexesEngine() - with index_engine.yield_db_session_ctx() as index_db_session: + logger.info(f"Blockchain type: {blockchain_type.value}") + customer_connection = get_db_connection(args.customer_uuid) + + customer_engine = MoonstreamCustomDBEngine(customer_connection) + + index_engine = MoonstreamDBIndexesEngine() + + with customer_engine.yield_db_session_ctx() as db_session, index_engine.yield_db_session_ctx() as index_db_session: initial_event_jobs = get_event_crawl_job_records( index_db_session, @@ -219,8 +226,6 @@ def handle_crawl_v3(args: argparse.Namespace) -> None: f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" ) - logger.info(f"Blockchain type: {blockchain_type.value}") - with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None if args.web3 is None: logger.info( @@ -292,6 +297,8 @@ def handle_crawl_v3(args: argparse.Namespace) -> None: args.heartbeat_interval, args.new_jobs_refetch_interval, web3_uri=args.web3_uri, + version=3, + index_db_session=index_db_session, ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index c379c95a..dd504b33 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -3,7 +3,6 @@ import time import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple -from uuid import UUID from moonstreamtypes.blockchain import AvailableBlockchainType @@ -16,10 +15,6 @@ from moonworm.crawler.ethereum_state_provider import Web3StateProvider from sqlalchemy.orm.session import Session from web3 import Web3 -from ..settings import ( - HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, - HISTORICAL_CRAWLER_STATUSES, -) from .crawler import ( EventCrawlJob, FunctionCallCrawlJob, @@ -31,10 +26,13 @@ from .crawler import ( merge_event_crawl_jobs, merge_function_call_crawl_jobs, moonworm_crawler_update_job_as_pickedup, + get_event_crawl_job_records, + get_function_call_crawl_job_records, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions +from ..settings import CRAWLER_LABEL, SEER_CRAWLER_LABEL logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -108,8 +106,16 @@ def continuous_crawler( web3_uri: Optional[str] = None, max_insert_batch: int = 10000, version: int = 2, + index_db_session: Optional[Session] = None, + customer_id: Optional[str] = None, ): crawler_type = "continuous" + if version == 3: + crawler_type = "continuous_v3" + + label_name = CRAWLER_LABEL + if version == 3: + label_name = SEER_CRAWLER_LABEL assert ( min_blocks_batch < max_blocks_batch ), "min_blocks_batch must be less than max_blocks_batch" @@ -191,6 +197,7 @@ def continuous_crawler( to_block=end_block, blocks_cache=blocks_cache, db_block_query_batch=min_blocks_batch * 2, + version=version, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {end_block}." @@ -203,9 +210,13 @@ def continuous_crawler( db_session, all_events[i : i + max_insert_batch], blockchain_type, + version, + label_name, ) else: - add_events_to_session(db_session, all_events, blockchain_type) + add_events_to_session( + db_session, all_events, blockchain_type, version, label_name + ) logger.info( f"Crawling function calls from {start_block} to {end_block}" @@ -228,10 +239,16 @@ def continuous_crawler( db_session, all_function_calls[i : i + max_insert_batch], blockchain_type, + db_version=version, + label_name=label_name, ) else: add_function_calls_to_session( - db_session, all_function_calls, blockchain_type + db_session, + all_function_calls, + blockchain_type, + db_version=version, + label_name=label_name, ) current_time = datetime.utcnow() @@ -239,20 +256,56 @@ def continuous_crawler( if current_time - jobs_refetchet_time > timedelta( seconds=new_jobs_refetch_interval ): - logger.info( - f"Refetching new jobs from bugout journal since {jobs_refetchet_time}" - ) - event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( - event_crawl_jobs, function_call_crawl_jobs, blockchain_type - ) + if version == 2: + ## Refetch new jobs from bugout journal + logger.info( + f"Refetching new jobs from bugout journal since {jobs_refetchet_time}" + ) + event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( + event_crawl_jobs, function_call_crawl_jobs, blockchain_type + ) - ( - event_crawl_jobs, - function_call_crawl_jobs, - ) = moonworm_crawler_update_job_as_pickedup( - event_crawl_jobs=event_crawl_jobs, - function_call_crawl_jobs=function_call_crawl_jobs, - ) + ( + event_crawl_jobs, + function_call_crawl_jobs, + ) = moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs=event_crawl_jobs, + function_call_crawl_jobs=function_call_crawl_jobs, + ) + elif version == 3 and index_db_session is not None: + ## Refetch new jobs from index db + + updated_event_crawl_jobs = get_event_crawl_job_records( + index_db_session, + blockchain_type, + [], + {event.event_abi_hash: event for event in event_crawl_jobs}, + customer_id=customer_id, + ) + + event_crawl_jobs = [ + event for event in updated_event_crawl_jobs.values() + ] + + updated_function_call_crawl_jobs = ( + get_function_call_crawl_job_records( + index_db_session, + blockchain_type, + [], + { + function_call.contract_address: function_call + for function_call in function_call_crawl_jobs + }, + customer_id=customer_id, + ) + ) + + function_call_crawl_jobs = [ + function_call + for function_call in updated_function_call_crawl_jobs.values() + ] + else: + raise ValueError("Invalid version") jobs_refetchet_time = current_time diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 79a4d58e..807e2ef3 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -704,6 +704,7 @@ def get_event_crawl_job_records( blockchain_type: AvailableBlockchainType, addresses: List[str], existing_crawl_job_records: Dict[str, EventCrawlJob], + customer_id: Optional[str] = None, ): """ Retrieve and update the event crawl job records from the database. @@ -712,9 +713,12 @@ def get_event_crawl_job_records( query = ( db_session.query(AbiJobs) .filter(AbiJobs.chain == blockchain_type.value) - .filter(func.length(AbiJobs.abi_selector) > 10) + .filter(func.cast(AbiJobs.abi_selector, JSON).op("->>")("type") == "event") ) + if customer_id is not None: + query = query.filter(AbiJobs.customer_id == customer_id) + if len(addresses) != 0: query = query.filter( AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) @@ -770,6 +774,7 @@ def get_function_call_crawl_job_records( blockchain_type: AvailableBlockchainType, addresses: List[str], existing_crawl_job_records: Dict[str, FunctionCallCrawlJob], + customer_id: Optional[str] = None, ): """ Retrieve and update the function call crawl job records from the database. @@ -786,6 +791,9 @@ def get_function_call_crawl_job_records( ) ) + if customer_id is not None: + query = query.filter(AbiJobs.customer_id == customer_id) + if len(addresses) != 0: query = query.filter( AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])