diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index d4af9631..3a193145 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -25,6 +25,7 @@ from .settings import ( MOONSTREAM_CRAWL_WORKERS, MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI, MOONSTREAM_POLYGON_WEB3_PROVIDER_URI, + MOONSTREAM_CLIENT_ID_HEADER, ) logger = logging.getLogger(__name__) @@ -37,9 +38,17 @@ class BlockCrawlError(Exception): """ -def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None): +def connect( + blockchain_type: AvailableBlockchainType, + web3_uri: Optional[str] = None, + token: Optional[str] = None, +): web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() + request_kwargs: Any = None + if token is not None: + request_kwargs = {"headers": {MOONSTREAM_CLIENT_ID_HEADER: token}} + if web3_uri is None: if blockchain_type == AvailableBlockchainType.ETHEREUM: web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI @@ -49,7 +58,10 @@ def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = raise Exception("Wrong blockchain type provided for web3 URI") if web3_uri.startswith("http://") or web3_uri.startswith("https://"): - web3_provider = Web3.HTTPProvider(web3_uri) + web3_provider = Web3.HTTPProvider( + endpoint_uri=web3_uri, + request_kwargs=request_kwargs, + ) else: web3_provider = Web3.IPCProvider(web3_uri) web3_client = Web3(web3_provider) @@ -181,15 +193,18 @@ def add_block_transactions( def get_latest_blocks( - blockchain_type: AvailableBlockchainType, confirmations: int = 0 + blockchain_type: AvailableBlockchainType, + confirmations: int = 0, + token: Optional[str] = None, ) -> Tuple[Optional[int], int]: """ - Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method). + Retrieve the latest block from the connected node (connection is created by the + connect(AvailableBlockchainType, ClientTokenID) method). If confirmations > 0, and the latest block on the node has block number N, this returns the block with block_number (N - confirmations) """ - web3_client = connect(blockchain_type) + web3_client = connect(blockchain_type, token=token) latest_block_number: int = web3_client.eth.block_number if confirmations > 0: latest_block_number -= confirmations @@ -212,11 +227,12 @@ def crawl_blocks( blockchain_type: AvailableBlockchainType, blocks_numbers: List[int], with_transactions: bool = False, + token: Optional[str] = None, ) -> None: """ Open database and geth sessions and fetch block data from blockchain. """ - web3_client = connect(blockchain_type) + web3_client = connect(blockchain_type, token=token) with yield_db_session_ctx() as db_session: pbar = tqdm(total=len(blocks_numbers)) for block_number in blocks_numbers: @@ -256,6 +272,7 @@ def check_missing_blocks( blockchain_type: AvailableBlockchainType, blocks_numbers: List[int], notransactions=False, + token: Optional[str] = None, ) -> List[int]: """ Query block from postgres. If block does not presented in database, @@ -294,7 +311,7 @@ def check_missing_blocks( [block[0], block[1]] for block in blocks_exist_raw_query.all() ] - web3_client = connect(blockchain_type) + web3_client = connect(blockchain_type, token=token) blocks_exist_len = len(blocks_exist) pbar = tqdm(total=blocks_exist_len) @@ -336,6 +353,7 @@ def crawl_blocks_executor( block_numbers_list: List[int], with_transactions: bool = False, num_processes: int = MOONSTREAM_CRAWL_WORKERS, + token: Optional[str] = None, ) -> None: """ Execute crawler in processes. @@ -363,14 +381,16 @@ def crawl_blocks_executor( results: List[Future] = [] if num_processes == 1: logger.warning("Executing block crawler in lazy mod") - return crawl_blocks(blockchain_type, block_numbers_list, with_transactions) + return crawl_blocks( + blockchain_type, block_numbers_list, with_transactions, token + ) else: with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: for worker in worker_indices: block_chunk = worker_job_lists[worker] logger.info(f"Spawned process for {len(block_chunk)} blocks") result = executor.submit( - crawl_blocks, blockchain_type, block_chunk, with_transactions + crawl_blocks, blockchain_type, block_chunk, with_transactions, token ) result.add_done_callback(record_error) results.append(result) diff --git a/crawlers/mooncrawl/mooncrawl/contract/cli.py b/crawlers/mooncrawl/mooncrawl/contract/cli.py index 5e315071..9e5dc109 100644 --- a/crawlers/mooncrawl/mooncrawl/contract/cli.py +++ b/crawlers/mooncrawl/mooncrawl/contract/cli.py @@ -117,7 +117,7 @@ def run_crawler_desc( def handle_parser(args: argparse.Namespace): with yield_db_session_ctx() as session: - w3 = connect(AvailableBlockchainType.ETHEREUM) + w3 = connect(AvailableBlockchainType.ETHEREUM, token=args.token) if args.order == "asc": run_crawler_asc( w3=w3, @@ -153,6 +153,11 @@ def generate_parser(): """ parser = argparse.ArgumentParser(description="Moonstream Deployment Crawler") + parser.add_argument( + "--token", + type=str, + help="Client token ID", + ) parser.add_argument( "--start", "-s", type=int, default=None, help="block to start crawling from" ) diff --git a/crawlers/mooncrawl/mooncrawl/crawler.py b/crawlers/mooncrawl/mooncrawl/crawler.py index 49f05180..a94fbae9 100644 --- a/crawlers/mooncrawl/mooncrawl/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/crawler.py @@ -93,7 +93,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None: """ while True: latest_stored_block_number, latest_block_number = get_latest_blocks( - AvailableBlockchainType(args.blockchain), args.confirmations + AvailableBlockchainType(args.blockchain), args.confirmations, args.token ) if latest_stored_block_number is None: latest_stored_block_number = 0 @@ -137,6 +137,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None: block_numbers_list=blocks_numbers_list, with_transactions=True, num_processes=args.jobs, + token=args.token, ) logger.info( f"Synchronized blocks from {latest_stored_block_number} to {latest_block_number}" @@ -155,6 +156,8 @@ def crawler_blocks_add_handler(args: argparse.Namespace) -> None: blockchain_type=AvailableBlockchainType(args.blockchain), block_numbers_list=blocks_numbers_list, with_transactions=True, + num_processes=MOONSTREAM_CRAWL_WORKERS, + token=args.token, ) logger.info( @@ -177,7 +180,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None: confirmations = 150 shift = 2000 _, latest_block_number = get_latest_blocks( - AvailableBlockchainType(args.blockchain), confirmations + AvailableBlockchainType(args.blockchain), confirmations, args.token ) block_range = f"{latest_block_number-shift}-{latest_block_number}" @@ -206,6 +209,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None: block_numbers_list=missing_blocks_numbers_total, with_transactions=True, num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS, + token=args.token, ) logger.info( f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers " @@ -246,6 +250,12 @@ def main() -> None: time_now = datetime.now(timezone.utc) + parser.add_argument( + "--token", + type=str, + help="Client token ID", + ) + # Blockchain blocks parser parser_crawler_blocks = subcommands.add_parser( "blocks", description="Blockchain blocks commands" diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 5174bc0c..ec89b08f 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -54,7 +54,9 @@ def handle_crawl(args: argparse.Namespace) -> None: logger.info( "No web3 provider URL provided, using default (blockchan.py: connect())" ) - web3 = _retry_connect_web3(blockchain_type) + web3 = _retry_connect_web3( + blockchain_type=blockchain_type, token=args.token + ) else: logger.info(f"Using web3 provider URL: {args.web3}") web3 = Web3( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 7cff8735..a45ad407 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -85,6 +85,7 @@ def _retry_connect_web3( blockchain_type: AvailableBlockchainType, retry_count: int = 10, sleep_time: float = 5, + token: Optional[str] = None, ) -> Web3: """ Retry connecting to the blockchain. @@ -92,7 +93,7 @@ def _retry_connect_web3( while retry_count > 0: retry_count -= 1 try: - web3 = connect(blockchain_type) + web3 = connect(blockchain_type, token=token) web3.eth.block_number logger.info(f"Connected to {blockchain_type}") return web3 diff --git a/crawlers/mooncrawl/mooncrawl/nft/cli.py b/crawlers/mooncrawl/mooncrawl/nft/cli.py index 2fe1c823..c93439f9 100644 --- a/crawlers/mooncrawl/mooncrawl/nft/cli.py +++ b/crawlers/mooncrawl/mooncrawl/nft/cli.py @@ -55,7 +55,9 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3: raise ValueError( "Could not find Web3 connection information in arguments or in MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable" ) - return connect(AvailableBlockchainType.ETHEREUM, web3_connection_string) + return connect( + AvailableBlockchainType.ETHEREUM, web3_connection_string, token=args.token + ) def get_latest_block_from_node(web3_client: Web3): @@ -258,6 +260,12 @@ def main() -> None: parser.set_defaults(func=lambda _: parser.print_help()) subcommands = parser.add_subparsers(description="Subcommands") + parser.add_argument( + "--token", + type=str, + help="Client token ID", + ) + parser_ethereum = subcommands.add_parser( "ethereum", description="Collect information about NFTs from Ethereum blockchains", diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 89d2751f..47c09ba9 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -26,10 +26,13 @@ ORIGINS = RAW_ORIGINS.split(",") # OpenAPI DOCS_TARGET_PATH = "docs" - # Crawler label CRAWLER_LABEL = "moonworm-alpha" +MOONSTREAM_CLIENT_ID_HEADER = os.environ.get( + "MOONSTREAM_CLIENT_ID_HEADER", "x-moonstream-client-id" +) + # Geth connection address MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get( "MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", "" diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py index 6f57dd3d..7d51abd4 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -8,7 +8,7 @@ import logging import time from datetime import datetime, timedelta from enum import Enum -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Optional from uuid import UUID import boto3 # type: ignore @@ -337,7 +337,9 @@ def generate_list_of_names( def process_external( - abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType + abi_external_calls: List[Dict[str, Any]], + blockchain: AvailableBlockchainType, + token: Optional[str] = None, ): """ Request all required external data @@ -383,7 +385,7 @@ def process_external( logger.error(f"Error processing external call: {e}") if external_calls: - web3_client = connect(blockchain) + web3_client = connect(blockchain, token=token) for extcall in external_calls: try: @@ -434,6 +436,7 @@ def generate_web3_metrics( address: str, crawler_label: str, abi_json: Any, + token: Optional[str] = None, ) -> List[Any]: """ Generate stats for cards components @@ -446,6 +449,7 @@ def generate_web3_metrics( extention_data = process_external( abi_external_calls=abi_external_calls, blockchain=blockchain_type, + token=token, ) extention_data.append( @@ -611,6 +615,7 @@ def stats_generate_handler(args: argparse.Namespace): address=address, crawler_label=crawler_label, abi_json=abi_json, + token=args.token, ) # Generate blocks state information