From 0791c26e9727a37df40be382548415911f4e192a Mon Sep 17 00:00:00 2001 From: Sergei Sumarokov <7047457+kompotkot@users.noreply.github.com> Date: Mon, 17 Jun 2024 17:26:42 +0300 Subject: [PATCH 1/2] Revert "Add moonworm v3 db crawler" --- crawlers/mooncrawl/mooncrawl/blockchain.py | 71 +-- .../mooncrawl/moonworm_crawler/cli.py | 539 +----------------- .../moonworm_crawler/continuous_crawler.py | 26 +- .../mooncrawl/moonworm_crawler/crawler.py | 136 +---- .../mooncrawl/moonworm_crawler/db.py | 348 ++++------- .../moonworm_crawler/event_crawler.py | 48 +- .../moonworm_crawler/function_call_crawler.py | 11 +- .../moonworm_crawler/historical_crawler.py | 36 +- crawlers/mooncrawl/mooncrawl/settings.py | 44 +- crawlers/mooncrawl/mooncrawl/version.py | 2 +- crawlers/mooncrawl/sample.env | 16 +- crawlers/mooncrawl/setup.py | 4 +- 12 files changed, 188 insertions(+), 1093 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index 858d02dd..a8c5c724 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -3,7 +3,7 @@ from concurrent.futures import Future, ThreadPoolExecutor, wait from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from uuid import UUID -from moonstreamtypes.blockchain import ( +from moonstreamdb.blockchain import ( AvailableBlockchainType, get_block_model, get_transaction_model, @@ -39,11 +39,6 @@ from .settings import ( MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI, MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI, MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI, - MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI, - MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI, - MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI, - MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI, - MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI, WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS, ) @@ -57,36 +52,9 @@ class BlockCrawlError(Exception): """ -default_uri_mapping = { - AvailableBlockchainType.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI, - AvailableBlockchainType.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI, - AvailableBlockchainType.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI, - AvailableBlockchainType.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI, - AvailableBlockchainType.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI, - AvailableBlockchainType.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI, - AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI, - AvailableBlockchainType.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI, - AvailableBlockchainType.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI, - AvailableBlockchainType.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI, - AvailableBlockchainType.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI, - AvailableBlockchainType.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI, - AvailableBlockchainType.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI, - AvailableBlockchainType.STARKNET: MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI, - AvailableBlockchainType.STARKNET_SEPOLIA: MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.MANTLE: MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI, - AvailableBlockchainType.MANTLE_SEPOLIA: MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI, - AvailableBlockchainType.GAME7_ORBIT_ARBITRUM_SEPOLIA: MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI, -} - - def connect( blockchain_type: Optional[AvailableBlockchainType] = None, web3_uri: Optional[str] = None, - version: int = 2, ) -> Web3: if blockchain_type is None and web3_uri is None: raise Exception("Both blockchain_type and web3_uri could not be None") @@ -95,8 +63,41 @@ def connect( request_kwargs: Dict[str, Any] = {"headers": {"Content-Type": "application/json"}} if web3_uri is None: - web3_uri = default_uri_mapping.get(blockchain_type) # type: ignore - if web3_uri is None: + if blockchain_type == AvailableBlockchainType.ETHEREUM: + web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.POLYGON: + web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.MUMBAI: + web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.AMOY: + web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.XDAI: + web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA: + web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: + web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE: + web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA: + web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA: + web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.XAI: + web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA: + web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.AVALANCHE: + web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI: + web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.BLAST: + web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA: + web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI + elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX: + web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI + else: raise Exception("Wrong blockchain type provided for web3 URI") if web3_uri.startswith("http://") or web3_uri.startswith("https://"): diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 24287908..b4b1b424 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -2,17 +2,9 @@ import argparse import logging from typing import Optional from uuid import UUID -from urllib.parse import urlparse, urlunparse - -import requests -from moonstreamdbv3.db import ( - MoonstreamDBEngine, - MoonstreamDBIndexesEngine, - MoonstreamCustomDBEngine, -) -from moonstreamtypes.blockchain import AvailableBlockchainType -from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type +from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamdb.subscriptions import blockchain_type_to_subscription_type from web3 import Web3 from web3.middleware import geth_poa_middleware @@ -21,8 +13,6 @@ from ..settings import ( HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, HISTORICAL_CRAWLER_STATUSES, MOONSTREAM_MOONWORM_TASKS_JOURNAL, - MOONSTREAM_DB_V3_CONTROLLER_API, - MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN, ) from .continuous_crawler import _retry_connect_web3, continuous_crawler from .crawler import ( @@ -32,8 +22,6 @@ from .crawler import ( make_function_call_crawl_jobs, moonworm_crawler_update_job_as_pickedup, update_job_state_with_filters, - get_event_crawl_job_records, - get_function_call_crawl_job_records, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -147,154 +135,6 @@ def handle_crawl(args: argparse.Namespace) -> None: ) -def get_db_connection(uuid): - url = ( - f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{uuid}/instances/1/creds/seer/url" - ) - headers = { - "Authorization": f"Bearer {MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN}" - } - - try: - response = requests.get(url, headers=headers) - response.raise_for_status() # Raises HTTPError for bad requests (4xx or 5xx) - except requests.RequestException as e: - logging.error(f"Network-related error for UUID {uuid}: {str(e)}") - raise ValueError(f"Network-related error for UUID {uuid}: {str(e)}") - except Exception as e: - raise Exception(f"Unhandled exception, error: {str(e)}") - - connection_string = response.text - try: - connection_string = ensure_port_in_connection_string(connection_string) - except ValueError as e: - error_msg = f"Invalid connection string for UUID {uuid}: {str(e)}" - logging.error(error_msg) - raise ValueError(error_msg) - - return connection_string - - -def ensure_port_in_connection_string(connection_string): - # Parse the connection string into components - parsed_url = urlparse(connection_string) - - # Check if a port is specified, and if not, add the default port - if parsed_url.port is None: - # Append default port 5432 for PostgreSQL if no port specified - - connection_string = connection_string.replace( - "/" + connection_string.split("/")[-1], - f":5432" + "/" + connection_string.split("/")[-1], - ) - connection_string = connection_string.replace('"', "") - return connection_string - - -def handle_crawl_v3(args: argparse.Namespace) -> None: - blockchain_type = AvailableBlockchainType(args.blockchain_type) - subscription_type = blockchain_type_to_subscription_type(blockchain_type) - - index_engine = MoonstreamDBIndexesEngine() - - with index_engine.yield_db_session_ctx() as index_db_session: - - initial_event_jobs = get_event_crawl_job_records( - index_db_session, - blockchain_type, - [], - {}, - ) - - logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") - - initial_function_call_jobs = get_function_call_crawl_job_records( - index_db_session, - blockchain_type, - [], - {}, - ) - - logger.info( - f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" - ) - - logger.info(f"Blockchain type: {blockchain_type.value}") - with yield_db_session_ctx() as db_session: - web3: Optional[Web3] = None - if args.web3 is None: - logger.info( - "No web3 provider URL provided, using default (blockchan.py: connect())" - ) - web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri) - else: - logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider( - args.web3, - ) - ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) - - last_labeled_block = get_last_labeled_block_number(db_session, blockchain_type) - logger.info(f"Last labeled block: {last_labeled_block}") - - start_block = args.start - if start_block is None: - logger.info("No start block provided") - if last_labeled_block is not None: - start_block = last_labeled_block - 1 - logger.info(f"Using last labeled block as start: {start_block}") - else: - logger.info( - "No last labeled block found, using start block (web3.eth.blockNumber - 300)" - ) - start_block = web3.eth.blockNumber - 10000 - logger.info(f"Starting from block: {start_block}") - elif last_labeled_block is not None: - if start_block < last_labeled_block and not args.force: - logger.info( - f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}" - ) - logger.info( - f"Use --force to override this and start from the start block: {start_block}" - ) - - start_block = last_labeled_block - else: - logger.info(f"Using start block: {start_block}") - else: - logger.info(f"Using start block: {start_block}") - - confirmations = args.confirmations - - if not args.no_confirmations: - assert confirmations > 0, "confirmations must be greater than 0" - else: - confirmations = 0 - - craw_event_jobs = list(initial_event_jobs.values()) - initial_function_call_jobs = list(initial_function_call_jobs.values()) - - continuous_crawler( - db_session, - blockchain_type, - web3, - craw_event_jobs, - initial_function_call_jobs, - start_block, - args.max_blocks_batch, - args.min_blocks_batch, - confirmations, - args.min_sleep_time, - args.heartbeat_interval, - args.new_jobs_refetch_interval, - web3_uri=args.web3_uri, - ) - - def handle_historical_crawl(args: argparse.Namespace) -> None: blockchain_type = AvailableBlockchainType(args.blockchain_type) subscription_type = blockchain_type_to_subscription_type(blockchain_type) @@ -499,178 +339,6 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) -def handle_historical_crawl_v3(args: argparse.Namespace) -> None: - """ - Historical crawl for MoonstreamDB v3 - """ - - blockchain_type = AvailableBlockchainType(args.blockchain_type) - ##subscription_type = blockchain_type_to_subscription_type(blockchain_type) - - addresses_filter = [] - if args.address is not None: - ## 40 hexadecimal characters format - - addresses_filter.append(args.address[2:]) - - index_engine = MoonstreamDBIndexesEngine() - - with index_engine.yield_db_session_ctx() as index_db_session: - - initial_event_jobs = get_event_crawl_job_records( - index_db_session, - blockchain_type, - addresses_filter, - {}, - ) - - logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") - - initial_function_call_jobs = get_function_call_crawl_job_records( - index_db_session, - blockchain_type, - addresses_filter, - {}, - ) - - logger.info( - f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" - ) - - customer_connection = get_db_connection(args.customer_uuid) - - if customer_connection == "": - raise ValueError("No connection string found for the customer") - - if args.only_events: - filtered_function_call_jobs = [] - logger.info(f"Removing function call crawl jobs since --only-events is set") - else: - filtered_function_call_jobs = initial_function_call_jobs.values() - - if args.only_functions: - filtered_event_jobs = [] - logger.info( - f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}" - ) - else: - filtered_event_jobs = initial_event_jobs.values() - - if args.only_events and args.only_functions: - raise ValueError( - "--only-events and --only-functions cannot be set at the same time" - ) - - logger.info( - f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" - ) - - logger.info(f"Blockchain type: {blockchain_type.value}") - - customer_engine = MoonstreamCustomDBEngine(customer_connection) - - with customer_engine.yield_db_session_ctx() as db_session: - web3: Optional[Web3] = None - if args.web3 is None: - logger.info( - "No web3 provider URL provided, using default (blockchan.py: connect())" - ) - web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri) - else: - logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider( - args.web3, - ) - ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) - - last_labeled_block = get_first_labeled_block_number( - db_session, - blockchain_type, - args.address, - only_events=args.only_events, - db_version=3, - ) - logger.info(f"Last labeled block: {last_labeled_block}") - - addresses_deployment_blocks = None - - end_block = args.end - - start_block = args.start - - # get set of addresses from event jobs and function call jobs - if args.find_deployed_blocks: - addresses_set = set() - for job in filtered_event_jobs: - addresses_set.update(job.contracts) - for function_job in filtered_function_call_jobs: - addresses_set.add(function_job.contract_address) - - if args.start is None: - start_block = web3.eth.blockNumber - 1 - - addresses_deployment_blocks = find_all_deployed_blocks( - web3, list(addresses_set) - ) - if len(addresses_deployment_blocks) == 0: - logger.error( - "No addresses found in the blockchain. Please check your addresses and try again" - ) - return - end_block = min(addresses_deployment_blocks.values()) - - if start_block is None: - logger.info("No start block provided") - if last_labeled_block is not None: - start_block = last_labeled_block - logger.info(f"Using last labeled block as start: {start_block}") - else: - logger.info( - "No last labeled block found, using start block (web3.eth.blockNumber - 300)" - ) - raise ValueError( - "No start block provided and no last labeled block found" - ) - elif last_labeled_block is not None: - if start_block > last_labeled_block and not args.force: - logger.info( - f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}" - ) - logger.info( - f"Use --force to override this and start from the start block: {start_block}" - ) - - start_block = last_labeled_block - else: - logger.info(f"Using start block: {start_block}") - else: - logger.info(f"Using start block: {start_block}") - - if start_block < end_block: - raise ValueError( - f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction." - ) - - historical_crawler( - db_session, - blockchain_type, - web3, - filtered_event_jobs, # type: ignore - filtered_function_call_jobs, # type: ignore - start_block, - end_block, - args.max_blocks_batch, - args.min_sleep_time, - web3_uri=args.web3_uri, - addresses_deployment_blocks=addresses_deployment_blocks, - version=3, - ) - - def main() -> None: parser = argparse.ArgumentParser() parser.set_defaults(func=lambda _: parser.print_help()) @@ -776,103 +444,6 @@ def main() -> None: crawl_parser.set_defaults(func=handle_crawl) - crawl_parser_v3 = subparsers.add_parser( - "crawl-v3", - help="continuous crawling the event/function call jobs from bugout journal", - ) - - crawl_parser_v3.add_argument( - "--start", - "-s", - type=int, - default=None, - ) - - crawl_parser_v3.add_argument( - "--blockchain-type", - "-b", - type=str, - help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", - ) - - crawl_parser_v3.add_argument( - "--web3", - type=str, - default=None, - help="Web3 provider URL", - ) - - crawl_parser_v3.add_argument( - "--poa", - action="store_true", - default=False, - help="Use PoA middleware", - ) - - crawl_parser_v3.add_argument( - "--max-blocks-batch", - "-m", - type=int, - default=80, - help="Maximum number of blocks to crawl in a single batch", - ) - - crawl_parser_v3.add_argument( - "--min-blocks-batch", - "-n", - type=int, - default=20, - help="Minimum number of blocks to crawl in a single batch", - ) - - crawl_parser_v3.add_argument( - "--confirmations", - "-c", - type=int, - default=175, - help="Number of confirmations to wait for", - ) - - crawl_parser_v3.add_argument( - "--no-confirmations", - action="store_true", - default=False, - help="Do not wait for confirmations explicitly set confirmations to 0", - ) - - crawl_parser_v3.add_argument( - "--min-sleep-time", - "-t", - type=float, - default=0.1, - help="Minimum time to sleep between crawl step", - ) - - crawl_parser_v3.add_argument( - "--heartbeat-interval", - "-i", - type=float, - default=60, - help="Heartbeat interval in seconds", - ) - - crawl_parser_v3.add_argument( - "--new-jobs-refetch-interval", - "-r", - type=float, - default=180, - help="Time to wait before refetching new jobs", - ) - - crawl_parser_v3.add_argument( - "--force", - action="store_true", - default=False, - help="Force start from the start block", - ) - - crawl_parser_v3.set_defaults(func=handle_crawl_v3) - historical_crawl_parser = subparsers.add_parser( "historical-crawl", help="Crawl historical data" ) @@ -961,112 +532,6 @@ def main() -> None: ) historical_crawl_parser.set_defaults(func=handle_historical_crawl) - historical_crawl_parser_v3 = subparsers.add_parser( - "historical-crawl-v3", help="Crawl historical data" - ) - - historical_crawl_parser_v3.add_argument( - "--address", - "-a", - required=False, - type=str, - ) - - historical_crawl_parser_v3.add_argument( - "--start", - "-s", - type=int, - default=None, - ) - - historical_crawl_parser_v3.add_argument( - "--end", - "-e", - type=int, - required=False, - ) - - historical_crawl_parser_v3.add_argument( - "--blockchain-type", - "-b", - type=str, - help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", - ) - - historical_crawl_parser_v3.add_argument( - "--web3", - type=str, - default=None, - help="Web3 provider URL", - ) - - historical_crawl_parser_v3.add_argument( - "--poa", - action="store_true", - default=False, - help="Use PoA middleware", - ) - - historical_crawl_parser_v3.add_argument( - "--max-blocks-batch", - "-m", - type=int, - default=80, - help="Maximum number of blocks to crawl in a single batch", - ) - - historical_crawl_parser_v3.add_argument( - "--min-sleep-time", - "-t", - type=float, - default=0.1, - help="Minimum time to sleep between crawl step", - ) - - historical_crawl_parser_v3.add_argument( - "--force", - action="store_true", - default=False, - help="Force start from the start block", - ) - - historical_crawl_parser_v3.add_argument( - "--only-events", - action="store_true", - default=False, - help="Only crawl events", - ) - - historical_crawl_parser_v3.add_argument( - "--only-functions", - action="store_true", - default=False, - help="Only crawl function calls", - ) - - historical_crawl_parser_v3.add_argument( - "--find-deployed-blocks", - action="store_true", - default=False, - help="Find all deployed blocks", - ) - - historical_crawl_parser_v3.add_argument( - "--customer-uuid", - type=UUID, - required=True, - help="Customer UUID", - ) - - historical_crawl_parser_v3.add_argument( - "--user-uuid", - type=UUID, - required=False, - help="User UUID", - ) - - historical_crawl_parser_v3.set_defaults(func=handle_historical_crawl_v3) - args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index c379c95a..c23a8602 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -5,14 +5,12 @@ from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from uuid import UUID - -from moonstreamtypes.blockchain import AvailableBlockchainType -from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type -from moonstreamtypes.networks import blockchain_type_to_network_type +from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamdb.networks import blockchain_type_to_network_type +from moonstreamdb.subscriptions import blockchain_type_to_subscription_type from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, ) -from moonworm.crawler.ethereum_state_provider import Web3StateProvider from sqlalchemy.orm.session import Session from web3 import Web3 @@ -107,7 +105,6 @@ def continuous_crawler( new_jobs_refetch_interval: float = 120, web3_uri: Optional[str] = None, max_insert_batch: int = 10000, - version: int = 2, ): crawler_type = "continuous" assert ( @@ -132,14 +129,11 @@ def continuous_crawler( except Exception as e: raise Exception(e) - evm_state_provider = Web3StateProvider(web3) - - if version == 2: - evm_state_provider = MoonstreamEthereumStateProvider( - web3, - network, # type: ignore - db_session, - ) + ethereum_state_provider = MoonstreamEthereumStateProvider( + web3, + network, + db_session, + ) heartbeat_template = { "status": "crawling", @@ -212,7 +206,7 @@ def continuous_crawler( ) all_function_calls = _crawl_functions( blockchain_type, - evm_state_provider, + ethereum_state_provider, function_call_crawl_jobs, start_block, end_block, @@ -274,7 +268,7 @@ def continuous_crawler( function_call_crawl_jobs ) heartbeat_template["function_call metrics"] = ( - evm_state_provider.metrics + ethereum_state_provider.metrics ) heartbeat( crawler_type=crawler_type, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 79a4d58e..62ad4c65 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -2,22 +2,17 @@ import json import logging import re import time -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime -import binascii from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast from uuid import UUID from bugout.data import BugoutJournalEntries, BugoutSearchResult from eth_typing.evm import ChecksumAddress -from moonstreamtypes.blockchain import AvailableBlockchainType +from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.subscriptions import SubscriptionTypes -from moonstreamtypes.subscriptions import SubscriptionTypes -from moonstreamdbv3.models_indexes import AbiJobs from moonworm.deployment import find_deployment_block # type: ignore -from sqlalchemy import func, cast as sqlcast, JSON -from sqlalchemy.orm import Session from web3.main import Web3 from ..blockchain import connect @@ -139,7 +134,6 @@ class FunctionCallCrawlJob: contract_address: ChecksumAddress entries_tags: Dict[UUID, List[str]] created_at: int - existing_selectors: List[str] = field(default_factory=list) def get_crawl_job_entries( @@ -697,129 +691,3 @@ def add_progress_to_tags( ) return entries_tags_delete, entries_tags_add - - -def get_event_crawl_job_records( - db_session: Session, - blockchain_type: AvailableBlockchainType, - addresses: List[str], - existing_crawl_job_records: Dict[str, EventCrawlJob], -): - """ - Retrieve and update the event crawl job records from the database. - """ - - query = ( - db_session.query(AbiJobs) - .filter(AbiJobs.chain == blockchain_type.value) - .filter(func.length(AbiJobs.abi_selector) > 10) - ) - - if len(addresses) != 0: - query = query.filter( - AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) - ) - - crawl_job_records = query.all() - - if len(crawl_job_records) == 0: - return existing_crawl_job_records - - for crawl_job_record in crawl_job_records: - - str_address = "0x" + crawl_job_record.address.hex() - - checksummed_address = Web3.toChecksumAddress(str_address) - - if crawl_job_record.abi_selector in existing_crawl_job_records: - - if ( - checksummed_address - not in existing_crawl_job_records[ - crawl_job_record.abi_selector - ].contracts - ): - existing_crawl_job_records[ - crawl_job_record.abi_selector - ].contracts.append(checksummed_address) - - else: - new_crawl_job = EventCrawlJob( - event_abi_hash=str(crawl_job_record.abi_selector), - event_abi=json.loads(str(crawl_job_record.abi)), - contracts=[checksummed_address], - address_entries={ - crawl_job_record.address.hex(): { - UUID(str(crawl_job_record.id)): [ - str(crawl_job_record.status), - str(crawl_job_record.progress), - ] - } - }, - created_at=int(crawl_job_record.created_at.timestamp()), - ) - existing_crawl_job_records[str(crawl_job_record.abi_selector)] = ( - new_crawl_job - ) - - return existing_crawl_job_records - - -def get_function_call_crawl_job_records( - db_session: Session, - blockchain_type: AvailableBlockchainType, - addresses: List[str], - existing_crawl_job_records: Dict[str, FunctionCallCrawlJob], -): - """ - Retrieve and update the function call crawl job records from the database. - """ - - # Query AbiJobs where the abi_selector is exactly 8 characters long. - query = ( - db_session.query(AbiJobs) - .filter(AbiJobs.chain == blockchain_type.value) - .filter(func.length(AbiJobs.abi_selector) == 10) - .filter( - sqlcast(AbiJobs.abi, JSON).op("->>")("type") == "function", - sqlcast(AbiJobs.abi, JSON).op("->>")("stateMutability") != "view", - ) - ) - - if len(addresses) != 0: - query = query.filter( - AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) - ) - - crawl_job_records = query.all() - - # Iterate over each record fetched from the database - for crawl_job_record in crawl_job_records: - str_address = "0x" + crawl_job_record.address.hex() - - if str_address not in existing_crawl_job_records: - existing_crawl_job_records[str_address] = FunctionCallCrawlJob( - contract_abi=[json.loads(str(crawl_job_record.abi))], - contract_address=Web3.toChecksumAddress(str_address), - entries_tags={ - UUID(str(crawl_job_record.id)): [ - str(crawl_job_record.status), - str(crawl_job_record.progress), - ] - }, - created_at=int(crawl_job_record.created_at.timestamp()), - existing_selectors=[str(crawl_job_record.abi_selector)], - ) - else: - if ( - crawl_job_record.abi_selector - not in existing_crawl_job_records[str_address].existing_selectors - ): - existing_crawl_job_records[str_address].contract_abi.append( - json.loads(str(crawl_job_record.abi)) - ) - existing_crawl_job_records[str_address].existing_selectors.append( - str(crawl_job_record.abi_selector) - ) - - return existing_crawl_job_records diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 876847dd..8b5001ec 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -1,14 +1,12 @@ import json import logging -from typing import Dict, List, Optional, Union, Any +from typing import Dict, List, Optional +from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.models import Base -from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy import Integer, String, column, exists, func, select, text, values from sqlalchemy.orm import Session -from sqlalchemy.dialects.postgresql import insert - from ..settings import CRAWLER_LABEL from .event_crawler import Event @@ -18,15 +16,12 @@ logger = logging.getLogger(__name__) def _event_to_label( - blockchain_type: AvailableBlockchainType, - event: Event, - label_name=CRAWLER_LABEL, - db_version: int = 2, + blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL ) -> Base: """ Creates a label model. """ - label_model = get_label_model(blockchain_type, version=db_version) + label_model = get_label_model(blockchain_type) sanityzed_label_data = json.loads( json.dumps( { @@ -37,46 +32,27 @@ def _event_to_label( ).replace(r"\u0000", "") ) - if db_version == 2: - label = label_model( - label=label_name, - label_data=sanityzed_label_data, - address=event.address, - block_number=event.block_number, - block_timestamp=event.block_timestamp, - transaction_hash=event.transaction_hash, - log_index=event.log_index, - ) - else: - - del sanityzed_label_data["type"] - del sanityzed_label_data["name"] - - label = label_model( - label=label_name, - label_name=event.event_name, - label_data=sanityzed_label_data, - address=event.address, - block_number=event.block_number, - block_timestamp=event.block_timestamp, - transaction_hash=event.transaction_hash, - log_index=event.log_index, - block_hash=event.block_hash.hex(), # type: ignore - ) - + label = label_model( + label=label_name, + label_data=sanityzed_label_data, + address=event.address, + block_number=event.block_number, + block_timestamp=event.block_timestamp, + transaction_hash=event.transaction_hash, + log_index=event.log_index, + ) return label def _function_call_to_label( blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall, - db_version: int = 2, label_name=CRAWLER_LABEL, ) -> Base: """ Creates a label model. """ - label_model = get_label_model(blockchain_type, version=db_version) + label_model = get_label_model(blockchain_type) sanityzed_label_data = json.loads( json.dumps( @@ -91,34 +67,14 @@ def _function_call_to_label( ).replace(r"\u0000", "") ) - if db_version == 2: - - label = label_model( - label=label_name, - label_data=sanityzed_label_data, - address=function_call.contract_address, - block_number=function_call.block_number, - transaction_hash=function_call.transaction_hash, - block_timestamp=function_call.block_timestamp, - ) - - else: - - del sanityzed_label_data["type"] - del sanityzed_label_data["name"] - - label = label_model( - label=label_name, - label_name=function_call.function_name, - label_data=sanityzed_label_data, - address=function_call.contract_address, - block_number=function_call.block_number, - block_hash=function_call.block_hash.hex(), # type: ignore - transaction_hash=function_call.transaction_hash, - block_timestamp=function_call.block_timestamp, - caller_address=function_call.caller_address, - origin_address=function_call.caller_address, - ) + label = label_model( + label=label_name, + label_data=sanityzed_label_data, + address=function_call.contract_address, + block_number=function_call.block_number, + transaction_hash=function_call.transaction_hash, + block_timestamp=function_call.block_timestamp, + ) return label @@ -127,9 +83,8 @@ def get_last_labeled_block_number( db_session: Session, blockchain_type: AvailableBlockchainType, label_name=CRAWLER_LABEL, - db_version: int = 2, ) -> Optional[int]: - label_model = get_label_model(blockchain_type, version=db_version) + label_model = get_label_model(blockchain_type) block_number = ( db_session.query(label_model.block_number) .filter(label_model.label == label_name) @@ -145,34 +100,38 @@ def get_first_labeled_block_number( db_session: Session, blockchain_type: AvailableBlockchainType, address: str, - label_name: str = CRAWLER_LABEL, + label_name=CRAWLER_LABEL, only_events: bool = False, - db_version: int = 2, ) -> Optional[int]: - - label_model = get_label_model(blockchain_type, version=db_version) - - base_query = ( + label_model = get_label_model(blockchain_type) + block_number_query = ( db_session.query(label_model.block_number) - .filter(label_model.label == label_name, label_model.address == address) + .filter(label_model.label == label_name) + .filter(label_model.address == address) + ) + + function_call_block_numbers = ( + block_number_query.filter(label_model.log_index == None) .order_by(label_model.block_number) + .limit(50) + .all() + ) + event_block_numbers = ( + block_number_query.filter(label_model.log_index != None) + .order_by(label_model.block_number) + .limit(50) + .all() ) - event_blocks = base_query.filter(label_model.log_index != None).first() - function_blocks = ( - None - if only_events - else base_query.filter(label_model.log_index == None).first() - ) - - if event_blocks and function_blocks: - result = max(event_blocks, function_blocks) - elif event_blocks or function_blocks: - result = event_blocks if event_blocks else function_blocks + if only_events: + return event_block_numbers[0][0] if event_block_numbers else None else: - result = None - - return result[0] if result else None + event_block_number = event_block_numbers[0][0] if event_block_numbers else -1 + function_call_block_number = ( + function_call_block_numbers[0][0] if function_call_block_numbers else -1 + ) + max_block_number = max(event_block_number, function_call_block_number) + return max_block_number if max_block_number != -1 else None def commit_session(db_session: Session) -> None: @@ -192,188 +151,93 @@ def add_events_to_session( db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType, - db_version: int = 2, label_name=CRAWLER_LABEL, ) -> None: if len(events) == 0: return - label_model = get_label_model(blockchain_type, version=db_version) + label_model = get_label_model(blockchain_type) - if db_version == 2: + events_hashes_to_save = set([event.transaction_hash for event in events]) - events_hashes_to_save = set([event.transaction_hash for event in events]) - - # Define a CTE VALUES expression to escape big IN clause - hashes_cte = select( - values(column("transaction_hash", String), name="hashes").data( - [(hash,) for hash in events_hashes_to_save] - ) - ).cte() - - # Retrieve existing transaction hashes and registered log indexes - query = ( - db_session.query( - label_model.transaction_hash.label("transaction_hash"), - func.array_agg(label_model.log_index).label("log_indexes"), - ) - .filter( - label_model.label == label_name, - label_model.log_index.isnot(None), - exists().where( - label_model.transaction_hash == hashes_cte.c.transaction_hash - ), - ) - .group_by(label_model.transaction_hash) + # Define a CTE VALUES expression to escape big IN clause + hashes_cte = select( + values(column("transaction_hash", String), name="hashes").data( + [(hash,) for hash in events_hashes_to_save] ) + ).cte() - existing_log_index_by_tx_hash = { - row.transaction_hash: row.log_indexes for row in query - } - - labels_to_save = [ - _event_to_label(blockchain_type, event, label_name) - for event in events - if event.transaction_hash not in existing_log_index_by_tx_hash - or event.log_index - not in existing_log_index_by_tx_hash[event.transaction_hash] - ] - - logger.info(f"Saving {len(labels_to_save)} event labels to session") - db_session.add_all(labels_to_save) - - else: - - # Define the table name and columns based on the blockchain type - table = label_model.__table__ - - # Create a list of dictionaries representing new records - records = [] - for event in events: - - label_event = _event_to_label( - blockchain_type, event, label_name, db_version - ) - - record = { - "label": label_event.label, - "transaction_hash": label_event.transaction_hash, - "log_index": label_event.log_index, - "block_number": label_event.block_number, - "block_hash": label_event.block_hash, - "block_timestamp": label_event.block_timestamp, - "caller_address": None, - "origin_address": None, - "address": label_event.address, - "label_name": label_event.label_name, - "label_type": "event", - "label_data": label_event.label_data, - } - - records.append(record) - - # Insert records using a single batched query with an ON CONFLICT clause - statement = insert(table).values(records) - do_nothing_statement = statement.on_conflict_do_nothing( - index_elements=["transaction_hash", "log_index"], - index_where=(table.c.label == "seer") & (table.c.label_type == "event"), + # Retrieve existing transaction hashes and registered log indexes + query = ( + db_session.query( + label_model.transaction_hash.label("transaction_hash"), + func.array_agg(label_model.log_index).label("log_indexes"), ) + .filter( + label_model.label == label_name, + label_model.log_index.isnot(None), + exists().where( + label_model.transaction_hash == hashes_cte.c.transaction_hash + ), + ) + .group_by(label_model.transaction_hash) + ) - db_session.execute(do_nothing_statement) + existing_log_index_by_tx_hash = { + row.transaction_hash: row.log_indexes for row in query + } - logger.info(f"Batch inserted {len(records)} event labels into {table.name}") + labels_to_save = [ + _event_to_label(blockchain_type, event, label_name) + for event in events + if event.transaction_hash not in existing_log_index_by_tx_hash + or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] + ] + + logger.info(f"Saving {len(labels_to_save)} event labels to session") + db_session.add_all(labels_to_save) def add_function_calls_to_session( db_session: Session, function_calls: List[ContractFunctionCall], blockchain_type: AvailableBlockchainType, - db_version: int = 2, label_name=CRAWLER_LABEL, ) -> None: if len(function_calls) == 0: return - if db_version == 2: + label_model = get_label_model(blockchain_type) - label_model = get_label_model(blockchain_type, version=db_version) + transactions_hashes_to_save = list( + set([function_call.transaction_hash for function_call in function_calls]) + ) - transactions_hashes_to_save = list( - set([function_call.transaction_hash for function_call in function_calls]) + # Define a CTE VALUES expression to escape big IN clause + hashes_cte = select( + values(column("transaction_hash", String), name="hashes").data( + [(hash,) for hash in transactions_hashes_to_save] ) + ).cte() - # Define a CTE VALUES expression to escape big IN clause - hashes_cte = select( - values(column("transaction_hash", String), name="hashes").data( - [(hash,) for hash in transactions_hashes_to_save] - ) - ).cte() + # Retrieve existing transaction hashes + query = db_session.query( + label_model.transaction_hash.label("transaction_hash") + ).filter( + label_model.label == label_name, + label_model.log_index.is_(None), + exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash), + ) - # Retrieve existing transaction hashes - query = db_session.query( - label_model.transaction_hash.label("transaction_hash") - ).filter( - label_model.label == label_name, - label_model.log_index.is_(None), - exists().where( - label_model.transaction_hash == hashes_cte.c.transaction_hash - ), - ) + existing_tx_hashes = [row.transaction_hash for row in query] - existing_tx_hashes = [row.transaction_hash for row in query] + labels_to_save = [ + _function_call_to_label(blockchain_type, function_call) + for function_call in function_calls + if function_call.transaction_hash not in existing_tx_hashes + ] - labels_to_save = [ - _function_call_to_label(blockchain_type, function_call) - for function_call in function_calls - if function_call.transaction_hash not in existing_tx_hashes - ] - - logger.info(f"Saving {len(labels_to_save)} labels to session") - db_session.add_all(labels_to_save) - - else: - - label_model = get_label_model(blockchain_type, version=db_version) - - # Define the table name and columns based on the blockchain type - table = label_model.__table__ - - # Create a list of dictionaries representing new records - records = [] - for function_call in function_calls: - - label_function_call = _function_call_to_label( - blockchain_type, function_call, db_version - ) - - record = { - "label": label_function_call.label, - "transaction_hash": label_function_call.transaction_hash, - "log_index": None, - "block_number": label_function_call.block_number, - "block_hash": label_function_call.block_hash, - "block_timestamp": label_function_call.block_timestamp, - "caller_address": label_function_call.caller_address, - "origin_address": label_function_call.caller_address, - "address": label_function_call.address, - "label_name": label_function_call.label_name, - "label_type": "tx_call", - "label_data": label_function_call.label_data, - } - - records.append(record) - - # Insert records using a single batched query with an ON CONFLICT clause - statement = insert(table).values(records) - do_nothing_statement = statement.on_conflict_do_nothing( - index_elements=["transaction_hash"], - index_where=(table.c.label == "seer") & (table.c.label_type == "tx_call"), - ) - - db_session.execute(do_nothing_statement) - - logger.info( - f"Batch inserted {len(records)} function call labels into {table.name}" - ) + logger.info(f"Saving {len(labels_to_save)} labels to session") + db_session.add_all(labels_to_save) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index e19ee260..8a0b094e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -2,12 +2,7 @@ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple - -from moonstreamtypes.blockchain import ( - AvailableBlockchainType, - get_label_model, - get_block_model, -) +from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model from moonworm.crawler.log_scanner import ( _crawl_events as moonworm_autoscale_crawl_events, # type: ignore ) @@ -31,7 +26,6 @@ class Event: block_timestamp: int transaction_hash: str log_index: int - block_hash: Optional[str] = None def _get_block_timestamp_from_web3( @@ -54,7 +48,6 @@ def get_block_timestamp( block_number: int, blocks_cache: Dict[int, int], max_blocks_batch: int = 30, - version: int = 2, ) -> int: """ Get the timestamp of a block. @@ -70,14 +63,6 @@ def get_block_timestamp( :param blocks_cache: The cache of blocks. :return: The timestamp of the block. """ - - if version != 2: - if block_number in blocks_cache: - return blocks_cache[block_number] - target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number) - blocks_cache[block_number] = target_block_timestamp - return target_block_timestamp - assert max_blocks_batch > 0 if block_number in blocks_cache: @@ -86,9 +71,7 @@ def get_block_timestamp( block_model = get_block_model(blockchain_type) blocks = ( - db_session.query( - block_model.block_number, block_model.timestamp, block_model.hash - ) + db_session.query(block_model.block_number, block_model.timestamp) .filter( and_( block_model.block_number >= block_number - max_blocks_batch - 1, @@ -125,7 +108,6 @@ def _crawl_events( to_block: int, blocks_cache: Dict[int, int] = {}, db_block_query_batch=10, - version: int = 2, ) -> List[Event]: all_events = [] for job in jobs: @@ -147,7 +129,6 @@ def _crawl_events( raw_event["blockNumber"], blocks_cache, db_block_query_batch, - version, ) event = Event( event_name=raw_event["event"], @@ -157,7 +138,6 @@ def _crawl_events( block_timestamp=raw_event["blockTimestamp"], transaction_hash=raw_event["transactionHash"], log_index=raw_event["logIndex"], - block_hash=raw_event.get("blockHash"), ) all_events.append(event) @@ -174,25 +154,21 @@ def _autoscale_crawl_events( blocks_cache: Dict[int, int] = {}, batch_size: int = 1000, db_block_query_batch=10, - version: int = 2, ) -> Tuple[List[Event], int]: """ Crawl events with auto regulated batch_size. """ all_events = [] for job in jobs: - try: - raw_events, batch_size = moonworm_autoscale_crawl_events( - web3=web3, - event_abi=job.event_abi, - from_block=from_block, - to_block=to_block, - batch_size=batch_size, - contract_address=job.contracts[0], - max_blocks_batch=3000, - ) - except Exception as e: - breakpoint() + raw_events, batch_size = moonworm_autoscale_crawl_events( + web3=web3, + event_abi=job.event_abi, + from_block=from_block, + to_block=to_block, + batch_size=batch_size, + contract_address=job.contracts[0], + max_blocks_batch=3000, + ) for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( db_session, @@ -201,7 +177,6 @@ def _autoscale_crawl_events( raw_event["blockNumber"], blocks_cache, db_block_query_batch, - version, ) event = Event( event_name=raw_event["event"], @@ -211,7 +186,6 @@ def _autoscale_crawl_events( block_timestamp=raw_event["blockTimestamp"], transaction_hash=raw_event["transactionHash"], log_index=raw_event["logIndex"], - block_hash=raw_event.get("blockHash"), ) all_events.append(event) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py index 43f99cec..e3631790 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py @@ -1,8 +1,7 @@ import logging -from typing import List, Union +from typing import List from moonstreamdb.blockchain import AvailableBlockchainType -from moonstreamtypes.blockchain import AvailableBlockchainType from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore from moonworm.crawler.function_call_crawler import ( # type: ignore ContractFunctionCall, @@ -11,7 +10,6 @@ from moonworm.crawler.function_call_crawler import ( # type: ignore from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, ) -from moonworm.crawler.ethereum_state_provider import Web3StateProvider from moonworm.watch import MockState # type: ignore from sqlalchemy.orm import Session from web3 import Web3 @@ -24,7 +22,7 @@ logger = logging.getLogger(__name__) def _crawl_functions( blockchain_type: AvailableBlockchainType, - ethereum_state_provider: Union[MoonstreamEthereumStateProvider, Web3StateProvider], + ethereum_state_provider: MoonstreamEthereumStateProvider, jobs: List[FunctionCallCrawlJob], from_block: int, to_block: int, @@ -59,12 +57,9 @@ def function_call_crawler( start_block: int, end_block: int, batch_size: int, - version: int = 2, ): - if version != 2: - raise ValueError("Only version 2 is supported") try: - network = blockchain_type_to_network_type(blockchain_type=blockchain_type) # type: ignore + network = blockchain_type_to_network_type(blockchain_type=blockchain_type) except Exception as e: raise Exception(e) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index a1013671..028f9e71 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -1,17 +1,14 @@ import logging import time -from typing import Dict, List, Optional, Union, Any +from typing import Dict, List, Optional from uuid import UUID from eth_typing.evm import ChecksumAddress - -from moonstreamtypes.blockchain import AvailableBlockchainType -from moonstreamtypes.networks import blockchain_type_to_network_type +from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, - Network, ) -from moonworm.crawler.ethereum_state_provider import Web3StateProvider from sqlalchemy.orm.session import Session from web3 import Web3 @@ -31,7 +28,7 @@ logger = logging.getLogger(__name__) def historical_crawler( db_session: Session, - blockchain_type: AvailableBlockchainType, # AvailableBlockchainType, + blockchain_type: AvailableBlockchainType, web3: Optional[Web3], event_crawl_jobs: List[EventCrawlJob], function_call_crawl_jobs: List[FunctionCallCrawlJob], @@ -42,7 +39,6 @@ def historical_crawler( web3_uri: Optional[str] = None, addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None, max_insert_batch: int = 10000, - version: int = 2, ): assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" assert min_sleep_time > 0, "min_sleep_time must be greater than 0" @@ -61,15 +57,11 @@ def historical_crawler( except Exception as e: raise Exception(e) - evm_state_provider = Web3StateProvider(web3) - - if version == 2: - ### Moonstream state provider use the V2 db to get the block - evm_state_provider = MoonstreamEthereumStateProvider( - web3, - network, # type: ignore - db_session, - ) + ethereum_state_provider = MoonstreamEthereumStateProvider( + web3, + network, + db_session, + ) logger.info(f"Starting historical event crawler start_block={start_block}") @@ -101,7 +93,6 @@ def historical_crawler( to_block=start_block, blocks_cache=blocks_cache, db_block_query_batch=max_blocks_batch, - version=version, ) else: @@ -114,7 +105,6 @@ def historical_crawler( to_block=start_block, blocks_cache=blocks_cache, db_block_query_batch=max_blocks_batch, - version=version, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}." @@ -127,12 +117,11 @@ def historical_crawler( db_session, all_events[i : i + max_insert_batch], blockchain_type, - version, ) else: - add_events_to_session(db_session, all_events, blockchain_type, version) + add_events_to_session(db_session, all_events, blockchain_type) if function_call_crawl_jobs: logger.info( @@ -140,7 +129,7 @@ def historical_crawler( ) all_function_calls = _crawl_functions( blockchain_type, - evm_state_provider, + ethereum_state_provider, function_call_crawl_jobs, batch_end_block, start_block, @@ -156,12 +145,11 @@ def historical_crawler( db_session, all_function_calls[i : i + max_insert_batch], blockchain_type, - version, ) else: add_function_calls_to_session( - db_session, all_function_calls, blockchain_type, version + db_session, all_function_calls, blockchain_type ) if addresses_deployment_blocks: diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index e93bab56..2cc24339 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -44,7 +44,7 @@ DOCS_TARGET_PATH = "docs" # Crawler label -CRAWLER_LABEL = "seer" +CRAWLER_LABEL = "moonworm-alpha" VIEW_STATE_CRAWLER_LABEL = "view-state-alpha" METADATA_CRAWLER_LABEL = "metadata-crawler" @@ -196,42 +196,6 @@ if MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI == "": "MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI env variable is not set" ) -MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI = os.environ.get( - "MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI", "" -) -if MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI == "": - raise Exception("MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI env variable is not set") - -MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI = os.environ.get( - "MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI", "" -) -if MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI == "": - raise Exception( - "MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI env variable is not set" - ) - -MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI = os.environ.get( - "MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI", "" -) -if MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI == "": - raise Exception("MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI env variable is not set") - -MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI = os.environ.get( - "MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI", "" -) -if MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI == "": - raise Exception( - "MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI env variable is not set" - ) - -MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI = os.environ.get( - "MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI", "" -) -if MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI == "": - raise Exception( - "MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI env variable is not set" - ) - MOONSTREAM_CRAWL_WORKERS = 4 MOONSTREAM_CRAWL_WORKERS_RAW = os.environ.get("MOONSTREAM_CRAWL_WORKERS") @@ -429,9 +393,3 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "": MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000 MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60 -MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( - "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" -) -MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN = os.environ.get( - "MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN", "" -) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 4984cb78..240b2333 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.4.6" +MOONCRAWL_VERSION = "0.4.5" diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 81057787..28cb624b 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -3,6 +3,7 @@ export BUGOUT_BROOD_URL="https://auth.bugout.dev" export BUGOUT_SPIRE_URL="https://spire.bugout.dev" export HUMBUG_REPORTER_CRAWLERS_TOKEN="" + # Engine environment variables export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to" @@ -18,8 +19,6 @@ export MOONSTREAM_DATA_JOURNAL_ID="" export MOONSTREAM_MOONWORM_TASKS_JOURNAL="" export MOONSTREAM_ADMIN_ACCESS_TOKEN="" export NFT_HUMBUG_TOKEN="" -export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="" -export MOONSTREAM_ADMIN_ACCESS_TOKEN="" # Blockchain nodes environment variables export MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI="https://" @@ -39,12 +38,6 @@ export MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI="https://" export COINMARKETCAP_API_KEY="" + # Custom crawler export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="" export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev" @@ -64,8 +58,4 @@ export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN=" \ No newline at end of file diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 57a4d5ee..2e5911ab 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -38,10 +38,8 @@ setup( "chardet", "fastapi", "moonstreamdb>=0.4.4", - "moonstreamdb-v3>=0.0.10", - "moonstream-types>=0.0.3", "moonstream>=0.1.1", - "moonworm[moonstream]>=0.9.1", + "moonworm[moonstream]>=0.6.2", "humbug", "pydantic==1.9.2", "python-dateutil", From 9964b21b04222a7a8b93bc900654b5cbff354411 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 17 Jun 2024 14:29:34 +0000 Subject: [PATCH 2/2] Moonworm backup hardcode version --- crawlers/mooncrawl/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 2e5911ab..59099b6b 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -39,7 +39,7 @@ setup( "fastapi", "moonstreamdb>=0.4.4", "moonstream>=0.1.1", - "moonworm[moonstream]>=0.6.2", + "moonworm[moonstream]==0.9.0", "humbug", "pydantic==1.9.2", "python-dateutil",