From 9a885687d59d33c65c90e49c64e28980172fee11 Mon Sep 17 00:00:00 2001 From: Andrey Date: Sun, 5 Jan 2025 13:55:19 +0200 Subject: [PATCH 01/13] Initial update. --- .../mooncrawl/metadata_crawler/cli.py | 334 ++++++++++-------- .../mooncrawl/metadata_crawler/db.py | 235 +++++++++++- crawlers/mooncrawl/mooncrawl/settings.py | 24 +- .../mooncrawl/mooncrawl/state_crawler/cli.py | 204 ++++++++++- 4 files changed, 637 insertions(+), 160 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 166bfb5c..eb0496a3 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -3,21 +3,28 @@ import json import logging import random import urllib.request -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional from urllib.error import HTTPError from moonstreamdb.blockchain import AvailableBlockchainType +from ..actions import get_all_entries_from_search +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx +from ..data import TokenURIs from .db import ( clean_labels_from_db, get_current_metadata_for_address, get_tokens_id_wich_may_updated, get_uris_of_tokens, metadata_to_label, + get_tokens_to_crawl, + upsert_metadata_labels, ) +from ..settings import moonstream_client as mc + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -50,7 +57,6 @@ def crawl_uri(metadata_uri: str) -> Any: result = None while retry < 3: try: - if metadata_uri.startswith("ipfs://"): metadata_uri = metadata_uri.replace( "ipfs://", "https://ipfs.io/ipfs/", 1 @@ -61,10 +67,7 @@ def crawl_uri(metadata_uri: str) -> Any: response = urllib.request.urlopen(req, timeout=10) - if ( - metadata_uri.startswith("data:application/json") - or response.status == 200 - ): + if metadata_uri.startswith("data:application/json") or response.status == 200: result = json.loads(response.read()) break retry += 1 @@ -81,167 +84,213 @@ def crawl_uri(metadata_uri: str) -> Any: return result +def process_address_metadata( + address: str, + blockchain_type: AvailableBlockchainType, + batch_size: int, + max_recrawl: int, + threads: int, + job: Optional[dict], + tokens: List[TokenURIs], +) -> None: + """ + Process metadata for a single address with v3 support + """ + with yield_db_read_only_preping_session_ctx() as db_session_read_only: + try: + ## + + already_parsed = get_current_metadata_for_address( + db_session=db_session_read_only, + blockchain_type=blockchain_type, + address=address, + ) + + + ### Do we need this? + ### Can we move it to sql query? + ### Do we need to get all tokens? + + + maybe_updated = get_tokens_id_wich_may_updated( + db_session=db_session_read_only, + blockchain_type=blockchain_type, + address=address, + ) + except Exception as err: + logger.warning(f"Error while getting metadata state for address {address}: {err}") + return + + with yield_db_preping_session_ctx() as db_session: + try: + logger.info(f"Starting to crawl metadata for address: {address}") + + # Determine if this is a v3 job + v3 = job.get("v3", False) if job else False + + # Determine leak rate based on job config or default behavior + leak_rate = 0.0 + update_existing = job.get("update_existing", False) if job else False + + if len(maybe_updated) > 0: + free_spots = len(maybe_updated) / max_recrawl + if free_spots > 1: + leak_rate = 0 + else: + leak_rate = 1 - ( + len(already_parsed) - max_recrawl + len(maybe_updated) + ) / len(already_parsed) + + parsed_with_leak = leak_of_crawled_uri( + already_parsed, leak_rate, maybe_updated + ) + + logger.info( + f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}" + ) + logger.info(f"Already parsed: {len(already_parsed)} for {address}") + logger.info(f"Amount of tokens to parse: {len(tokens)} for {address}") + + # Remove already parsed tokens + new_tokens = [ + token for token in tokens + if token.token_id not in parsed_with_leak + ] + + for requests_chunk in [ + new_tokens[i : i + batch_size] + for i in range(0, len(new_tokens), batch_size) + ]: + metadata_batch = [] + try: + with db_session.begin(): + # Gather all metadata in parallel + with ThreadPoolExecutor(max_workers=threads) as executor: + future_to_token = { + executor.submit(crawl_uri, token.token_uri): token + for token in requests_chunk + } + for future in as_completed(future_to_token): + token = future_to_token[future] + try: + metadata = future.result(timeout=10) + if metadata: + metadata_batch.append((token, metadata)) + except Exception as e: + logger.error(f"Error fetching metadata for token {token.token_id}: {e}") + continue + + if metadata_batch: + # Batch upsert all metadata + upsert_metadata_labels( + db_session=db_session, + blockchain_type=blockchain_type, + metadata_batch=metadata_batch, + v3=v3, + update_existing=update_existing + ) + + clean_labels_from_db( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + ) + logger.info(f"Write {len(metadata_batch)} labels for {address}") + + except Exception as err: + logger.warning(f"Error while writing labels for address {address}: {err}") + db_session.rollback() + + except Exception as err: + logger.warning(f"Error while crawling metadata for address {address}: {err}") + db_session.rollback() + + def parse_metadata( blockchain_type: AvailableBlockchainType, batch_size: int, max_recrawl: int, threads: int, + metadata_journal_id: Optional[str] = None, ): """ Parse all metadata of tokens. """ - logger.info("Starting metadata crawler") logger.info(f"Processing blockchain {blockchain_type.value}") - # run crawling of levels - with yield_db_read_only_preping_session_ctx() as db_session_read_only: + spire_jobs = [] + if metadata_journal_id: + # Get all jobs for this blockchain from Spire + search_query = f"#metadata-job #{blockchain_type.value}" try: - # get all tokens with uri - logger.info("Requesting all tokens with uri from database") - uris_of_tokens = get_uris_of_tokens(db_session_read_only, blockchain_type) - - tokens_uri_by_address: Dict[str, Any] = {} - - for token_uri_data in uris_of_tokens: - if token_uri_data.address not in tokens_uri_by_address: - tokens_uri_by_address[token_uri_data.address] = [] - tokens_uri_by_address[token_uri_data.address].append(token_uri_data) - + entries = get_all_entries_from_search( + journal_id=metadata_journal_id, + search_query=search_query, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + content=True, + limit=1000, + ) + + logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}") + + for entry in entries: + try: + if not entry.content: + continue + + job = json.loads(entry.content) + if job.get("blockchain") != blockchain_type.value: + logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}") + continue + spire_jobs.append(job) + except Exception as err: + id = entry.entry_url.split("/")[-1] + logger.error(f"Error parsing job from entry {id}: {err}") + continue except Exception as err: - logger.error(f"Error while requesting tokens with uri from database: {err}") + logger.error(f"Error fetching jobs from journal: {err}") return - for address in tokens_uri_by_address: - with yield_db_read_only_preping_session_ctx() as db_session_read_only: - try: - already_parsed = get_current_metadata_for_address( - db_session=db_session_read_only, - blockchain_type=blockchain_type, + # Process each job + for job in spire_jobs or [None]: # If no jobs, run once with None + try: + # Get tokens to crawl + with yield_db_read_only_preping_session_ctx() as db_session_read_only: + tokens_uri_by_address = get_tokens_to_crawl( + db_session_read_only, + blockchain_type, + job, + ) + + # Process each address + for address, tokens in tokens_uri_by_address.items(): + process_address_metadata( address=address, - ) - - maybe_updated = get_tokens_id_wich_may_updated( - db_session=db_session_read_only, blockchain_type=blockchain_type, - address=address, - ) - except Exception as err: - logger.warning(err) - logger.warning( - f"Error while requesting metadata for address: {address}" - ) - continue - - with yield_db_preping_session_ctx() as db_session: - try: - logger.info(f"Starting to crawl metadata for address: {address}") - - leak_rate = 0.0 - - if len(maybe_updated) > 0: - free_spots = len(maybe_updated) / max_recrawl - - if free_spots > 1: - leak_rate = 0 - else: - leak_rate = 1 - ( - len(already_parsed) - max_recrawl + len(maybe_updated) - ) / len(already_parsed) - - parsed_with_leak = leak_of_crawled_uri( - already_parsed, leak_rate, maybe_updated + batch_size=batch_size, + max_recrawl=max_recrawl, + threads=threads, + job=job, + tokens=tokens, ) - logger.info( - f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}" - ) - - logger.info(f"Already parsed: {len(already_parsed)} for {address}") - - logger.info( - f"Amount of state in database: {len(tokens_uri_by_address[address])} for {address}" - ) - - logger.info( - f"Amount of tokens parsed with leak: {len(parsed_with_leak)} for {address}" - ) - - # Remove already parsed tokens - new_tokens_uri_by_address = [ - token_uri_data - for token_uri_data in tokens_uri_by_address[address] - if token_uri_data.token_id not in parsed_with_leak - ] - - logger.info( - f"Amount of tokens to parse: {len(new_tokens_uri_by_address)} for {address}" - ) - - for requests_chunk in [ - new_tokens_uri_by_address[i : i + batch_size] - for i in range(0, len(new_tokens_uri_by_address), batch_size) - ]: - writed_labels = 0 - db_session.commit() - - try: - with db_session.begin(): - for token_uri_data in requests_chunk: - with ThreadPoolExecutor( - max_workers=threads - ) as executor: - future = executor.submit( - crawl_uri, token_uri_data.token_uri - ) - metadata = future.result(timeout=10) - db_session.add( - metadata_to_label( - blockchain_type=blockchain_type, - metadata=metadata, - token_uri_data=token_uri_data, - ) - ) - writed_labels += 1 - - if writed_labels > 0: - clean_labels_from_db( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - ) - logger.info( - f"Write {writed_labels} labels for {address}" - ) - # trasaction is commited here - except Exception as err: - logger.warning(err) - logger.warning( - f"Error while writing labels for address: {address}" - ) - db_session.rollback() - - clean_labels_from_db( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - ) - except Exception as err: - logger.warning(err) - logger.warning(f"Error while crawling metadata for address: {address}") - db_session.rollback() - continue + except Exception as err: + logger.error(f"Error processing job: {err}") + continue def handle_crawl(args: argparse.Namespace) -> None: """ Parse all metadata of tokens. """ - blockchain_type = AvailableBlockchainType(args.blockchain) - parse_metadata( - blockchain_type, args.commit_batch_size, args.max_recrawl, args.threads + blockchain_type, + args.commit_batch_size, + args.max_recrawl, + args.threads, + args.metadata_journal_id, ) @@ -259,7 +308,7 @@ def main() -> None: "--blockchain", "-b", type=str, - help="Type of blockchain wich writng in database", + help="Type of blockchain which writing in database", required=True, ) metadata_crawler_parser.add_argument( @@ -283,6 +332,11 @@ def main() -> None: default=4, help="Amount of threads for crawling", ) + metadata_crawler_parser.add_argument( + "--metadata-journal-id", + type=str, + help="Optional Spire journal ID containing metadata jobs", + ) metadata_crawler_parser.set_defaults(func=handle_crawl) args = parser.parse_args() diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index c53bdbe2..b7fbac85 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -1,13 +1,29 @@ import json import logging -from typing import Any, Dict, List, Optional +from hexbytes import HexBytes +from typing import Any, Dict, List, Optional, Tuple +###from sqlalchemy import +from sqlalchemy.dialects.postgresql import insert -from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from datetime import datetime + +##from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model from sqlalchemy.orm import Session from sqlalchemy.sql import text +from ..actions import recive_S3_data_from_query from ..data import TokenURIs -from ..settings import CRAWLER_LABEL, METADATA_CRAWLER_LABEL, VIEW_STATE_CRAWLER_LABEL +from ..settings import ( + CRAWLER_LABEL, + METADATA_CRAWLER_LABEL, + VIEW_STATE_CRAWLER_LABEL, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + bugout_client as bc, + moonstream_client as mc, +) +from moonstream.client import Moonstream # type: ignore + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -18,11 +34,13 @@ def metadata_to_label( metadata: Optional[Dict[str, Any]], token_uri_data: TokenURIs, label_name=METADATA_CRAWLER_LABEL, + v3: bool = False, ): """ - Creates a label model. + Creates a label model with support for v2 and v3 database structures. """ - label_model = get_label_model(blockchain_type) + version = 3 if v3 else 2 + label_model = get_label_model(blockchain_type, version=version) sanityzed_label_data = json.loads( json.dumps( @@ -34,14 +52,35 @@ def metadata_to_label( ).replace(r"\u0000", "") ) - label = label_model( - label=label_name, - label_data=sanityzed_label_data, - address=token_uri_data.address, - block_number=token_uri_data.block_number, - transaction_hash=None, - block_timestamp=token_uri_data.block_timestamp, - ) + if v3: + # V3 structure similar to state crawler + label_data = { + "token_id": token_uri_data.token_id, + "metadata": metadata, + } + + label = label_model( + label=label_name, + label_name="metadata", # Fixed name for metadata labels + label_type="metadata", + label_data=label_data, + address=HexBytes(token_uri_data.address), + block_number=token_uri_data.block_number, + # Use a fixed tx hash for metadata since it's not from a transaction + transaction_hash="0x2653135e31407726a25dd8d304878578cdfcc7d69a2b319d1aca4a37ed66956a", + block_timestamp=token_uri_data.block_timestamp, + block_hash=token_uri_data.block_hash if hasattr(token_uri_data, 'block_hash') else None, + ) + else: + # Original v2 structure + label = label_model( + label=label_name, + label_data=sanityzed_label_data, + address=token_uri_data.address, + block_number=token_uri_data.block_number, + transaction_hash=None, + block_timestamp=token_uri_data.block_timestamp, + ) return label @@ -273,3 +312,173 @@ def clean_labels_from_db( ), {"address": address, "label": METADATA_CRAWLER_LABEL}, ) + + +def get_tokens_from_query_api( + client: Moonstream, + query_name: str, + params: dict, + token: str, +) -> List[TokenURIs]: + """ + Get token URIs from Query API results + """ + try: + data = recive_S3_data_from_query( + client=client, + token=token, + query_name=query_name, + params=params, + ) + + # Convert query results to TokenURIs format + results = [] + for item in data.get("data", []): + results.append( + TokenURIs( + token_id=str(item.get("token_id")), + address=item.get("address"), + token_uri=item.get("token_uri"), + block_number=item.get("block_number"), + block_timestamp=item.get("block_timestamp"), + ) + ) + return results + except Exception as err: + logger.error(f"Error fetching data from Query API: {err}") + return [] + +def get_tokens_to_crawl( + db_session: Session, + blockchain_type: AvailableBlockchainType, + spire_job: Optional[dict] = None, +) -> Dict[str, List[TokenURIs]]: + """` + Get tokens to crawl either from Query API (if specified in Spire job) or database + """ + tokens_uri_by_address = {} + + if spire_job and "query_api" in spire_job: + # Get tokens from Query API + query_config = spire_job["query_api"] + client = Moonstream() + + tokens = get_tokens_from_query_api( + client=client, + query_name=query_config["name"], + params=query_config["params"], + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + # Group by address + for token in tokens: + if token.address not in tokens_uri_by_address: + tokens_uri_by_address[token.address] = [] + tokens_uri_by_address[token.address].append(token) + else: + # Get tokens from database (existing logic) + uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type) + for token_uri_data in uris_of_tokens: + if token_uri_data.address not in tokens_uri_by_address: + tokens_uri_by_address[token_uri_data.address] = [] + tokens_uri_by_address[token_uri_data.address].append(token_uri_data) + + return tokens_uri_by_address + +def upsert_metadata_labels( + db_session: Session, + blockchain_type: AvailableBlockchainType, + metadata_batch: List[Tuple[TokenURIs, Optional[Dict[str, Any]]]], + v3: bool = False, + update_existing: bool = False, +) -> None: + """ + Batch upsert metadata labels - update if exists, insert if not. + """ + try: + version = 3 if v3 else 2 + label_model = get_label_model(blockchain_type, version=version) + + # Prepare batch of labels + labels_data = [] + for token_uri_data, metadata in metadata_batch: + if v3: + # V3 structure + label_data = { + "token_id": token_uri_data.token_id, + "metadata": metadata, + } + + labels_data.append({ + "label": METADATA_CRAWLER_LABEL, + "label_name": "metadata", + "label_type": "metadata", + "label_data": label_data, + "address": HexBytes(token_uri_data.address), + "block_number": token_uri_data.block_number, + "transaction_hash": "0x2653135e31407726a25dd8d304878578cdfcc7d69a2b319d1aca4a37ed66956a", + "block_timestamp": token_uri_data.block_timestamp, + "block_hash": getattr(token_uri_data, 'block_hash', None), + }) + else: + # V2 structure + label_data = { + "type": "metadata", + "token_id": token_uri_data.token_id, + "metadata": metadata, + } + + labels_data.append({ + "label": METADATA_CRAWLER_LABEL, + "label_data": label_data, + "address": token_uri_data.address, + "block_number": token_uri_data.block_number, + "transaction_hash": None, + "block_timestamp": token_uri_data.block_timestamp, + }) + + if not labels_data: + return + + # Create insert statement + insert_stmt = insert(label_model).values(labels_data) + + if v3: + # V3 upsert + result_stmt = insert_stmt.on_conflict_do_update( + index_elements=[ + label_model.label, + label_model.label_name, + label_model.address, + label_model.label_data["token_id"].astext, + ], + set_=dict( + label_data=insert_stmt.excluded.label_data, + block_number=insert_stmt.excluded.block_number, + block_timestamp=insert_stmt.excluded.block_timestamp, + block_hash=insert_stmt.excluded.block_hash, + updated_at=datetime.now(), + ), + + ) + else: + # V2 upsert + result_stmt = insert_stmt.on_conflict_do_update( + index_elements=[ + label_model.label, + label_model.address, + label_model.label_data["token_id"].astext, + ], + set_=dict( + label_data=insert_stmt.excluded.label_data, + block_number=insert_stmt.excluded.block_number, + block_timestamp=insert_stmt.excluded.block_timestamp, + updated_at=datetime.now(), + ), + ) + + db_session.execute(result_stmt) + + except Exception as err: + logger.error(f"Error batch upserting metadata labels: {err}") + raise \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 20cf9253..c44ec00c 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -3,9 +3,11 @@ from typing import Dict, Optional from uuid import UUID from bugout.app import Bugout -from moonstreamtypes.blockchain import AvailableBlockchainType +from moonstreamtypes.blockchain import AvailableBlockchainType # type: ignore +from moonstream.client import Moonstream # type: ignore -# Bugout +# APIs +## Bugout BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") @@ -31,6 +33,24 @@ except: HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN") + +## Moonstream +MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") + +moonstream_client = Moonstream() + + +## Moonstream Engine +MOONSTREAM_ENGINE_URL = os.environ.get( + "MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to" +) + +## Moonstream DB +MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( + "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" +) + + # Origin RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS") if RAW_ORIGINS is None: diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 13b994c6..eb7427bc 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -9,9 +9,9 @@ from concurrent.futures._base import TimeoutError from pprint import pprint from typing import Any, Dict, List, Optional from uuid import UUID +from web3 import Web3 -from moonstream.client import Moonstream # type: ignore -from moonstreamtypes.blockchain import AvailableBlockchainType +from moonstreamtypes.blockchain import AvailableBlockchainType # type: ignore from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 from ..actions import recive_S3_data_from_query, get_all_entries_from_search @@ -25,19 +25,37 @@ from ..settings import ( multicall_contracts, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + MOONSTREAM_DB_V3_CONTROLLER_API, + moonstream_client as mc, ) from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +def request_connection_string( + customer_id: str, + instance_id: int, + token: str, + user: str = "seer", # token with write access +) -> str: + """ + Request connection string from the Moonstream API. + """ + response = requests.get( + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", + headers={"Authorization": f"Bearer {token}"}, + ) -client = Moonstream() + response.raise_for_status() + + return response.text.replace('"', "") -def execute_query(query: Dict[str, Any], token: str): +def execute_query(query: Dict[str, Any], token: str) -> Any: """ Query task example: @@ -72,7 +90,7 @@ def execute_query(query: Dict[str, Any], token: str): # run query template via moonstream query API data = recive_S3_data_from_query( - client=client, + client=mc, token=token, query_name=query_url, custom_body=body, @@ -302,6 +320,182 @@ def crawl_calls_level( return batch_size +def connect_to_web3( + blockchain_type: Any, + web3_provider_uri: Optional[str], + web3_uri: Optional[str], +) -> Web3: + """Connects to the Web3 client.""" + if web3_provider_uri is not None: + try: + logger.info( + f"Connecting to blockchain: {blockchain_type} with custom provider!" + ) + web3_client = connect( + blockchain_type=blockchain_type, web3_uri=web3_provider_uri + ) + except Exception as e: + logger.error( + f"Web3 connection to custom provider {web3_provider_uri} failed. Error: {e}" + ) + raise e + else: + logger.info(f"Connecting to blockchain: {blockchain_type} with node balancer.") + web3_client = _retry_connect_web3( + blockchain_type=blockchain_type, web3_uri=web3_uri + ) + logger.info(f"Crawler started connected to blockchain: {blockchain_type}") + return web3_client + + +def get_block_info(web3_client: Web3, block_number: Optional[int]) -> tuple: + """Retrieves block information.""" + if block_number is None: + block_number = web3_client.eth.get_block("latest").number # type: ignore + logger.info(f"Current block number: {block_number}") + block = web3_client.eth.get_block(block_number) # type: ignore + block_timestamp = block.timestamp # type: ignore + block_hash = block.hash.hex() # type: ignore + return block_number, block_timestamp, block_hash + + +def recursive_unpack( + method_abi: Any, + level: int, + calls: Dict[int, List[Any]], + contracts_methods: Dict[str, Any], + contracts_ABIs: Dict[str, Any], + responses: Dict[str, Any], + moonstream_token: str, + v3: bool, + customer_id: Optional[str] = None, + instance_id: Optional[str] = None, +) -> str: + """Recursively unpacks method ABIs to generate a tree of calls.""" + have_subcalls = False + if method_abi["type"] == "queryAPI": + # Make queryAPI call + response = execute_query(method_abi, token=moonstream_token) + # Generate hash for queryAPI call + generated_hash = hashlib.md5( + json.dumps( + method_abi, + sort_keys=True, + indent=4, + separators=(",", ": "), + ).encode("utf-8") + ).hexdigest() + # Add response to responses + responses[generated_hash] = response + return generated_hash + + abi = { + "inputs": [], + "outputs": method_abi["outputs"], + "name": method_abi["name"], + "type": "function", + "stateMutability": "view", + "v3": v3, + "customer_id": customer_id, + "instance_id": instance_id, + } + + for input in method_abi["inputs"]: + if isinstance(input["value"], (int, list, str)): + abi["inputs"].append(input) + elif isinstance(input["value"], dict): + if input["value"]["type"] in ["function", "queryAPI"]: + hash_link = recursive_unpack( + input["value"], + level + 1, + calls, + contracts_methods, + contracts_ABIs, + responses, + moonstream_token, + v3, + customer_id, + instance_id, + ) + input["value"] = hash_link + have_subcalls = True + abi["inputs"].append(input) + + abi["address"] = method_abi["address"] + ### drop instance_id and customer_id + # del abi["instance_id"] + # del abi["customer_id"] + generated_hash = hashlib.md5( + json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode( + "utf-8" + ) + ).hexdigest() + abi["generated_hash"] = generated_hash + + if have_subcalls: + level += 1 + calls.setdefault(level, []).append(abi) + else: + level = 0 + calls.setdefault(level, []).append(abi) + + contracts_methods.setdefault(method_abi["address"], []) + if abi["name"] not in contracts_methods[method_abi["address"]]: + ### lets try to deduplicate by method name + contracts_methods[method_abi["address"]].append(abi["name"]) + contracts_ABIs.setdefault(method_abi["address"], {}) + contracts_ABIs[method_abi["address"]][abi["name"]] = abi + + return generated_hash + + +def build_interfaces( + contracts_ABIs: Dict[str, Any], contracts_methods: Dict[str, Any], web3_client: Web3 +) -> Dict[str, Any]: + """Builds contract interfaces.""" + interfaces = {} + for contract_address in contracts_ABIs: + + abis = [ + contracts_ABIs[contract_address][method_name] + for method_name in contracts_methods[contract_address] + ] + try: + interfaces[contract_address] = web3_client.eth.contract( + address=web3_client.toChecksumAddress(contract_address), abi=abis + ) + except Exception as e: + logger.error(f"Failed to connect to contract {contract_address}: {e}") + continue + return interfaces + + +def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[str]: + """Processes the address field of a job and returns a list of addresses.""" + if isinstance(job["address"], str): + return [Web3.toChecksumAddress(job["address"])] + elif isinstance(job["address"], list): + return [ + Web3.toChecksumAddress(address) for address in job["address"] + ] # manual job multiplication + elif isinstance(job["address"], dict): + if job["address"].get("type") == "queryAPI": + # QueryAPI job multiplication + addresses = execute_query(job["address"], token=moonstream_token) + checsum_addresses = [] + for address in addresses: + try: + checsum_addresses.append(Web3.toChecksumAddress(address)) + except Exception as e: + logger.error(f"Invalid address: {address}") + continue + return checsum_addresses + else: + raise ValueError(f"Invalid address type: {type(job['address'])}") + else: + raise ValueError(f"Invalid address type: {type(job['address'])}") + + def parse_jobs( jobs: List[Any], blockchain_type: AvailableBlockchainType, From 44fa379211b07b72448cb112c7e72a64362b91b6 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 9 Jan 2025 14:07:29 +0200 Subject: [PATCH 02/13] Add spire metadata crawler. --- crawlers/mooncrawl/mooncrawl/actions.py | 24 ++ crawlers/mooncrawl/mooncrawl/data.py | 1 + .../mooncrawl/metadata_crawler/cli.py | 265 ++++++++++++------ .../mooncrawl/metadata_crawler/db.py | 60 ++-- crawlers/mooncrawl/mooncrawl/settings.py | 5 + crawlers/mooncrawl/setup.py | 4 +- 6 files changed, 233 insertions(+), 126 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index ba6cf868..c3a59443 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -226,3 +226,27 @@ def get_customer_db_uri( except Exception as e: logger.error(f"Error get customer db uri: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + + + +## DB V3 + +def request_connection_string( + customer_id: str, + instance_id: int, + token: str, + user: str = "seer", # token with write access +) -> str: + """ + Request connection string from the Moonstream API. + Default user is seer with write access + """ + response = requests.get( + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", + headers={"Authorization": f"Bearer {token}"}, + ) + + response.raise_for_status() + + return response.text.replace('"', "") + diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 54ae071f..020fa7f9 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -60,6 +60,7 @@ class TokenURIs(BaseModel): block_number: str block_timestamp: str address: str + block_hash: Optional[str] = None # for v3 only class ViewTasks(BaseModel): diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index eb0496a3..0d550dec 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -4,14 +4,16 @@ import logging import random import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Dict, List, Optional +from sqlalchemy.orm import Session +from typing import Any, Dict, List, Optional, Tuple from urllib.error import HTTPError -from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamtypes.blockchain import AvailableBlockchainType -from ..actions import get_all_entries_from_search -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN -from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx + +from ..actions import get_all_entries_from_search, request_connection_string +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, METADATA_TASKS_JOURNAL_ID +from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx, create_moonstream_engine, sessionmaker from ..data import TokenURIs from .db import ( clean_labels_from_db, @@ -84,13 +86,12 @@ def crawl_uri(metadata_uri: str) -> Any: return result -def process_address_metadata( +def process_address_metadata_with_leak( address: str, blockchain_type: AvailableBlockchainType, batch_size: int, max_recrawl: int, threads: int, - job: Optional[dict], tokens: List[TokenURIs], ) -> None: """ @@ -98,7 +99,6 @@ def process_address_metadata( """ with yield_db_read_only_preping_session_ctx() as db_session_read_only: try: - ## already_parsed = get_current_metadata_for_address( db_session=db_session_read_only, @@ -106,12 +106,6 @@ def process_address_metadata( address=address, ) - - ### Do we need this? - ### Can we move it to sql query? - ### Do we need to get all tokens? - - maybe_updated = get_tokens_id_wich_may_updated( db_session=db_session_read_only, blockchain_type=blockchain_type, @@ -125,13 +119,6 @@ def process_address_metadata( try: logger.info(f"Starting to crawl metadata for address: {address}") - # Determine if this is a v3 job - v3 = job.get("v3", False) if job else False - - # Determine leak rate based on job config or default behavior - leak_rate = 0.0 - update_existing = job.get("update_existing", False) if job else False - if len(maybe_updated) > 0: free_spots = len(maybe_updated) / max_recrawl if free_spots > 1: @@ -163,39 +150,38 @@ def process_address_metadata( ]: metadata_batch = [] try: - with db_session.begin(): - # Gather all metadata in parallel - with ThreadPoolExecutor(max_workers=threads) as executor: - future_to_token = { - executor.submit(crawl_uri, token.token_uri): token - for token in requests_chunk - } - for future in as_completed(future_to_token): - token = future_to_token[future] - try: - metadata = future.result(timeout=10) - if metadata: - metadata_batch.append((token, metadata)) - except Exception as e: - logger.error(f"Error fetching metadata for token {token.token_id}: {e}") - continue - if metadata_batch: - # Batch upsert all metadata - upsert_metadata_labels( - db_session=db_session, - blockchain_type=blockchain_type, - metadata_batch=metadata_batch, - v3=v3, - update_existing=update_existing - ) - - clean_labels_from_db( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - ) - logger.info(f"Write {len(metadata_batch)} labels for {address}") + # Gather all metadata in parallel + with ThreadPoolExecutor(max_workers=threads) as executor: + future_to_token = { + executor.submit(crawl_uri, token.token_uri): token + for token in requests_chunk + } + for future in as_completed(future_to_token): + token = future_to_token[future] + try: + metadata = future.result(timeout=10) + if metadata: + metadata_batch.append((token, metadata)) + except Exception as e: + logger.error(f"Error fetching metadata for token {token.token_id}: {e}") + continue + + if metadata_batch: + # Batch upsert all metadata + upsert_metadata_labels( + db_session=db_session, + blockchain_type=blockchain_type, + metadata_batch=metadata_batch, + v3=False + ) + + clean_labels_from_db( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + ) + logger.info(f"Write {len(metadata_batch)} labels for {address}") except Exception as err: logger.warning(f"Error while writing labels for address {address}: {err}") @@ -206,12 +192,64 @@ def process_address_metadata( db_session.rollback() + +def process_address_metadata( + address: str, + blockchain_type: AvailableBlockchainType, + db_session: Session, + batch_size: int, + max_recrawl: int, + threads: int, + tokens: List[TokenURIs], +) -> None: + """ + Process metadata for a single address with v3 support + Leak logic is implemented in sql statement + """ + + + + + for requests_chunk in [ + tokens[i : i + batch_size] + for i in range(0, len(tokens), batch_size) + ]: + metadata_batch = [] + with ThreadPoolExecutor(max_workers=threads) as executor: + future_to_token = { + executor.submit(crawl_uri, token.token_uri): token + for token in requests_chunk + } + for future in as_completed(future_to_token): + token = future_to_token[future] + metadata = future.result(timeout=10) + metadata_batch.append((token, metadata)) + + + upsert_metadata_labels( + db_session=db_session, + blockchain_type=blockchain_type, + metadata_batch=metadata_batch, + v3=True + ) + + clean_labels_from_db( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + ) + + + + + def parse_metadata( blockchain_type: AvailableBlockchainType, batch_size: int, max_recrawl: int, threads: int, - metadata_journal_id: Optional[str] = None, + spire: bool = False, + custom_db_uri: Optional[str] = None, ): """ Parse all metadata of tokens. @@ -219,13 +257,37 @@ def parse_metadata( logger.info("Starting metadata crawler") logger.info(f"Processing blockchain {blockchain_type.value}") + # Get tokens to crawl v2 flow + with yield_db_read_only_preping_session_ctx() as db_session_read_only: + tokens_uri_by_address = get_tokens_to_crawl( + db_session_read_only, + blockchain_type, + {}, + ) + + + # Process each address + for address, tokens in tokens_uri_by_address.items(): + process_address_metadata_with_leak( + address=address, + blockchain_type=blockchain_type, + batch_size=batch_size, + max_recrawl=max_recrawl, + threads=threads, + tokens=tokens, + ) + + + + spire_jobs = [] - if metadata_journal_id: + + if spire == True: # Get all jobs for this blockchain from Spire search_query = f"#metadata-job #{blockchain_type.value}" try: entries = get_all_entries_from_search( - journal_id=metadata_journal_id, + journal_id=METADATA_TASKS_JOURNAL_ID, search_query=search_query, token=MOONSTREAM_ADMIN_ACCESS_TOKEN, content=True, @@ -253,31 +315,67 @@ def parse_metadata( return # Process each job - for job in spire_jobs or [None]: # If no jobs, run once with None - try: - # Get tokens to crawl - with yield_db_read_only_preping_session_ctx() as db_session_read_only: + + # sessions list for each customer and instance + sessions_by_customer: Dict[Tuple[str, str], Session] = {} + + # all sessions in one try block + try: + for job in spire_jobs: + try: + customer_id = job.get("customer_id") + instance_id = job.get("instance_id") + + if (customer_id, instance_id) not in sessions_by_customer: + # Create session + # Assume fetch_connection_string fetches the connection string + if custom_db_uri: + connection_string = custom_db_uri + else: + connection_string = request_connection_string( + customer_id=customer_id, + instance_id=instance_id, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + engine = create_moonstream_engine(connection_string, 2, 100000) + session = sessionmaker(bind=engine) + try: + sessions_by_customer[(customer_id, instance_id)] = session() + except Exception as e: + logger.error(f"Connection to {engine} failed: {e}") + continue + + + # Get tokens to crawl tokens_uri_by_address = get_tokens_to_crawl( - db_session_read_only, + sessions_by_customer[(customer_id, instance_id)], blockchain_type, job, - ) + ) - # Process each address - for address, tokens in tokens_uri_by_address.items(): - process_address_metadata( - address=address, - blockchain_type=blockchain_type, - batch_size=batch_size, - max_recrawl=max_recrawl, - threads=threads, - job=job, - tokens=tokens, - ) - - except Exception as err: - logger.error(f"Error processing job: {err}") - continue + for address, tokens in tokens_uri_by_address.items(): + process_address_metadata( + address=address, + blockchain_type=blockchain_type, + db_session=sessions_by_customer[(customer_id, instance_id)], + batch_size=batch_size, + max_recrawl=max_recrawl, + threads=threads, + tokens=tokens, + ) + except Exception as err: + logger.error(f"Error processing job: {err}") + continue + except Exception as err: + logger.error(f"Error processing jobs: {err}") + raise err + + finally: + for session in sessions_by_customer.values(): + try: + session.close() + except Exception as err: + logger.error(f"Error closing session: {err}") def handle_crawl(args: argparse.Namespace) -> None: @@ -290,7 +388,8 @@ def handle_crawl(args: argparse.Namespace) -> None: args.commit_batch_size, args.max_recrawl, args.threads, - args.metadata_journal_id, + args.spire, + args.custom_db_uri, ) @@ -333,9 +432,15 @@ def main() -> None: help="Amount of threads for crawling", ) metadata_crawler_parser.add_argument( - "--metadata-journal-id", + "--spire", + type=bool, + default=False, + help="If true, use spire jobs to crawl metadata", + ) + metadata_crawler_parser.add_argument( + "--custom-db-uri", type=str, - help="Optional Spire journal ID containing metadata jobs", + help="Custom db uri to use for crawling", ) metadata_crawler_parser.set_defaults(func=handle_crawl) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index b7fbac85..03987f55 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -67,7 +67,6 @@ def metadata_to_label( address=HexBytes(token_uri_data.address), block_number=token_uri_data.block_number, # Use a fixed tx hash for metadata since it's not from a transaction - transaction_hash="0x2653135e31407726a25dd8d304878578cdfcc7d69a2b319d1aca4a37ed66956a", block_timestamp=token_uri_data.block_timestamp, block_hash=token_uri_data.block_hash if hasattr(token_uri_data, 'block_hash') else None, ) @@ -99,13 +98,13 @@ def commit_session(db_session: Session) -> None: def get_uris_of_tokens( - db_session: Session, blockchain_type: AvailableBlockchainType + db_session: Session, blockchain_type: AvailableBlockchainType, version: int = 2 ) -> List[TokenURIs]: """ Get meatadata URIs. """ - label_model = get_label_model(blockchain_type) + label_model = get_label_model(blockchain_type, version=version) table = label_model.__tablename__ @@ -152,13 +151,13 @@ def get_uris_of_tokens( def get_current_metadata_for_address( - db_session: Session, blockchain_type: AvailableBlockchainType, address: str + db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2 ): """ Get existing metadata. """ - label_model = get_label_model(blockchain_type) + label_model = get_label_model(blockchain_type, version=version) table = label_model.__tablename__ @@ -188,7 +187,7 @@ def get_current_metadata_for_address( def get_tokens_id_wich_may_updated( - db_session: Session, blockchain_type: AvailableBlockchainType, address: str + db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2 ): """ Returns a list of tokens which may have updated information. @@ -202,7 +201,7 @@ def get_tokens_id_wich_may_updated( Required integration with entity API and opcodes crawler. """ - label_model = get_label_model(blockchain_type) + label_model = get_label_model(blockchain_type, version=version) table = label_model.__tablename__ @@ -358,7 +357,10 @@ def get_tokens_to_crawl( """ tokens_uri_by_address = {} - if spire_job and "query_api" in spire_job: + if spire_job: + if "query_api" not in spire_job: + raise ValueError("Query API is not specified in Spire job") + # Get tokens from Query API query_config = spire_job["query_api"] client = Moonstream() @@ -390,7 +392,8 @@ def upsert_metadata_labels( blockchain_type: AvailableBlockchainType, metadata_batch: List[Tuple[TokenURIs, Optional[Dict[str, Any]]]], v3: bool = False, - update_existing: bool = False, + db_batch_size: int = 100, + ) -> None: """ Batch upsert metadata labels - update if exists, insert if not. @@ -398,10 +401,12 @@ def upsert_metadata_labels( try: version = 3 if v3 else 2 label_model = get_label_model(blockchain_type, version=version) + # Prepare batch of labels labels_data = [] for token_uri_data, metadata in metadata_batch: + if v3: # V3 structure label_data = { @@ -416,7 +421,6 @@ def upsert_metadata_labels( "label_data": label_data, "address": HexBytes(token_uri_data.address), "block_number": token_uri_data.block_number, - "transaction_hash": "0x2653135e31407726a25dd8d304878578cdfcc7d69a2b319d1aca4a37ed66956a", "block_timestamp": token_uri_data.block_timestamp, "block_hash": getattr(token_uri_data, 'block_hash', None), }) @@ -442,40 +446,8 @@ def upsert_metadata_labels( # Create insert statement insert_stmt = insert(label_model).values(labels_data) - - if v3: - # V3 upsert - result_stmt = insert_stmt.on_conflict_do_update( - index_elements=[ - label_model.label, - label_model.label_name, - label_model.address, - label_model.label_data["token_id"].astext, - ], - set_=dict( - label_data=insert_stmt.excluded.label_data, - block_number=insert_stmt.excluded.block_number, - block_timestamp=insert_stmt.excluded.block_timestamp, - block_hash=insert_stmt.excluded.block_hash, - updated_at=datetime.now(), - ), - - ) - else: - # V2 upsert - result_stmt = insert_stmt.on_conflict_do_update( - index_elements=[ - label_model.label, - label_model.address, - label_model.label_data["token_id"].astext, - ], - set_=dict( - label_data=insert_stmt.excluded.label_data, - block_number=insert_stmt.excluded.block_number, - block_timestamp=insert_stmt.excluded.block_timestamp, - updated_at=datetime.now(), - ), - ) + result_stmt = insert_stmt.on_conflict_do_nothing( + ) db_session.execute(result_stmt) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index c44ec00c..ab2fbcac 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -510,3 +510,8 @@ MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( "MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain" ) + + +METADATA_TASKS_JOURNAL_ID = os.environ.get( + "METADATA_TASKS_JOURNAL_ID", "" +) diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index b54e9a19..8fbef953 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -38,8 +38,8 @@ setup( "chardet", "fastapi", "moonstreamdb>=0.4.6", - "moonstreamdb-v3>=0.1.3", - "moonstream-types>=0.0.10", + "moonstreamdb-v3>=0.1.4", + "moonstream-types>=0.0.11", "moonstream>=0.1.2", "moonworm[moonstream]>=0.9.3", "humbug", From 6b3ea9242ac9ba956848e9f6bdb541a69da687dc Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 9 Jan 2025 14:38:02 +0200 Subject: [PATCH 03/13] Remove state crawler changes. --- .../mooncrawl/mooncrawl/state_crawler/cli.py | 206 +----------------- 1 file changed, 6 insertions(+), 200 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index eb7427bc..696a5394 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -9,9 +9,9 @@ from concurrent.futures._base import TimeoutError from pprint import pprint from typing import Any, Dict, List, Optional from uuid import UUID -from web3 import Web3 -from moonstreamtypes.blockchain import AvailableBlockchainType # type: ignore +from moonstream.client import Moonstream # type: ignore +from moonstreamtypes.blockchain import AvailableBlockchainType from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 from ..actions import recive_S3_data_from_query, get_all_entries_from_search @@ -25,37 +25,19 @@ from ..settings import ( multicall_contracts, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, - MOONSTREAM_DB_V3_CONTROLLER_API, - moonstream_client as mc, ) from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature - logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def request_connection_string( - customer_id: str, - instance_id: int, - token: str, - user: str = "seer", # token with write access -) -> str: - """ - Request connection string from the Moonstream API. - """ - response = requests.get( - f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", - headers={"Authorization": f"Bearer {token}"}, - ) - response.raise_for_status() - - return response.text.replace('"', "") +client = Moonstream() -def execute_query(query: Dict[str, Any], token: str) -> Any: +def execute_query(query: Dict[str, Any], token: str): """ Query task example: @@ -90,7 +72,7 @@ def execute_query(query: Dict[str, Any], token: str) -> Any: # run query template via moonstream query API data = recive_S3_data_from_query( - client=mc, + client=client, token=token, query_name=query_url, custom_body=body, @@ -320,182 +302,6 @@ def crawl_calls_level( return batch_size -def connect_to_web3( - blockchain_type: Any, - web3_provider_uri: Optional[str], - web3_uri: Optional[str], -) -> Web3: - """Connects to the Web3 client.""" - if web3_provider_uri is not None: - try: - logger.info( - f"Connecting to blockchain: {blockchain_type} with custom provider!" - ) - web3_client = connect( - blockchain_type=blockchain_type, web3_uri=web3_provider_uri - ) - except Exception as e: - logger.error( - f"Web3 connection to custom provider {web3_provider_uri} failed. Error: {e}" - ) - raise e - else: - logger.info(f"Connecting to blockchain: {blockchain_type} with node balancer.") - web3_client = _retry_connect_web3( - blockchain_type=blockchain_type, web3_uri=web3_uri - ) - logger.info(f"Crawler started connected to blockchain: {blockchain_type}") - return web3_client - - -def get_block_info(web3_client: Web3, block_number: Optional[int]) -> tuple: - """Retrieves block information.""" - if block_number is None: - block_number = web3_client.eth.get_block("latest").number # type: ignore - logger.info(f"Current block number: {block_number}") - block = web3_client.eth.get_block(block_number) # type: ignore - block_timestamp = block.timestamp # type: ignore - block_hash = block.hash.hex() # type: ignore - return block_number, block_timestamp, block_hash - - -def recursive_unpack( - method_abi: Any, - level: int, - calls: Dict[int, List[Any]], - contracts_methods: Dict[str, Any], - contracts_ABIs: Dict[str, Any], - responses: Dict[str, Any], - moonstream_token: str, - v3: bool, - customer_id: Optional[str] = None, - instance_id: Optional[str] = None, -) -> str: - """Recursively unpacks method ABIs to generate a tree of calls.""" - have_subcalls = False - if method_abi["type"] == "queryAPI": - # Make queryAPI call - response = execute_query(method_abi, token=moonstream_token) - # Generate hash for queryAPI call - generated_hash = hashlib.md5( - json.dumps( - method_abi, - sort_keys=True, - indent=4, - separators=(",", ": "), - ).encode("utf-8") - ).hexdigest() - # Add response to responses - responses[generated_hash] = response - return generated_hash - - abi = { - "inputs": [], - "outputs": method_abi["outputs"], - "name": method_abi["name"], - "type": "function", - "stateMutability": "view", - "v3": v3, - "customer_id": customer_id, - "instance_id": instance_id, - } - - for input in method_abi["inputs"]: - if isinstance(input["value"], (int, list, str)): - abi["inputs"].append(input) - elif isinstance(input["value"], dict): - if input["value"]["type"] in ["function", "queryAPI"]: - hash_link = recursive_unpack( - input["value"], - level + 1, - calls, - contracts_methods, - contracts_ABIs, - responses, - moonstream_token, - v3, - customer_id, - instance_id, - ) - input["value"] = hash_link - have_subcalls = True - abi["inputs"].append(input) - - abi["address"] = method_abi["address"] - ### drop instance_id and customer_id - # del abi["instance_id"] - # del abi["customer_id"] - generated_hash = hashlib.md5( - json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode( - "utf-8" - ) - ).hexdigest() - abi["generated_hash"] = generated_hash - - if have_subcalls: - level += 1 - calls.setdefault(level, []).append(abi) - else: - level = 0 - calls.setdefault(level, []).append(abi) - - contracts_methods.setdefault(method_abi["address"], []) - if abi["name"] not in contracts_methods[method_abi["address"]]: - ### lets try to deduplicate by method name - contracts_methods[method_abi["address"]].append(abi["name"]) - contracts_ABIs.setdefault(method_abi["address"], {}) - contracts_ABIs[method_abi["address"]][abi["name"]] = abi - - return generated_hash - - -def build_interfaces( - contracts_ABIs: Dict[str, Any], contracts_methods: Dict[str, Any], web3_client: Web3 -) -> Dict[str, Any]: - """Builds contract interfaces.""" - interfaces = {} - for contract_address in contracts_ABIs: - - abis = [ - contracts_ABIs[contract_address][method_name] - for method_name in contracts_methods[contract_address] - ] - try: - interfaces[contract_address] = web3_client.eth.contract( - address=web3_client.toChecksumAddress(contract_address), abi=abis - ) - except Exception as e: - logger.error(f"Failed to connect to contract {contract_address}: {e}") - continue - return interfaces - - -def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[str]: - """Processes the address field of a job and returns a list of addresses.""" - if isinstance(job["address"], str): - return [Web3.toChecksumAddress(job["address"])] - elif isinstance(job["address"], list): - return [ - Web3.toChecksumAddress(address) for address in job["address"] - ] # manual job multiplication - elif isinstance(job["address"], dict): - if job["address"].get("type") == "queryAPI": - # QueryAPI job multiplication - addresses = execute_query(job["address"], token=moonstream_token) - checsum_addresses = [] - for address in addresses: - try: - checsum_addresses.append(Web3.toChecksumAddress(address)) - except Exception as e: - logger.error(f"Invalid address: {address}") - continue - return checsum_addresses - else: - raise ValueError(f"Invalid address type: {type(job['address'])}") - else: - raise ValueError(f"Invalid address type: {type(job['address'])}") - - def parse_jobs( jobs: List[Any], blockchain_type: AvailableBlockchainType, @@ -1018,4 +824,4 @@ def main() -> None: if __name__ == "__main__": - main() + main() \ No newline at end of file From 34e13c151985d24057271cff44ac64417a15a1c9 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 9 Jan 2025 14:39:49 +0200 Subject: [PATCH 04/13] Bump version. --- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 3592a458..69fad2b6 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.5.1" +MOONCRAWL_VERSION = "0.5.2" From 8e0ce97989c674cc24bdd44a424b6f852807955a Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 9 Jan 2025 14:47:57 +0200 Subject: [PATCH 05/13] Add readme. --- .../mooncrawl/metadata_crawler/Readme.md | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 crawlers/mooncrawl/mooncrawl/metadata_crawler/Readme.md diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/Readme.md b/crawlers/mooncrawl/mooncrawl/metadata_crawler/Readme.md new file mode 100644 index 00000000..c260ba6e --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/Readme.md @@ -0,0 +1,180 @@ +# Metadata Crawler Architecture + +## Overview +The metadata crawler is designed to fetch and store metadata for NFTs (Non-Fungible Tokens) from various blockchains. It supports both traditional database TokenURI view methods queries and Spire journal-based job configurations, with the ability to handle both v2 and v3 database structures. + + +## Core Components + +### 1. Update Strategies + +#### Leak-Based Strategy (Legacy v2) +- Uses probabilistic approach to determine which tokens to update +- Controlled by `max_recrawl` parameter +- Suitable for large collections with infrequent updates + +#### SQL-Based Strategy (v3) +- Uses SQL queries to determine which tokens need updates +- More precise tracking of token updates +- Better suited for active collections + +### 2. Database Connections + +The crawler supports multiple database connection strategies: +- Default Moonstream database connection +- Custom database URI via `--custom-db-uri` +- Per-customer instance connections (v3) + ```json + { + "customer_id": "...", + "instance_id": "...", + "blockchain": "ethereum", + "v3": true + } + ``` + +### 3. Job Configuration +Jobs can be configured in two ways: +- Through Spire journal entries with tags `#metadata-job #{blockchain}` +- Direct database queries (legacy mode) using TokenURI view method +Example Spire journal entry: +```json +{ + "type": "metadata-job", + "query_api": { + "name": "new_tokens_to_crawl", + "params": { + "address": "0x...", + "blockchain": "ethereum" + } + }, + "contract_address": "0x...", + "blockchain": "ethereum", + "update_existing": false, + "v3": true, + "customer_id": "...", // Optional, for custom database + "instance_id": "..." // Optional, for custom database +} +``` + +### 2. Data Flow +1. **Token Discovery** + - Query API integration for dynamic token discovery + - Database queries for existing tokens + - Support for multiple addresses per job + +2. **Metadata Fetching** + - Parallel processing with ThreadPoolExecutor + - IPFS gateway support + - Automatic retry mechanism + - Rate limiting and batch processing + +3. **Storage** + - Supports both v2 and v3 database structures + - Batch upsert operations + - Efficient cleaning of old labels + +### 3. Database Structures + +v2: +```python +{ + "label": METADATA_CRAWLER_LABEL, + "label_data": { + "type": "metadata", + "token_id": "...", + "metadata": {...} + }, + "block_number": 1234567890 + "block_timestamp": 456 +} +``` + +v3: +```python +{ + "label": METADATA_CRAWLER_LABEL, + "label_type": "metadata", + "label_data": { + "token_id": "...", + "metadata": {...} + }, + "address": "0x...", + "block_number": 123, + "block_timestamp": 456, + "block_hash": "0x..." +} + +``` + +## Key Features + +1. **Flexible Token Selection** + - Query API integration + - Support for multiple addresses + - Configurable update strategies + +2. **Efficient Processing** + - Batch processing + - Parallel metadata fetching + - Optimized database operations + +3. **Error Handling** + - Retry mechanism for failed requests + - Transaction management + - Detailed logging + +4. **Database Management** + - Efficient upsert operations + - Label cleaning + - Version compatibility (v2/v3) + +## Usage + +### CLI Options + +```bash +metadata-crawler crawl \ +--blockchain ethereum \ +--commit-batch-size 50 \ +--max-recrawl 300 \ +--threads 4 \ +--spire true \ +--custom-db-uri "postgresql://..." # Optional +``` +### Environment Variables +- `MOONSTREAM_ADMIN_ACCESS_TOKEN`: Required for API access +- `METADATA_CRAWLER_LABEL`: Label for database entries +- `METADATA_TASKS_JOURNAL_ID`: Journal ID for metadata tasks + + +### Database Modes + +1. **Legacy Mode (v2)** + - Uses leak-based update strategy + - Single database connection + - Simple metadata structure + +2. **Modern Mode (v3)** + - SQL-based update tracking + - Support for multiple database instances + - Enhanced metadata structure + - Per-customer database isolation + + +## Best Practices + +1. **Job Configuration** + - Use descriptive job names + - Group related addresses + - Set appropriate update intervals + +2. **Performance Optimization** + - Adjust batch sizes based on network conditions + - Monitor thread count vs. performance + - Use appropriate IPFS gateways + +3. **Maintenance** + - Regular cleaning of old labels + - Monitor database size + - Check for failed metadata fetches From ba95f3c88d88980f6a17bf576132ac5ee2a3c272 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 13 Jan 2025 22:31:42 +0200 Subject: [PATCH 06/13] Upgrade query proccessing. --- crawlers/mooncrawl/mooncrawl/actions.py | 4 + crawlers/mooncrawl/mooncrawl/api.py | 39 ++--- .../mooncrawl/metadata_crawler/cli.py | 133 +++++++++--------- .../mooncrawl/metadata_crawler/db.py | 35 ++++- crawlers/mooncrawl/mooncrawl/settings.py | 15 +- moonstreamapi/moonstreamapi/routes/queries.py | 2 +- 6 files changed, 135 insertions(+), 93 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index c3a59443..36f1a5e1 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -116,6 +116,7 @@ def recive_S3_data_from_query( client: Moonstream, token: Union[str, uuid.UUID], query_name: str, + query_params: Dict[str, Any] = {}, params: Dict[str, Any] = {}, time_await: int = 2, max_retries: int = 30, @@ -136,15 +137,18 @@ def recive_S3_data_from_query( if custom_body: headers = { "Authorization": f"Bearer {token}", + "Content-Type": "application/json", } json = custom_body response = requests.post( url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", headers=headers, + params=query_params, json=json, timeout=5, ) + data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) else: data_url = client.exec_query( diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 615d2805..5db35304 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -13,12 +13,7 @@ import boto3 # type: ignore from bugout.data import BugoutJournalEntity, BugoutResource from fastapi import BackgroundTasks, FastAPI from fastapi.middleware.cors import CORSMiddleware -from moonstreamdb.blockchain import ( - AvailableBlockchainType, - get_block_model, - get_label_model, - get_transaction_model, -) +from moonstreamtypes.blockchain import AvailableBlockchainType, get_block_model, get_label_model, get_transaction_model from sqlalchemy import text from . import data @@ -232,6 +227,11 @@ async def queries_data_update_handler( requested_query = request_data.query + version = 2 + + if request_data.customer_id and request_data.instance_id: + version = 3 + blockchain_table = "polygon_labels" if request_data.blockchain: if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: @@ -240,22 +240,23 @@ async def queries_data_update_handler( blockchain = AvailableBlockchainType(request_data.blockchain) - requested_query = ( - requested_query.replace( - "__transactions_table__", - get_transaction_model(blockchain).__tablename__, - ) - .replace( - "__blocks_table__", - get_block_model(blockchain).__tablename__, - ) - .replace( + blockchain_table = get_label_model(blockchain, version).__tablename__ + requested_query = requested_query.replace( "__labels_table__", - get_label_model(blockchain).__tablename__, - ) + blockchain_table ) + if version == 2: + ( + requested_query.replace( + "__transactions_table__", + get_transaction_model(blockchain).__tablename__, + ) + .replace( + "__blocks_table__", + get_block_model(blockchain).__tablename__, + ) - blockchain_table = get_label_model(blockchain).__tablename__ + ) # Check if it can transform to TextClause try: diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 0d550dec..7ecda650 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -8,11 +8,13 @@ from sqlalchemy.orm import Session from typing import Any, Dict, List, Optional, Tuple from urllib.error import HTTPError +from bugout.exceptions import BugoutResponseException from moonstreamtypes.blockchain import AvailableBlockchainType +from moonstreamdb.blockchain import AvailableBlockchainType as AvailableBlockchainTypeV2 from ..actions import get_all_entries_from_search, request_connection_string -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, METADATA_TASKS_JOURNAL_ID +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_METADATA_TASKS_JOURNAL, MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx, create_moonstream_engine, sessionmaker from ..data import TokenURIs from .db import ( @@ -77,6 +79,8 @@ def crawl_uri(metadata_uri: str) -> Any: except HTTPError as error: logger.error(f"request end with error statuscode: {error.code}") retry += 1 + if error.code == 404: + return None continue except Exception as err: logger.error(err) @@ -233,12 +237,17 @@ def process_address_metadata( v3=True ) + db_session.commit() + clean_labels_from_db( db_session=db_session, blockchain_type=blockchain_type, address=address, + version=3 ) + db_session.commit() + @@ -248,7 +257,6 @@ def parse_metadata( batch_size: int, max_recrawl: int, threads: int, - spire: bool = False, custom_db_uri: Optional[str] = None, ): """ @@ -257,62 +265,66 @@ def parse_metadata( logger.info("Starting metadata crawler") logger.info(f"Processing blockchain {blockchain_type.value}") - # Get tokens to crawl v2 flow - with yield_db_read_only_preping_session_ctx() as db_session_read_only: - tokens_uri_by_address = get_tokens_to_crawl( - db_session_read_only, - blockchain_type, - {}, - ) - - - # Process each address - for address, tokens in tokens_uri_by_address.items(): - process_address_metadata_with_leak( - address=address, - blockchain_type=blockchain_type, - batch_size=batch_size, - max_recrawl=max_recrawl, - threads=threads, - tokens=tokens, - ) - - - - - spire_jobs = [] - - if spire == True: - # Get all jobs for this blockchain from Spire - search_query = f"#metadata-job #{blockchain_type.value}" + # Check if blockchain exists in v2 package + if blockchain_type.value in [chain.value for chain in AvailableBlockchainTypeV2]: try: - entries = get_all_entries_from_search( - journal_id=METADATA_TASKS_JOURNAL_ID, - search_query=search_query, - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - content=True, - limit=1000, - ) - - logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}") - - for entry in entries: - try: - if not entry.content: - continue - - job = json.loads(entry.content) - if job.get("blockchain") != blockchain_type.value: - logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}") - continue - spire_jobs.append(job) - except Exception as err: - id = entry.entry_url.split("/")[-1] - logger.error(f"Error parsing job from entry {id}: {err}") - continue + logger.info(f"Processing v2 blockchain: {blockchain_type.value}") + # Get tokens to crawl v2 flow + with yield_db_read_only_preping_session_ctx() as db_session_read_only: + tokens_uri_by_address = get_tokens_to_crawl( + db_session_read_only, + blockchain_type, + {}, + ) + + # Process each address + for address, tokens in tokens_uri_by_address.items(): + process_address_metadata_with_leak( + address=address, + blockchain_type=blockchain_type, + batch_size=batch_size, + max_recrawl=max_recrawl, + threads=threads, + tokens=tokens, + ) except Exception as err: - logger.error(f"Error fetching jobs from journal: {err}") - return + logger.error(f"V2 flow failed: {err}, continuing with Spire flow") + + # Continue with Spire flow regardless of v2 result + spire_jobs = [] + + # Get all jobs for this blockchain from Spire + search_query = f"#metadata-job #{blockchain_type.value}" + try: + entries = get_all_entries_from_search( + journal_id=MOONSTREAM_METADATA_TASKS_JOURNAL, + search_query=search_query, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + content=True, + limit=1000, + ) + + logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}") + + for entry in entries: + try: + if not entry.content: + continue + + job = json.loads(entry.content) + if job.get("blockchain") != blockchain_type.value: + logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}") + continue + spire_jobs.append(job) + except Exception as err: + id = entry.entry_url.split("/")[-1] + logger.error(f"Error parsing job from entry {id}: {err}") + continue + except BugoutResponseException as err: + logger.error(f"Bugout error fetching jobs from journal: {err.detail}") + except Exception as err: + logger.error(f"Error fetching jobs from journal: {err}") + return # Process each job @@ -369,7 +381,7 @@ def parse_metadata( except Exception as err: logger.error(f"Error processing jobs: {err}") raise err - + finally: for session in sessions_by_customer.values(): try: @@ -388,7 +400,6 @@ def handle_crawl(args: argparse.Namespace) -> None: args.commit_batch_size, args.max_recrawl, args.threads, - args.spire, args.custom_db_uri, ) @@ -431,12 +442,6 @@ def main() -> None: default=4, help="Amount of threads for crawling", ) - metadata_crawler_parser.add_argument( - "--spire", - type=bool, - default=False, - help="If true, use spire jobs to crawl metadata", - ) metadata_crawler_parser.add_argument( "--custom-db-uri", type=str, diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index 03987f55..2d3785a3 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -7,7 +7,6 @@ from sqlalchemy.dialects.postgresql import insert from datetime import datetime -##from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model from sqlalchemy.orm import Session from sqlalchemy.sql import text @@ -19,6 +18,7 @@ from ..settings import ( METADATA_CRAWLER_LABEL, VIEW_STATE_CRAWLER_LABEL, MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN, bugout_client as bc, moonstream_client as mc, ) @@ -271,14 +271,14 @@ def get_tokens_id_wich_may_updated( def clean_labels_from_db( - db_session: Session, blockchain_type: AvailableBlockchainType, address: str + db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2 ): """ Remove existing labels. But keep the latest one for each token. """ - label_model = get_label_model(blockchain_type) + label_model = get_label_model(blockchain_type, version=version) table = label_model.__tablename__ @@ -315,19 +315,34 @@ def clean_labels_from_db( def get_tokens_from_query_api( client: Moonstream, + blockchain_type: AvailableBlockchainType, query_name: str, params: dict, token: str, + customer_id: Optional[str] = None, + instance_id: Optional[str] = None, ) -> List[TokenURIs]: """ Get token URIs from Query API results """ + + query_params = {} + + if customer_id and instance_id: + query_params["customer_id"] = customer_id + query_params["instance_id"] = instance_id + try: data = recive_S3_data_from_query( client=client, token=token, query_name=query_name, - params=params, + params={}, + query_params=query_params, + custom_body={ + "blockchain": blockchain_type.value, + "params": params, + } ) # Convert query results to TokenURIs format @@ -367,9 +382,12 @@ def get_tokens_to_crawl( tokens = get_tokens_from_query_api( client=client, + blockchain_type=blockchain_type, query_name=query_config["name"], params=query_config["params"], - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + token=MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN, + customer_id=spire_job["customer_id"], + instance_id=spire_job["instance_id"], ) # Group by address @@ -401,7 +419,6 @@ def upsert_metadata_labels( try: version = 3 if v3 else 2 label_model = get_label_model(blockchain_type, version=version) - # Prepare batch of labels labels_data = [] @@ -449,7 +466,11 @@ def upsert_metadata_labels( result_stmt = insert_stmt.on_conflict_do_nothing( ) - db_session.execute(result_stmt) + result = db_session.execute(result_stmt) + engine = db_session.get_bind() + logger.info(f"Database URL: {engine.engine.url}") + + db_session.commit() except Exception as err: logger.error(f"Error batch upserting metadata labels: {err}") diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index ab2fbcac..8d14e22f 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -512,6 +512,17 @@ MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( ) -METADATA_TASKS_JOURNAL_ID = os.environ.get( - "METADATA_TASKS_JOURNAL_ID", "" +MOONSTREAM_METADATA_TASKS_JOURNAL = os.environ.get( + "MOONSTREAM_METADATA_TASKS_JOURNAL", "" ) + + +### MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN + +MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN = os.environ.get( + "MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN", "" +) +if MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN == "": + raise ValueError( + "MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN environment variable must be set" + ) diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index c1388beb..e9f05216 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -16,7 +16,7 @@ from bugout.data import ( ) from bugout.exceptions import BugoutResponseException from fastapi import APIRouter, Body, Path, Query, Request -from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamtypes.blockchain import AvailableBlockchainType from sqlalchemy import text from .. import data From 943c6a46abea2f0c8ed7aed38bf89b8b2cfaac23 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 13 Jan 2025 22:41:45 +0200 Subject: [PATCH 07/13] Bump version and fix comment. --- crawlers/mooncrawl/mooncrawl/actions.py | 2 +- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 36f1a5e1..c9728e93 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -242,7 +242,7 @@ def request_connection_string( user: str = "seer", # token with write access ) -> str: """ - Request connection string from the Moonstream API. + Request connection string from the Moonstream DB V3 Controller API. Default user is seer with write access """ response = requests.get( diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 69fad2b6..1a4a3d66 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.5.2" +MOONCRAWL_VERSION = "0.5.3" From c013e544fdd42909044d14236e1f7c75eae40e16 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 12:33:58 +0200 Subject: [PATCH 08/13] MOONSTREAM_PUBLIC_QUERIES_ACCESS_TOKEN -> MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN --- crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py | 4 ++-- crawlers/mooncrawl/mooncrawl/settings.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 7ecda650..a55c2ffa 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -14,7 +14,7 @@ from moonstreamdb.blockchain import AvailableBlockchainType as AvailableBlockcha from ..actions import get_all_entries_from_search, request_connection_string -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_METADATA_TASKS_JOURNAL, MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_METADATA_TASKS_JOURNAL, MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx, create_moonstream_engine, sessionmaker from ..data import TokenURIs from .db import ( diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index 2d3785a3..bf550ea6 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -18,7 +18,7 @@ from ..settings import ( METADATA_CRAWLER_LABEL, VIEW_STATE_CRAWLER_LABEL, MOONSTREAM_ADMIN_ACCESS_TOKEN, - MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN, + MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, bugout_client as bc, moonstream_client as mc, ) @@ -385,7 +385,7 @@ def get_tokens_to_crawl( blockchain_type=blockchain_type, query_name=query_config["name"], params=query_config["params"], - token=MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN, + token=MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, customer_id=spire_job["customer_id"], instance_id=spire_job["instance_id"], ) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 8d14e22f..0cef6bab 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -519,10 +519,10 @@ MOONSTREAM_METADATA_TASKS_JOURNAL = os.environ.get( ### MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN -MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN = os.environ.get( - "MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN", "" +MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN = os.environ.get( + "MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN", "" ) -if MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN == "": +if MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN == "": raise ValueError( - "MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN environment variable must be set" + "MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN environment variable must be set" ) From 41f3645a8f043ad3a437adc64b85b3e28d1ad74f Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 18:10:11 +0200 Subject: [PATCH 09/13] Add deploy. --- crawlers/deploy/deploy-state.bash | 29 ++++++++++++++++++- crawlers/deploy/deploy.bash | 29 +++++++++++++++++++ crawlers/deploy/game7-metadata.service | 11 +++++++ crawlers/deploy/game7-metadata.timer | 9 ++++++ .../deploy/game7-testnet-metadata.service | 11 +++++++ crawlers/deploy/game7-testnet-metadata.timer | 9 ++++++ 6 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 crawlers/deploy/game7-metadata.service create mode 100644 crawlers/deploy/game7-metadata.timer create mode 100644 crawlers/deploy/game7-testnet-metadata.service create mode 100644 crawlers/deploy/game7-testnet-metadata.timer diff --git a/crawlers/deploy/deploy-state.bash b/crawlers/deploy/deploy-state.bash index 78818e25..c88f326a 100755 --- a/crawlers/deploy/deploy-state.bash +++ b/crawlers/deploy/deploy-state.bash @@ -63,6 +63,15 @@ XAI_SEPOLIA_STATE_CLEAN_TIMER_FILE="xai-sepolia-state-clean.timer" XAI_SEPOLIA_METADATA_SERVICE_FILE="xai-sepolia-metadata.service" XAI_SEPOLIA_METADATA_TIMER_FILE="xai-sepolia-metadata.timer" +# Game7 +GAME7_METADATA_SERVICE_FILE="game7-metadata.service" +GAME7_METADATA_TIMER_FILE="game7-metadata.timer" + +# Game7 testnet +GAME7_TESTNET_METADATA_SERVICE_FILE="game7-testnet-metadata.service" +GAME7_TESTNET_METADATA_TIMER_FILE="game7-testnet-metadata.timer" + + set -eu echo @@ -229,4 +238,22 @@ chmod 644 "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${ cp "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XAI_SEPOLIA_METADATA_SERVICE_FILE}" cp "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XAI_SEPOLIA_METADATA_TIMER_FILE}" XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XAI_SEPOLIA_METADATA_TIMER_FILE}" \ No newline at end of file +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XAI_SEPOLIA_METADATA_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Game7 metadata service and timer with: ${GAME7_METADATA_SERVICE_FILE}, ${GAME7_METADATA_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" +cp "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_METADATA_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Game7 testnet metadata service and timer with: ${GAME7_TESTNET_METADATA_SERVICE_FILE}, ${GAME7_TESTNET_METADATA_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" +cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_TESTNET_METADATA_TIMER_FILE}" diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index 17c315c9..63679f1e 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -217,6 +217,14 @@ MANTLE_SEPOLIA_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="mantle-sepolia-historical-cra MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="mantle-sepolia-historical-crawl-transactions.service" MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="mantle-sepolia-historical-crawl-transactions.timer" +# Game7 +GAME7_METADATA_SERVICE_FILE="game7-metadata.service" +GAME7_METADATA_TIMER_FILE="game7-metadata.timer" + +# Game7 testnet +GAME7_TESTNET_METADATA_SERVICE_FILE="game7-testnet-metadata.service" +GAME7_TESTNET_METADATA_TIMER_FILE="game7-testnet-metadata.timer" + set -eu echo @@ -1109,3 +1117,24 @@ cp "${SCRIPT_DIR}/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" cp "${SCRIPT_DIR}/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + + +# Game7 +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Game7 metadata service and timer with: ${GAME7_METADATA_SERVICE_FILE}, ${GAME7_METADATA_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" +cp "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_METADATA_TIMER_FILE}" + +# Game7 testnet +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Game7 testnet metadata service and timer with: ${GAME7_TESTNET_METADATA_SERVICE_FILE}, ${GAME7_TESTNET_METADATA_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" +cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_TESTNET_METADATA_TIMER_FILE}" diff --git a/crawlers/deploy/game7-metadata.service b/crawlers/deploy/game7-metadata.service new file mode 100644 index 00000000..96478ec3 --- /dev/null +++ b/crawlers/deploy/game7-metadata.service @@ -0,0 +1,11 @@ +[Unit] +Description=Execute metadata crawler +After=network.target + +[Service] +Type=oneshot +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.metadata_crawler.cli crawl --blockchain game7 +CPUWeight=60 +SyslogIdentifier=game7-metadata diff --git a/crawlers/deploy/game7-metadata.timer b/crawlers/deploy/game7-metadata.timer new file mode 100644 index 00000000..0ae0ee8f --- /dev/null +++ b/crawlers/deploy/game7-metadata.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Execute Game7 metadata crawler each 10m + +[Timer] +OnBootSec=20s +OnUnitActiveSec=60m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/game7-testnet-metadata.service b/crawlers/deploy/game7-testnet-metadata.service new file mode 100644 index 00000000..6d3bdc37 --- /dev/null +++ b/crawlers/deploy/game7-testnet-metadata.service @@ -0,0 +1,11 @@ +[Unit] +Description=Execute metadata crawler +After=network.target + +[Service] +Type=oneshot +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.metadata_crawler.cli crawl --blockchain game7_testnet +CPUWeight=60 +SyslogIdentifier=game7-testnet-metadata diff --git a/crawlers/deploy/game7-testnet-metadata.timer b/crawlers/deploy/game7-testnet-metadata.timer new file mode 100644 index 00000000..66950fd2 --- /dev/null +++ b/crawlers/deploy/game7-testnet-metadata.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Execute Game7 testnet metadata crawler each 10m + +[Timer] +OnBootSec=20s +OnUnitActiveSec=60m + +[Install] +WantedBy=timers.target From 00fa04ae259714f38bddede0f370bfb944e9e160 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 18:41:19 +0200 Subject: [PATCH 10/13] fix leak rate. --- crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index a55c2ffa..7946a201 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -123,11 +123,10 @@ def process_address_metadata_with_leak( try: logger.info(f"Starting to crawl metadata for address: {address}") + leak_rate = 0 if len(maybe_updated) > 0: free_spots = len(maybe_updated) / max_recrawl - if free_spots > 1: - leak_rate = 0 - else: + if free_spots < 1: leak_rate = 1 - ( len(already_parsed) - max_recrawl + len(maybe_updated) ) / len(already_parsed) From 28a49673e876031d7dfdc188ad14532c3aea03ca Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 19:04:38 +0200 Subject: [PATCH 11/13] Update token proccessing. --- .../mooncrawl/metadata_crawler/cli.py | 25 +++++++++++-------- .../mooncrawl/metadata_crawler/db.py | 4 +-- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 7946a201..3d90461f 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -122,22 +122,27 @@ def process_address_metadata_with_leak( with yield_db_preping_session_ctx() as db_session: try: logger.info(f"Starting to crawl metadata for address: {address}") + logger.info(f"Maybe updated: {len(maybe_updated)}") - leak_rate = 0 - if len(maybe_updated) > 0: - free_spots = len(maybe_updated) / max_recrawl - if free_spots < 1: - leak_rate = 1 - ( - len(already_parsed) - max_recrawl + len(maybe_updated) - ) / len(already_parsed) + # Calculate how many tokens we can 'leak' so total recrawled (maybe_updated + leaked) <= max_recrawl + num_already_parsed = len(already_parsed) + num_maybe_updated = len(maybe_updated) + free_spots = max(0, max_recrawl - num_maybe_updated) - parsed_with_leak = leak_of_crawled_uri( - already_parsed, leak_rate, maybe_updated - ) + if num_already_parsed > 0 and free_spots > 0: + leak_rate = free_spots / num_already_parsed + else: + leak_rate = 0 logger.info( f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}" ) + + # TODO: Fully random leak is not correct, we should leak based on created_at + parsed_with_leak = leak_of_crawled_uri( + already_parsed, leak_rate, maybe_updated + ) + logger.info(f"Already parsed: {len(already_parsed)} for {address}") logger.info(f"Amount of tokens to parse: {len(tokens)} for {address}") diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index bf550ea6..4b72803d 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -466,9 +466,7 @@ def upsert_metadata_labels( result_stmt = insert_stmt.on_conflict_do_nothing( ) - result = db_session.execute(result_stmt) - engine = db_session.get_bind() - logger.info(f"Database URL: {engine.engine.url}") + db_session.execute(result_stmt) db_session.commit() From f36836247e8ba904c4c979d728869436453f71c9 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 19:14:37 +0200 Subject: [PATCH 12/13] Delete repeated variable. --- crawlers/mooncrawl/mooncrawl/settings.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 0cef6bab..ceba9fc1 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -13,13 +13,6 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev" bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) - -MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") -MOONSTREAM_ENGINE_URL = os.environ.get( - "MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to" -) - - BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get( "MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30 ) From 4e7c270a8e7f773dc84af19c9f356bfa59e6bb49 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 Jan 2025 19:25:45 +0200 Subject: [PATCH 13/13] Add more logging. --- .../mooncrawl/metadata_crawler/cli.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 3d90461f..a9780602 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -216,12 +216,21 @@ def process_address_metadata( """ + logger.info(f"Processing address {address} with {len(tokens)} tokens") + total_tokens = len(tokens) + total_chunks = (total_tokens + batch_size - 1) // batch_size - for requests_chunk in [ + for chunk_index, requests_chunk in enumerate([ tokens[i : i + batch_size] for i in range(0, len(tokens), batch_size) - ]: + ]): + logger.info( + f"Processing chunk {chunk_index + 1}/{total_chunks} " + f"({len(requests_chunk)} tokens) for address {address}" + ) + + metadata_batch = [] with ThreadPoolExecutor(max_workers=threads) as executor: future_to_token = { @@ -241,6 +250,8 @@ def process_address_metadata( v3=True ) + logger.info(f"Wrote {len(metadata_batch)} labels for {address}") + db_session.commit() clean_labels_from_db( @@ -251,9 +262,6 @@ def process_address_metadata( ) db_session.commit() - - - def parse_metadata(