diff --git a/crawlers/deploy/mumbai-state.service b/crawlers/deploy/mumbai-state.service index 121f8976..c27b8ca8 100644 --- a/crawlers/deploy/mumbai-state.service +++ b/crawlers/deploy/mumbai-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json CPUWeight=60 SyslogIdentifier=mumbai-state \ No newline at end of file diff --git a/crawlers/deploy/polygon-state.service b/crawlers/deploy/polygon-state.service index 04ace9d0..9d307971 100644 --- a/crawlers/deploy/polygon-state.service +++ b/crawlers/deploy/polygon-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json CPUWeight=60 SyslogIdentifier=polygon-state \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 5211ee51..7372a547 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -1,14 +1,19 @@ from collections import OrderedDict +from datetime import datetime import hashlib import json import logging +import time from typing import Any, Dict, Optional, Union import uuid + from bugout.data import ( BugoutResources, ) from bugout.exceptions import BugoutResponseException +from moonstream.client import Moonstream, ENDPOINT_QUERIES, MoonstreamQueryResultUrl # type: ignore +import requests # type: ignore from .middleware import MoonstreamHTTPException from .settings import bugout_client as bc @@ -101,3 +106,67 @@ def get_entity_subscription_collection_id( else: resource = resources.resources[0] return resource.resource_data["collection_id"] + + +def recive_S3_data_from_query( + client: Moonstream, + token: Union[str, uuid.UUID], + query_name: str, + params: Dict[str, Any] = {}, + time_await: int = 2, + max_retries: int = 30, + custom_body: Optional[Dict[str, Any]] = None, +) -> Any: + """ + Await the query to be update data on S3 with if_modified_since and return new the data. + """ + + keep_going = True + + repeat = 0 + + if_modified_since_datetime = datetime.utcnow() + if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") + + time.sleep(2) + if custom_body: + headers = { + "Authorization": f"Bearer {token}", + } + json = custom_body + + response = requests.post( + url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", + headers=headers, + json=json, + timeout=5, + ) + data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) + else: + data_url = client.exec_query( + token=token, + name=query_name, + params=params, + ) # S3 presign_url + + while keep_going: + time.sleep(time_await) + try: + data_response = requests.get( + data_url.url, + headers={"If-Modified-Since": if_modified_since}, + timeout=5, + ) + except Exception as e: + logger.error(e) + continue + + if data_response.status_code == 200: + break + + repeat += 1 + + if repeat > max_retries: + logger.info("Too many retries") + break + return data_response.json() diff --git a/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py index 32566ff3..61487215 100644 --- a/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py @@ -1,19 +1,19 @@ import argparse import csv import datetime +import json import logging from io import StringIO from moonstream.client import Moonstream # type: ignore -import time import requests # type: ignore -import json +import time +from uuid import UUID from typing import Any, Dict, Union -from uuid import UUID from .queries import tokenomics_queries, cu_bank_queries, tokenomics_orange_dao_queries - +from ..actions import recive_S3_data_from_query from ..settings import ( MOONSTREAM_S3_PUBLIC_DATA_BUCKET, MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX, @@ -34,56 +34,6 @@ addresess_erc20_721 = { addresess_erc1155 = ["0x99A558BDBdE247C2B2716f0D4cFb0E246DFB697D"] -def recive_S3_data_from_query( - client: Moonstream, - token: Union[str, UUID], - query_name: str, - params: Dict[str, Any], - time_await: int = 2, - max_retries: int = 30, -) -> Any: - """ - Await the query to be update data on S3 with if_modified_since and return new the data. - """ - - keep_going = True - - repeat = 0 - - if_modified_since_datetime = datetime.datetime.utcnow() - if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") - - time.sleep(2) - - data_url = client.exec_query( - token=token, - name=query_name, - params=params, - ) # S3 presign_url - - while keep_going: - time.sleep(time_await) - try: - data_response = requests.get( - data_url.url, - headers={"If-Modified-Since": if_modified_since}, - timeout=5, - ) - except Exception as e: - logger.error(e) - continue - - if data_response.status_code == 200: - break - - repeat += 1 - - if repeat > max_retries: - logger.info("Too many retries") - break - return data_response.json() - - def generate_report( client: Moonstream, token: Union[str, UUID], diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 903632bc..88e8f6d5 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -11,14 +11,17 @@ from typing import Any, Dict, List, Optional from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType +from moonstream.client import Moonstream # type: ignore from web3._utils.request import cache_session from web3.middleware import geth_poa_middleware from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 +from ..actions import recive_S3_data_from_query from ..db import PrePing_SessionLocal from ..settings import ( INFURA_PROJECT_ID, + MOONSTREAM_ADMIN_ACCESS_TOKEN, NB_CONTROLLER_ACCESS_ID, infura_networks, multicall_contracts, @@ -31,6 +34,69 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +client = Moonstream() + + +def execute_query(query: Dict[str, Any], token: str): + """ + Query task example: + + { + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "mumbai", + "params": { + "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C" + }, + "keys": [ + "token_id" + ] + } + + """ + + # get the query url + query_url = query["query_url"] + + # get the blockchain + blockchain = query.get("blockchain") + + # get the parameters + params = query["params"] + + body = {"params": params} + + if blockchain: + body["blockchain"] = blockchain + + # run query template via moonstream query API + + data = recive_S3_data_from_query( + client=client, + token=token, + query_name=query_url, + custom_body=body, + ) + + # extract the keys as a list + + keys = query["keys"] + + # extract the values from the data + + data = data["data"] + + if len(data) == 0: + return [] + + result = [] + + for item in data: + result.append(tuple([item[key] for key in keys])) + + return result + + def make_multicall( multicall_method: Any, calls: List[Any], @@ -146,11 +212,10 @@ def crawl_calls_level( if input["value"] not in responces: parameters.append([input["value"]]) else: - if ( + if input["value"] in contracts_ABIs[call["address"]] and ( contracts_ABIs[call["address"]][input["value"]]["name"] == "totalSupply" ): # hack for totalSupply TODO(Andrey): need add propper support for response parsing - print(responces[input["value"]][0]) parameters.append( list(range(1, responces[input["value"]][0][0] + 1)) ) @@ -222,7 +287,7 @@ def crawl_calls_level( logger.error(f"Exception: {e}") raise (e) time.sleep(2) - print(f"retry: {retry}") + logger.info(f"Retry: {retry}") # results parsing and writing to database add_to_session_count = 0 for result in make_multicall_result: @@ -246,6 +311,7 @@ def parse_jobs( block_number: Optional[int], batch_size: int, access_id: UUID, + moonstream_token: str, ): """ Parse jobs from list and generate web3 interfaces for each contract. @@ -254,11 +320,12 @@ def parse_jobs( contracts_ABIs: Dict[str, Any] = {} contracts_methods: Dict[str, Any] = {} calls: Dict[int, Any] = {0: []} + responces: Dict[str, Any] = {} if web3_provider_uri is not None: try: logger.info( - f"Connecting to blockchain: {blockchain_type} with custom provider!" + f"Connecting to blockchain : {blockchain_type} with custom provider!" ) web3_client = connect(web3_provider_uri) @@ -297,6 +364,30 @@ def parse_jobs( """ have_subcalls = False + ### we add queryAPI to that tree + + if method_abi["type"] == "queryAPI": + # make queryAPI call + + responce = execute_query(method_abi, token=moonstream_token) + + # generate hash for queryAPI call + + generated_hash = hashlib.md5( + json.dumps( + method_abi, + sort_keys=True, + indent=4, + separators=(",", ": "), + ).encode("utf-8") + ).hexdigest() + + # add responce to responces + + responces[generated_hash] = responce + + return generated_hash + abi = { "inputs": [], "outputs": method_abi["outputs"], @@ -306,7 +397,10 @@ def parse_jobs( } for input in method_abi["inputs"]: - if type(input["value"]) in (str, int, list): + if type(input["value"]) in (int, list): + abi["inputs"].append(input) + + elif type(input["value"]) == str: abi["inputs"].append(input) elif type(input["value"]) == dict: @@ -315,6 +409,9 @@ def parse_jobs( # replace defenition by hash pointing to the result of the recursive_unpack input["value"] = hash_link have_subcalls = True + elif input["value"]["type"] == "queryAPI": + input["value"] = recursive_unpack(input["value"], level + 1) + have_subcalls = True abi["inputs"].append(input) abi["address"] = method_abi["address"] generated_hash = hashlib.md5( @@ -368,8 +465,6 @@ def parse_jobs( address=web3_client.toChecksumAddress(contract_address), abi=abis ) - responces: Dict[str, Any] = {} - # reverse call_tree call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] @@ -447,6 +542,7 @@ def handle_crawl(args: argparse.Namespace) -> None: args.block_number, args.batch_size, args.access_id, + args.moonstream_token, ) @@ -505,6 +601,13 @@ def main() -> None: "crawl-jobs", help="continuous crawling the view methods from job structure", # TODO(ANDREY): move tasks to journal ) + view_state_crawler_parser.add_argument( + "--moonstream-token", + "-t", + type=str, + help="Moonstream token", + required=True, + ) view_state_crawler_parser.add_argument( "--blockchain", "-b", diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json index a2f95956..6de0ccc9 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json @@ -41,17 +41,15 @@ "name": "tokenId", "type": "uint256", "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C", - "inputs": [] + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "mumbai", + "params": { + "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C" + }, + "keys": [ + "token_id" + ] } } ], diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json index c354a8d7..2b3bb8cd 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json @@ -41,17 +41,15 @@ "name": "tokenId", "type": "uint256", "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8", - "inputs": [] + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "polygon", + "params": { + "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" + }, + "keys": [ + "token_id" + ] } } ],