Merge branch 'main' into metadata-crawler-improvements

pull/739/head
Andrey 2022-12-15 20:14:31 +02:00
commit 2691409022
12 zmienionych plików z 360 dodań i 81 usunięć

Wyświetl plik

@ -58,6 +58,10 @@ MUMBAI_SYNCHRONIZE_SERVICE="mumbai-synchronize.service"
MUMBAI_MISSING_SERVICE_FILE="mumbai-missing.service"
MUMBAI_MISSING_TIMER_FILE="mumbai-missing.timer"
MUMBAI_MOONWORM_CRAWLER_SERVICE_FILE="mumbai-moonworm-crawler.service"
MUMBAI_STATE_SERVICE_FILE="mumbai-state.service"
MUMBAI_STATE_TIMER_FILE="mumbai-state.timer"
MUMBAI_STATE_CLEAN_SERVICE_FILE="mumbai-state-clean.service"
MUMBAI_STATE_CLEAN_TIMER_FILE="mumbai-state-clean.timer"
# XDai service files
XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service"
@ -289,3 +293,23 @@ cp "${SCRIPT_DIR}/${POLYGON_CU_REPORTS_TOKENONOMICS_SERVICE_FILE}" "/etc/systemd
cp "${SCRIPT_DIR}/${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}" "/etc/systemd/system/${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}"
systemctl daemon-reload
systemctl restart --no-block "${POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing MUMBAI state service and timer with: ${MUMBAI_STATE_SERVICE_FILE}, ${MUMBAI_STATE_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${MUMBAI_STATE_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_STATE_TIMER_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_STATE_SERVICE_FILE}" "/etc/systemd/system/${MUMBAI_STATE_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_STATE_TIMER_FILE}" "/etc/systemd/system/${MUMBAI_STATE_TIMER_FILE}"
systemctl daemon-reload
systemctl restart --no-block "${MUMBAI_STATE_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing MUMBAI state clean service and timer with: ${MUMBAI_STATE_CLEAN_SERVICE_FILE}, ${MUMBAI_STATE_CLEAN_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_TIMER_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_SERVICE_FILE}" "/etc/systemd/system/${MUMBAI_STATE_CLEAN_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_STATE_CLEAN_TIMER_FILE}" "/etc/systemd/system/${MUMBAI_STATE_CLEAN_TIMER_FILE}"
systemctl daemon-reload
systemctl restart --no-block "${MUMBAI_STATE_CLEAN_TIMER_FILE}"

Wyświetl plik

@ -0,0 +1,13 @@
[Unit]
Description=Execute state clean labels crawler
After=network.target
[Service]
Type=oneshot
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" clean-state-labels --blockchain mumbai -N 10000
CPUWeight=60
SyslogIdentifier=mumbai-state-clean

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Execute Mumbai state clean labels crawler each 25m
[Timer]
OnBootSec=50s
OnUnitActiveSec=25m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,13 @@
[Unit]
Description=Execute state crawler
After=network.target
[Service]
Type=oneshot
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json
CPUWeight=60
SyslogIdentifier=mumbai-state

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Execute Mumbai state crawler each 10m
[Timer]
OnBootSec=15s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -8,6 +8,6 @@ User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json
CPUWeight=60
SyslogIdentifier=polygon-state

Wyświetl plik

@ -210,3 +210,24 @@ if MOONSTREAM_S3_PUBLIC_DATA_BUCKET == "":
MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX = os.environ.get(
"MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX", "dev"
)
# infura config
INFURA_PROJECT_ID = os.environ.get("INFURA_PROJECT_ID")
infura_networks = {
AvailableBlockchainType.ETHEREUM: {
"name": "mainnet",
"url": f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}",
},
AvailableBlockchainType.POLYGON: {
"name": "polygon",
"url": f"https://polygon-mainnet.infura.io/v3/{INFURA_PROJECT_ID}",
},
AvailableBlockchainType.MUMBAI: {
"name": "mumbai",
"url": f"https://polygon-mumbai.infura.io/v3/{INFURA_PROJECT_ID}",
},
}

Wyświetl plik

@ -1,10 +1,14 @@
import argparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import TimeoutError
import json
import hashlib
import itertools
import logging
from typing import Dict, List, Any, Optional
from uuid import UUID
import time
from pprint import pprint
from moonstreamdb.blockchain import AvailableBlockchainType
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
@ -13,24 +17,27 @@ from moonstreamdb.db import (
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
import requests
from sqlalchemy.orm import sessionmaker
from web3._utils.request import cache_session
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from .db import view_call_to_label, commit_session, clean_labels
from .Multicall2_interface import Contract as Multicall2
from ..settings import (
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
INFURA_PROJECT_ID,
multicall_contracts,
infura_networks,
)
from .web3_util import FunctionSignature
from .web3_util import FunctionSignature, connect
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Multicall2_address = "0xc8E51042792d7405184DfCa245F2d27B94D013b6"
def make_multicall(
multicall_method: Any,
calls: List[Any],
@ -38,13 +45,20 @@ def make_multicall(
block_number: str = "latest",
) -> Any:
multicall_calls = [
(
call["address"],
call["method"].encode_data(call["inputs"]).hex(),
)
for call in calls
]
multicall_calls = []
for call in calls:
try:
multicall_calls.append(
(
call["address"],
call["method"].encode_data(call["inputs"]).hex(),
)
)
except Exception as e:
logger.error(
f'Error encoding data for method {call["method"].name} call: {call}'
)
multicall_result = multicall_method(False, calls=multicall_calls).call(
block_identifier=block_number
@ -54,10 +68,41 @@ def make_multicall(
# Handle the case with not successful calls
for index, encoded_data in enumerate(multicall_result):
if encoded_data[0]:
try:
if encoded_data[0]:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
)
else:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
)
except Exception as e:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1])[0],
"result": str(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
@ -67,27 +112,21 @@ def make_multicall(
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
"error": str(e),
}
)
else:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
logger.error(
f"Error decoding data for for method {call['method'].name} call {calls[index]}: {e}."
)
# data is not decoded, return the encoded data
logger.error(f"Encoded data: {encoded_data}")
return results
def crawl_calls_level(
web3_client,
db_session,
calls,
responces,
@ -98,13 +137,20 @@ def crawl_calls_level(
block_number,
blockchain_type,
block_timestamp,
max_batch_size=5000,
min_batch_size=4,
):
calls_of_level = []
for call in calls:
if call["generated_hash"] in responces:
continue
parameters = []
logger.info(f"Call: {json.dumps(call, indent=4)}")
for input in call["inputs"]:
if type(input["value"]) in (str, int):
@ -114,9 +160,10 @@ def crawl_calls_level(
if (
contracts_ABIs[call["address"]][input["value"]]["name"]
== "totalSupply"
):
): # hack for totalSupply TODO(Andrey): need add propper support for response parsing
print(responces[input["value"]][0])
parameters.append(
list(range(1, responces[input["value"]][0] + 1))
list(range(1, responces[input["value"]][0][0] + 1))
)
else:
parameters.append(responces[input["value"]])
@ -126,6 +173,10 @@ def crawl_calls_level(
raise
for call_parameters in itertools.product(*parameters):
# hack for tuples product
if len(call_parameters) == 1 and type(call_parameters[0]) == tuple:
call_parameters = call_parameters[0]
calls_of_level.append(
{
"address": call["address"],
@ -137,22 +188,55 @@ def crawl_calls_level(
}
)
for call_chunk in [
calls_of_level[i : i + batch_size]
for i in range(0, len(calls_of_level), batch_size)
]:
retry = 0
while True:
try:
make_multicall_result = make_multicall(
multicall_method=multicall_method,
calls=call_chunk,
block_number=block_number,
block_timestamp=block_timestamp,
while len(calls_of_level) > 0:
make_multicall_result = []
try:
call_chunk = calls_of_level[:batch_size]
logger.info(
f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}"
)
# 1 thead with timeout for hung multicall calls
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
make_multicall,
multicall_method,
call_chunk,
block_timestamp,
block_number,
)
break
except ValueError:
continue
make_multicall_result = future.result(timeout=20)
logger.info(
f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}"
)
retry = 0
calls_of_level = calls_of_level[batch_size:]
logger.info(f"lenght of task left {len(calls_of_level)}.")
batch_size = min(batch_size * 2, max_batch_size)
except ValueError as e: # missing trie node
logger.error(f"ValueError: {e}, retrying")
retry += 1
if "missing trie node" in str(e):
time.sleep(4)
if retry > 5:
raise (e)
batch_size = max(batch_size // 3, min_batch_size)
except TimeoutError as e: # timeout
logger.error(f"TimeoutError: {e}, retrying")
retry += 1
if retry > 5:
raise (e)
batch_size = max(batch_size // 3, min_batch_size)
except Exception as e:
logger.error(f"Exception: {e}")
raise (e)
time.sleep(2)
print(f"retry: {retry}")
# results parsing and writing to database
add_to_session_count = 0
for result in make_multicall_result:
@ -167,10 +251,13 @@ def crawl_calls_level(
commit_session(db_session)
logger.info(f"{add_to_session_count} labels commit to database.")
return batch_size
def parse_jobs(
jobs: List[Any],
blockchain_type: AvailableBlockchainType,
web3_provider_uri: Optional[str],
block_number: Optional[int],
batch_size: int,
access_id: UUID,
@ -183,9 +270,26 @@ def parse_jobs(
contracts_methods: Dict[str, Any] = {}
calls: Dict[int, Any] = {0: []}
web3_client = _retry_connect_web3(
blockchain_type=blockchain_type, access_id=access_id
)
if web3_provider_uri is not None:
try:
logger.info(
f"Connecting to blockchain: {blockchain_type} with custom provider!"
)
web3_client = connect(web3_provider_uri)
if blockchain_type != AvailableBlockchainType.ETHEREUM:
web3_client.middleware_onion.inject(geth_poa_middleware, layer=0)
except Exception as e:
logger.error(
f"Web3 connection to custom provider {web3_provider_uri} failed error: {e}"
)
raise (e)
else:
logger.info(f"Connecting to blockchain: {blockchain_type} with Node balancer.")
web3_client = _retry_connect_web3(
blockchain_type=blockchain_type, access_id=access_id
)
logger.info(f"Crawler started connected to blockchain: {blockchain_type}")
@ -229,7 +333,11 @@ def parse_jobs(
have_subcalls = True
abi["inputs"].append(input)
abi["address"] = method_abi["address"]
generated_hash = hashlib.md5(json.dumps(abi).encode("utf-8")).hexdigest()
generated_hash = hashlib.md5(
json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode(
"utf-8"
)
).hexdigest()
abi["generated_hash"] = generated_hash
if have_subcalls:
@ -297,8 +405,10 @@ def parse_jobs(
# initial call of level 0 all call without subcalls directly moved there
logger.info("Crawl level: 0")
logger.info(f"Jobs amount: {len(calls[0])}")
logger.info(f"call_tree_levels: {call_tree_levels}")
crawl_calls_level(
batch_size = crawl_calls_level(
web3_client,
db_session,
calls[0],
responces,
@ -316,7 +426,8 @@ def parse_jobs(
logger.info(f"Crawl level: {level}")
logger.info(f"Jobs amount: {len(calls[level])}")
crawl_calls_level(
batch_size = crawl_calls_level(
web3_client,
db_session,
calls[level],
responces,
@ -341,38 +452,28 @@ def handle_crawl(args: argparse.Namespace) -> None:
Read all view methods of the contracts and crawl
"""
my_job = {
"type": "function",
"stateMutability": "view",
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256",
"value": {
"type": "function",
"name": "totalSupply",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256",
}
],
"address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",
"inputs": [],
},
}
],
"name": "tokenURI",
"outputs": [{"internalType": "string", "name": "", "type": "string"}],
"address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",
}
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
blockchain_type = AvailableBlockchainType(args.blockchain)
custom_web3_provider = args.custom_web3_provider
if args.infura and INFURA_PROJECT_ID is not None:
if blockchain_type not in infura_networks:
raise ValueError(
f"Infura is not supported for {blockchain_type} blockchain type"
)
logger.info(f"Using Infura!")
custom_web3_provider = infura_networks[blockchain_type]["url"]
parse_jobs(
[my_job], blockchain_type, args.block_number, args.batch_size, args.access_id
jobs,
blockchain_type,
custom_web3_provider,
args.block_number,
args.batch_size,
args.access_id,
)
@ -382,12 +483,12 @@ def parse_abi(args: argparse.Namespace) -> None:
"""
with open(args.abi_file, "r") as f:
# read json and parse only stateMutability equal to view
abi = json.load(f)
output_json = []
for method in abi:
# read json and parse only stateMutability equal to view
if method.get("stateMutability") and method["stateMutability"] == "view":
output_json.append(method)
@ -446,9 +547,27 @@ def main() -> None:
help="Type of blovkchain wich writng in database",
required=True,
)
view_state_crawler_parser.add_argument(
"--infura",
action="store_true",
help="Use infura as web3 provider",
)
view_state_crawler_parser.add_argument(
"--custom-web3-provider",
"-w3",
type=str,
help="Type of blovkchain wich writng in database",
)
view_state_crawler_parser.add_argument(
"--block-number", "-N", type=str, help="Block number."
)
view_state_crawler_parser.add_argument(
"--jobs-file",
"-j",
type=str,
help="Path to json file with jobs",
required=True,
)
view_state_crawler_parser.add_argument(
"--batch-size",
"-s",

Wyświetl plik

@ -0,0 +1,35 @@
[
{
"type": "function",
"stateMutability": "view",
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256",
"value": {
"type": "function",
"name": "totalSupply",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"address": "0x39858b1A4e48CfFB1019F0A15ff54899213B3f8b",
"inputs": []
}
}
],
"name": "tokenURI",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"address": "0x39858b1A4e48CfFB1019F0A15ff54899213B3f8b"
}
]

Wyświetl plik

@ -0,0 +1,35 @@
[
{
"type": "function",
"stateMutability": "view",
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256",
"value": {
"type": "function",
"name": "totalSupply",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",
"inputs": []
}
}
],
"name": "tokenURI",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f"
}
]

Wyświetl plik

@ -36,4 +36,5 @@ export COINMARKETCAP_API_KEY="<API_key_to_parse_conmarketcap>"
# Custom crawler
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="<public_bucket>"
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev"
export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="<access token for run queries for public dashboards>"
export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="<access token for run queries for public dashboards>"
export INFURA_PROJECT_ID="<infura_project_id>"

Wyświetl plik

@ -41,7 +41,7 @@ setup(
"moonstream>=0.1.1",
"moonworm[moonstream]>=0.5.2",
"humbug",
"pydantic",
"pydantic==1.9.2",
"python-dateutil",
"requests",
"tqdm",