diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard_stats_worker.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard_stats_worker.py index 2eb08836..a839fe96 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard_stats_worker.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard_stats_worker.py @@ -4,7 +4,7 @@ from enum import Enum import hashlib from datetime import timedelta, datetime import logging -from typing import Any, Dict, List +from typing import Any, Dict, List, Callable import os import time import pprint @@ -17,8 +17,8 @@ from moonstreamdb.models import EthereumLabel, EthereumTransaction, EthereumBloc from bugout.app import Bugout from bugout.data import BugoutResources -from sqlalchemy.orm import Session -from sqlalchemy import func, text, and_, Date, cast +from sqlalchemy.orm import Session, Query +from sqlalchemy import func, text, and_, Date, cast, Column, desc from datetime import date @@ -66,6 +66,7 @@ bc = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" +BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" MOONSTREAM_ADMIN_ACCESS_TOKEN = os.getenv("MOONSTREAM_ADMIN_ACCESS_TOKEN") @@ -81,15 +82,6 @@ def push_statistics( hash: str, ) -> None: - # statistics_prefix = os.environ.get( - # "AWS_S3_DRONES_BUCKET_STATISTICS_PREFIX", "" - # ).rstrip("/") - # if bucket is None: - # logger.warning( - # "AWS_STATS_S3_BUCKET environment variable not defined, skipping storage of search results" - # ) - # return - result_bytes = json.dumps(statistics_data).encode("utf-8") result_key = f'contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' @@ -102,61 +94,147 @@ def push_statistics( Metadata={"drone": "statistics"}, ) - # TODO (Andrey) Understand why logger wont show messages some time and put them beside print - # without print here exeption wont show. print(f"Statistics push to bucket: s3://{bucket}/{result_key}") -# import moonworm - - -def add_events_to_database(db_session: Session, contract: str, events: List[Any]): - """ - Apply events of contract to dataabase +def generate_metrics( + db_session: Session, address: str, timescale: str, metrics: List[str], start: Any +): """ - for event in events: - try: - eth_label = EthereumLabel( - label="mooonworm", - address=contract, - label_data={ - "type": "event", - "contract": contract, - "tx_hash": event, - }, + Generage metrics + """ + + start = start + end = datetime.utcnow() + + start_timestamp = int(start.timestamp()) + end_timestamp = int(end.timestamp()) + + results: Dict[str, Any] = {} + + time_step = timescales_params[timescale]["timestep"] + + time_format = timescales_params[timescale]["timeformat"] + + def make_query( + db_session: Session, + identifying_column: Column, + statistic_column: Column, + ) -> Query: + + unformated_time_series_subquery = db_session.query( + func.generate_series( + start, + end, + time_step, + ).label("timeseries_points") + ).subquery(name="unformated_time_series_subquery") + + time_series_formated = db_session.query( + func.to_char( + unformated_time_series_subquery.c.timeseries_points, time_format + ).label("timeseries_points") + ) + + time_series_formated_subquery = time_series_formated.subquery( + name="time_series_subquery" + ) + + metric_count_subquery = ( + db_session.query( + func.count(statistic_column).label("count"), + func.to_char( + func.to_timestamp(EthereumBlock.timestamp).cast(Date), time_format + ).label("timeseries_points"), ) - try: - db_session.add(eth_label) - db_session.commit() - except Exception as e: - db_session.rollback() - raise e - except Exception as e: - logger.error( - f"Failed to add addresss label ${contract} to database\n{str(e)}" + .join( + EthereumBlock, + EthereumTransaction.block_number == EthereumBlock.block_number, ) + .filter(identifying_column == address) + .filter(EthereumBlock.timestamp >= start_timestamp) + .filter(EthereumBlock.timestamp <= end_timestamp) + .group_by(text("timeseries_points")) + ).subquery(name="metric_counts") + metrics_time_series = ( + db_session.query( + time_series_formated_subquery.c.timeseries_points.label( + "timeseries_points" + ), + func.coalesce(metric_count_subquery.c.count.label("count"), 0), + ) + .join( + metric_count_subquery, + time_series_formated_subquery.c.timeseries_points + == metric_count_subquery.c.timeseries_points, + isouter=True, + ) + .order_by(text("timeseries_points DESC")) + ) -def read_events(contract: str, abi: Dict[str, Any]): - """ - Read events for provided abi - """ - # moonworm + response_metric: List[Any] = [] + for created_date, count in metrics_time_series: -def read_functions_calls(contract: str, abi: Dict[str, Any]): - """ - Read functions for provided abi - """ + if time_format == "YYYY-MM-DD HH24": + created_date, hour = created_date.split(" ") + response_metric.append( + {"date": created_date, "hour": hour, "count": count} + ) + else: + response_metric.append({"date": created_date, "count": count}) - # token of admin + return response_metric - pass + try: + start_time = time.time() + + results["transactions_out"] = make_query( + db_session, + EthereumTransaction.from_address, + EthereumTransaction.hash, + ) + + print("--- transactions_out %s seconds ---" % (time.time() - start_time)) + + start_time = time.time() + results["transactions_in"] = make_query( + db_session, + EthereumTransaction.to_address, + EthereumTransaction.hash, + ) + + print("--- transactions_in %s seconds ---" % (time.time() - start_time)) + pprint.pprint(results) + + start_time = time.time() + results["value_out"] = make_query( + db_session, + EthereumTransaction.from_address, + EthereumTransaction.value, + ) + print("--- value_out %s seconds ---" % (time.time() - start_time)) + + start_time = time.time() + results["value_in"] = make_query( + db_session, + EthereumTransaction.to_address, + EthereumTransaction.value, + ) + + print("--- value_in %s seconds ---" % (time.time() - start_time)) + + except Exception as err: + print(err) + pass + + return results def generate_data( - session: Session, address: str, timescale: str, functions: List[str], start: Any + db_session: Session, address: str, timescale: str, functions: List[str], start: Any ): # create empty time series @@ -168,7 +246,7 @@ def generate_data( # if end is None: end = datetime.utcnow() - time_series_subquery = session.query( + time_series_subquery = db_session.query( func.generate_series( start, end, @@ -176,14 +254,14 @@ def generate_data( ).label("timeseries_points") ) - # print(time_series_subquery.all()) + print(time_series_subquery.all()) time_series_subquery = time_series_subquery.subquery(name="time_series_subquery") # get distinct tags labels in that range label_requested = ( - session.query(EthereumLabel.label.label("label")) + db_session.query(EthereumLabel.label.label("label")) .filter(EthereumLabel.address == address) .filter(EthereumLabel.label.in_(functions)) .distinct() @@ -191,40 +269,33 @@ def generate_data( if start is not None: label_requested = label_requested.filter( - cast(EthereumLabel.block_timestamp, Date) > start + func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start ) if end is not None: label_requested = label_requested.filter( - cast(EthereumLabel.block_timestamp, Date) < end + func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end ) - # print(functions) - - # print(label_requested) label_requested = label_requested.subquery(name="label_requested") # empty timeseries with tags - empty_time_series_subquery = session.query( + empty_time_series_subquery = db_session.query( func.to_char(time_series_subquery.c.timeseries_points, time_format).label( "timeseries_points" ), label_requested.c.label.label("label"), ) - print(empty_time_series_subquery) - - print(empty_time_series_subquery.all()) - empty_time_series_subquery = empty_time_series_subquery.subquery( name="empty_time_series_subquery" ) # tags count label_counts = ( - session.query( - func.to_char(EthereumLabel.created_at, time_format).label( - "timeseries_points" - ), + db_session.query( + func.to_char( + func.to_timestamp(EthereumLabel.block_timestamp).cast(Date), time_format + ).label("timeseries_points"), func.count(EthereumLabel.id).label("count"), EthereumLabel.label.label("label"), ) @@ -233,9 +304,13 @@ def generate_data( ) if start is not None: - label_counts = label_counts.filter(EthereumLabel.created_at > start) + label_counts = label_counts.filter( + func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start + ) if end is not None: - label_counts = label_counts.filter(EthereumLabel.created_at < end) + label_counts = label_counts.filter( + func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end + ) label_counts_subquery = ( label_counts.group_by( @@ -248,7 +323,7 @@ def generate_data( # Join empty tags_time_series with tags count eg apply tags counts to time series. labels_time_series = ( - session.query( + db_session.query( empty_time_series_subquery.c.timeseries_points.label("timeseries_points"), empty_time_series_subquery.c.label.label("label"), func.coalesce(label_counts_subquery.c.count.label("count"), 0), @@ -290,18 +365,6 @@ def generate_data( return response_labels -def month_stats(type, abi): - pass - - -def week_stats(type, abi): - pass - - -def days_stats(type, abi): - pass - - def crawlers_start(db_session): # read all subscriptions @@ -310,12 +373,23 @@ def crawlers_start(db_session): params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"}, timeout=10, ) + print(f"Subscriptions for processing: {len(required_subscriptions.resources)}") s3_client = boto3.client("s3") + # already proccessd + + already_proccessd = [] + for subscription in required_subscriptions.resources: + if ( + subscription.resource_data["address"] + != "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d" + ): + continue + bucket = subscription.resource_data["bucket"] key = subscription.resource_data["s3_path"] address = subscription.resource_data["address"] @@ -334,6 +408,9 @@ def crawlers_start(db_session): hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest() + if f"{address}/{hash}" in already_proccessd: + continue + s3_data_object = {} abi_functions = [item for item in abi_json if item["type"] == "function"] @@ -345,24 +422,15 @@ def crawlers_start(db_session): print(f"Timescale: {timescale}") - functions_metrics = {} - abi_functions_names = [item["name"] for item in abi_functions] - data = generate_data( + functions_calls_data = generate_data( db_session, address, timescale, abi_functions_names, start=start_date ) - print(data) - print() - - functions_metrics[timescale] = data - - s3_data_object["functions"] = functions_metrics + s3_data_object["functions"] = functions_calls_data # generate data - events_metrics = {} - abi_events_names = [ item["name"] if item["name"] not in lable_filters @@ -370,7 +438,7 @@ def crawlers_start(db_session): for item in abi_events ] - data = generate_data( + events_data = generate_data( db_session, address, timescale, @@ -378,9 +446,17 @@ def crawlers_start(db_session): start=start_date, ) - events_metrics[timescale] = data + s3_data_object["events"] = events_data - s3_data_object["events"] = events_metrics + s3_data_object["metrics"] = generate_metrics( + db_session, + address, + timescale, + abi_events_names, + start=start_date, + ) + + pprint.pprint(s3_data_object) push_statistics( statistics_data=s3_data_object, @@ -389,15 +465,37 @@ def crawlers_start(db_session): bucket=bucket, hash=hash, ) + already_proccessd.append(f"{address}/{hash}") time.sleep(10) -def main(): +def stats_generate_handler(args: argparse.Namespace): with yield_db_session_ctx() as db_session: crawlers_start(db_session) +def main() -> None: + + parser = argparse.ArgumentParser(description="Command Line Interface") + parser.set_defaults(func=lambda _: parser.print_help()) + subcommands = parser.add_subparsers(description="Dashboard worker commands") + + # Statistics parser + parser_statistics = subcommands.add_parser( + "statistics", description="Drone statistics" + ) + parser_statistics.set_defaults(func=lambda _: parser_statistics.print_help()) + subcommands_statistics = parser_statistics.add_subparsers( + description="Drone dashboard statistics commands" + ) + parser_statistics_generate = subcommands_statistics.add_parser( + "generate", description="Generate statistics" + ) + + parser_statistics_generate.set_defaults(func=stats_generate_handler) + + if __name__ == "__main__": main()