From c6f40085f312f49740bbadb280a455ad568ccb2c Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 14 Jun 2024 19:11:29 +0300 Subject: [PATCH] Add insert for v3 and fix jobs defenitions. --- .../mooncrawl/moonworm_crawler/cli.py | 28 ++-- .../moonworm_crawler/continuous_crawler.py | 1 - .../mooncrawl/moonworm_crawler/crawler.py | 44 ++++--- .../mooncrawl/moonworm_crawler/db.py | 121 ++++++++++++++---- .../moonworm_crawler/function_call_crawler.py | 1 + 5 files changed, 141 insertions(+), 54 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index f52900d1..24287908 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -6,7 +6,11 @@ from urllib.parse import urlparse, urlunparse import requests -from moonstreamdbv3.db import MoonstreamDBEngine, MoonstreamDBIndexesEngine +from moonstreamdbv3.db import ( + MoonstreamDBEngine, + MoonstreamDBIndexesEngine, + MoonstreamCustomDBEngine, +) from moonstreamtypes.blockchain import AvailableBlockchainType from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type from web3 import Web3 @@ -160,8 +164,7 @@ def get_db_connection(uuid): except Exception as e: raise Exception(f"Unhandled exception, error: {str(e)}") - connection_string = response.text.strip('"') - + connection_string = response.text try: connection_string = ensure_port_in_connection_string(connection_string) except ValueError as e: @@ -173,11 +176,18 @@ def get_db_connection(uuid): def ensure_port_in_connection_string(connection_string): + # Parse the connection string into components parsed_url = urlparse(connection_string) + + # Check if a port is specified, and if not, add the default port if parsed_url.port is None: - host = f"{parsed_url.hostname}:5432" # Assuming default port for PostgreSQL - parsed_url = parsed_url._replace(netloc=host) - connection_string = urlunparse(parsed_url) + # Append default port 5432 for PostgreSQL if no port specified + + connection_string = connection_string.replace( + "/" + connection_string.split("/")[-1], + f":5432" + "/" + connection_string.split("/")[-1], + ) + connection_string = connection_string.replace('"', "") return connection_string @@ -529,7 +539,9 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None: customer_connection = get_db_connection(args.customer_uuid) - filtered_function_call_jobs = [] # v1 + if customer_connection == "": + raise ValueError("No connection string found for the customer") + if args.only_events: filtered_function_call_jobs = [] logger.info(f"Removing function call crawl jobs since --only-events is set") @@ -555,7 +567,7 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None: logger.info(f"Blockchain type: {blockchain_type.value}") - customer_engine = MoonstreamDBEngine() + customer_engine = MoonstreamCustomDBEngine(customer_connection) with customer_engine.yield_db_session_ctx() as db_session: web3: Optional[Web3] = None diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 01f70f7f..c379c95a 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -135,7 +135,6 @@ def continuous_crawler( evm_state_provider = Web3StateProvider(web3) if version == 2: - evm_state_provider = MoonstreamEthereumStateProvider( web3, network, # type: ignore diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 7d350180..5bfdd4dc 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -2,7 +2,7 @@ import json import logging import re import time -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime import binascii from enum import Enum @@ -16,7 +16,7 @@ from moonstreamdb.subscriptions import SubscriptionTypes from moonstreamtypes.subscriptions import SubscriptionTypes from moonstreamdbv3.models_indexes import AbiJobs from moonworm.deployment import find_deployment_block # type: ignore -from sqlalchemy import func +from sqlalchemy import func, cast, JSON from sqlalchemy.orm import Session from web3.main import Web3 @@ -139,6 +139,7 @@ class FunctionCallCrawlJob: contract_address: ChecksumAddress entries_tags: Dict[UUID, List[str]] created_at: int + existing_selectors: List[str] = field(default_factory=list) def get_crawl_job_entries( @@ -778,7 +779,11 @@ def get_function_call_crawl_job_records( query = ( db_session.query(AbiJobs) .filter(AbiJobs.chain == blockchain_type.value) - .filter(func.length(AbiJobs.abi_selector) == 8) + .filter(func.length(AbiJobs.abi_selector) == 10) + .filter( + cast(AbiJobs.abi, JSON).op("->>")("type") == "function", + cast(AbiJobs.abi, JSON).op("->>")("stateMutability") != "view", + ) ) if len(addresses) != 0: @@ -790,30 +795,31 @@ def get_function_call_crawl_job_records( # Iterate over each record fetched from the database for crawl_job_record in crawl_job_records: - str_address = crawl_job_record.address.hex() # Convert address to hex string + str_address = "0x" + crawl_job_record.address.hex() - if crawl_job_record.abi_selector in existing_crawl_job_records: - # If abi_selector already exists in the records, append new address if not already listed - current_job = existing_crawl_job_records[crawl_job_record.abi_selector] - if str_address not in current_job.contract_address: - # Since it should handle a single address, we ensure not to append but update if necessary. - current_job.contract_address = str_address - else: - # Create a new FunctionCallCrawlJob record if the abi_selector is not found in existing records. - new_crawl_job = FunctionCallCrawlJob( + if str_address not in existing_crawl_job_records: + existing_crawl_job_records[str_address] = FunctionCallCrawlJob( contract_abi=[json.loads(str(crawl_job_record.abi))], - contract_address=str_address, + contract_address=Web3.toChecksumAddress(str_address), entries_tags={ UUID(str(crawl_job_record.id)): [ str(crawl_job_record.status), str(crawl_job_record.progress), - ], + ] }, created_at=int(crawl_job_record.created_at.timestamp()), + existing_selectors=[str(crawl_job_record.abi_selector)], ) - - existing_crawl_job_records[str(crawl_job_record.abi_selector)] = ( - new_crawl_job - ) + else: + if ( + crawl_job_record.abi_selector + not in existing_crawl_job_records[str_address].existing_selectors + ): + existing_crawl_job_records[str_address].contract_abi.append( + json.loads(str(crawl_job_record.abi)) + ) + existing_crawl_job_records[str_address].existing_selectors.append( + str(crawl_job_record.abi_selector) + ) return existing_crawl_job_records diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 9dd02654..07a8236e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -91,7 +91,7 @@ def _function_call_to_label( ).replace(r"\u0000", "") ) - if isinstance(blockchain_type, AvailableBlockchainType): + if db_version == 2: label = label_model( label=label_name, @@ -102,6 +102,26 @@ def _function_call_to_label( block_timestamp=function_call.block_timestamp, ) + else: + + del sanityzed_label_data["type"] + del sanityzed_label_data["name"] + + label = label_model( + label=label_name, + label_name=function_call.function_name, + label_data=sanityzed_label_data, + address=function_call.contract_address, + block_number=function_call.block_number, + block_hash=function_call.block_hash.hex(), # type: ignore + transaction_hash=function_call.transaction_hash, + block_timestamp=function_call.block_timestamp, + caller_address=function_call.caller_address, + origin_address=function_call.caller_address, + ) + + print(label) + return label @@ -278,35 +298,84 @@ def add_function_calls_to_session( if len(function_calls) == 0: return - label_model = get_label_model(blockchain_type, version=db_version) + if db_version == 2: - transactions_hashes_to_save = list( - set([function_call.transaction_hash for function_call in function_calls]) - ) + label_model = get_label_model(blockchain_type, version=db_version) - # Define a CTE VALUES expression to escape big IN clause - hashes_cte = select( - values(column("transaction_hash", String), name="hashes").data( - [(hash,) for hash in transactions_hashes_to_save] + transactions_hashes_to_save = list( + set([function_call.transaction_hash for function_call in function_calls]) ) - ).cte() - # Retrieve existing transaction hashes - query = db_session.query( - label_model.transaction_hash.label("transaction_hash") - ).filter( - label_model.label == label_name, - label_model.log_index.is_(None), - exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash), - ) + # Define a CTE VALUES expression to escape big IN clause + hashes_cte = select( + values(column("transaction_hash", String), name="hashes").data( + [(hash,) for hash in transactions_hashes_to_save] + ) + ).cte() - existing_tx_hashes = [row.transaction_hash for row in query] + # Retrieve existing transaction hashes + query = db_session.query( + label_model.transaction_hash.label("transaction_hash") + ).filter( + label_model.label == label_name, + label_model.log_index.is_(None), + exists().where( + label_model.transaction_hash == hashes_cte.c.transaction_hash + ), + ) - labels_to_save = [ - _function_call_to_label(blockchain_type, function_call) - for function_call in function_calls - if function_call.transaction_hash not in existing_tx_hashes - ] + existing_tx_hashes = [row.transaction_hash for row in query] - logger.info(f"Saving {len(labels_to_save)} labels to session") - db_session.add_all(labels_to_save) + labels_to_save = [ + _function_call_to_label(blockchain_type, function_call) + for function_call in function_calls + if function_call.transaction_hash not in existing_tx_hashes + ] + + logger.info(f"Saving {len(labels_to_save)} labels to session") + db_session.add_all(labels_to_save) + + else: + + label_model = get_label_model(blockchain_type, version=db_version) + + # Define the table name and columns based on the blockchain type + table = label_model.__table__ + + # Create a list of dictionaries representing new records + records = [] + for function_call in function_calls: + + label_function_call = _function_call_to_label( + blockchain_type, function_call, db_version + ) + + record = { + "label": label_function_call.label, + "transaction_hash": label_function_call.transaction_hash, + "log_index": None, + "block_number": label_function_call.block_number, + "block_hash": None, + "block_timestamp": label_function_call.block_timestamp, + "caller_address": label_function_call.label_data["caller"], + "origin_address": None, + "address": label_function_call.address, + "label_name": label_function_call.label_name, + "label_type": "tx_call", + "label_data": label_function_call.label_data, + } + + records.append(record) + + # Insert records using a single batched query with an ON CONFLICT clause + statement = insert(table).values(records) + do_nothing_statement = statement.on_conflict_do_nothing( + index_elements=["transaction_hash", "log_index"], + index_where=(table.c.label == "seer") & (table.c.label_type == "tx_call"), + ) + + db_session.execute(do_nothing_statement) + + logger.info( + f"Batch inserted {len(records)} function call labels into {table.name}" + ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py index 43f99cec..f230938e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/function_call_crawler.py @@ -43,6 +43,7 @@ def _crawl_functions( "function_call", blockchain_type ), ) + print(f"Processing job {function_call_crawler.whitelisted_methods}") function_call_crawler.crawl( from_block, to_block,