diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index a2852daf..9f0d3749 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -20,6 +20,7 @@ from .crawler import ( make_event_crawl_jobs, make_function_call_crawl_jobs, find_all_deployed_blocks, + update_job_state_with_filters, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -37,23 +38,45 @@ def handle_crawl(args: argparse.Namespace) -> None: subscription_type, "event", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ), - moonworm=True, + ) ) logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") + if len(initial_event_jobs) > 0: + initial_event_jobs = update_job_state_with_filters( # type: ignore + events=initial_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + initial_function_call_jobs = make_function_call_crawl_jobs( get_crawl_job_entries( subscription_type, "function", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ), - moonworm=True, + ) ) logger.info( f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" ) + if len(initial_function_call_jobs) > 0: + initial_event_jobs = update_job_state_with_filters( # type: ignore + events=initial_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -143,7 +166,6 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: [ "moonworm_task_pikedup:True", "historical_crawl_status:pending", - "progress:0", ] ) @@ -190,16 +212,46 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: filtered_function_call_jobs = [] logger.info(f"Removing function call crawl jobs since --only-events is set") + if args.only_functions: + filtered_event_jobs = [] + logger.info( + f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}" + ) + + if args.only_events and args.only_functions: + raise ValueError( + "--only-events and --only-functions cannot be set at the same time" + ) + + if args.tasks_journal: + if len(filtered_event_jobs) > 0: + filtered_event_jobs = update_job_state_with_filters( # type: ignore + events=filtered_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + + if len(filtered_function_call_jobs) > 0: + filtered_function_call_jobs = update_job_state_with_filters( # type: ignore + function_calls=filtered_function_call_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + logger.info( f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" ) - addresses_set = set() - for job in filtered_event_jobs: - addresses_set.update(job.contracts) - for function_job in filtered_function_call_jobs: - addresses_set.add(function_job.contract_address) - logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -224,14 +276,23 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) logger.info(f"Last labeled block: {last_labeled_block}") - if args.tasks_journal: - start_block = int(web3.eth.blockNumber) - 1 - end_block = min( - find_all_deployed_blocks(blockchain_type, list(addresses_set)) + addresses_deployment_blocks = None + + # get set of addresses from event jobs and function call jobs + if args.find_deployed_blocks: + addresses_set = set() + for job in filtered_event_jobs: + addresses_set.update(job.contracts) + for function_job in filtered_function_call_jobs: + addresses_set.add(function_job.contract_address) + + if args.start is None: + start_block = web3.eth.blockNumber - 1 + + addresses_deployment_blocks = find_all_deployed_blocks( + blockchain_type, list(addresses_set) ) - else: - start_block = args.start - end_block = args.end + end_block = min(addresses_deployment_blocks.values()) if start_block is None: logger.info("No start block provided") @@ -276,6 +337,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: args.max_blocks_batch, args.min_sleep_time, access_id=args.access_id, + addresses_deployment_blocks=addresses_deployment_blocks, ) @@ -454,6 +516,18 @@ def main() -> None: default=False, help="Only crawl events", ) + historical_crawl_parser.add_argument( + "--only-functions", + action="store_true", + default=False, + help="Only crawl function calls", + ) + historical_crawl_parser.add_argument( + "--find-deployed-blocks", + action="store_true", + default=False, + help="Find all deployed blocks", + ) historical_crawl_parser.add_argument( "--tasks-journal", action="store_true", diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index d33c8b35..8657754e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -220,6 +220,30 @@ def continuous_crawler( event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( event_crawl_jobs, function_call_crawl_jobs, blockchain_type ) + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_job_state_with_filters( # type: ignore + events=event_crawl_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_job_state_with_filters( # type: ignore + events=function_call_crawl_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + jobs_refetchet_time = current_time if current_time - last_heartbeat_time > timedelta( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 1adb9d66..f4d58432 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -5,7 +5,7 @@ import time from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional, cast +from typing import Any, Callable, Dict, List, Optional, cast, Union from uuid import UUID from bugout.data import BugoutSearchResult @@ -21,6 +21,8 @@ from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, + HISTORICAL_CRAWLER_STATUSES, ) logging.basicConfig(level=logging.INFO) @@ -146,6 +148,7 @@ class EventCrawlJob: event_abi_hash: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] + entries_ids: Dict[ChecksumAddress, Dict[UUID, List[str]]] created_at: int @@ -153,6 +156,7 @@ class EventCrawlJob: class FunctionCallCrawlJob: contract_abi: List[Dict[str, Any]] contract_address: ChecksumAddress + entries_tags: Dict[UUID, List[str]] created_at: int @@ -209,13 +213,13 @@ def get_crawl_job_entries( def find_all_deployed_blocks( blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress] -): +) -> Dict[ChecksumAddress, int]: """ find all deployed blocks for given addresses """ web3 = _retry_connect_web3(blockchain_type) - all_deployed_blocks = [] + all_deployed_blocks = {} for address in addresses_set: try: code = web3.eth.getCode(address) @@ -226,8 +230,7 @@ def find_all_deployed_blocks( web3_interval=0.5, ) if block is not None: - all_deployed_blocks.append(address) - + all_deployed_blocks[address] = block except Exception as e: logger.error(f"Failed to get code for {address}: {e}") return all_deployed_blocks @@ -240,9 +243,7 @@ def _get_tag(entry: BugoutSearchResult, tag: str) -> str: raise ValueError(f"Tag {tag} not found in {entry}") -def make_event_crawl_jobs( - entries: List[BugoutSearchResult], moonworm: bool = False -) -> List[EventCrawlJob]: +def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]: """ Create EventCrawlJob objects from bugout entries. """ @@ -253,27 +254,23 @@ def make_event_crawl_jobs( abi_hash = _get_tag(entry, "abi_method_hash") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) - # if entry.tags not contain moonworm_task_pikedup:True - if "moonworm_task_pikedup:True" not in entry.tags and moonworm: - # Update the tag to pickedup - bugout_client.update_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entry_id=entry.entry_url.split("/")[-1], - tags=["moonworm_task_pikedup:True"], - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji existing_crawl_job = crawl_job_by_hash.get(abi_hash) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) + existing_crawl_job.entries_ids[contract_address] = { + entry_id: entry.tags + } + else: abi = cast(str, entry.content) new_crawl_job = EventCrawlJob( event_abi_hash=abi_hash, event_abi=json.loads(abi), contracts=[contract_address], + entries_ids={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) crawl_job_by_hash[abi_hash] = new_crawl_job @@ -283,7 +280,6 @@ def make_event_crawl_jobs( def make_function_call_crawl_jobs( entries: List[BugoutSearchResult], - moonworm: bool = False, ) -> List[FunctionCallCrawlJob]: """ Create FunctionCallCrawlJob objects from bugout entries. @@ -293,26 +289,18 @@ def make_function_call_crawl_jobs( method_signature_by_address: Dict[str, List[str]] = {} for entry in entries: + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) method_signature = encode_function_signature(abi) if method_signature is None: raise ValueError(f"{abi} is not a function ABI") - if "moonworm_task_pikedup:True" not in entry.tags and moonworm: - # Update the tag to pickedup - bugout_client.update_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entry_id=entry.entry_url.split("/")[-1], - tags=["moonworm_task_pikedup:True"], - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) - if contract_address not in crawl_job_by_address: crawl_job_by_address[contract_address] = FunctionCallCrawlJob( contract_abi=[abi], contract_address=contract_address, + entries_tags={entry_id: entry.tags}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) method_signature_by_address[contract_address] = [method_signature] @@ -321,6 +309,9 @@ def make_function_call_crawl_jobs( if method_signature not in method_signature_by_address[contract_address]: crawl_job_by_address[contract_address].contract_abi.append(abi) method_signature_by_address[contract_address].append(method_signature) + crawl_job_by_address[contract_address].entries_tags[ + entry_id + ] = entry.tags return [crawl_job for crawl_job in crawl_job_by_address.values()] @@ -449,3 +440,212 @@ def heartbeat( tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"], timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) + + +def bugout_state_update( + entries_tags_add: List[Dict[str, Any]], + entries_tags_delete: List[Dict[str, Any]], +) -> Any: + if len(entries_tags_add) > 0: + new_entreis_state = bugout_client.update_entries_tags( # type: ignore + entries_tags=entries_tags_add, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + if len(entries_tags_delete) > 0: + new_entreis_state = bugout_client.delete_entries_tags( # type: ignore + entries_tags=entries_tags_delete, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + return new_entreis_state + + +def update_job_tags( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + new_entreis_state: Any, +): + for entry in new_entreis_state: + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + for entry_id, tags in entries_ids.items(): + if entry_id == entry["journal_entry_id"]: + event.entries_ids[contract_address][entry_id] = tags + + if isinstance(event, FunctionCallCrawlJob): + for entry_id, tags in event.entries_tags.items(): + if entry_id == entry["journal_entry_id"]: + event.entries_tags[entry_id] = tags + + return events + + +def update_job_state_with_filters( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + address_filter: List[ChecksumAddress], + required_tags: List[str], + tags_to_add: List[str] = [], + tags_to_delete: List[str] = [], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Function that updates the state of the job in bugout. + """ + + entries_ids_to_update: List[UUID] = [] + + ### TODO: refactor this function + + if len(tags_to_add) == 0 and len(tags_to_delete) == 0: + return events + + for event in events: + # functions + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + if address_filter and contract_address not in address_filter: + continue + for entry_id, tags in entries_ids.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + event.entries_ids[contract_address][entry_id].extend( + tags_to_add + ) + + # events + if isinstance(event, FunctionCallCrawlJob): + if address_filter and event.contract_address not in address_filter: + continue + for entry_id, tags in event.entries_tags.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + if len(entries_ids_to_update) == 0: + return events + + new_entries_state = bugout_state_update( + entries_tags_add=[ + {"journal_entry_id": entry_id, "tags": tags_to_add} + for entry_id in entries_ids_to_update + ], + entries_tags_delete=[ + {"journal_entry_id": entry_id, "tags": tags_to_delete} + for entry_id in entries_ids_to_update + ], + ) + + events = update_job_tags(events, new_entries_state) + + return events + + +def update_entries_status_and_proggress( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + progess_map: Dict[ChecksumAddress, float], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Update entries status and proggress in mooncrawl bugout journal + """ + + entries_tags_delete = [] + + entries_tags_add = [] + + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + proggress = int(progess_map.get(contract_address, 0)) * 100 + + for entry_id, tags in entries_ids.items(): + # proggress + + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + entries_tags_delete.append( + { + "journal_entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + ], + } + ) + + if proggress >= 100: + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + if isinstance(event, FunctionCallCrawlJob): + proggress = int(progess_map.get(event.contract_address, 0)) * 100 + + for entry_id, tags in event.entries_tags.items(): + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + # proggress + entries_tags_delete.append( + { + "journal_entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + ], + } + ) + + if proggress >= 100: + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + new_entries_state = bugout_state_update( + entries_tags_add=entries_tags_add, + entries_tags_delete=entries_tags_delete, + ) + + events = update_job_tags(events, new_entries_state) + + return events diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 564f1dab..aeca1659 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -3,6 +3,7 @@ import time from typing import Dict, List, Optional, Tuple from uuid import UUID +from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, @@ -11,7 +12,12 @@ from moonworm.crawler.networks import Network # type: ignore from sqlalchemy.orm.session import Session from web3 import Web3 -from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3 +from .crawler import ( + EventCrawlJob, + FunctionCallCrawlJob, + _retry_connect_web3, + update_entries_status_and_proggress, +) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -31,6 +37,7 @@ def historical_crawler( max_blocks_batch: int = 100, min_sleep_time: float = 0.1, access_id: Optional[UUID] = None, + addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None, ): assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" assert min_sleep_time > 0, "min_sleep_time must be greater than 0" @@ -60,6 +67,10 @@ def historical_crawler( blocks_cache: Dict[int, int] = {} failed_count = 0 + original_start_block = start_block + + progess_map: Dict[ChecksumAddress, float] = {} + while start_block >= end_block: try: time.sleep(min_sleep_time) @@ -119,6 +130,27 @@ def historical_crawler( db_session, all_function_calls, blockchain_type ) + if addresses_deployment_blocks: + for address, deployment_block in addresses_deployment_blocks.items(): + current_position = end_block + + progess = original_start_block - current_position / ( + original_start_block - deployment_block + ) + progess_map[address] = progess + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_entries_status_and_proggress( # type: ignore + events=function_call_crawl_jobs, + progess_map=progess_map, + ) + + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_entries_status_and_proggress( # type: ignore + events=event_crawl_jobs, + progess_map=progess_map, + ) + # Commiting to db commit_session(db_session) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index cd697b0f..d9a24f70 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -279,3 +279,26 @@ infura_networks = { BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + + +# Historical crawler status config + +HISTORICAL_CRAWLER_STATUSES = { + "pending": "pending", + "running": "running", + "finished": "finished", +} + +# Historical crawler moonworm status config + +HISTORICAL_CRAWLER_MOONWORM_STATUSES = { + "pickedup": True, +} + +# Statuses tags prefixes + +HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = { + "moonworm_status": "moonworm_task_pickedup", + "historical_crawl_status": "historical_crawl_status", + "progress_status": "progress", +}