Add apply of filters wich in general depends on names of dashboard subscriptions.

Reformat code for get support of subscription wich not contain abi and only are generic methods.
pull/452/head
Andrey Dolgolev 2021-11-23 16:59:52 +02:00
rodzic 73a49941d8
commit 4f3326c05b
1 zmienionych plików z 220 dodań i 151 usunięć

Wyświetl plik

@ -8,10 +8,11 @@ import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Union
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResources
from bugout.data import BugoutResource, BugoutResources
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, Date, and_, func, text
from sqlalchemy.orm import Query, Session
@ -31,7 +32,7 @@ from ..settings import (
)
from ..settings import bugout_client as bc
from web3 import Web3
from web3 import Web3, method
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -78,11 +79,11 @@ def push_statistics(
subscription: Any,
timescale: str,
bucket: str,
hash: str,
dashboard_id: UUID,
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
s3 = boto3.client("s3")
s3.put_object(
@ -405,59 +406,28 @@ def get_unique_address(
)
def stats_generate_handler(args: argparse.Namespace):
def generate_list_of_names(
type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any
):
"""
Start crawler with generate.
Generate list of names for select from database by name field
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
if read_abi:
names = [item["name"] for item in abi_json if item["type"] == type]
else:
names = [item["name"] for item in subscription_filters[type]]
with yield_db_session_ctx() as db_session:
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"abi": "true",
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
},
timeout=10,
)
return names
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
def process_external(abi_external_calls, blockchain):
"""
Request all required external data
TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares).
"""
# Already processed
already_processed = []
for subscription in required_subscriptions.resources:
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = []
external_calls = []
@ -495,7 +465,7 @@ def stats_generate_handler(args: argparse.Namespace):
except Exception as e:
print(f"Error processing external call: {e}")
web3_client = connect(blockchain_type)
web3_client = connect(blockchain)
# {
# "type": "external_call"
# "display_name": "Total weth earned"
@ -516,7 +486,6 @@ def stats_generate_handler(args: argparse.Namespace):
# }
# }
extention_data = []
for extcall in external_calls:
try:
contract = web3_client.eth.contract(
@ -532,6 +501,110 @@ def stats_generate_handler(args: argparse.Namespace):
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
return extention_data
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
# ethereum_blockchain
blockchain_type = AvailableBlockchainType(args.blockchain)
# polygon_blockchain
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=10,
)
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
},
timeout=10,
)
subscription_by_id = {
blockchain_subscription.id: blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
# print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# # Already processed
already_processed = []
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"dashboard_subscriptions"
]:
subscription_id = dashboard_subscription_filters["subscription_id"]
if subscription_id not in subscription_by_id:
# Meen it's are different blockchain type
continue
s3_data_object = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
generic = dashboard_subscription_filters["generic"]
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="methods",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters.all_methods,
abi_json=abi_json,
)
events = generate_list_of_names(
type="events",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters.all_methods,
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
@ -553,14 +626,12 @@ def stats_generate_handler(args: argparse.Namespace):
s3_data_object["web3_metric"] = extention_data
abi_functions_names = [item["name"] for item in abi_functions]
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_functions_names,
functions=methods,
start=start_date,
metric_type="tx_call",
)
@ -568,14 +639,12 @@ def stats_generate_handler(args: argparse.Namespace):
s3_data_object["functions"] = functions_calls_data
# generate data
abi_events_names = [item["name"] for item in abi_events]
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_events_names,
functions=events,
start=start_date,
metric_type="event",
)
@ -587,16 +656,16 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=abi_events_names,
metrics=generic,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
hash=hash,
dashboard_id=dashboard.id,
)
already_processed.append(f"{address}/{hash}")