diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawle_erc721.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_historical_crawler.py similarity index 75% rename from crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawle_erc721.py rename to crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_historical_crawler.py index 8ff13993..2e495b81 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawle_erc721.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_historical_crawler.py @@ -4,15 +4,21 @@ import logging import json import time import traceback +import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any +from contextlib import contextmanager +from moonstreamdb.db import yield_db_session_ctx from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, ) from moonworm.crawler.networks import Network # type: ignore from sqlalchemy.orm.session import Session +from sqlalchemy.orm import sessionmaker +from sqlalchemy import MetaData, create_engine from web3 import Web3 +from web3.middleware import geth_poa_middleware from ..blockchain import connect @@ -65,6 +71,7 @@ def continuous_crawler( db_session: Session, blockchain_type: AvailableBlockchainType, web3: Optional[Web3], + addresses: List[str], abi: List[Dict[str, Any]], start_block: int, end_block: int, @@ -76,7 +83,8 @@ def continuous_crawler( new_jobs_refetch_interval: float = 120, use_traker: bool = True, ): - crawler_type = "ERC721_crawler" + crawler_type = "historical_crawler" + print(min_blocks_batch, max_blocks_batch) assert ( min_blocks_batch < max_blocks_batch ), "min_blocks_batch must be less than max_blocks_batch" @@ -91,14 +99,25 @@ def continuous_crawler( # Create tables if not exists works good for sqlite - from db.moonstreamdb.models import PolygonLabel - from db.moonstreamdb.db import engine - from sqlalchemy.ext.declarative import declarative_base + # from moonstreamdb.db import engine - Base = declarative_base() + # # from sqlalchemy.ext.declarative import declarative_base - Base.metadata.create_all(engine) - db_session.commit() + # # Base = declarative_base() + + # # Base.metadata.create_all(bind=engine) + # # db_session.commit() + + # META_DATA = MetaData(bind=engine) + + # print(META_DATA.tables) + + # polygon_table = META_DATA.tables["polygon_labels"] + + # print(polygon_table) + # raise + + # polygon_table.create_all(event_engine, checkfirst=True) crawl_start_time = datetime.utcnow() @@ -129,14 +148,17 @@ def continuous_crawler( # Create events jobs - events = [event for event in abi if event["type"]] + events = [event for event in abi if event["type"] == "event"] event_crawl_jobs = [] for event in events: event_crawl_jobs.append( EventCrawlJob( - event_abi_hash="", event_abi=event, contracts=[], created_at=0 + event_abi_hash="", + event_abi=event, + contracts=[address for address in addresses], + created_at=0, ) ) @@ -176,29 +198,34 @@ def continuous_crawler( end_block=end_block, ) + internal_end_block = start_block + max_blocks_batch + if internal_end_block > end_block: + internal_end_block = end_block + + if start_block > end_block: + break + min_sleep_time = max(min_sleep_time, min_sleep_time / 2) - logger.info(f"Crawling events from {start_block} to {end_block}") + logger.info( + f"Crawling events from {start_block} to {internal_end_block}" + ) all_events = _crawl_events( db_session=db_session, blockchain_type=blockchain_type, web3=web3, jobs=event_crawl_jobs, from_block=start_block, - to_block=end_block, + to_block=internal_end_block, blocks_cache=blocks_cache, db_block_query_batch=min_blocks_batch * 2, ) logger.info( - f"Crawled {len(all_events)} events from {start_block} to {end_block}." + f"Crawled {len(all_events)} events from {start_block} to {internal_end_block}." ) add_events_to_session(db_session, all_events, blockchain_type) - logger.info( - f"Crawling function calls from {start_block} to {end_block}" - ) - current_time = datetime.utcnow() if current_time - jobs_refetchet_time > timedelta( @@ -219,7 +246,7 @@ def continuous_crawler( commit_session(db_session) # Update heartbeat - heartbeat_template["last_block"] = end_block + heartbeat_template["last_block"] = internal_end_block heartbeat_template["current_time"] = _date_to_str(current_time) heartbeat_template["current_event_jobs_length"] = len( event_crawl_jobs @@ -241,7 +268,7 @@ def continuous_crawler( logger.info("Sending heartbeat.", heartbeat_template) last_heartbeat_time = datetime.utcnow() - start_block = end_block + 1 + start_block = internal_end_block + 1 failed_count = 0 except Exception as e: logger.error(f"Internal error: {e}") @@ -288,6 +315,59 @@ def continuous_crawler( raise e +def handle_crawl(args: argparse.Namespace) -> None: + + # Couldn't figure out how to convert from string to AvailableBlockchainType + # AvailableBlockchainType(args.blockchain_type) is not working + blockchain_type = AvailableBlockchainType(args.blockchain_type) + logger.info(f"Blockchain type: {blockchain_type.value}") + + with yield_db_session_ctx() as db_session: + web3: Optional[Web3] = None + if args.web3 is None: + logger.info( + "No web3 provider URL provided, using default (blockchan.py: connect())" + ) + web3 = _retry_connect_web3(blockchain_type) + else: + logger.info(f"Using web3 provider URL: {args.web3}") + web3 = Web3( + Web3.HTTPProvider( + args.web3, + ) + ) + if args.poa: + logger.info("Using PoA middleware") + web3.middleware_onion.inject(geth_poa_middleware, layer=0) + with open(args.abi_file, "r") as abi_file: + abi = json.load(abi_file) + + start_block = args.start + if start_block is None: + logger.info("No start block provided") + raise + else: + logger.info(f"Using start block: {start_block}") + + max_blocks_batch: int = args.max_blocks_batch + min_blocks_batch: int = args.min_blocks_batch + + continuous_crawler( + db_session, + blockchain_type, + web3, + args.addresses, + abi, + start_block, + args.end, + max_blocks_batch, + min_blocks_batch, + args.confirmations, + args.min_sleep_time, + args.heartbeat_interval, + ) + + def main() -> None: parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() @@ -300,6 +380,12 @@ def main() -> None: type=int, default=None, ) + crawl_parser.add_argument( + "--end", + "-e", + type=int, + default=None, + ) crawl_parser.add_argument( "--blockchain-type", "-b", @@ -338,6 +424,20 @@ def main() -> None: default=10, help="Minimum number of blocks to crawl in a single batch", ) + crawl_parser.add_argument( + "--abi-file", + "-f", + type=str, + help="Abi file with events.", + ) + + crawl_parser.add_argument( + "--addresses", + "-a", + type=str, + nargs="*", + help="List of addresses.", + ) crawl_parser.add_argument( "--confirmations", @@ -363,14 +463,6 @@ def main() -> None: help="Heartbeat interval in seconds", ) - crawl_parser.add_argument( - "--new-jobs-refetch-interval", - "-r", - type=float, - default=120, - help="Time to wait before refetching new jobs", - ) - crawl_parser.add_argument( "--force", action="store_true", @@ -382,3 +474,7 @@ def main() -> None: args = parser.parse_args() args.func(args) + + +if __name__ == "__main__": + main() diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 6f5eee30..6b8638e9 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -338,7 +338,7 @@ def get_crawler_point( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, title=f"{crawler_type} crawler - {blockchain_type.value}", - tags=[crawler_type, "crawler", blockchain_type.value, abi_hash], + tags=[crawler_type, "crawpoint", blockchain_type.value, abi_hash], content=f'{{"start_block":{start_block}, "end_block": {end_block} }}', ) return start_block, end_block, entry.id