Merge pull request #452 from bugout-dev/dashboards-worker-apply-names-filter

Add apply of filters.
pull/462/head
Andrei-Dolgolev 2021-11-25 15:11:35 +02:00 zatwierdzone przez GitHub
commit 32a05d0b64
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 278 dodań i 185 usunięć

Wyświetl plik

@ -411,12 +411,11 @@ async def get_dashboard_data_links_handler(
for subscription in dashboard_subscriptions: for subscription in dashboard_subscriptions:
hash = subscription.resource_data["abi_hash"]
available_timescales = [timescale.value for timescale in data.TimeScale] available_timescales = [timescale.value for timescale in data.TimeScale]
stats[subscription.id] = {} stats[subscription.id] = {}
for timescale in available_timescales: for timescale in available_timescales:
try: try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
stats_presigned_url = s3_client.generate_presigned_url( stats_presigned_url = s3_client.generate_presigned_url(
"get_object", "get_object",
Params={ Params={

Wyświetl plik

@ -9,11 +9,12 @@ import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List from typing import Any, Callable, Dict, List
from uuid import UUID
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResources from bugout.data import BugoutResources
from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, and_, func, text from sqlalchemy import Column, and_, func, text, distinct
from sqlalchemy.orm import Query, Session from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.operators import in_op from sqlalchemy.sql.operators import in_op
@ -29,6 +30,7 @@ from ..settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
CRAWLER_LABEL, CRAWLER_LABEL,
) )
from ..reporter import reporter
from ..settings import bugout_client as bc from ..settings import bugout_client as bc
from web3 import Web3 from web3 import Web3
@ -49,6 +51,8 @@ blockchain_by_subscription_id = {
class TimeScale(Enum): class TimeScale(Enum):
# TODO(Andrey) Unlock when we be sure about perfomanse of agregation on transactions table.
# Right now it can be hungs
# year = "year" # year = "year"
month = "month" month = "month"
week = "week" week = "week"
@ -69,6 +73,9 @@ timescales_delta: Dict[str, Dict[str, timedelta]] = {
"day": {"timedelta": timedelta(hours=24)}, "day": {"timedelta": timedelta(hours=24)},
} }
abi_type_to_dashboards_type = {"function": "methods", "event": "events"}
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
@ -78,11 +85,11 @@ def push_statistics(
subscription: Any, subscription: Any,
timescale: str, timescale: str,
bucket: str, bucket: str,
hash: str, dashboard_id: UUID,
) -> None: ) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8") result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
s3 = boto3.client("s3") s3 = boto3.client("s3")
s3.put_object( s3.put_object(
@ -336,8 +343,7 @@ def generate_data(
label_counts_subquery = ( label_counts_subquery = (
label_counts.group_by( label_counts.group_by(
text("timeseries_points"), text("timeseries_points"), label_model.label_data["name"].astext
label_model.label_data["name"].astext,
) )
.order_by(text("timeseries_points desc")) .order_by(text("timeseries_points desc"))
.subquery(name="label_counts") .subquery(name="label_counts")
@ -405,81 +411,35 @@ def get_unique_address(
) )
def get_count( def generate_list_of_names(
name: str, type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any
type: str, ):
db_session: Session,
blockchain_type: AvailableBlockchainType, """
address: str, Generate list of names for select from database by name field
"""
if read_abi:
names = [item["name"] for item in abi_json if item["type"] == type]
else:
names = [
item["name"]
for item in subscription_filters[abi_type_to_dashboards_type[type]]
]
return names
def process_external(
abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType
): ):
""" """
Return count of event from database. Request all required external data
TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares).
""" """
label_model = get_label_model(blockchain_type)
return ( extention_data = []
db_session.query(label_model)
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label_data["type"].astext == type)
.filter(label_model.label_data["name"].astext == name)
.count()
)
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"abi": "true",
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# Already processed
already_processed = []
for subscription in required_subscriptions.resources:
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
external_calls = [] external_calls = []
@ -517,28 +477,8 @@ def stats_generate_handler(args: argparse.Namespace):
except Exception as e: except Exception as e:
print(f"Error processing external call: {e}") print(f"Error processing external call: {e}")
web3_client = connect(blockchain_type) web3_client = connect(blockchain)
# {
# "type": "external_call"
# "display_name": "Total weth earned"
# "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a",
# "name": "balanceOf",
# "inputs": [
# {
# "name": "owner",
# "type": "address"
# "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88"
# }
# ],
# "outputs": [
# {
# "internalType": "uint256",
# "name": "",
# "type": "uint256"
# }
# }
extention_data = []
for extcall in external_calls: for extcall in external_calls:
try: try:
contract = web3_client.eth.contract( contract = web3_client.eth.contract(
@ -554,6 +494,142 @@ def stats_generate_handler(args: argparse.Namespace):
except Exception as e: except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}") print(f"Failed to call {extcall['name']} error: {e}")
return extention_data
def get_count(
name: str,
type: str,
db_session: Session,
select_expression: Any,
blockchain_type: AvailableBlockchainType,
address: str,
):
"""
Return count of event from database.
"""
label_model = get_label_model(blockchain_type)
return (
db_session.query(select_expression)
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label_data["type"].astext == type)
.filter(label_model.label_data["name"].astext == name)
.count()
)
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
# ethereum_blockchain
start_time = time.time()
blockchain_type = AvailableBlockchainType(args.blockchain)
# polygon_blockchain
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=10,
)
print(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
},
timeout=10,
)
print(
f"Amount of blockchain subscriptions: {len(blockchain_subscriptions.resources)}"
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
s3_client = boto3.client("s3")
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"dashboard_subscriptions"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
if subscription_id not in subscription_by_id:
# Meen it's are different blockchain type
continue
s3_data_object = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data[
"address"
]
generic = dashboard_subscription_filters["generic"]
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append( extention_data.append(
{ {
"display_name": "Overall unique token owners.", "display_name": "Overall unique token owners.",
@ -565,11 +641,7 @@ def stats_generate_handler(args: argparse.Namespace):
} }
) )
abi_functions_names = [item["name"] for item in abi_functions] if "HatchStartedEvent" in events:
abi_events_names = [item["name"] for item in abi_events]
if "HatchStartedEvent" in abi_events_names:
extention_data.append( extention_data.append(
{ {
@ -578,13 +650,14 @@ def stats_generate_handler(args: argparse.Namespace):
name="HatchStartedEvent", name="HatchStartedEvent",
type="event", type="event",
db_session=db_session, db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
), ),
} }
) )
if "HatchFinishedEvent" in abi_events_names: if "HatchFinishedEvent" in events:
extention_data.append( extention_data.append(
{ {
@ -593,6 +666,11 @@ def stats_generate_handler(args: argparse.Namespace):
name="HatchFinishedEvent", name="HatchFinishedEvent",
type="event", type="event",
db_session=db_session, db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data[
"args"
]["tokenId"]
),
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
), ),
@ -614,20 +692,19 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
timescale=timescale, timescale=timescale,
functions=abi_functions_names, functions=methods,
start=start_date, start=start_date,
metric_type="tx_call", metric_type="tx_call",
) )
s3_data_object["functions"] = functions_calls_data s3_data_object["functions"] = functions_calls_data
# generate data
events_data = generate_data( events_data = generate_data(
db_session=db_session, db_session=db_session,
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
timescale=timescale, timescale=timescale,
functions=abi_events_names, functions=events,
start=start_date, start=start_date,
metric_type="event", metric_type="event",
) )
@ -639,18 +716,35 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
timescale=timescale, timescale=timescale,
metrics=abi_events_names, metrics=generic,
start=start_date, start=start_date,
) )
push_statistics( push_statistics(
statistics_data=s3_data_object, statistics_data=s3_data_object,
subscription=subscription, subscription=subscription_by_id[subscription_id],
timescale=timescale, timescale=timescale,
bucket=bucket, bucket=bucket,
hash=hash, dashboard_id=dashboard.id,
)
except Exception as err:
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard.id}",
],
)
print(err)
reporter.custom_report(
title=f"Dashboard stats generated.",
content=f"Generate statistics for {args.blockchain}. \n Generation time: {time.time() - start_time}.",
tags=["dashboard", "statistics", f"blockchain:{args.blockchain}"],
) )
already_processed.append(f"{address}/{hash}")
def main() -> None: def main() -> None: