Init version of merging in stats generation.

pull/600/head
Andrey Dolgolev 2022-05-02 17:49:55 +03:00
rodzic 0ba2c02300
commit 6a7217a70a
1 zmienionych plików z 354 dodań i 78 usunięć

Wyświetl plik

@ -5,17 +5,20 @@ import argparse
import hashlib
import json
import logging
from pickle import DICT
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
from uuid import UUID
import traceback
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from moonstreamdb.db import yield_db_read_only_session_ctx
from sqlalchemy import Column, and_, distinct, func, text
from sqlalchemy.orm import Query, Session
from sqlalchemy import Column, and_, distinct, func, text, tuple_
from sqlalchemy.orm import Query, Session, Bundle
from sqlalchemy.sql.operators import in_op
from web3 import Web3
@ -34,6 +37,7 @@ from ..settings import (
from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO)
# logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
@ -126,7 +130,11 @@ def generate_data(
end = datetime.utcnow()
time_series_subquery = db_session.query(
func.generate_series(start, end, time_step,).label("timeseries_points")
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
)
time_series_subquery = time_series_subquery.subquery(name="time_series_subquery")
@ -332,6 +340,71 @@ def generate_list_of_names(
return names
def process_external_merged(
external_calls: Dict[str, Dict[str, Any]], blockchain: AvailableBlockchainType
):
"""
Process external calls
"""
external_calls_normalized = []
result: Dict[str, Any] = {}
for external_call_hash, external_call in external_calls.values():
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
]
external_calls.append(
{
"external_call_hash": external_call_hash,
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
logger.error(f"Error processing external call: {e}")
if external_calls_normalized:
web3_client = connect(blockchain)
for extcall in external_calls_normalized:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
result[extcall["external_call_hash"]] = response
except Exception as e:
logger.error(f"Failed to call {extcall['external_call_hash']} error: {e}")
return result
def process_external(
abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType
):
@ -440,7 +513,8 @@ def generate_web3_metrics(
abi_external_calls = [item for item in abi_json if item["type"] == "external_call"]
extention_data = process_external(
abi_external_calls=abi_external_calls, blockchain=blockchain_type,
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
@ -540,35 +614,58 @@ def stats_generate_handler(args: argparse.Namespace):
subscriptions_count = 0
# generate merged events and functions calls for all subscriptions
merged_events = {}
merged_functions = {}
merged_external_calls = {}
merged_external_calls["merged"] = {}
"""
{
address: {
"subscription_id": []
}
...
"merdged": {
}
"""
address_dashboard_id_subscription_id_tree = {}
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
if subscription_id not in subscription_by_id:
# Meen it's are different blockchain type
# Mean it's are different blockchain type
continue
subscriptions_count += 1
extention_data = []
# The resulting object whivh be pushed to S3
s3_data_object: Dict[str, Any] = {}
address = subscription_by_id[subscription_id].resource_data[
"address"
]
crawler_label = CRAWLER_LABEL
if dashboard.id not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address][
dashboard.id
] = {}
if address in ("0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",):
crawler_label = "moonworm"
if (
subscription_id
not in address_dashboard_id_subscription_id_tree[dashboard.id]
):
address_dashboard_id_subscription_id_tree[address][
dashboard.id
] = subscription_id
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
@ -581,7 +678,10 @@ def stats_generate_handler(args: argparse.Namespace):
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(Bucket=bucket, Key=key,)
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
@ -596,83 +696,256 @@ def stats_generate_handler(args: argparse.Namespace):
abi_json=abi_json,
)
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
# Generate blocks state information
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
if address not in merged_functions:
merged_functions[address] = {}
merged_events[address]["merged"] = set()
for timescale in [timescale.value for timescale in TimeScale]:
if address not in merged_external_calls:
merged_external_calls[address] = {}
merged_events[address]["merged"] = set()
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
if dashboard.id not in merged_events[address]:
merged_events[address][dashboard.id] = {}
logger.info(f"Timescale: {timescale}")
if dashboard.id not in merged_functions[address]:
merged_functions[address][dashboard.id] = {}
s3_data_object["web3_metric"] = extention_data
merged_events[address][dashboard.id][subscription_id] = events
merged_functions[address][dashboard.id][subscription_id] = methods
# Write state of blocks in database
s3_data_object["blocks_state"] = current_blocks_state
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=methods,
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["methods"] = functions_calls_data
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=events,
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
external_call_hash = hashlib.md5(
json.dumps(external_call_without_display_name).encode(
"utf-8"
)
).hexdigest()
if dashboard.id not in merged_external_calls:
merged_external_calls[dashboard.id] = {}
if (
subscription_id
not in merged_external_calls[dashboard.id]
):
merged_external_calls[dashboard.id][
subscription_id
] = {}
if (
external_call_hash
not in merged_external_calls[dashboard.id][
subscription_id
]
):
merged_external_calls[dashboard.id][subscription_id] = {
external_call_hash: external_call["display_name"]
}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
# Fill merged events and functions calls for all subscriptions
for event in events:
merged_events[address]["merged"].add(event)
for method in methods:
merged_functions[address]["merged"].add(method)
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
)
for address in address_dashboard_id_subscription_id_tree.keys():
# for dashboard_subscription_filters in dashboard.resource_data[
# "subscription_settings"
# ]:
# try:
# subscription_id = dashboard_subscription_filters["subscription_id"]
# if subscription_id not in subscription_by_id:
# # Mean it's are different blockchain type
# continue
# subscriptions_count += 1
# extention_data = []
# The resulting object whivh be pushed to S3
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
s3_data_object_for_contract: Dict[str, Any] = {}
crawler_label = CRAWLER_LABEL
if address in ("0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",):
crawler_label = "moonworm"
for timescale in [timescale.value for timescale in TimeScale]:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
logger.info(f"Timescale: {timescale}")
# Write state of blocks in database
s3_data_object_for_contract["blocks_state"] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
for dashboard in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for subscription_id in address_dashboard_id_subscription_id_tree[
address
][dashboard.id]:
extention_data = []
s3_subscription_data_object = {}
for external_call_hash, display_name in merged_external_calls[
dashboard.id
][subscription_id].values():
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[external_call_hash],
}
)
s3_subscription_data_object["web3_metric"] = extention_data
# list of user defined events
events_list = merged_events[address][dashboard.id][
subscription_id
]
s3_subscription_data_object["events"] = {}
for event in events_list:
event_data = s3_data_object_for_contract["events"][event]
s3_subscription_data_object["events"][event] = event_data
# list of user defined functions
functions_list = merged_functions[address][dashboard.id][
subscription_id
]
s3_subscription_data_object["methods"] = {}
for function in functions_list:
function_data = s3_data_object_for_contract["methods"][
function
]
s3_subscription_data_object["methods"][
function
] = function_data
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
# Push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
statistics_data=s3_subscription_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard.id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard.id}",
],
)
logger.error(err)
# except Exception as err:
# traceback.print_exc()
# db_session.rollback()
# reporter.error_report(
# err,
# [
# "dashboard",
# "statistics",
# f"blockchain:{args.blockchain}"
# f"subscriptions:{subscription_id}",
# f"dashboard:{dashboard.id}",
# ],
# )
# logger.error(err)
# break
reporter.custom_report(
title=f"Dashboard stats generated.",
@ -734,7 +1007,10 @@ def stats_generate_api_task(
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(Bucket=bucket, Key=key,)
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(