Merge pull request #421 from bugout-dev/Extentions-metrics

Extentions metrics
pull/424/head
Andrei-Dolgolev 2021-11-15 20:49:31 +02:00 zatwierdzone przez GitHub
commit 46a67d27be
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
1 zmienionych plików z 138 dodań i 3 usunięć

Wyświetl plik

@ -8,7 +8,7 @@ import logging
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Any, Dict, List from typing import Any, Callable, Dict, List
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResources from bugout.data import BugoutResources
@ -17,7 +17,12 @@ from sqlalchemy import Column, Date, and_, func, text
from sqlalchemy.orm import Query, Session from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.operators import in_op from sqlalchemy.sql.operators import in_op
from ..blockchain import get_block_model, get_label_model, get_transaction_model from ..blockchain import (
get_block_model,
get_label_model,
get_transaction_model,
connect,
)
from ..data import AvailableBlockchainType from ..data import AvailableBlockchainType
from ..settings import ( from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -26,6 +31,8 @@ from ..settings import (
) )
from ..settings import bugout_client as bc from ..settings import bugout_client as bc
from web3 import Web3
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -119,6 +126,7 @@ def generate_metrics(
db_session: Session, db_session: Session,
identifying_column: Column, identifying_column: Column,
statistic_column: Column, statistic_column: Column,
aggregate_func: Callable,
) -> Query: ) -> Query:
unformated_time_series_subquery = db_session.query( unformated_time_series_subquery = db_session.query(
@ -141,7 +149,7 @@ def generate_metrics(
metric_count_subquery = ( metric_count_subquery = (
db_session.query( db_session.query(
func.count(statistic_column).label("count"), aggregate_func(statistic_column).label("count"),
func.to_char( func.to_char(
func.to_timestamp(block_model.timestamp).cast(Date), time_format func.to_timestamp(block_model.timestamp).cast(Date), time_format
).label("timeseries_points"), ).label("timeseries_points"),
@ -176,6 +184,8 @@ def generate_metrics(
for created_date, count in metrics_time_series: for created_date, count in metrics_time_series:
if not isinstance(count, int):
count = int(count)
response_metric.append({"date": created_date, "count": count}) response_metric.append({"date": created_date, "count": count})
return response_metric return response_metric
@ -187,6 +197,7 @@ def generate_metrics(
db_session, db_session,
transaction_model.from_address, transaction_model.from_address,
transaction_model.hash, transaction_model.hash,
func.count,
) )
print("--- transactions_out %s seconds ---" % (time.time() - start_time)) print("--- transactions_out %s seconds ---" % (time.time() - start_time))
@ -196,6 +207,7 @@ def generate_metrics(
db_session, db_session,
transaction_model.to_address, transaction_model.to_address,
transaction_model.hash, transaction_model.hash,
func.count,
) )
print("--- transactions_in %s seconds ---" % (time.time() - start_time)) print("--- transactions_in %s seconds ---" % (time.time() - start_time))
@ -205,6 +217,7 @@ def generate_metrics(
db_session, db_session,
transaction_model.from_address, transaction_model.from_address,
transaction_model.value, transaction_model.value,
func.sum,
) )
print("--- value_out %s seconds ---" % (time.time() - start_time)) print("--- value_out %s seconds ---" % (time.time() - start_time))
@ -213,6 +226,7 @@ def generate_metrics(
db_session, db_session,
transaction_model.to_address, transaction_model.to_address,
transaction_model.value, transaction_model.value,
func.sum,
) )
print("--- value_in %s seconds ---" % (time.time() - start_time)) print("--- value_in %s seconds ---" % (time.time() - start_time))
@ -360,6 +374,37 @@ def generate_data(
return response_labels return response_labels
def cast_to_python_type(evm_type: str) -> Callable:
if evm_type.startswith(("uint", "int")):
return int
elif evm_type.startswith("bytes"):
return bytes
elif evm_type == "string":
return str
elif evm_type == "address":
return Web3.toChecksumAddress
elif evm_type == "bool":
return bool
else:
raise ValueError(f"Cannot convert to python type {evm_type}")
def get_unique_address(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
):
label_model = get_label_model(blockchain_type)
return (
db_session.query(label_model.label_data["args"]["to"])
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label_data["type"].astext == "event")
.filter(label_model.label_data["name"].astext == "Transfer")
.distinct()
.count()
)
def stats_generate_handler(args: argparse.Namespace): def stats_generate_handler(args: argparse.Namespace):
""" """
Start crawler with generate. Start crawler with generate.
@ -410,6 +455,94 @@ def stats_generate_handler(args: argparse.Namespace):
abi_functions = [item for item in abi_json if item["type"] == "function"] abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"] abi_events = [item for item in abi_json if item["type"] == "event"]
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
external_calls = []
for external_call in abi_external_calls:
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
]
external_calls.append(
{
"display_name": external_call["display_name"],
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
print(f"Error processing external call: {e}")
web3_client = connect(blockchain_type)
# {
# "type": "external_call"
# "display_name": "Total weth earned"
# "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a",
# "name": "balanceOf",
# "inputs": [
# {
# "name": "owner",
# "type": "address"
# "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88"
# }
# ],
# "outputs": [
# {
# "internalType": "uint256",
# "name": "",
# "type": "uint256"
# }
# }
extention_data = []
for extcall in external_calls:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
extention_data.append(
{"display_name": extcall["display_name"], "value": response}
)
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
for timescale in [timescale.value for timescale in TimeScale]: for timescale in [timescale.value for timescale in TimeScale]:
start_date = ( start_date = (
@ -418,6 +551,8 @@ def stats_generate_handler(args: argparse.Namespace):
print(f"Timescale: {timescale}") print(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
abi_functions_names = [item["name"] for item in abi_functions] abi_functions_names = [item["name"] for item in abi_functions]
functions_calls_data = generate_data( functions_calls_data = generate_data(