Remove state crawler changes.

pull/1163/head
Andrey 2025-01-09 14:38:02 +02:00
rodzic 44fa379211
commit 6b3ea9242a
1 zmienionych plików z 6 dodań i 200 usunięć

Wyświetl plik

@ -9,9 +9,9 @@ from concurrent.futures._base import TimeoutError
from pprint import pprint from pprint import pprint
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from uuid import UUID from uuid import UUID
from web3 import Web3
from moonstreamtypes.blockchain import AvailableBlockchainType # type: ignore from moonstream.client import Moonstream # type: ignore
from moonstreamtypes.blockchain import AvailableBlockchainType
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from ..actions import recive_S3_data_from_query, get_all_entries_from_search from ..actions import recive_S3_data_from_query, get_all_entries_from_search
@ -25,37 +25,19 @@ from ..settings import (
multicall_contracts, multicall_contracts,
MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
MOONSTREAM_DB_V3_CONTROLLER_API,
moonstream_client as mc,
) )
from .db import clean_labels, commit_session, view_call_to_label from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2 from .Multicall2_interface import Contract as Multicall2
from .web3_util import FunctionSignature from .web3_util import FunctionSignature
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def request_connection_string(
customer_id: str,
instance_id: int,
token: str,
user: str = "seer", # token with write access
) -> str:
"""
Request connection string from the Moonstream API.
"""
response = requests.get(
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url",
headers={"Authorization": f"Bearer {token}"},
)
response.raise_for_status() client = Moonstream()
return response.text.replace('"', "")
def execute_query(query: Dict[str, Any], token: str) -> Any: def execute_query(query: Dict[str, Any], token: str):
""" """
Query task example: Query task example:
@ -90,7 +72,7 @@ def execute_query(query: Dict[str, Any], token: str) -> Any:
# run query template via moonstream query API # run query template via moonstream query API
data = recive_S3_data_from_query( data = recive_S3_data_from_query(
client=mc, client=client,
token=token, token=token,
query_name=query_url, query_name=query_url,
custom_body=body, custom_body=body,
@ -320,182 +302,6 @@ def crawl_calls_level(
return batch_size return batch_size
def connect_to_web3(
blockchain_type: Any,
web3_provider_uri: Optional[str],
web3_uri: Optional[str],
) -> Web3:
"""Connects to the Web3 client."""
if web3_provider_uri is not None:
try:
logger.info(
f"Connecting to blockchain: {blockchain_type} with custom provider!"
)
web3_client = connect(
blockchain_type=blockchain_type, web3_uri=web3_provider_uri
)
except Exception as e:
logger.error(
f"Web3 connection to custom provider {web3_provider_uri} failed. Error: {e}"
)
raise e
else:
logger.info(f"Connecting to blockchain: {blockchain_type} with node balancer.")
web3_client = _retry_connect_web3(
blockchain_type=blockchain_type, web3_uri=web3_uri
)
logger.info(f"Crawler started connected to blockchain: {blockchain_type}")
return web3_client
def get_block_info(web3_client: Web3, block_number: Optional[int]) -> tuple:
"""Retrieves block information."""
if block_number is None:
block_number = web3_client.eth.get_block("latest").number # type: ignore
logger.info(f"Current block number: {block_number}")
block = web3_client.eth.get_block(block_number) # type: ignore
block_timestamp = block.timestamp # type: ignore
block_hash = block.hash.hex() # type: ignore
return block_number, block_timestamp, block_hash
def recursive_unpack(
method_abi: Any,
level: int,
calls: Dict[int, List[Any]],
contracts_methods: Dict[str, Any],
contracts_ABIs: Dict[str, Any],
responses: Dict[str, Any],
moonstream_token: str,
v3: bool,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
) -> str:
"""Recursively unpacks method ABIs to generate a tree of calls."""
have_subcalls = False
if method_abi["type"] == "queryAPI":
# Make queryAPI call
response = execute_query(method_abi, token=moonstream_token)
# Generate hash for queryAPI call
generated_hash = hashlib.md5(
json.dumps(
method_abi,
sort_keys=True,
indent=4,
separators=(",", ": "),
).encode("utf-8")
).hexdigest()
# Add response to responses
responses[generated_hash] = response
return generated_hash
abi = {
"inputs": [],
"outputs": method_abi["outputs"],
"name": method_abi["name"],
"type": "function",
"stateMutability": "view",
"v3": v3,
"customer_id": customer_id,
"instance_id": instance_id,
}
for input in method_abi["inputs"]:
if isinstance(input["value"], (int, list, str)):
abi["inputs"].append(input)
elif isinstance(input["value"], dict):
if input["value"]["type"] in ["function", "queryAPI"]:
hash_link = recursive_unpack(
input["value"],
level + 1,
calls,
contracts_methods,
contracts_ABIs,
responses,
moonstream_token,
v3,
customer_id,
instance_id,
)
input["value"] = hash_link
have_subcalls = True
abi["inputs"].append(input)
abi["address"] = method_abi["address"]
### drop instance_id and customer_id
# del abi["instance_id"]
# del abi["customer_id"]
generated_hash = hashlib.md5(
json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode(
"utf-8"
)
).hexdigest()
abi["generated_hash"] = generated_hash
if have_subcalls:
level += 1
calls.setdefault(level, []).append(abi)
else:
level = 0
calls.setdefault(level, []).append(abi)
contracts_methods.setdefault(method_abi["address"], [])
if abi["name"] not in contracts_methods[method_abi["address"]]:
### lets try to deduplicate by method name
contracts_methods[method_abi["address"]].append(abi["name"])
contracts_ABIs.setdefault(method_abi["address"], {})
contracts_ABIs[method_abi["address"]][abi["name"]] = abi
return generated_hash
def build_interfaces(
contracts_ABIs: Dict[str, Any], contracts_methods: Dict[str, Any], web3_client: Web3
) -> Dict[str, Any]:
"""Builds contract interfaces."""
interfaces = {}
for contract_address in contracts_ABIs:
abis = [
contracts_ABIs[contract_address][method_name]
for method_name in contracts_methods[contract_address]
]
try:
interfaces[contract_address] = web3_client.eth.contract(
address=web3_client.toChecksumAddress(contract_address), abi=abis
)
except Exception as e:
logger.error(f"Failed to connect to contract {contract_address}: {e}")
continue
return interfaces
def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[str]:
"""Processes the address field of a job and returns a list of addresses."""
if isinstance(job["address"], str):
return [Web3.toChecksumAddress(job["address"])]
elif isinstance(job["address"], list):
return [
Web3.toChecksumAddress(address) for address in job["address"]
] # manual job multiplication
elif isinstance(job["address"], dict):
if job["address"].get("type") == "queryAPI":
# QueryAPI job multiplication
addresses = execute_query(job["address"], token=moonstream_token)
checsum_addresses = []
for address in addresses:
try:
checsum_addresses.append(Web3.toChecksumAddress(address))
except Exception as e:
logger.error(f"Invalid address: {address}")
continue
return checsum_addresses
else:
raise ValueError(f"Invalid address type: {type(job['address'])}")
else:
raise ValueError(f"Invalid address type: {type(job['address'])}")
def parse_jobs( def parse_jobs(
jobs: List[Any], jobs: List[Any],
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
@ -1018,4 +824,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()