kopia lustrzana https://github.com/bugout-dev/moonstream
Add queryAPI job type.
rodzic
e26e324a09
commit
67de697270
|
@ -6,6 +6,6 @@ After=network.target
|
|||
Type=oneshot
|
||||
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
|
||||
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json
|
||||
CPUWeight=60
|
||||
SyslogIdentifier=mumbai-state
|
|
@ -6,6 +6,6 @@ After=network.target
|
|||
Type=oneshot
|
||||
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
|
||||
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --moonstream-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}" --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json
|
||||
CPUWeight=60
|
||||
SyslogIdentifier=polygon-state
|
|
@ -1,14 +1,19 @@
|
|||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Dict, Optional, Union
|
||||
import uuid
|
||||
|
||||
|
||||
from bugout.data import (
|
||||
BugoutResources,
|
||||
)
|
||||
from bugout.exceptions import BugoutResponseException
|
||||
from moonstream.client import Moonstream, ENDPOINT_QUERIES, MoonstreamQueryResultUrl # type: ignore
|
||||
import requests # type: ignore
|
||||
from .middleware import MoonstreamHTTPException
|
||||
from .settings import bugout_client as bc
|
||||
|
||||
|
@ -101,3 +106,68 @@ def get_entity_subscription_collection_id(
|
|||
else:
|
||||
resource = resources.resources[0]
|
||||
return resource.resource_data["collection_id"]
|
||||
|
||||
|
||||
def recive_S3_data_from_query(
|
||||
client: Moonstream,
|
||||
token: Union[str, uuid.UUID],
|
||||
query_name: str,
|
||||
params: Dict[str, Any] = {},
|
||||
time_await: int = 2,
|
||||
max_retries: int = 30,
|
||||
custom_body: Optional[Dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Await the query to be update data on S3 with if_modified_since and return new the data.
|
||||
"""
|
||||
|
||||
keep_going = True
|
||||
|
||||
repeat = 0
|
||||
|
||||
if_modified_since_datetime = datetime.utcnow()
|
||||
if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
|
||||
time.sleep(2)
|
||||
if custom_body:
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
}
|
||||
json = custom_body
|
||||
|
||||
response = requests.post(
|
||||
url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data",
|
||||
headers=headers,
|
||||
json=json,
|
||||
timeout=5,
|
||||
)
|
||||
data_url = MoonstreamQueryResultUrl(url=response.json()["url"])
|
||||
print(data_url)
|
||||
else:
|
||||
data_url = client.exec_query(
|
||||
token=token,
|
||||
name=query_name,
|
||||
params=params,
|
||||
) # S3 presign_url
|
||||
|
||||
while keep_going:
|
||||
time.sleep(time_await)
|
||||
try:
|
||||
data_response = requests.get(
|
||||
data_url.url,
|
||||
headers={"If-Modified-Since": if_modified_since},
|
||||
timeout=5,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
continue
|
||||
|
||||
if data_response.status_code == 200:
|
||||
break
|
||||
|
||||
repeat += 1
|
||||
|
||||
if repeat > max_retries:
|
||||
logger.info("Too many retries")
|
||||
break
|
||||
return data_response.json()
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
import argparse
|
||||
import csv
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
from io import StringIO
|
||||
from moonstream.client import Moonstream # type: ignore
|
||||
import time
|
||||
import requests # type: ignore
|
||||
import json
|
||||
import time
|
||||
from uuid import UUID
|
||||
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
from .queries import tokenomics_queries, cu_bank_queries, tokenomics_orange_dao_queries
|
||||
|
||||
from ..actions import recive_S3_data_from_query
|
||||
from ..settings import (
|
||||
MOONSTREAM_S3_PUBLIC_DATA_BUCKET,
|
||||
MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX,
|
||||
|
@ -34,56 +34,6 @@ addresess_erc20_721 = {
|
|||
addresess_erc1155 = ["0x99A558BDBdE247C2B2716f0D4cFb0E246DFB697D"]
|
||||
|
||||
|
||||
def recive_S3_data_from_query(
|
||||
client: Moonstream,
|
||||
token: Union[str, UUID],
|
||||
query_name: str,
|
||||
params: Dict[str, Any],
|
||||
time_await: int = 2,
|
||||
max_retries: int = 30,
|
||||
) -> Any:
|
||||
"""
|
||||
Await the query to be update data on S3 with if_modified_since and return new the data.
|
||||
"""
|
||||
|
||||
keep_going = True
|
||||
|
||||
repeat = 0
|
||||
|
||||
if_modified_since_datetime = datetime.datetime.utcnow()
|
||||
if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
data_url = client.exec_query(
|
||||
token=token,
|
||||
name=query_name,
|
||||
params=params,
|
||||
) # S3 presign_url
|
||||
|
||||
while keep_going:
|
||||
time.sleep(time_await)
|
||||
try:
|
||||
data_response = requests.get(
|
||||
data_url.url,
|
||||
headers={"If-Modified-Since": if_modified_since},
|
||||
timeout=5,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
continue
|
||||
|
||||
if data_response.status_code == 200:
|
||||
break
|
||||
|
||||
repeat += 1
|
||||
|
||||
if repeat > max_retries:
|
||||
logger.info("Too many retries")
|
||||
break
|
||||
return data_response.json()
|
||||
|
||||
|
||||
def generate_report(
|
||||
client: Moonstream,
|
||||
token: Union[str, UUID],
|
||||
|
|
|
@ -11,14 +11,17 @@ from typing import Any, Dict, List, Optional
|
|||
from uuid import UUID
|
||||
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType
|
||||
from moonstream.client import Moonstream # type: ignore
|
||||
from web3._utils.request import cache_session
|
||||
from web3.middleware import geth_poa_middleware
|
||||
|
||||
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
|
||||
|
||||
from ..actions import recive_S3_data_from_query
|
||||
from ..db import PrePing_SessionLocal
|
||||
from ..settings import (
|
||||
INFURA_PROJECT_ID,
|
||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
NB_CONTROLLER_ACCESS_ID,
|
||||
infura_networks,
|
||||
multicall_contracts,
|
||||
|
@ -31,6 +34,67 @@ logging.basicConfig(level=logging.INFO)
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
client = Moonstream()
|
||||
|
||||
|
||||
def execute_query(query: Dict[str, Any], token: str):
|
||||
"""
|
||||
Format of that query is:
|
||||
{
|
||||
"type": "queryAPI",
|
||||
"query_url": "template_erc721_minting",
|
||||
"blockchain": "mumbai",
|
||||
"params": {
|
||||
"address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C"
|
||||
},
|
||||
"keys": [
|
||||
"token_id"
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
# get the query url
|
||||
query_url = query["query_url"]
|
||||
|
||||
# get the blockchain
|
||||
blockchain = query.get("blockchain")
|
||||
|
||||
# get the parameters
|
||||
params = query["params"]
|
||||
|
||||
body = {"params": params}
|
||||
|
||||
if blockchain:
|
||||
body["blockchain"] = blockchain
|
||||
|
||||
# run query template via moonstream query API
|
||||
|
||||
data = recive_S3_data_from_query(
|
||||
client=client,
|
||||
token=token,
|
||||
query_name=query_url,
|
||||
custom_body=body,
|
||||
)
|
||||
|
||||
# extract the keys as a list
|
||||
|
||||
keys = query["keys"]
|
||||
|
||||
# extract the values from the data
|
||||
|
||||
data = data["data"]
|
||||
|
||||
if len(data) == 0:
|
||||
return []
|
||||
|
||||
result = []
|
||||
|
||||
for item in data:
|
||||
result.append(tuple([item[key] for key in keys]))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def make_multicall(
|
||||
multicall_method: Any,
|
||||
calls: List[Any],
|
||||
|
@ -143,6 +207,8 @@ def crawl_calls_level(
|
|||
|
||||
for input in call["inputs"]:
|
||||
if type(input["value"]) in (str, int):
|
||||
print(input["value"])
|
||||
print(responces.keys())
|
||||
if input["value"] not in responces:
|
||||
parameters.append([input["value"]])
|
||||
else:
|
||||
|
@ -246,6 +312,7 @@ def parse_jobs(
|
|||
block_number: Optional[int],
|
||||
batch_size: int,
|
||||
access_id: UUID,
|
||||
moonstream_token: str,
|
||||
):
|
||||
"""
|
||||
Parse jobs from list and generate web3 interfaces for each contract.
|
||||
|
@ -254,11 +321,12 @@ def parse_jobs(
|
|||
contracts_ABIs: Dict[str, Any] = {}
|
||||
contracts_methods: Dict[str, Any] = {}
|
||||
calls: Dict[int, Any] = {0: []}
|
||||
responces: Dict[str, Any] = {}
|
||||
|
||||
if web3_provider_uri is not None:
|
||||
try:
|
||||
logger.info(
|
||||
f"Connecting to blockchain: {blockchain_type} with custom provider!"
|
||||
f"Connecting to blockchain : {blockchain_type} with custom provider!"
|
||||
)
|
||||
|
||||
web3_client = connect(web3_provider_uri)
|
||||
|
@ -297,6 +365,30 @@ def parse_jobs(
|
|||
"""
|
||||
have_subcalls = False
|
||||
|
||||
### we add queryAPI to that tree
|
||||
|
||||
if method_abi["type"] == "queryAPI":
|
||||
# make queryAPI call
|
||||
|
||||
responce = execute_query(method_abi, token=moonstream_token)
|
||||
|
||||
# generate hash for queryAPI call
|
||||
|
||||
generated_hash = hashlib.md5(
|
||||
json.dumps(
|
||||
method_abi,
|
||||
sort_keys=True,
|
||||
indent=4,
|
||||
separators=(",", ": "),
|
||||
).encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
# add responce to responces
|
||||
|
||||
responces[generated_hash] = responce
|
||||
|
||||
return generated_hash
|
||||
|
||||
abi = {
|
||||
"inputs": [],
|
||||
"outputs": method_abi["outputs"],
|
||||
|
@ -306,7 +398,10 @@ def parse_jobs(
|
|||
}
|
||||
|
||||
for input in method_abi["inputs"]:
|
||||
if type(input["value"]) in (str, int, list):
|
||||
if type(input["value"]) in (int, list):
|
||||
abi["inputs"].append(input)
|
||||
|
||||
elif type(input["value"]) == str:
|
||||
abi["inputs"].append(input)
|
||||
|
||||
elif type(input["value"]) == dict:
|
||||
|
@ -315,6 +410,9 @@ def parse_jobs(
|
|||
# replace defenition by hash pointing to the result of the recursive_unpack
|
||||
input["value"] = hash_link
|
||||
have_subcalls = True
|
||||
elif input["value"]["type"] == "queryAPI":
|
||||
input["value"] = recursive_unpack(input["value"], level + 1)
|
||||
have_subcalls = True
|
||||
abi["inputs"].append(input)
|
||||
abi["address"] = method_abi["address"]
|
||||
generated_hash = hashlib.md5(
|
||||
|
@ -368,8 +466,6 @@ def parse_jobs(
|
|||
address=web3_client.toChecksumAddress(contract_address), abi=abis
|
||||
)
|
||||
|
||||
responces: Dict[str, Any] = {}
|
||||
|
||||
# reverse call_tree
|
||||
call_tree_levels = sorted(calls.keys(), reverse=True)[:-1]
|
||||
|
||||
|
@ -447,6 +543,7 @@ def handle_crawl(args: argparse.Namespace) -> None:
|
|||
args.block_number,
|
||||
args.batch_size,
|
||||
args.access_id,
|
||||
args.moonstream_token,
|
||||
)
|
||||
|
||||
|
||||
|
@ -505,6 +602,13 @@ def main() -> None:
|
|||
"crawl-jobs",
|
||||
help="continuous crawling the view methods from job structure", # TODO(ANDREY): move tasks to journal
|
||||
)
|
||||
view_state_crawler_parser.add_argument(
|
||||
"--moonstream-token",
|
||||
"-t",
|
||||
type=str,
|
||||
help="Moonstream token",
|
||||
required=True,
|
||||
)
|
||||
view_state_crawler_parser.add_argument(
|
||||
"--blockchain",
|
||||
"-b",
|
||||
|
|
|
@ -41,17 +41,15 @@
|
|||
"name": "tokenId",
|
||||
"type": "uint256",
|
||||
"value": {
|
||||
"type": "function",
|
||||
"name": "totalSupply",
|
||||
"outputs": [
|
||||
{
|
||||
"internalType": "uint256",
|
||||
"name": "",
|
||||
"type": "uint256"
|
||||
}
|
||||
],
|
||||
"address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C",
|
||||
"inputs": []
|
||||
"type": "queryAPI",
|
||||
"query_url": "template_erc721_minting",
|
||||
"blockchain": "mumbai",
|
||||
"params": {
|
||||
"address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C"
|
||||
},
|
||||
"keys": [
|
||||
"token_id"
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
|
|
Ładowanie…
Reference in New Issue