Add worker code.

pull/391/head
Andrey Dolgolev 2021-11-11 17:16:21 +02:00
rodzic 321b0e4765
commit e7fe8f9003
1 zmienionych plików z 403 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,403 @@
import json
import argparse
from enum import Enum
import hashlib
from datetime import timedelta, datetime
import logging
from typing import Any, Dict, List
import os
import time
import pprint
import boto3
from bugout import data # type: ignore
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel, EthereumTransaction, EthereumBlock
from bugout.app import Bugout
from bugout.data import BugoutResources
from sqlalchemy.orm import Session
from sqlalchemy import func, text, and_, Date, cast
from datetime import date
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Bugout
lable_filters = {"Transfer": "nft_transfer"}
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
DATA_BUCKET = ""
class TimeScale(Enum):
year = "year"
month = "month"
week = "week"
day = "day"
timescales_params: Dict[str, Dict[str, str]] = {
"year": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"month": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"week": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"day": {"timestep": "1 hours", "timeformat": "YYYY-MM-DD HH24"},
}
timescales_delta: Dict[str, Dict[str, timedelta]] = {
"year": {"timedelta": timedelta(days=365)},
"month": {"timedelta": timedelta(days=27)},
"week": {"timedelta": timedelta(days=6)},
"day": {"timedelta": timedelta(hours=24)},
}
bc = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
MOONSTREAM_ADMIN_ACCESS_TOKEN = os.getenv("MOONSTREAM_ADMIN_ACCESS_TOKEN")
AWS_S3_SMARTCONTRACTS_ABI_PREFIX = os.getenv("AWS_S3_SMARTCONTRACTS_ABI_PREFIX")
def push_statistics(
statistics_data: Dict[str, Any],
subscription: Any,
timescale: str,
bucket: str,
hash: str,
) -> None:
# statistics_prefix = os.environ.get(
# "AWS_S3_DRONES_BUCKET_STATISTICS_PREFIX", ""
# ).rstrip("/")
# if bucket is None:
# logger.warning(
# "AWS_STATS_S3_BUCKET environment variable not defined, skipping storage of search results"
# )
# return
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
s3 = boto3.client("s3")
s3.put_object(
Body=result_bytes,
Bucket=bucket,
Key=result_key,
ContentType="application/json",
Metadata={"drone": "statistics"},
)
# TODO (Andrey) Understand why logger wont show messages some time and put them beside print
# without print here exeption wont show.
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
# import moonworm
def add_events_to_database(db_session: Session, contract: str, events: List[Any]):
"""
Apply events of contract to dataabase
"""
for event in events:
try:
eth_label = EthereumLabel(
label="mooonworm",
address=contract,
label_data={
"type": "event",
"contract": contract,
"tx_hash": event,
},
)
try:
db_session.add(eth_label)
db_session.commit()
except Exception as e:
db_session.rollback()
raise e
except Exception as e:
logger.error(
f"Failed to add addresss label ${contract} to database\n{str(e)}"
)
def read_events(contract: str, abi: Dict[str, Any]):
"""
Read events for provided abi
"""
# moonworm
def read_functions_calls(contract: str, abi: Dict[str, Any]):
"""
Read functions for provided abi
"""
# token of admin
pass
def generate_data(
session: Session, address: str, timescale: str, functions: List[str], start: Any
):
# create empty time series
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
# if end is None:
end = datetime.utcnow()
time_series_subquery = session.query(
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
)
# print(time_series_subquery.all())
time_series_subquery = time_series_subquery.subquery(name="time_series_subquery")
# get distinct tags labels in that range
label_requested = (
session.query(EthereumLabel.label.label("label"))
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label.in_(functions))
.distinct()
)
if start is not None:
label_requested = label_requested.filter(
cast(EthereumLabel.block_timestamp, Date) > start
)
if end is not None:
label_requested = label_requested.filter(
cast(EthereumLabel.block_timestamp, Date) < end
)
# print(functions)
# print(label_requested)
label_requested = label_requested.subquery(name="label_requested")
# empty timeseries with tags
empty_time_series_subquery = session.query(
func.to_char(time_series_subquery.c.timeseries_points, time_format).label(
"timeseries_points"
),
label_requested.c.label.label("label"),
)
print(empty_time_series_subquery)
print(empty_time_series_subquery.all())
empty_time_series_subquery = empty_time_series_subquery.subquery(
name="empty_time_series_subquery"
)
# tags count
label_counts = (
session.query(
func.to_char(EthereumLabel.created_at, time_format).label(
"timeseries_points"
),
func.count(EthereumLabel.id).label("count"),
EthereumLabel.label.label("label"),
)
.filter(EthereumLabel.label.in_(functions))
.filter(EthereumLabel.address == address)
)
if start is not None:
label_counts = label_counts.filter(EthereumLabel.created_at > start)
if end is not None:
label_counts = label_counts.filter(EthereumLabel.created_at < end)
label_counts_subquery = (
label_counts.group_by(
text("timeseries_points"),
EthereumLabel.label,
)
.order_by(text("timeseries_points desc"))
.subquery(name="label_counts")
)
# Join empty tags_time_series with tags count eg apply tags counts to time series.
labels_time_series = (
session.query(
empty_time_series_subquery.c.timeseries_points.label("timeseries_points"),
empty_time_series_subquery.c.label.label("label"),
func.coalesce(label_counts_subquery.c.count.label("count"), 0),
)
.join(
label_counts_subquery,
and_(
empty_time_series_subquery.c.label == label_counts_subquery.c.label,
empty_time_series_subquery.c.timeseries_points
== label_counts_subquery.c.timeseries_points,
),
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
response_labels: Dict[Any, Any] = {}
for created_date, label, count in labels_time_series:
if not response_labels.get(label):
response_labels[label] = []
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_labels[label].append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_labels[label].append({"date": created_date, "count": count})
else:
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_labels[label].append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_labels[label].append({"date": created_date, "count": count})
return response_labels
def month_stats(type, abi):
pass
def week_stats(type, abi):
pass
def days_stats(type, abi):
pass
def crawlers_start(db_session):
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
for subscription in required_subscriptions.resources:
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
# print(abi_json)
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
for timescale in [timescale.value for timescale in TimeScale]:
start_date = datetime.utcnow() - timescales_delta[timescale]["timedelta"]
print(f"Timescale: {timescale}")
functions_metrics = {}
abi_functions_names = [item["name"] for item in abi_functions]
data = generate_data(
db_session, address, timescale, abi_functions_names, start=start_date
)
print(data)
print()
functions_metrics[timescale] = data
s3_data_object["functions"] = functions_metrics
# generate data
events_metrics = {}
abi_events_names = [
item["name"]
if item["name"] not in lable_filters
else lable_filters[item["name"]]
for item in abi_events
]
data = generate_data(
db_session,
address,
timescale,
abi_events_names,
start=start_date,
)
events_metrics[timescale] = data
s3_data_object["events"] = events_metrics
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
timescale=timescale,
bucket=bucket,
hash=hash,
)
time.sleep(10)
def main():
with yield_db_session_ctx() as db_session:
crawlers_start(db_session)
if __name__ == "__main__":
main()