moonstream/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py

1149 wiersze
40 KiB
Python
Czysty Zwykły widok Historia

"""
Generates dashboard.
"""
2021-11-11 15:16:21 +00:00
import argparse
import hashlib
2021-11-13 14:32:55 +00:00
import json
import logging
2021-11-11 15:16:21 +00:00
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Union, Optional
from uuid import UUID
2021-11-15 14:29:19 +00:00
import traceback
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
2022-08-10 16:55:49 +00:00
from moonstreamdb.blockchain import (
2022-08-09 16:29:19 +00:00
AvailableBlockchainType,
get_label_model,
get_transaction_model,
)
from moonstreamdb.db import yield_db_read_only_session_ctx
2022-05-26 10:35:19 +00:00
from sqlalchemy import and_, distinct, func, text, extract, cast
2022-05-03 13:53:34 +00:00
from sqlalchemy.orm import Session
2021-11-14 14:15:07 +00:00
from sqlalchemy.sql.operators import in_op
from web3 import Web3
2021-11-11 15:16:21 +00:00
2022-08-09 16:29:19 +00:00
from ..blockchain import connect
from ..reporter import reporter
2021-11-14 12:58:15 +00:00
from ..settings import (
CRAWLER_LABEL,
2021-11-14 12:58:15 +00:00
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
2022-05-26 12:41:58 +00:00
NB_CONTROLLER_ACCESS_ID,
2021-11-14 12:58:15 +00:00
)
2021-11-13 15:51:32 +00:00
from ..settings import bugout_client as bc
2021-11-11 15:16:21 +00:00
logging.basicConfig(level=logging.INFO)
2021-11-11 15:16:21 +00:00
logger = logging.getLogger(__name__)
subscription_ids_by_blockchain = {
"ethereum": ["ethereum_blockchain", "ethereum_smartcontract"],
"polygon": ["polygon_blockchain", "polygon_smartcontract"],
2022-08-10 16:55:49 +00:00
"mumbai": ["mumbai_blockchain", "mumbai_smartcontract"],
2022-05-26 13:43:49 +00:00
"xdai": ["xdai_blockchain", "xdai_smartcontract"],
}
blockchain_by_subscription_id = {
2021-12-11 15:26:56 +00:00
"ethereum_blockchain": "ethereum",
"polygon_blockchain": "polygon",
2022-08-10 16:55:49 +00:00
"mumbai_blockchain": "mumbai",
2022-05-25 14:10:52 +00:00
"xdai_blockchain": "xdai",
"ethereum_smartcontract": "ethereum",
"polygon_smartcontract": "polygon",
2022-08-10 16:55:49 +00:00
"mumbai_smartcontract": "mumbai",
2022-05-25 14:10:52 +00:00
"xdai_smartcontract": "xdai",
}
2021-11-11 15:16:21 +00:00
class TimeScale(Enum):
# TODO(Andrey) Unlock when we be sure about perfomanse of agregation on transactions table.
# Right now it can be hungs
2021-11-14 12:58:15 +00:00
# year = "year"
2021-11-11 15:16:21 +00:00
month = "month"
week = "week"
day = "day"
timescales_params: Dict[str, Dict[str, str]] = {
"year": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"month": {"timestep": "1 hours", "timeformat": "YYYY-MM-DD HH24"},
2021-11-14 14:15:07 +00:00
"week": {"timestep": "1 hours", "timeformat": "YYYY-MM-DD HH24"},
"day": {"timestep": "1 minutes", "timeformat": "YYYY-MM-DD HH24 MI"},
2021-11-11 15:16:21 +00:00
}
timescales_delta: Dict[str, Dict[str, timedelta]] = {
"year": {"timedelta": timedelta(days=365)},
"month": {"timedelta": timedelta(days=27)},
"week": {"timedelta": timedelta(days=6)},
"day": {"timedelta": timedelta(hours=24)},
}
2021-11-25 12:26:19 +00:00
abi_type_to_dashboards_type = {"function": "methods", "event": "events"}
2021-11-11 15:16:21 +00:00
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
2021-11-13 14:30:13 +00:00
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
2021-11-11 15:16:21 +00:00
def push_statistics(
statistics_data: Dict[str, Any],
subscription: Any,
timescale: str,
bucket: str,
2022-05-05 09:52:54 +00:00
dashboard_id: Union[UUID, str],
2021-11-11 15:16:21 +00:00
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
2021-11-11 15:16:21 +00:00
s3 = boto3.client("s3")
s3.put_object(
Body=result_bytes,
Bucket=bucket,
Key=result_key,
ContentType="application/json",
Metadata={"drone": "statistics"},
)
logger.info(f"Statistics push to bucket: s3://{bucket}/{result_key}")
2021-11-11 15:16:21 +00:00
def generate_data(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
functions: List[str],
start: Any,
2021-11-14 14:15:07 +00:00
metric_type: str,
2021-12-16 19:25:24 +00:00
crawler_label: str,
2021-11-11 15:16:21 +00:00
):
label_model = get_label_model(blockchain_type)
2021-11-11 15:16:21 +00:00
# create empty time series
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
# if end is None:
2022-05-26 10:35:19 +00:00
# end = datetime.utcnow()
2021-11-11 15:16:21 +00:00
2022-05-26 10:35:19 +00:00
# Get data in selected timerage
2021-11-11 15:16:21 +00:00
2022-05-26 10:35:19 +00:00
query_data = (
2021-11-13 14:30:13 +00:00
db_session.query(
func.to_char(
func.to_timestamp(label_model.block_timestamp), time_format
2021-11-13 14:30:13 +00:00
).label("timeseries_points"),
func.count(label_model.id).label("count"),
2021-11-14 14:15:07 +00:00
label_model.label_data["name"].astext.label("label"),
2021-11-11 15:16:21 +00:00
)
.filter(label_model.address == address)
2021-12-16 19:25:24 +00:00
.filter(label_model.label == crawler_label)
2022-05-26 10:35:19 +00:00
.filter(label_model.label_data["type"].astext == metric_type)
.filter(in_op(label_model.label_data["name"].astext, functions))
2021-11-14 14:15:07 +00:00
.filter(
2022-05-26 10:35:19 +00:00
label_model.block_timestamp
>= cast(extract("epoch", start), label_model.block_timestamp.type)
)
.filter(
label_model.block_timestamp
< cast(
extract("epoch", (start + timescales_delta[timescale]["timedelta"])),
label_model.block_timestamp.type,
2021-11-14 14:15:07 +00:00
)
)
2022-05-26 10:35:19 +00:00
.group_by(text("timeseries_points"), label_model.label_data["name"].astext)
.order_by(text("timeseries_points DESC"))
2021-11-11 15:16:21 +00:00
)
2022-05-26 10:35:19 +00:00
with_timetrashold_data = query_data.cte(name="timetrashold_data")
2021-11-11 15:16:21 +00:00
2022-05-26 10:35:19 +00:00
# Get availabel labels
2021-12-02 13:22:16 +00:00
2022-05-26 10:35:19 +00:00
requested_labels = db_session.query(
with_timetrashold_data.c.label.label("label")
).distinct()
with_requested_labels = requested_labels.cte(name="requested_labels")
# empty time series
time_series = db_session.query(
func.generate_series(
2022-06-21 10:24:41 +00:00
start,
start + timescales_delta[timescale]["timedelta"],
time_step,
2022-05-26 10:35:19 +00:00
).label("timeseries_points")
2021-11-11 15:16:21 +00:00
)
2022-05-26 10:35:19 +00:00
with_time_series = time_series.cte(name="time_series")
# empty_times_series_with_tags
empty_times_series_with_tags = db_session.query(
func.to_char(with_time_series.c.timeseries_points, time_format).label(
"timeseries_points"
),
with_requested_labels.c.label.label("label"),
)
with_empty_times_series_with_tags = empty_times_series_with_tags.cte(
name="empty_times_series_with_tags"
)
# fill time series with data
2021-11-11 15:16:21 +00:00
labels_time_series = (
2021-11-13 14:30:13 +00:00
db_session.query(
2022-05-26 10:35:19 +00:00
with_empty_times_series_with_tags.c.timeseries_points.label(
"timeseries_points"
),
with_empty_times_series_with_tags.c.label.label("label"),
with_timetrashold_data.c.count.label("count"),
2021-11-11 15:16:21 +00:00
)
.join(
2022-05-26 10:35:19 +00:00
with_empty_times_series_with_tags,
2021-11-11 15:16:21 +00:00
and_(
2022-05-26 10:35:19 +00:00
with_empty_times_series_with_tags.c.label
== with_timetrashold_data.c.label,
with_empty_times_series_with_tags.c.timeseries_points
== with_timetrashold_data.c.timeseries_points,
2021-11-11 15:16:21 +00:00
),
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
response_labels: Dict[Any, Any] = {}
for created_date, label, count in labels_time_series:
if not response_labels.get(label):
response_labels[label] = []
response_labels[label].append({"date": created_date, "count": count})
2021-11-11 15:16:21 +00:00
return response_labels
2021-11-15 15:00:29 +00:00
2021-11-15 14:29:19 +00:00
def cast_to_python_type(evm_type: str) -> Callable:
if evm_type.startswith(("uint", "int")):
return int
elif evm_type.startswith("bytes"):
return bytes
elif evm_type == "string":
return str
elif evm_type == "address":
return Web3.toChecksumAddress
elif evm_type == "bool":
return bool
else:
raise ValueError(f"Cannot convert to python type {evm_type}")
2021-11-11 15:16:21 +00:00
2021-11-15 18:41:18 +00:00
def get_unique_address(
2021-12-16 19:25:24 +00:00
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
2021-11-15 18:41:18 +00:00
):
label_model = get_label_model(blockchain_type)
return (
db_session.query(label_model.label_data["args"]["to"])
.filter(label_model.address == address)
2021-12-16 19:25:24 +00:00
.filter(label_model.label == crawler_label)
2021-11-15 18:41:18 +00:00
.filter(label_model.label_data["type"].astext == "event")
.filter(label_model.label_data["name"].astext == "Transfer")
.distinct()
.count()
)
2021-12-04 16:34:29 +00:00
def get_blocks_state(
2021-12-05 12:11:51 +00:00
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Dict[str, int]:
2021-12-04 16:34:29 +00:00
"""
Generate meta information about
"""
blocks_state = {
2021-12-06 20:09:25 +00:00
"latest_stored_block": 0,
"latest_labelled_block": 0,
"earliest_labelled_block": 0,
2021-12-04 16:34:29 +00:00
}
label_model = get_label_model(blockchain_type)
transactions_model = get_transaction_model(blockchain_type)
max_transactions_number = (
db_session.query(transactions_model.block_number)
.order_by(transactions_model.block_number.desc())
.limit(1)
).subquery("max_transactions_number")
max_label_number = (
db_session.query(label_model.block_number)
.order_by(label_model.block_number.desc())
.filter(label_model.label == CRAWLER_LABEL)
.limit(1)
).subquery("max_label_models_number")
min_label_number = (
db_session.query(label_model.block_number)
.order_by(label_model.block_number.asc())
.filter(label_model.label == CRAWLER_LABEL)
.limit(1)
).subquery("min_label_models_number")
2021-12-04 16:34:29 +00:00
result = (
db_session.query(
max_label_number.c.block_number.label("latest_labelled_block"),
min_label_number.c.block_number.label("earliest_labelled_block"),
max_transactions_number.c.block_number,
)
2021-12-04 16:34:29 +00:00
).one_or_none()
if result:
2021-12-06 20:09:25 +00:00
earliest_labelled_block, latest_labelled_block, latest_stored_block = result
2021-12-04 16:34:29 +00:00
blocks_state = {
2021-12-06 20:09:25 +00:00
"latest_stored_block": latest_stored_block,
"latest_labelled_block": latest_labelled_block,
"earliest_labelled_block": earliest_labelled_block,
2021-12-04 16:34:29 +00:00
}
return blocks_state
def generate_list_of_names(
type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any
):
"""
Generate list of names for select from database by name field
"""
2021-11-25 12:26:19 +00:00
if read_abi:
names = [item["name"] for item in abi_json if item["type"] == type]
else:
2021-11-25 12:26:19 +00:00
names = [
item["name"]
for item in subscription_filters[abi_type_to_dashboards_type[type]]
]
return names
def process_external_merged(
external_calls: Dict[str, Dict[str, Any]],
blockchain: AvailableBlockchainType,
2022-06-21 10:21:01 +00:00
access_id: Optional[UUID] = None,
):
"""
Process external calls
"""
external_calls_normalized = []
result: Dict[str, Any] = {}
2022-05-03 13:53:34 +00:00
for external_call_hash, external_call in external_calls.items():
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
]
2022-05-03 13:53:34 +00:00
external_calls_normalized.append(
{
"external_call_hash": external_call_hash,
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
logger.error(f"Error processing external call: {e}")
if external_calls_normalized:
web3_client = connect(blockchain, access_id=access_id)
for extcall in external_calls_normalized:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
result[extcall["external_call_hash"]] = response
except Exception as e:
logger.error(f"Failed to call {extcall['external_call_hash']} error: {e}")
return result
2021-11-25 11:23:34 +00:00
def process_external(
abi_external_calls: List[Dict[str, Any]],
blockchain: AvailableBlockchainType,
access_id: Optional[UUID] = None,
2021-11-25 11:23:34 +00:00
):
"""
Request all required external data
TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares).
"""
extention_data = []
external_calls = []
for external_call in abi_external_calls:
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
]
external_calls.append(
{
"display_name": external_call["display_name"],
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
logger.error(f"Error processing external call: {e}")
if external_calls:
web3_client = connect(blockchain, access_id=access_id)
for extcall in external_calls:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
extention_data.append(
{"display_name": extcall["display_name"], "value": response}
)
except Exception as e:
logger.error(f"Failed to call {extcall['name']} error: {e}")
return extention_data
2021-11-23 18:23:33 +00:00
def get_count(
name: str,
type: str,
db_session: Session,
select_expression: Any,
2021-11-23 18:23:33 +00:00
blockchain_type: AvailableBlockchainType,
address: str,
2021-12-16 19:25:24 +00:00
crawler_label: str,
2021-11-23 18:23:33 +00:00
):
"""
Return count of event from database.
"""
label_model = get_label_model(blockchain_type)
return (
db_session.query(select_expression)
2021-11-23 18:23:33 +00:00
.filter(label_model.address == address)
2021-12-16 19:25:24 +00:00
.filter(label_model.label == crawler_label)
2021-11-23 18:23:33 +00:00
.filter(label_model.label_data["type"].astext == type)
.filter(label_model.label_data["name"].astext == name)
.count()
)
def generate_web3_metrics(
db_session: Session,
events: List[str],
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
abi_json: Any,
access_id: Optional[UUID] = None,
) -> List[Any]:
"""
Generate stats for cards components
"""
extention_data = []
abi_external_calls = [item for item in abi_json if item["type"] == "external_call"]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
access_id=access_id,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# TODO: Remove it if ABI already have correct web3_call signature.
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data["args"]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
return extention_data
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
2021-11-11 15:16:21 +00:00
with yield_db_read_only_session_ctx() as db_session:
2021-11-25 12:26:19 +00:00
start_time = time.time()
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=10,
)
logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}")
2021-11-23 15:16:18 +00:00
# get all subscriptions
available_subscriptions: List[BugoutResource] = []
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"subscription_type_id": subscription_type,
},
timeout=10,
)
available_subscriptions.extend(blockchain_subscriptions.resources)
2021-11-23 15:16:18 +00:00
subscription_by_id = {
2021-11-23 15:16:18 +00:00
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in available_subscriptions
}
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
2021-11-11 15:16:21 +00:00
2021-12-02 13:22:16 +00:00
subscriptions_count = 0
# generate merged events and functions calls for all subscriptions
2022-05-03 13:53:34 +00:00
merged_events: Dict[str, Any] = {}
2022-05-03 13:53:34 +00:00
merged_functions: Dict[str, Any] = {}
2022-05-03 13:53:34 +00:00
merged_external_calls: Dict[str, Any] = {}
merged_external_calls["merged"] = {}
"""
{
address: {
"subscription_id": []
}
...
"merdged": {
}
"""
2022-05-03 13:53:34 +00:00
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
2022-05-05 10:12:33 +00:00
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
2021-11-25 12:26:19 +00:00
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
2021-11-11 15:16:21 +00:00
2021-11-25 12:26:19 +00:00
if subscription_id not in subscription_by_id:
# Mean it's are different blockchain type
2021-11-25 12:26:19 +00:00
continue
2021-11-11 15:16:21 +00:00
2022-05-03 13:53:34 +00:00
try:
UUID(subscription_id)
except Exception as err:
logger.error(
f"Subscription id {subscription_id} is not valid UUID: {err}"
)
continue
2021-11-25 12:26:19 +00:00
address = subscription_by_id[subscription_id].resource_data[
"address"
]
2022-05-03 13:53:34 +00:00
if address not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address] = {}
2021-11-15 18:41:18 +00:00
2022-05-03 13:53:34 +00:00
if (
2022-05-05 10:12:33 +00:00
str(dashboard.id)
2022-05-03 13:53:34 +00:00
not in address_dashboard_id_subscription_id_tree
):
address_dashboard_id_subscription_id_tree[address][
2022-05-05 10:12:33 +00:00
str(dashboard.id)
2022-05-03 13:53:34 +00:00
] = []
2021-12-16 19:25:24 +00:00
if (
subscription_id
2022-05-03 13:53:34 +00:00
not in address_dashboard_id_subscription_id_tree[address][
2022-05-05 10:12:33 +00:00
str(dashboard.id)
2022-05-03 13:53:34 +00:00
]
):
address_dashboard_id_subscription_id_tree[address][
2022-05-05 10:12:33 +00:00
str(dashboard.id)
2022-05-03 13:53:34 +00:00
].append(subscription_id)
2021-12-16 19:25:24 +00:00
2021-11-25 12:26:19 +00:00
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
2021-11-25 12:26:19 +00:00
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
2022-06-21 10:24:41 +00:00
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
2021-11-25 12:26:19 +00:00
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
2021-11-25 13:09:01 +00:00
if address not in merged_functions:
merged_functions[address] = {}
2022-05-03 13:53:34 +00:00
merged_functions[address]["merged"] = set()
2021-12-04 16:34:29 +00:00
2022-05-05 10:12:33 +00:00
if str(dashboard.id) not in merged_events[address]:
merged_events[address][str(dashboard.id)] = {}
2021-11-25 12:26:19 +00:00
2022-05-05 10:12:33 +00:00
if str(dashboard.id) not in merged_functions[address]:
merged_functions[address][str(dashboard.id)] = {}
2022-05-05 10:12:33 +00:00
merged_events[address][str(dashboard.id)][subscription_id] = events
merged_functions[address][str(dashboard.id)][
2022-05-03 13:53:34 +00:00
subscription_id
] = methods
2021-11-23 18:23:33 +00:00
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
2021-12-04 16:34:29 +00:00
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
external_call_hash = hashlib.md5(
json.dumps(external_call_without_display_name).encode(
"utf-8"
)
).hexdigest()
2022-05-05 10:12:33 +00:00
if str(dashboard.id) not in merged_external_calls:
merged_external_calls[str(dashboard.id)] = {}
if (
subscription_id
2022-05-05 10:12:33 +00:00
not in merged_external_calls[str(dashboard.id)]
):
2022-05-05 10:12:33 +00:00
merged_external_calls[str(dashboard.id)][
subscription_id
] = {}
if (
external_call_hash
2022-05-05 10:12:33 +00:00
not in merged_external_calls[str(dashboard.id)][
subscription_id
]
):
2022-05-05 10:12:33 +00:00
merged_external_calls[str(dashboard.id)][
2022-05-03 13:53:34 +00:00
subscription_id
] = {external_call_hash: external_call["display_name"]}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
# Fill merged events and functions calls for all subscriptions
for event in events:
merged_events[address]["merged"].add(event)
for method in methods:
merged_functions[address]["merged"].add(method)
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
2022-06-21 10:21:01 +00:00
access_id=args.access_id,
)
2021-11-25 12:26:19 +00:00
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
s3_data_object_for_contract: Dict[str, Any] = {}
crawler_label = CRAWLER_LABEL
for timescale in [timescale.value for timescale in TimeScale]:
2022-05-03 13:53:34 +00:00
try:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
2022-05-03 13:53:34 +00:00
logger.info(f"Timescale: {timescale}")
2022-05-03 13:53:34 +00:00
# Write state of blocks in database
s3_data_object_for_contract["blocks_state"] = current_blocks_state
2022-05-03 13:53:34 +00:00
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
2022-05-03 13:53:34 +00:00
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
2022-05-03 13:53:34 +00:00
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
2022-05-05 09:52:54 +00:00
for dashboard_id in address_dashboard_id_subscription_id_tree[
2022-05-03 13:53:34 +00:00
address
]: # Dashboards loop for address
for (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
2022-05-05 09:52:54 +00:00
dashboard_id
2022-05-03 13:53:34 +00:00
]:
try:
extention_data = []
2022-05-03 13:53:34 +00:00
s3_subscription_data_object: Dict[str, Any] = {}
s3_subscription_data_object[
"blocks_state"
] = s3_data_object_for_contract["blocks_state"]
2022-05-05 09:52:54 +00:00
if dashboard_id in merged_external_calls:
2022-05-03 13:53:34 +00:00
for (
external_call_hash,
display_name,
2022-05-05 09:52:54 +00:00
) in merged_external_calls[dashboard_id][
2022-05-03 13:53:34 +00:00
subscription_id
].items():
if external_call_hash in external_calls_results:
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[
external_call_hash
],
}
)
2022-05-03 13:53:34 +00:00
s3_subscription_data_object[
"web3_metric"
] = extention_data
2022-05-03 13:53:34 +00:00
# list of user defined events
2022-05-05 09:52:54 +00:00
events_list = merged_events[address][dashboard_id][
2022-05-03 13:53:34 +00:00
subscription_id
]
2022-05-03 13:53:34 +00:00
s3_subscription_data_object["events"] = {}
2022-05-03 13:53:34 +00:00
for event in events_list:
if event in events_data:
s3_subscription_data_object["events"][
event
] = events_data[event]
2022-05-03 13:53:34 +00:00
# list of user defined functions
2022-05-05 09:52:54 +00:00
functions_list = merged_functions[address][
dashboard_id
][subscription_id]
2022-05-03 13:53:34 +00:00
s3_subscription_data_object["methods"] = {}
2021-11-11 15:16:21 +00:00
2022-05-03 13:53:34 +00:00
for function in functions_list:
if function in functions_calls_data:
s3_subscription_data_object["methods"][
function
] = functions_calls_data[function]
bucket = subscription_by_id[
subscription_id
].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
2022-05-05 09:52:54 +00:00
dashboard_id=dashboard_id,
2022-05-03 13:53:34 +00:00
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
2022-05-05 10:12:33 +00:00
f"dashboard:{dashboard}",
2022-05-03 13:53:34 +00:00
],
)
logger.error(err)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}" f"timescale:{timescale}",
f"data_generation_failed",
],
)
logger.error(err)
2021-11-25 12:26:19 +00:00
reporter.custom_report(
title=f"Dashboard stats generated.",
2021-12-02 13:22:16 +00:00
content=f"Generate statistics for {args.blockchain}. \n Generation time: {time.time() - start_time}. \n Total amount of dashboards: {len(dashboard_resources.resources)}. Generate stats for {subscriptions_count}.",
2021-11-25 12:26:19 +00:00
tags=["dashboard", "statistics", f"blockchain:{args.blockchain}"],
)
2021-11-11 15:16:21 +00:00
2021-12-21 15:23:26 +00:00
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, BugoutResource],
access_id: Optional[UUID] = None,
2021-12-21 15:23:26 +00:00
):
2021-12-16 13:26:04 +00:00
"""
Start crawler with generate.
"""
with yield_db_read_only_session_ctx() as db_session:
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
2021-12-16 13:26:04 +00:00
s3_client = boto3.client("s3")
for dashboard_subscription_filters in dashboard.resource_data[
2021-12-16 17:17:13 +00:00
"subscription_settings"
2021-12-16 13:26:04 +00:00
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
blockchain_type = AvailableBlockchainType(
2021-12-16 17:17:13 +00:00
blockchain_by_subscription_id[
subscription_by_id[subscription_id].resource_data[
"subscription_type_id"
]
2021-12-16 13:26:04 +00:00
]
)
s3_data_object: Dict[str, Any] = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
crawler_label = CRAWLER_LABEL
2022-01-18 23:21:24 +00:00
if address in ("0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",):
crawler_label = "moonworm"
# Read required events, functions and web3_call form ABI
2021-12-16 13:26:04 +00:00
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
2021-12-16 13:26:04 +00:00
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
2022-06-21 10:24:41 +00:00
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
2021-12-16 13:26:04 +00:00
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
# Data for cards components
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
access_id=access_id,
2021-12-16 13:26:04 +00:00
)
# Generate blocks state information
2021-12-16 13:26:04 +00:00
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
2021-12-21 15:23:26 +00:00
for timescale in timescales:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
2021-12-16 13:26:04 +00:00
logger.info(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
2021-12-16 13:26:04 +00:00
# Write state of blocks in database
2021-12-21 15:23:26 +00:00
s3_data_object["blocks_state"] = current_blocks_state
2021-12-16 13:26:04 +00:00
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
2021-12-16 13:26:04 +00:00
# Generate functions call timeseries
2021-12-21 15:23:26 +00:00
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=methods,
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
2021-12-21 15:23:26 +00:00
)
2022-01-19 11:25:32 +00:00
s3_data_object["methods"] = functions_calls_data
2021-12-16 13:26:04 +00:00
# Generate events timeseries
2021-12-21 15:23:26 +00:00
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=events,
start=start_date,
metric_type="event",
crawler_label=crawler_label,
2021-12-21 15:23:26 +00:00
)
s3_data_object["events"] = events_data
2021-12-16 13:26:04 +00:00
# push data to S3 bucket
2021-12-21 15:23:26 +00:00
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard.id,
)
2021-12-16 13:26:04 +00:00
except Exception as err:
2022-05-03 13:53:34 +00:00
traceback.print_exc()
2021-12-16 13:26:04 +00:00
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"subscriptions:{subscription_id}",
2022-05-03 13:53:34 +00:00
f"dashboard:{str(dashboard.id)}",
2021-12-16 13:26:04 +00:00
],
)
logger.error(err)
2021-12-16 13:26:04 +00:00
2021-11-13 14:30:13 +00:00
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())
parser.add_argument(
"--access-id",
default=NB_CONTROLLER_ACCESS_ID,
type=UUID,
help="User access ID",
)
subcommands = parser.add_subparsers(
2021-11-13 14:30:13 +00:00
description="Drone dashboard statistics commands"
)
# Statistics parser
parser_generate = subcommands.add_parser(
2021-11-13 14:30:13 +00:00
"generate", description="Generate statistics"
)
parser_generate.set_defaults(func=lambda _: parser_generate.print_help())
parser_generate.add_argument(
"--blockchain",
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser_generate.set_defaults(func=stats_generate_handler)
2021-11-13 14:30:13 +00:00
args = parser.parse_args()
args.func(args)
2021-11-13 14:30:13 +00:00
2021-11-11 15:16:21 +00:00
if __name__ == "__main__":
main()