Merge branch 'main' into refresh-dashboard-stats

pull/516/head
Andrey Dolgolev 2022-01-14 12:00:27 +02:00
commit d221ffd6be
2 zmienionych plików z 4 dodań i 160 usunięć

Wyświetl plik

@ -153,7 +153,7 @@ chmod 644 "${SCRIPT_DIR}/${POLYGON_STATISTICS_SERVICE_FILE}" "${SCRIPT_DIR}/${PO
cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_STATISTICS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_TIMER_FILE}" "/etc/systemd/system/${POLYGON_STATISTICS_TIMER_FILE}"
systemctl daemon-reload
systemctl restart "${POLYGON_STATISTICS_TIMER_FILE}"
systemctl restart --no-block "${POLYGON_STATISTICS_TIMER_FILE}"
# echo
# echo

Wyświetl plik

@ -104,152 +104,6 @@ def push_statistics(
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
def generate_metrics(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
metrics: List[str],
start: Any,
):
"""
Generage metrics
"""
block_model = get_block_model(blockchain_type)
transaction_model = get_transaction_model(blockchain_type)
start = start
end = datetime.utcnow()
start_timestamp = int(start.timestamp())
end_timestamp = int(end.timestamp())
results: Dict[str, Any] = {}
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
def make_query(
db_session: Session,
identifying_column: Column,
statistic_column: Column,
aggregate_func: Callable,
) -> Query:
unformated_time_series_subquery = db_session.query(
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
).subquery(name="unformated_time_series_subquery")
time_series_formated = db_session.query(
func.to_char(
unformated_time_series_subquery.c.timeseries_points, time_format
).label("timeseries_points")
)
time_series_formated_subquery = time_series_formated.subquery(
name="time_series_subquery"
)
metric_count_subquery = (
db_session.query(
aggregate_func(statistic_column).label("count"),
func.to_char(
func.to_timestamp(block_model.timestamp), time_format
).label("timeseries_points"),
)
.join(
block_model,
transaction_model.block_number == block_model.block_number,
)
.filter(identifying_column == address)
.filter(block_model.timestamp >= start_timestamp)
.filter(block_model.timestamp <= end_timestamp)
.group_by(text("timeseries_points"))
).subquery(name="metric_counts")
metrics_time_series = (
db_session.query(
time_series_formated_subquery.c.timeseries_points.label(
"timeseries_points"
),
func.coalesce(metric_count_subquery.c.count.label("count"), 0),
)
.join(
metric_count_subquery,
time_series_formated_subquery.c.timeseries_points
== metric_count_subquery.c.timeseries_points,
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
response_metric: List[Any] = []
for created_date, count in metrics_time_series:
if not isinstance(count, int):
count = int(count)
response_metric.append({"date": created_date, "count": count})
return response_metric
try:
if "transactions_out" in metrics:
start_time = time.time()
results["transactions_out"] = make_query(
db_session,
transaction_model.from_address,
transaction_model.hash,
func.count,
)
print("--- transactions_out %s seconds ---" % (time.time() - start_time))
if "transactions_in" in metrics:
start_time = time.time()
results["transactions_in"] = make_query(
db_session,
transaction_model.to_address,
transaction_model.hash,
func.count,
)
print("--- transactions_in %s seconds ---" % (time.time() - start_time))
if "value_out" in metrics:
start_time = time.time()
results["value_out"] = make_query(
db_session,
transaction_model.from_address,
transaction_model.value,
func.sum,
)
print("--- value_out %s seconds ---" % (time.time() - start_time))
if "value_in" in metrics:
start_time = time.time()
results["value_in"] = make_query(
db_session,
transaction_model.to_address,
transaction_model.value,
func.sum,
)
print("--- value_in %s seconds ---" % (time.time() - start_time))
except Exception as err:
print(err)
pass
return results
def generate_data(
db_session: Session,
blockchain_type: AvailableBlockchainType,
@ -651,10 +505,6 @@ def stats_generate_handler(args: argparse.Namespace):
):
crawler_label = "moonworm"
generic = dashboard_subscription_filters["generic"]
generic_metrics_names = [item["name"] for item in generic]
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
@ -790,14 +640,7 @@ def stats_generate_handler(args: argparse.Namespace):
s3_data_object["events"] = events_data
s3_data_object["generic"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=generic_metrics_names,
start=start_date,
)
s3_data_object["generic"] = {}
push_statistics(
statistics_data=s3_data_object,
@ -807,6 +650,7 @@ def stats_generate_handler(args: argparse.Namespace):
dashboard_id=dashboard.id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
@ -817,7 +661,7 @@ def stats_generate_handler(args: argparse.Namespace):
f"dashboard:{dashboard.id}",
],
)
print(err)
logger.error(err)
reporter.custom_report(
title=f"Dashboard stats generated.",