diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py index c2f777b9..ee6f3d0a 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -8,10 +8,11 @@ import logging import time from datetime import datetime, timedelta from enum import Enum -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Union +from uuid import UUID import boto3 # type: ignore -from bugout.data import BugoutResources +from bugout.data import BugoutResource, BugoutResources from moonstreamdb.db import yield_db_session_ctx from sqlalchemy import Column, Date, and_, func, text from sqlalchemy.orm import Query, Session @@ -31,7 +32,7 @@ from ..settings import ( ) from ..settings import bugout_client as bc -from web3 import Web3 +from web3 import Web3, method logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -78,11 +79,11 @@ def push_statistics( subscription: Any, timescale: str, bucket: str, - hash: str, + dashboard_id: UUID, ) -> None: result_bytes = json.dumps(statistics_data).encode("utf-8") - result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' + result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json' s3 = boto3.client("s3") s3.put_object( @@ -405,6 +406,104 @@ def get_unique_address( ) +def generate_list_of_names( + type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any +): + + """ + Generate list of names for select from database by name field + """ + if read_abi: + names = [item["name"] for item in abi_json if item["type"] == type] + else: + names = [item["name"] for item in subscription_filters[type]] + + return names + + +def process_external(abi_external_calls, blockchain): + """ + Request all required external data + TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares). + """ + + extention_data = [] + + external_calls = [] + + for external_call in abi_external_calls: + try: + func_input_abi = [] + input_args = [] + for func_input in external_call["inputs"]: + func_input_abi.append( + {"name": func_input["name"], "type": func_input["type"]} + ) + input_args.append( + cast_to_python_type(func_input["type"])(func_input["value"]) + ) + + func_abi = [ + { + "name": external_call["name"], + "inputs": func_input_abi, + "outputs": external_call["outputs"], + "type": "function", + "stateMutability": "view", + } + ] + + external_calls.append( + { + "display_name": external_call["display_name"], + "address": Web3.toChecksumAddress(external_call["address"]), + "name": external_call["name"], + "abi": func_abi, + "input_args": input_args, + } + ) + except Exception as e: + print(f"Error processing external call: {e}") + + web3_client = connect(blockchain) + # { + # "type": "external_call" + # "display_name": "Total weth earned" + # "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a", + # "name": "balanceOf", + # "inputs": [ + # { + # "name": "owner", + # "type": "address" + # "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88" + # } + # ], + # "outputs": [ + # { + # "internalType": "uint256", + # "name": "", + # "type": "uint256" + # } + # } + + for extcall in external_calls: + try: + contract = web3_client.eth.contract( + address=extcall["address"], abi=extcall["abi"] + ) + response = contract.functions[extcall["name"]]( + *extcall["input_args"] + ).call() + + extention_data.append( + {"display_name": extcall["display_name"], "value": response} + ) + except Exception as e: + print(f"Failed to call {extcall['name']} error: {e}") + + return extention_data + + def stats_generate_handler(args: argparse.Namespace): """ Start crawler with generate. @@ -413,192 +512,162 @@ def stats_generate_handler(args: argparse.Namespace): with yield_db_session_ctx() as db_session: # read all subscriptions - required_subscriptions: BugoutResources = bc.list_resources( + + # ethereum_blockchain + + blockchain_type = AvailableBlockchainType(args.blockchain) + + # polygon_blockchain + dashboard_resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD}, + timeout=10, + ) + + # Create subscriptions dict for get subscriptions by id. + blockchain_subscriptions: BugoutResources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params={ "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, - "abi": "true", "subscription_type_id": subscription_id_by_blockchain[args.blockchain], }, timeout=10, ) - print(f"Subscriptions for processing: {len(required_subscriptions.resources)}") + subscription_by_id = { + blockchain_subscription.id: blockchain_subscription + for blockchain_subscription in blockchain_subscriptions.resources + } + + # print(f"Subscriptions for processing: {len(required_subscriptions.resources)}") s3_client = boto3.client("s3") - # Already processed + # # Already processed already_processed = [] - for subscription in required_subscriptions.resources: - bucket = subscription.resource_data["bucket"] - key = subscription.resource_data["s3_path"] - address = subscription.resource_data["address"] + for dashboard in dashboard_resources.resources: - print(f"Expected bucket: s3://{bucket}/{key}") + for dashboard_subscription_filters in dashboard.resource_data[ + "dashboard_subscriptions" + ]: - abi = s3_client.get_object( - Bucket=bucket, - Key=key, - ) - abi_json = json.loads(abi["Body"].read()) + subscription_id = dashboard_subscription_filters["subscription_id"] - abi_string = json.dumps(abi_json, sort_keys=True, indent=2) + if subscription_id not in subscription_by_id: + # Meen it's are different blockchain type + continue - hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest() + s3_data_object = {} - if f"{address}/{hash}" in already_processed: - continue + extention_data = [] - s3_data_object = {} + address = subscription_by_id[subscription_id].resource_data["address"] - abi_functions = [item for item in abi_json if item["type"] == "function"] - abi_events = [item for item in abi_json if item["type"] == "event"] + generic = dashboard_subscription_filters["generic"] - abi_external_calls = [ - item for item in abi_json if item["type"] == "external_call" - ] + if not subscription_by_id[subscription_id].resource_data["abi"]: - external_calls = [] + methods = [] + events = [] - for external_call in abi_external_calls: - try: - func_input_abi = [] - input_args = [] - for func_input in external_call["inputs"]: - func_input_abi.append( - {"name": func_input["name"], "type": func_input["type"]} - ) - input_args.append( - cast_to_python_type(func_input["type"])(func_input["value"]) - ) + else: - func_abi = [ - { - "name": external_call["name"], - "inputs": func_input_abi, - "outputs": external_call["outputs"], - "type": "function", - "stateMutability": "view", - } + bucket = subscription_by_id[subscription_id].resource_data["bucket"] + key = subscription_by_id[subscription_id].resource_data["s3_path"] + + abi = s3_client.get_object( + Bucket=bucket, + Key=key, + ) + abi_json = json.loads(abi["Body"].read()) + + methods = generate_list_of_names( + type="methods", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters.all_methods, + abi_json=abi_json, + ) + + events = generate_list_of_names( + type="events", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters.all_methods, + abi_json=abi_json, + ) + + abi_external_calls = [ + item for item in abi_json if item["type"] == "external_call" ] - external_calls.append( - { - "display_name": external_call["display_name"], - "address": Web3.toChecksumAddress(external_call["address"]), - "name": external_call["name"], - "abi": func_abi, - "input_args": input_args, - } + extention_data = process_external( + abi_external_calls=abi_external_calls, + blockchain=blockchain_type, ) - except Exception as e: - print(f"Error processing external call: {e}") - web3_client = connect(blockchain_type) - # { - # "type": "external_call" - # "display_name": "Total weth earned" - # "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a", - # "name": "balanceOf", - # "inputs": [ - # { - # "name": "owner", - # "type": "address" - # "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88" - # } - # ], - # "outputs": [ - # { - # "internalType": "uint256", - # "name": "", - # "type": "uint256" - # } - # } + extention_data.append( + { + "display_name": "Overall unique token owners.", + "value": get_unique_address( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + ), + } + ) - extention_data = [] - for extcall in external_calls: - try: - contract = web3_client.eth.contract( - address=extcall["address"], abi=extcall["abi"] + for timescale in [timescale.value for timescale in TimeScale]: + + start_date = ( + datetime.utcnow() - timescales_delta[timescale]["timedelta"] ) - response = contract.functions[extcall["name"]]( - *extcall["input_args"] - ).call() - extention_data.append( - {"display_name": extcall["display_name"], "value": response} - ) - except Exception as e: - print(f"Failed to call {extcall['name']} error: {e}") + print(f"Timescale: {timescale}") - extention_data.append( - { - "display_name": "Overall unique token owners.", - "value": get_unique_address( + s3_data_object["web3_metric"] = extention_data + + functions_calls_data = generate_data( db_session=db_session, blockchain_type=blockchain_type, address=address, - ), - } - ) + timescale=timescale, + functions=methods, + start=start_date, + metric_type="tx_call", + ) - for timescale in [timescale.value for timescale in TimeScale]: + s3_data_object["functions"] = functions_calls_data + # generate data - start_date = ( - datetime.utcnow() - timescales_delta[timescale]["timedelta"] - ) + events_data = generate_data( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + functions=events, + start=start_date, + metric_type="event", + ) - print(f"Timescale: {timescale}") + s3_data_object["events"] = events_data - s3_data_object["web3_metric"] = extention_data + s3_data_object["generic"] = generate_metrics( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + metrics=generic, + start=start_date, + ) - abi_functions_names = [item["name"] for item in abi_functions] - - functions_calls_data = generate_data( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - functions=abi_functions_names, - start=start_date, - metric_type="tx_call", - ) - - s3_data_object["functions"] = functions_calls_data - # generate data - - abi_events_names = [item["name"] for item in abi_events] - - events_data = generate_data( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - functions=abi_events_names, - start=start_date, - metric_type="event", - ) - - s3_data_object["events"] = events_data - - s3_data_object["generic"] = generate_metrics( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - metrics=abi_events_names, - start=start_date, - ) - - push_statistics( - statistics_data=s3_data_object, - subscription=subscription, - timescale=timescale, - bucket=bucket, - hash=hash, - ) - already_processed.append(f"{address}/{hash}") + push_statistics( + statistics_data=s3_data_object, + subscription=subscription_by_id[subscription_id], + timescale=timescale, + bucket=bucket, + dashboard_id=dashboard.id, + ) + already_processed.append(f"{address}/{hash}") def main() -> None: