diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index f8b565e4..9fd5bba7 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -58,6 +58,10 @@ MUMBAI_SYNCHRONIZE_SERVICE="mumbai-synchronize.service" MUMBAI_MISSING_SERVICE_FILE="mumbai-missing.service" MUMBAI_MISSING_TIMER_FILE="mumbai-missing.timer" MUMBAI_MOONWORM_CRAWLER_SERVICE_FILE="mumbai-moonworm-crawler.service" +MUMBAI_STATE_SERVICE_FILE="mumbai-state.service" +MUMBAI_STATE_TIMER_FILE="mumbai-state.timer" +MUMBAI_STATE_CLEAN_SERVICE_FILE="mumbai-state-clean.service" +MUMBAI_STATE_CLEAN_TIMER_FILE="mumbai-state-clean.timer" # XDai service files XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service" @@ -289,3 +293,23 @@ cp "${SCRIPT_DIR}/${POLYGON_CU_REPORTS_TOKENONOMICS_SERVICE_FILE}" "/etc/systemd cp "${SCRIPT_DIR}/${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}" "/etc/systemd/system/${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}" systemctl daemon-reload systemctl restart --no-block "${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}" + + + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI state service and timer with: ${MUMBAI_STATE_SERVICE_FILE}, ${MUMBAI_STATE_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_STATE_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_STATE_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_STATE_SERVICE_FILE}" "/etc/systemd/system/${MUMBAI_STATE_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_STATE_TIMER_FILE}" "/etc/systemd/system/${MUMBAI_STATE_TIMER_FILE}" +systemctl daemon-reload +systemctl restart --no-block "${MUMBAI_STATE_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI state clean service and timer with: ${MUMBAI_STATE_CLEAN_SERVICE_FILE}, ${MUMBAI_STATE_CLEAN_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_SERVICE_FILE}" "/etc/systemd/system/${MUMBAI_STATE_CLEAN_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_TIMER_FILE}" "/etc/systemd/system/${MUMBAI_STATE_CLEAN_TIMER_FILE}" +systemctl daemon-reload +systemctl restart --no-block "${MUMBAI_STATE_CLEAN_TIMER_FILE}" \ No newline at end of file diff --git a/crawlers/deploy/mumbai-state-clean.service b/crawlers/deploy/mumbai-state-clean.service new file mode 100644 index 00000000..84fd553d --- /dev/null +++ b/crawlers/deploy/mumbai-state-clean.service @@ -0,0 +1,13 @@ +[Unit] +Description=Execute state clean labels crawler +After=network.target + +[Service] +Type=oneshot +User=ubuntu +Group=www-data +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" clean-state-labels --blockchain mumbai -N 10000 +CPUWeight=60 +SyslogIdentifier=mumbai-state-clean diff --git a/crawlers/deploy/mumbai-state-clean.timer b/crawlers/deploy/mumbai-state-clean.timer new file mode 100644 index 00000000..e29cd6a8 --- /dev/null +++ b/crawlers/deploy/mumbai-state-clean.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Execute Mumbai state clean labels crawler each 25m + +[Timer] +OnBootSec=50s +OnUnitActiveSec=25m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/mumbai-state.service b/crawlers/deploy/mumbai-state.service new file mode 100644 index 00000000..8493b385 --- /dev/null +++ b/crawlers/deploy/mumbai-state.service @@ -0,0 +1,13 @@ +[Unit] +Description=Execute state crawler +After=network.target + +[Service] +Type=oneshot +User=ubuntu +Group=www-data +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +CPUWeight=60 +SyslogIdentifier=mumbai-state diff --git a/crawlers/deploy/mumbai-state.timer b/crawlers/deploy/mumbai-state.timer new file mode 100644 index 00000000..48e17e68 --- /dev/null +++ b/crawlers/deploy/mumbai-state.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Execute Mumbai state crawler each 10m + +[Timer] +OnBootSec=15s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/polygon-state.service b/crawlers/deploy/polygon-state.service index 46ddd88e..1effefba 100644 --- a/crawlers/deploy/polygon-state.service +++ b/crawlers/deploy/polygon-state.service @@ -8,6 +8,6 @@ User=ubuntu Group=www-data WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json CPUWeight=60 SyslogIdentifier=polygon-state diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 66e50dd6..11e654c5 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -210,3 +210,24 @@ if MOONSTREAM_S3_PUBLIC_DATA_BUCKET == "": MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX = os.environ.get( "MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX", "dev" ) + + +# infura config + + +INFURA_PROJECT_ID = os.environ.get("INFURA_PROJECT_ID") + +infura_networks = { + AvailableBlockchainType.ETHEREUM: { + "name": "mainnet", + "url": f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}", + }, + AvailableBlockchainType.POLYGON: { + "name": "polygon", + "url": f"https://polygon-mainnet.infura.io/v3/{INFURA_PROJECT_ID}", + }, + AvailableBlockchainType.MUMBAI: { + "name": "mumbai", + "url": f"https://polygon-mumbai.infura.io/v3/{INFURA_PROJECT_ID}", + }, +} diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index b3b70842..a464ffe2 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -1,10 +1,14 @@ import argparse +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures._base import TimeoutError import json import hashlib import itertools import logging from typing import Dict, List, Any, Optional from uuid import UUID +import time +from pprint import pprint from moonstreamdb.blockchain import AvailableBlockchainType from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 @@ -13,24 +17,27 @@ from moonstreamdb.db import ( MOONSTREAM_POOL_SIZE, create_moonstream_engine, ) +import requests from sqlalchemy.orm import sessionmaker +from web3._utils.request import cache_session +from web3 import Web3, HTTPProvider +from web3.middleware import geth_poa_middleware from .db import view_call_to_label, commit_session, clean_labels from .Multicall2_interface import Contract as Multicall2 from ..settings import ( NB_CONTROLLER_ACCESS_ID, MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, + INFURA_PROJECT_ID, multicall_contracts, + infura_networks, ) -from .web3_util import FunctionSignature +from .web3_util import FunctionSignature, connect logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -Multicall2_address = "0xc8E51042792d7405184DfCa245F2d27B94D013b6" - - def make_multicall( multicall_method: Any, calls: List[Any], @@ -38,13 +45,20 @@ def make_multicall( block_number: str = "latest", ) -> Any: - multicall_calls = [ - ( - call["address"], - call["method"].encode_data(call["inputs"]).hex(), - ) - for call in calls - ] + multicall_calls = [] + + for call in calls: + try: + multicall_calls.append( + ( + call["address"], + call["method"].encode_data(call["inputs"]).hex(), + ) + ) + except Exception as e: + logger.error( + f'Error encoding data for method {call["method"].name} call: {call}' + ) multicall_result = multicall_method(False, calls=multicall_calls).call( block_identifier=block_number @@ -54,10 +68,41 @@ def make_multicall( # Handle the case with not successful calls for index, encoded_data in enumerate(multicall_result): - if encoded_data[0]: + try: + if encoded_data[0]: + results.append( + { + "result": calls[index]["method"].decode_data(encoded_data[1]), + "hash": calls[index]["hash"], + "method": calls[index]["method"], + "address": calls[index]["address"], + "name": calls[index]["method"].name, + "inputs": calls[index]["inputs"], + "call_data": multicall_calls[index][1], + "block_number": block_number, + "block_timestamp": block_timestamp, + "status": encoded_data[0], + } + ) + else: + results.append( + { + "result": calls[index]["method"].decode_data(encoded_data[1]), + "hash": calls[index]["hash"], + "method": calls[index]["method"], + "address": calls[index]["address"], + "name": calls[index]["method"].name, + "inputs": calls[index]["inputs"], + "call_data": multicall_calls[index][1], + "block_number": block_number, + "block_timestamp": block_timestamp, + "status": encoded_data[0], + } + ) + except Exception as e: results.append( { - "result": calls[index]["method"].decode_data(encoded_data[1])[0], + "result": str(encoded_data[1]), "hash": calls[index]["hash"], "method": calls[index]["method"], "address": calls[index]["address"], @@ -67,27 +112,21 @@ def make_multicall( "block_number": block_number, "block_timestamp": block_timestamp, "status": encoded_data[0], + "error": str(e), } ) - else: - results.append( - { - "result": calls[index]["method"].decode_data(encoded_data[1]), - "hash": calls[index]["hash"], - "method": calls[index]["method"], - "address": calls[index]["address"], - "name": calls[index]["method"].name, - "inputs": calls[index]["inputs"], - "call_data": multicall_calls[index][1], - "block_number": block_number, - "block_timestamp": block_timestamp, - "status": encoded_data[0], - } + + logger.error( + f"Error decoding data for for method {call['method'].name} call {calls[index]}: {e}." ) + # data is not decoded, return the encoded data + logger.error(f"Encoded data: {encoded_data}") + return results def crawl_calls_level( + web3_client, db_session, calls, responces, @@ -98,13 +137,20 @@ def crawl_calls_level( block_number, blockchain_type, block_timestamp, + max_batch_size=5000, + min_batch_size=4, ): calls_of_level = [] for call in calls: + + if call["generated_hash"] in responces: + continue parameters = [] + logger.info(f"Call: {json.dumps(call, indent=4)}") + for input in call["inputs"]: if type(input["value"]) in (str, int): @@ -114,9 +160,10 @@ def crawl_calls_level( if ( contracts_ABIs[call["address"]][input["value"]]["name"] == "totalSupply" - ): + ): # hack for totalSupply TODO(Andrey): need add propper support for response parsing + print(responces[input["value"]][0]) parameters.append( - list(range(1, responces[input["value"]][0] + 1)) + list(range(1, responces[input["value"]][0][0] + 1)) ) else: parameters.append(responces[input["value"]]) @@ -126,6 +173,10 @@ def crawl_calls_level( raise for call_parameters in itertools.product(*parameters): + + # hack for tuples product + if len(call_parameters) == 1 and type(call_parameters[0]) == tuple: + call_parameters = call_parameters[0] calls_of_level.append( { "address": call["address"], @@ -137,22 +188,55 @@ def crawl_calls_level( } ) - for call_chunk in [ - calls_of_level[i : i + batch_size] - for i in range(0, len(calls_of_level), batch_size) - ]: + retry = 0 - while True: - try: - make_multicall_result = make_multicall( - multicall_method=multicall_method, - calls=call_chunk, - block_number=block_number, - block_timestamp=block_timestamp, + while len(calls_of_level) > 0: + + make_multicall_result = [] + try: + + call_chunk = calls_of_level[:batch_size] + + logger.info( + f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}" + ) + + # 1 thead with timeout for hung multicall calls + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit( + make_multicall, + multicall_method, + call_chunk, + block_timestamp, + block_number, ) - break - except ValueError: - continue + make_multicall_result = future.result(timeout=20) + logger.info( + f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}" + ) + retry = 0 + calls_of_level = calls_of_level[batch_size:] + logger.info(f"lenght of task left {len(calls_of_level)}.") + batch_size = min(batch_size * 2, max_batch_size) + except ValueError as e: # missing trie node + logger.error(f"ValueError: {e}, retrying") + retry += 1 + if "missing trie node" in str(e): + time.sleep(4) + if retry > 5: + raise (e) + batch_size = max(batch_size // 3, min_batch_size) + except TimeoutError as e: # timeout + logger.error(f"TimeoutError: {e}, retrying") + retry += 1 + if retry > 5: + raise (e) + batch_size = max(batch_size // 3, min_batch_size) + except Exception as e: + logger.error(f"Exception: {e}") + raise (e) + time.sleep(2) + print(f"retry: {retry}") # results parsing and writing to database add_to_session_count = 0 for result in make_multicall_result: @@ -167,10 +251,13 @@ def crawl_calls_level( commit_session(db_session) logger.info(f"{add_to_session_count} labels commit to database.") + return batch_size + def parse_jobs( jobs: List[Any], blockchain_type: AvailableBlockchainType, + web3_provider_uri: Optional[str], block_number: Optional[int], batch_size: int, access_id: UUID, @@ -183,9 +270,26 @@ def parse_jobs( contracts_methods: Dict[str, Any] = {} calls: Dict[int, Any] = {0: []} - web3_client = _retry_connect_web3( - blockchain_type=blockchain_type, access_id=access_id - ) + if web3_provider_uri is not None: + try: + logger.info( + f"Connecting to blockchain: {blockchain_type} with custom provider!" + ) + + web3_client = connect(web3_provider_uri) + + if blockchain_type != AvailableBlockchainType.ETHEREUM: + web3_client.middleware_onion.inject(geth_poa_middleware, layer=0) + except Exception as e: + logger.error( + f"Web3 connection to custom provider {web3_provider_uri} failed error: {e}" + ) + raise (e) + else: + logger.info(f"Connecting to blockchain: {blockchain_type} with Node balancer.") + web3_client = _retry_connect_web3( + blockchain_type=blockchain_type, access_id=access_id + ) logger.info(f"Crawler started connected to blockchain: {blockchain_type}") @@ -229,7 +333,11 @@ def parse_jobs( have_subcalls = True abi["inputs"].append(input) abi["address"] = method_abi["address"] - generated_hash = hashlib.md5(json.dumps(abi).encode("utf-8")).hexdigest() + generated_hash = hashlib.md5( + json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode( + "utf-8" + ) + ).hexdigest() abi["generated_hash"] = generated_hash if have_subcalls: @@ -297,8 +405,10 @@ def parse_jobs( # initial call of level 0 all call without subcalls directly moved there logger.info("Crawl level: 0") logger.info(f"Jobs amount: {len(calls[0])}") + logger.info(f"call_tree_levels: {call_tree_levels}") - crawl_calls_level( + batch_size = crawl_calls_level( + web3_client, db_session, calls[0], responces, @@ -316,7 +426,8 @@ def parse_jobs( logger.info(f"Crawl level: {level}") logger.info(f"Jobs amount: {len(calls[level])}") - crawl_calls_level( + batch_size = crawl_calls_level( + web3_client, db_session, calls[level], responces, @@ -341,38 +452,28 @@ def handle_crawl(args: argparse.Namespace) -> None: Read all view methods of the contracts and crawl """ - my_job = { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256", - } - ], - "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", - "inputs": [], - }, - } - ], - "name": "tokenURI", - "outputs": [{"internalType": "string", "name": "", "type": "string"}], - "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", - } + with open(args.jobs_file, "r") as f: + jobs = json.load(f) blockchain_type = AvailableBlockchainType(args.blockchain) + custom_web3_provider = args.custom_web3_provider + + if args.infura and INFURA_PROJECT_ID is not None: + if blockchain_type not in infura_networks: + raise ValueError( + f"Infura is not supported for {blockchain_type} blockchain type" + ) + logger.info(f"Using Infura!") + custom_web3_provider = infura_networks[blockchain_type]["url"] + parse_jobs( - [my_job], blockchain_type, args.block_number, args.batch_size, args.access_id + jobs, + blockchain_type, + custom_web3_provider, + args.block_number, + args.batch_size, + args.access_id, ) @@ -382,12 +483,12 @@ def parse_abi(args: argparse.Namespace) -> None: """ with open(args.abi_file, "r") as f: - # read json and parse only stateMutability equal to view abi = json.load(f) output_json = [] for method in abi: + # read json and parse only stateMutability equal to view if method.get("stateMutability") and method["stateMutability"] == "view": output_json.append(method) @@ -446,9 +547,27 @@ def main() -> None: help="Type of blovkchain wich writng in database", required=True, ) + view_state_crawler_parser.add_argument( + "--infura", + action="store_true", + help="Use infura as web3 provider", + ) + view_state_crawler_parser.add_argument( + "--custom-web3-provider", + "-w3", + type=str, + help="Type of blovkchain wich writng in database", + ) view_state_crawler_parser.add_argument( "--block-number", "-N", type=str, help="Block number." ) + view_state_crawler_parser.add_argument( + "--jobs-file", + "-j", + type=str, + help="Path to json file with jobs", + required=True, + ) view_state_crawler_parser.add_argument( "--batch-size", "-s", diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json new file mode 100644 index 00000000..661a6833 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json @@ -0,0 +1,35 @@ +[ + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "function", + "name": "totalSupply", + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "address": "0x39858b1A4e48CfFB1019F0A15ff54899213B3f8b", + "inputs": [] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0x39858b1A4e48CfFB1019F0A15ff54899213B3f8b" + } +] \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json new file mode 100644 index 00000000..4df3a5e8 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json @@ -0,0 +1,35 @@ +[ + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "function", + "name": "totalSupply", + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", + "inputs": [] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f" + } +] \ No newline at end of file diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 5e7e8980..74ab5f8f 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -36,4 +36,5 @@ export COINMARKETCAP_API_KEY="" # Custom crawler export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="" export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev" -export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="" \ No newline at end of file +export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="" +export INFURA_PROJECT_ID="" \ No newline at end of file diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index f0c1c219..63611b75 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -41,7 +41,7 @@ setup( "moonstream>=0.1.1", "moonworm[moonstream]>=0.5.2", "humbug", - "pydantic", + "pydantic==1.9.2", "python-dateutil", "requests", "tqdm",