Add function for extention data which go to cards componener.

pull/516/head
Andrey Dolgolev 2022-01-17 14:44:58 +02:00
rodzic 6dfbb986a0
commit 261065f91a
1 zmienionych plików z 127 dodań i 148 usunięć

Wyświetl plik

@ -33,8 +33,8 @@ from ..settings import (
)
from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
subscription_ids_by_blockchain = {
@ -100,7 +100,7 @@ def push_statistics(
Metadata={"drone": "statistics"},
)
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
logger.info(f"Statistics push to bucket: s3://{bucket}/{result_key}")
def generate_data(
@ -380,9 +380,10 @@ def process_external(
}
)
except Exception as e:
print(f"Error processing external call: {e}")
logger.error(f"Error processing external call: {e}")
web3_client = connect(blockchain)
if external_calls:
web3_client = connect(blockchain)
for extcall in external_calls:
try:
@ -397,7 +398,7 @@ def process_external(
{"display_name": extcall["display_name"], "value": response}
)
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
logger.error(f"Failed to call {extcall['name']} error: {e}")
return extention_data
@ -426,6 +427,79 @@ def get_count(
)
def generate_web3_metrics(
db_session: Session,
events: List[str],
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
abi_json: Any,
) -> List[Any]:
"""
Generate stats for cards components
"""
extention_data = []
abi_external_calls = [item for item in abi_json if item["type"] == "external_call"]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# TODO: Remove it if ABI already have correct web3_call signature.
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data["args"]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
return extention_data
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
@ -433,7 +507,6 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
start_time = time.time()
@ -443,10 +516,9 @@ def stats_generate_handler(args: argparse.Namespace):
timeout=10,
)
print(f"Amount of dashboards: {len(dashboard_resources.resources)}")
logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# get all subscriptions
available_subscriptions: List[BugoutResource] = []
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
@ -467,7 +539,7 @@ def stats_generate_handler(args: argparse.Namespace):
for blockchain_subscription in available_subscriptions
}
print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
@ -487,11 +559,11 @@ def stats_generate_handler(args: argparse.Namespace):
continue
subscriptions_count += 1
s3_data_object: Dict[str, Any] = {}
extention_data = []
# The resulting object whivh be pushed to S3
s3_data_object: Dict[str, Any] = {}
address = subscription_by_id[subscription_id].resource_data[
"address"
]
@ -504,33 +576,30 @@ def stats_generate_handler(args: argparse.Namespace):
):
crawler_label = "moonworm"
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
@ -538,65 +607,16 @@ def stats_generate_handler(args: argparse.Namespace):
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
)
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data[
"args"
]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# Generate blocks state information
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
@ -607,12 +627,17 @@ def stats_generate_handler(args: argparse.Namespace):
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
s3_data_object["blocks_state"] = current_blocks_state
logger.info(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
# Write state of blocks in database
s3_data_object["blocks_state"] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -623,9 +648,9 @@ def stats_generate_handler(args: argparse.Namespace):
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["functions"] = functions_calls_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -636,11 +661,9 @@ def stats_generate_handler(args: argparse.Namespace):
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
s3_data_object["generic"] = {}
# Push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
@ -680,7 +703,7 @@ def stats_generate_api_task(
with yield_db_session_ctx() as db_session:
print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
@ -714,16 +737,17 @@ def stats_generate_api_task(
):
crawler_label = "moonworm"
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
@ -744,65 +768,17 @@ def stats_generate_api_task(
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
# Data for cards components
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
)
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data["args"][
"tokenId"
]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# Generate blocks state information
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
@ -813,12 +789,17 @@ def stats_generate_api_task(
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
s3_data_object["blocks_state"] = current_blocks_state
logger.info(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
# Write state of blocks in database
s3_data_object["blocks_state"] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -829,9 +810,9 @@ def stats_generate_api_task(
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["functions"] = functions_calls_data
# Generate events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -842,11 +823,9 @@ def stats_generate_api_task(
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
s3_data_object["generic"] = {}
# push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
@ -864,7 +843,7 @@ def stats_generate_api_task(
f"dashboard:{dashboard.id}",
],
)
print(err)
logger.error(err)
def main() -> None: