Add metrics timeseries.

pull/391/head
Andrey Dolgolev 2021-11-13 16:30:13 +02:00
rodzic e7fe8f9003
commit 362b266496
1 zmienionych plików z 196 dodań i 98 usunięć

Wyświetl plik

@ -4,7 +4,7 @@ from enum import Enum
import hashlib
from datetime import timedelta, datetime
import logging
from typing import Any, Dict, List
from typing import Any, Dict, List, Callable
import os
import time
import pprint
@ -17,8 +17,8 @@ from moonstreamdb.models import EthereumLabel, EthereumTransaction, EthereumBloc
from bugout.app import Bugout
from bugout.data import BugoutResources
from sqlalchemy.orm import Session
from sqlalchemy import func, text, and_, Date, cast
from sqlalchemy.orm import Session, Query
from sqlalchemy import func, text, and_, Date, cast, Column, desc
from datetime import date
@ -66,6 +66,7 @@ bc = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
MOONSTREAM_ADMIN_ACCESS_TOKEN = os.getenv("MOONSTREAM_ADMIN_ACCESS_TOKEN")
@ -81,15 +82,6 @@ def push_statistics(
hash: str,
) -> None:
# statistics_prefix = os.environ.get(
# "AWS_S3_DRONES_BUCKET_STATISTICS_PREFIX", ""
# ).rstrip("/")
# if bucket is None:
# logger.warning(
# "AWS_STATS_S3_BUCKET environment variable not defined, skipping storage of search results"
# )
# return
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
@ -102,61 +94,147 @@ def push_statistics(
Metadata={"drone": "statistics"},
)
# TODO (Andrey) Understand why logger wont show messages some time and put them beside print
# without print here exeption wont show.
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
# import moonworm
def add_events_to_database(db_session: Session, contract: str, events: List[Any]):
"""
Apply events of contract to dataabase
def generate_metrics(
db_session: Session, address: str, timescale: str, metrics: List[str], start: Any
):
"""
for event in events:
try:
eth_label = EthereumLabel(
label="mooonworm",
address=contract,
label_data={
"type": "event",
"contract": contract,
"tx_hash": event,
},
Generage metrics
"""
start = start
end = datetime.utcnow()
start_timestamp = int(start.timestamp())
end_timestamp = int(end.timestamp())
results: Dict[str, Any] = {}
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
def make_query(
db_session: Session,
identifying_column: Column,
statistic_column: Column,
) -> Query:
unformated_time_series_subquery = db_session.query(
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
).subquery(name="unformated_time_series_subquery")
time_series_formated = db_session.query(
func.to_char(
unformated_time_series_subquery.c.timeseries_points, time_format
).label("timeseries_points")
)
time_series_formated_subquery = time_series_formated.subquery(
name="time_series_subquery"
)
metric_count_subquery = (
db_session.query(
func.count(statistic_column).label("count"),
func.to_char(
func.to_timestamp(EthereumBlock.timestamp).cast(Date), time_format
).label("timeseries_points"),
)
try:
db_session.add(eth_label)
db_session.commit()
except Exception as e:
db_session.rollback()
raise e
except Exception as e:
logger.error(
f"Failed to add addresss label ${contract} to database\n{str(e)}"
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(identifying_column == address)
.filter(EthereumBlock.timestamp >= start_timestamp)
.filter(EthereumBlock.timestamp <= end_timestamp)
.group_by(text("timeseries_points"))
).subquery(name="metric_counts")
metrics_time_series = (
db_session.query(
time_series_formated_subquery.c.timeseries_points.label(
"timeseries_points"
),
func.coalesce(metric_count_subquery.c.count.label("count"), 0),
)
.join(
metric_count_subquery,
time_series_formated_subquery.c.timeseries_points
== metric_count_subquery.c.timeseries_points,
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
def read_events(contract: str, abi: Dict[str, Any]):
"""
Read events for provided abi
"""
# moonworm
response_metric: List[Any] = []
for created_date, count in metrics_time_series:
def read_functions_calls(contract: str, abi: Dict[str, Any]):
"""
Read functions for provided abi
"""
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_metric.append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_metric.append({"date": created_date, "count": count})
# token of admin
return response_metric
pass
try:
start_time = time.time()
results["transactions_out"] = make_query(
db_session,
EthereumTransaction.from_address,
EthereumTransaction.hash,
)
print("--- transactions_out %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["transactions_in"] = make_query(
db_session,
EthereumTransaction.to_address,
EthereumTransaction.hash,
)
print("--- transactions_in %s seconds ---" % (time.time() - start_time))
pprint.pprint(results)
start_time = time.time()
results["value_out"] = make_query(
db_session,
EthereumTransaction.from_address,
EthereumTransaction.value,
)
print("--- value_out %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["value_in"] = make_query(
db_session,
EthereumTransaction.to_address,
EthereumTransaction.value,
)
print("--- value_in %s seconds ---" % (time.time() - start_time))
except Exception as err:
print(err)
pass
return results
def generate_data(
session: Session, address: str, timescale: str, functions: List[str], start: Any
db_session: Session, address: str, timescale: str, functions: List[str], start: Any
):
# create empty time series
@ -168,7 +246,7 @@ def generate_data(
# if end is None:
end = datetime.utcnow()
time_series_subquery = session.query(
time_series_subquery = db_session.query(
func.generate_series(
start,
end,
@ -176,14 +254,14 @@ def generate_data(
).label("timeseries_points")
)
# print(time_series_subquery.all())
print(time_series_subquery.all())
time_series_subquery = time_series_subquery.subquery(name="time_series_subquery")
# get distinct tags labels in that range
label_requested = (
session.query(EthereumLabel.label.label("label"))
db_session.query(EthereumLabel.label.label("label"))
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label.in_(functions))
.distinct()
@ -191,40 +269,33 @@ def generate_data(
if start is not None:
label_requested = label_requested.filter(
cast(EthereumLabel.block_timestamp, Date) > start
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start
)
if end is not None:
label_requested = label_requested.filter(
cast(EthereumLabel.block_timestamp, Date) < end
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end
)
# print(functions)
# print(label_requested)
label_requested = label_requested.subquery(name="label_requested")
# empty timeseries with tags
empty_time_series_subquery = session.query(
empty_time_series_subquery = db_session.query(
func.to_char(time_series_subquery.c.timeseries_points, time_format).label(
"timeseries_points"
),
label_requested.c.label.label("label"),
)
print(empty_time_series_subquery)
print(empty_time_series_subquery.all())
empty_time_series_subquery = empty_time_series_subquery.subquery(
name="empty_time_series_subquery"
)
# tags count
label_counts = (
session.query(
func.to_char(EthereumLabel.created_at, time_format).label(
"timeseries_points"
),
db_session.query(
func.to_char(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date), time_format
).label("timeseries_points"),
func.count(EthereumLabel.id).label("count"),
EthereumLabel.label.label("label"),
)
@ -233,9 +304,13 @@ def generate_data(
)
if start is not None:
label_counts = label_counts.filter(EthereumLabel.created_at > start)
label_counts = label_counts.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start
)
if end is not None:
label_counts = label_counts.filter(EthereumLabel.created_at < end)
label_counts = label_counts.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end
)
label_counts_subquery = (
label_counts.group_by(
@ -248,7 +323,7 @@ def generate_data(
# Join empty tags_time_series with tags count eg apply tags counts to time series.
labels_time_series = (
session.query(
db_session.query(
empty_time_series_subquery.c.timeseries_points.label("timeseries_points"),
empty_time_series_subquery.c.label.label("label"),
func.coalesce(label_counts_subquery.c.count.label("count"), 0),
@ -290,18 +365,6 @@ def generate_data(
return response_labels
def month_stats(type, abi):
pass
def week_stats(type, abi):
pass
def days_stats(type, abi):
pass
def crawlers_start(db_session):
# read all subscriptions
@ -310,12 +373,23 @@ def crawlers_start(db_session):
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# already proccessd
already_proccessd = []
for subscription in required_subscriptions.resources:
if (
subscription.resource_data["address"]
!= "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d"
):
continue
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
@ -334,6 +408,9 @@ def crawlers_start(db_session):
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_proccessd:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
@ -345,24 +422,15 @@ def crawlers_start(db_session):
print(f"Timescale: {timescale}")
functions_metrics = {}
abi_functions_names = [item["name"] for item in abi_functions]
data = generate_data(
functions_calls_data = generate_data(
db_session, address, timescale, abi_functions_names, start=start_date
)
print(data)
print()
functions_metrics[timescale] = data
s3_data_object["functions"] = functions_metrics
s3_data_object["functions"] = functions_calls_data
# generate data
events_metrics = {}
abi_events_names = [
item["name"]
if item["name"] not in lable_filters
@ -370,7 +438,7 @@ def crawlers_start(db_session):
for item in abi_events
]
data = generate_data(
events_data = generate_data(
db_session,
address,
timescale,
@ -378,9 +446,17 @@ def crawlers_start(db_session):
start=start_date,
)
events_metrics[timescale] = data
s3_data_object["events"] = events_data
s3_data_object["events"] = events_metrics
s3_data_object["metrics"] = generate_metrics(
db_session,
address,
timescale,
abi_events_names,
start=start_date,
)
pprint.pprint(s3_data_object)
push_statistics(
statistics_data=s3_data_object,
@ -389,15 +465,37 @@ def crawlers_start(db_session):
bucket=bucket,
hash=hash,
)
already_proccessd.append(f"{address}/{hash}")
time.sleep(10)
def main():
def stats_generate_handler(args: argparse.Namespace):
with yield_db_session_ctx() as db_session:
crawlers_start(db_session)
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers(description="Dashboard worker commands")
# Statistics parser
parser_statistics = subcommands.add_parser(
"statistics", description="Drone statistics"
)
parser_statistics.set_defaults(func=lambda _: parser_statistics.print_help())
subcommands_statistics = parser_statistics.add_subparsers(
description="Drone dashboard statistics commands"
)
parser_statistics_generate = subcommands_statistics.add_parser(
"generate", description="Generate statistics"
)
parser_statistics_generate.set_defaults(func=stats_generate_handler)
if __name__ == "__main__":
main()