From 67de69727053433c3fff5565896470230868a789 Mon Sep 17 00:00:00 2001 From: Andrey Date: Sat, 1 Jul 2023 14:28:35 +0300 Subject: [PATCH 1/6] Add queryAPI job type. --- crawlers/deploy/mumbai-state.service | 2 +- crawlers/deploy/polygon-state.service | 2 +- crawlers/mooncrawl/mooncrawl/actions.py | 70 +++++++++++ .../mooncrawl/reports_crawler/cli.py | 58 +-------- .../mooncrawl/mooncrawl/state_crawler/cli.py | 112 +++++++++++++++++- .../state_crawler/jobs/mumbai-jobs.json | 20 ++-- 6 files changed, 193 insertions(+), 71 deletions(-) diff --git a/crawlers/deploy/mumbai-state.service b/crawlers/deploy/mumbai-state.service index 121f8976..c27b8ca8 100644 --- a/crawlers/deploy/mumbai-state.service +++ b/crawlers/deploy/mumbai-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json CPUWeight=60 SyslogIdentifier=mumbai-state \ No newline at end of file diff --git a/crawlers/deploy/polygon-state.service b/crawlers/deploy/polygon-state.service index 04ace9d0..9d307971 100644 --- a/crawlers/deploy/polygon-state.service +++ b/crawlers/deploy/polygon-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json CPUWeight=60 SyslogIdentifier=polygon-state \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 5211ee51..7f987654 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -1,14 +1,19 @@ from collections import OrderedDict +from datetime import datetime import hashlib import json import logging +import time from typing import Any, Dict, Optional, Union import uuid + from bugout.data import ( BugoutResources, ) from bugout.exceptions import BugoutResponseException +from moonstream.client import Moonstream, ENDPOINT_QUERIES, MoonstreamQueryResultUrl # type: ignore +import requests # type: ignore from .middleware import MoonstreamHTTPException from .settings import bugout_client as bc @@ -101,3 +106,68 @@ def get_entity_subscription_collection_id( else: resource = resources.resources[0] return resource.resource_data["collection_id"] + + +def recive_S3_data_from_query( + client: Moonstream, + token: Union[str, uuid.UUID], + query_name: str, + params: Dict[str, Any] = {}, + time_await: int = 2, + max_retries: int = 30, + custom_body: Optional[Dict[str, Any]] = None, +) -> Any: + """ + Await the query to be update data on S3 with if_modified_since and return new the data. + """ + + keep_going = True + + repeat = 0 + + if_modified_since_datetime = datetime.utcnow() + if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") + + time.sleep(2) + if custom_body: + headers = { + "Authorization": f"Bearer {token}", + } + json = custom_body + + response = requests.post( + url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", + headers=headers, + json=json, + timeout=5, + ) + data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) + print(data_url) + else: + data_url = client.exec_query( + token=token, + name=query_name, + params=params, + ) # S3 presign_url + + while keep_going: + time.sleep(time_await) + try: + data_response = requests.get( + data_url.url, + headers={"If-Modified-Since": if_modified_since}, + timeout=5, + ) + except Exception as e: + logger.error(e) + continue + + if data_response.status_code == 200: + break + + repeat += 1 + + if repeat > max_retries: + logger.info("Too many retries") + break + return data_response.json() diff --git a/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py index 32566ff3..61487215 100644 --- a/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/reports_crawler/cli.py @@ -1,19 +1,19 @@ import argparse import csv import datetime +import json import logging from io import StringIO from moonstream.client import Moonstream # type: ignore -import time import requests # type: ignore -import json +import time +from uuid import UUID from typing import Any, Dict, Union -from uuid import UUID from .queries import tokenomics_queries, cu_bank_queries, tokenomics_orange_dao_queries - +from ..actions import recive_S3_data_from_query from ..settings import ( MOONSTREAM_S3_PUBLIC_DATA_BUCKET, MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX, @@ -34,56 +34,6 @@ addresess_erc20_721 = { addresess_erc1155 = ["0x99A558BDBdE247C2B2716f0D4cFb0E246DFB697D"] -def recive_S3_data_from_query( - client: Moonstream, - token: Union[str, UUID], - query_name: str, - params: Dict[str, Any], - time_await: int = 2, - max_retries: int = 30, -) -> Any: - """ - Await the query to be update data on S3 with if_modified_since and return new the data. - """ - - keep_going = True - - repeat = 0 - - if_modified_since_datetime = datetime.datetime.utcnow() - if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") - - time.sleep(2) - - data_url = client.exec_query( - token=token, - name=query_name, - params=params, - ) # S3 presign_url - - while keep_going: - time.sleep(time_await) - try: - data_response = requests.get( - data_url.url, - headers={"If-Modified-Since": if_modified_since}, - timeout=5, - ) - except Exception as e: - logger.error(e) - continue - - if data_response.status_code == 200: - break - - repeat += 1 - - if repeat > max_retries: - logger.info("Too many retries") - break - return data_response.json() - - def generate_report( client: Moonstream, token: Union[str, UUID], diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 903632bc..5c231fce 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -11,14 +11,17 @@ from typing import Any, Dict, List, Optional from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType +from moonstream.client import Moonstream # type: ignore from web3._utils.request import cache_session from web3.middleware import geth_poa_middleware from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 +from ..actions import recive_S3_data_from_query from ..db import PrePing_SessionLocal from ..settings import ( INFURA_PROJECT_ID, + MOONSTREAM_ADMIN_ACCESS_TOKEN, NB_CONTROLLER_ACCESS_ID, infura_networks, multicall_contracts, @@ -31,6 +34,67 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +client = Moonstream() + + +def execute_query(query: Dict[str, Any], token: str): + """ + Format of that query is: + { + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "mumbai", + "params": { + "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C" + }, + "keys": [ + "token_id" + ] + } + """ + + # get the query url + query_url = query["query_url"] + + # get the blockchain + blockchain = query.get("blockchain") + + # get the parameters + params = query["params"] + + body = {"params": params} + + if blockchain: + body["blockchain"] = blockchain + + # run query template via moonstream query API + + data = recive_S3_data_from_query( + client=client, + token=token, + query_name=query_url, + custom_body=body, + ) + + # extract the keys as a list + + keys = query["keys"] + + # extract the values from the data + + data = data["data"] + + if len(data) == 0: + return [] + + result = [] + + for item in data: + result.append(tuple([item[key] for key in keys])) + + return result + + def make_multicall( multicall_method: Any, calls: List[Any], @@ -143,6 +207,8 @@ def crawl_calls_level( for input in call["inputs"]: if type(input["value"]) in (str, int): + print(input["value"]) + print(responces.keys()) if input["value"] not in responces: parameters.append([input["value"]]) else: @@ -246,6 +312,7 @@ def parse_jobs( block_number: Optional[int], batch_size: int, access_id: UUID, + moonstream_token: str, ): """ Parse jobs from list and generate web3 interfaces for each contract. @@ -254,11 +321,12 @@ def parse_jobs( contracts_ABIs: Dict[str, Any] = {} contracts_methods: Dict[str, Any] = {} calls: Dict[int, Any] = {0: []} + responces: Dict[str, Any] = {} if web3_provider_uri is not None: try: logger.info( - f"Connecting to blockchain: {blockchain_type} with custom provider!" + f"Connecting to blockchain : {blockchain_type} with custom provider!" ) web3_client = connect(web3_provider_uri) @@ -297,6 +365,30 @@ def parse_jobs( """ have_subcalls = False + ### we add queryAPI to that tree + + if method_abi["type"] == "queryAPI": + # make queryAPI call + + responce = execute_query(method_abi, token=moonstream_token) + + # generate hash for queryAPI call + + generated_hash = hashlib.md5( + json.dumps( + method_abi, + sort_keys=True, + indent=4, + separators=(",", ": "), + ).encode("utf-8") + ).hexdigest() + + # add responce to responces + + responces[generated_hash] = responce + + return generated_hash + abi = { "inputs": [], "outputs": method_abi["outputs"], @@ -306,7 +398,10 @@ def parse_jobs( } for input in method_abi["inputs"]: - if type(input["value"]) in (str, int, list): + if type(input["value"]) in (int, list): + abi["inputs"].append(input) + + elif type(input["value"]) == str: abi["inputs"].append(input) elif type(input["value"]) == dict: @@ -315,6 +410,9 @@ def parse_jobs( # replace defenition by hash pointing to the result of the recursive_unpack input["value"] = hash_link have_subcalls = True + elif input["value"]["type"] == "queryAPI": + input["value"] = recursive_unpack(input["value"], level + 1) + have_subcalls = True abi["inputs"].append(input) abi["address"] = method_abi["address"] generated_hash = hashlib.md5( @@ -368,8 +466,6 @@ def parse_jobs( address=web3_client.toChecksumAddress(contract_address), abi=abis ) - responces: Dict[str, Any] = {} - # reverse call_tree call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] @@ -447,6 +543,7 @@ def handle_crawl(args: argparse.Namespace) -> None: args.block_number, args.batch_size, args.access_id, + args.moonstream_token, ) @@ -505,6 +602,13 @@ def main() -> None: "crawl-jobs", help="continuous crawling the view methods from job structure", # TODO(ANDREY): move tasks to journal ) + view_state_crawler_parser.add_argument( + "--moonstream-token", + "-t", + type=str, + help="Moonstream token", + required=True, + ) view_state_crawler_parser.add_argument( "--blockchain", "-b", diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json index a2f95956..6de0ccc9 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json @@ -41,17 +41,15 @@ "name": "tokenId", "type": "uint256", "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C", - "inputs": [] + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "mumbai", + "params": { + "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C" + }, + "keys": [ + "token_id" + ] } } ], From fc168a619e23f71fbd98fc3227a4052289f19f35 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jul 2023 14:32:44 +0300 Subject: [PATCH 2/6] Add changes. --- crawlers/mooncrawl/mooncrawl/actions.py | 2 ++ crawlers/mooncrawl/mooncrawl/state_crawler/cli.py | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 7f987654..d5e89413 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -133,6 +133,7 @@ def recive_S3_data_from_query( headers = { "Authorization": f"Bearer {token}", } + print(headers) json = custom_body response = requests.post( @@ -141,6 +142,7 @@ def recive_S3_data_from_query( json=json, timeout=5, ) + print(response.text) data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) print(data_url) else: diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 5c231fce..e213067c 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -30,6 +30,8 @@ from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature, connect +import traceback + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -112,6 +114,8 @@ def make_multicall( ) ) except Exception as e: + traceback.print_exc() + breakpoint() logger.error( f'Error encoding data for method {call["method"].name} call: {call}' ) @@ -123,6 +127,7 @@ def make_multicall( results = [] # Handle the case with not successful calls + print("multicall_result") for index, encoded_data in enumerate(multicall_result): try: if encoded_data[0]: @@ -156,6 +161,8 @@ def make_multicall( } ) except Exception as e: + traceback.print_exc() + breakpoint() results.append( { "result": str(encoded_data[1]), @@ -212,7 +219,7 @@ def crawl_calls_level( if input["value"] not in responces: parameters.append([input["value"]]) else: - if ( + if input["value"] in contracts_ABIs[call["address"]] and ( contracts_ABIs[call["address"]][input["value"]]["name"] == "totalSupply" ): # hack for totalSupply TODO(Andrey): need add propper support for response parsing From 41ef7eaa61eb8d38325e1cbc68e300a35e3e6319 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jul 2023 14:38:03 +0300 Subject: [PATCH 3/6] Remove prints and breackpoints. --- crawlers/mooncrawl/mooncrawl/state_crawler/cli.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index e213067c..855fcf77 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -41,7 +41,8 @@ client = Moonstream() def execute_query(query: Dict[str, Any], token: str): """ - Format of that query is: + Query task example: + { "type": "queryAPI", "query_url": "template_erc721_minting", @@ -53,6 +54,7 @@ def execute_query(query: Dict[str, Any], token: str): "token_id" ] } + """ # get the query url @@ -114,8 +116,6 @@ def make_multicall( ) ) except Exception as e: - traceback.print_exc() - breakpoint() logger.error( f'Error encoding data for method {call["method"].name} call: {call}' ) @@ -127,7 +127,6 @@ def make_multicall( results = [] # Handle the case with not successful calls - print("multicall_result") for index, encoded_data in enumerate(multicall_result): try: if encoded_data[0]: @@ -161,8 +160,6 @@ def make_multicall( } ) except Exception as e: - traceback.print_exc() - breakpoint() results.append( { "result": str(encoded_data[1]), @@ -214,8 +211,6 @@ def crawl_calls_level( for input in call["inputs"]: if type(input["value"]) in (str, int): - print(input["value"]) - print(responces.keys()) if input["value"] not in responces: parameters.append([input["value"]]) else: @@ -223,7 +218,6 @@ def crawl_calls_level( contracts_ABIs[call["address"]][input["value"]]["name"] == "totalSupply" ): # hack for totalSupply TODO(Andrey): need add propper support for response parsing - print(responces[input["value"]][0]) parameters.append( list(range(1, responces[input["value"]][0][0] + 1)) ) @@ -295,7 +289,7 @@ def crawl_calls_level( logger.error(f"Exception: {e}") raise (e) time.sleep(2) - print(f"retry: {retry}") + logger.info(f"Retry: {retry}") # results parsing and writing to database add_to_session_count = 0 for result in make_multicall_result: From 93a620dd675789dc7f25b685c5ff71d2cc57f608 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jul 2023 14:42:53 +0300 Subject: [PATCH 4/6] Remove import. --- crawlers/mooncrawl/mooncrawl/state_crawler/cli.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 855fcf77..88e8f6d5 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -30,8 +30,6 @@ from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature, connect -import traceback - logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) From e329a807b6080b661e75a431c9847cf0b4e8591f Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jul 2023 14:48:41 +0300 Subject: [PATCH 5/6] Remove prints. --- crawlers/mooncrawl/mooncrawl/actions.py | 3 --- .../state_crawler/jobs/polygon-jobs.json | 20 +++++++++---------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index d5e89413..7372a547 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -133,7 +133,6 @@ def recive_S3_data_from_query( headers = { "Authorization": f"Bearer {token}", } - print(headers) json = custom_body response = requests.post( @@ -142,9 +141,7 @@ def recive_S3_data_from_query( json=json, timeout=5, ) - print(response.text) data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) - print(data_url) else: data_url = client.exec_query( token=token, diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json index c354a8d7..b17bd9ac 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json @@ -41,17 +41,15 @@ "name": "tokenId", "type": "uint256", "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8", - "inputs": [] + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "mumbai", + "params": { + "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" + }, + "keys": [ + "token_id" + ] } } ], From f7aec850deb256b79978202a0d44cdc37dc67a69 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jul 2023 14:59:46 +0300 Subject: [PATCH 6/6] Add changes. --- .../mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json index b17bd9ac..2b3bb8cd 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json @@ -43,7 +43,7 @@ "value": { "type": "queryAPI", "query_url": "template_erc721_minting", - "blockchain": "mumbai", + "blockchain": "polygon", "params": { "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" },