Merge branch 'ABI-Defined-Dashboars' into subscriptions-and-resource-fixes

pull/402/head
Andrey Dolgolev 2021-11-14 08:51:37 +02:00
commit 143fcea6c6
11 zmienionych plików z 678 dodań i 6 usunięć

Wyświetl plik

@ -0,0 +1,9 @@
#!/usr/bin/env sh
# Expects access to Python environment with the requirements for this project installed.
set -e
MOONSTREAM_HOST="${MOONSTREAM_HOST:-0.0.0.0}"
MOONSTREAM_PORT="${MOONSTREAM_PORT:-7491}"
uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" mooncrawl.api:app --reload

Wyświetl plik

@ -0,0 +1,81 @@
"""
The Mooncrawl HTTP API
"""
import logging
import time
from typing import Dict
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from . import data
from .middleware import MoonstreamHTTPException
from .settings import DOCS_TARGET_PATH, ORIGINS
from .version import MOONCRAWL_VERSION
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
tags_metadata = [
{"name": "jobs", "description": "Trigger crawler jobs."},
{"name": "time", "description": "Server timestamp endpoints."},
]
app = FastAPI(
title=f"Mooncrawl HTTP API",
description="Mooncrawl API endpoints.",
version=MOONCRAWL_VERSION,
openapi_tags=tags_metadata,
openapi_url="/openapi.json",
docs_url=None,
redoc_url=f"/{DOCS_TARGET_PATH}",
)
app.add_middleware(
CORSMiddleware,
allow_origins=ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/ping", response_model=data.PingResponse)
async def ping_handler() -> data.PingResponse:
"""
Check server status.
"""
return data.PingResponse(status="ok")
@app.get("/version", response_model=data.VersionResponse)
async def version_handler() -> data.VersionResponse:
"""
Get server version.
"""
return data.VersionResponse(version=MOONCRAWL_VERSION)
@app.get("/now", tags=["time"])
async def now_handler() -> data.NowResponse:
"""
Get server current time.
"""
return data.NowResponse(epoch_time=time.time())
@app.get("/jobs/stats_update", tags=["jobs"])
async def status_handler():
"""
Find latest crawlers records with creation timestamp:
- ethereum_txpool
- ethereum_trending
"""
try:
pass
except Exception as e:
logger.error(f"Unhandled status exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
return

Wyświetl plik

@ -5,8 +5,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
EthereumBlock,
EthereumLabel,
EthereumTransaction,
PolygonBlock,
PolygonLabel,
PolygonTransaction,
)
from psycopg2.errors import UniqueViolation # type: ignore
@ -78,6 +80,24 @@ def get_block_model(
return block_model
def get_label_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumLabel, PolygonLabel]]:
"""
Depends on provided blockchain type: Ethereum or Polygon,
set proper block label model: EthereumLabel or PolygonLabel.
"""
label_model: Type[Union[EthereumLabel, PolygonLabel]]
if blockchain_type == AvailableBlockchainType.ETHEREUM:
label_model = EthereumLabel
elif blockchain_type == AvailableBlockchainType.POLYGON:
label_model = PolygonLabel
else:
raise Exception("Unsupported blockchain type provided")
return label_model
def get_transaction_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumTransaction, PolygonTransaction]]:

Wyświetl plik

@ -2,6 +2,8 @@ from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
class AvailableBlockchainType(Enum):
ETHEREUM = "ethereum"
@ -14,3 +16,27 @@ class DateRange:
end_time: datetime
include_start: bool
include_end: bool
class PingResponse(BaseModel):
"""
Schema for ping response
"""
status: str
class VersionResponse(BaseModel):
"""
Schema for responses on /version endpoint
"""
version: str
class NowResponse(BaseModel):
"""
Schema for responses on /now endpoint
"""
epoch_time: float

Wyświetl plik

@ -0,0 +1,26 @@
import logging
from typing import Any, Dict, Optional
from fastapi import HTTPException
from .reporter import reporter
logger = logging.getLogger(__name__)
class MoonstreamHTTPException(HTTPException):
"""
Extended HTTPException to handle 500 Internal server errors
and send crash reports.
"""
def __init__(
self,
status_code: int,
detail: Any = None,
headers: Optional[Dict[str, Any]] = None,
internal_error: Exception = None,
):
super().__init__(status_code, detail, headers)
if internal_error is not None:
reporter.error_report(internal_error)

Wyświetl plik

@ -1,9 +1,26 @@
import os
from typing import cast
from bugout.app import Bugout
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN")
# Origin
RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS")
if RAW_ORIGINS is None:
raise ValueError(
"MOONSTREAM_CORS_ALLOWED_ORIGINS environment variable must be set (comma-separated list of CORS allowed origins)"
)
ORIGINS = RAW_ORIGINS.split(",")
# OpenAPI
DOCS_TARGET_PATH = "docs"
# Geth connection address
MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get(
"MOONSTREAM_NODE_ETHEREUM_IPC_ADDR", "127.0.0.1"

Wyświetl plik

@ -0,0 +1,488 @@
"""
Generates dashboard.
"""
import argparse
import hashlib
import json
import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
import boto3 # type: ignore
from bugout.data import BugoutResources
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, Date, and_, func, text
from sqlalchemy.orm import Query, Session
from ..blockchain import get_block_model, get_label_model, get_transaction_model
from ..data import AvailableBlockchainType
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
lable_filters = {"Transfer": "nft_transfer"}
class TimeScale(Enum):
year = "year"
month = "month"
week = "week"
day = "day"
timescales_params: Dict[str, Dict[str, str]] = {
"year": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"month": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"week": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"},
"day": {"timestep": "1 hours", "timeformat": "YYYY-MM-DD HH24"},
}
timescales_delta: Dict[str, Dict[str, timedelta]] = {
"year": {"timedelta": timedelta(days=365)},
"month": {"timedelta": timedelta(days=27)},
"week": {"timedelta": timedelta(days=6)},
"day": {"timedelta": timedelta(hours=24)},
}
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
def push_statistics(
statistics_data: Dict[str, Any],
subscription: Any,
timescale: str,
bucket: str,
hash: str,
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
s3 = boto3.client("s3")
s3.put_object(
Body=result_bytes,
Bucket=bucket,
Key=result_key,
ContentType="application/json",
Metadata={"drone": "statistics"},
)
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
def generate_metrics(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
metrics: List[str],
start: Any,
):
"""
Generage metrics
"""
block_model = get_block_model(blockchain_type)
transaction_model = get_transaction_model(blockchain_type)
start = start
end = datetime.utcnow()
start_timestamp = int(start.timestamp())
end_timestamp = int(end.timestamp())
results: Dict[str, Any] = {}
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
def make_query(
db_session: Session,
identifying_column: Column,
statistic_column: Column,
) -> Query:
unformated_time_series_subquery = db_session.query(
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
).subquery(name="unformated_time_series_subquery")
time_series_formated = db_session.query(
func.to_char(
unformated_time_series_subquery.c.timeseries_points, time_format
).label("timeseries_points")
)
time_series_formated_subquery = time_series_formated.subquery(
name="time_series_subquery"
)
metric_count_subquery = (
db_session.query(
func.count(statistic_column).label("count"),
func.to_char(
func.to_timestamp(block_model.timestamp).cast(Date), time_format
).label("timeseries_points"),
)
.join(
block_model,
transaction_model.block_number == block_model.block_number,
)
.filter(identifying_column == address)
.filter(block_model.timestamp >= start_timestamp)
.filter(block_model.timestamp <= end_timestamp)
.group_by(text("timeseries_points"))
).subquery(name="metric_counts")
metrics_time_series = (
db_session.query(
time_series_formated_subquery.c.timeseries_points.label(
"timeseries_points"
),
func.coalesce(metric_count_subquery.c.count.label("count"), 0),
)
.join(
metric_count_subquery,
time_series_formated_subquery.c.timeseries_points
== metric_count_subquery.c.timeseries_points,
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
response_metric: List[Any] = []
for created_date, count in metrics_time_series:
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_metric.append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_metric.append({"date": created_date, "count": count})
return response_metric
try:
start_time = time.time()
results["transactions_out"] = make_query(
db_session,
transaction_model.from_address,
transaction_model.hash,
)
print("--- transactions_out %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["transactions_in"] = make_query(
db_session,
transaction_model.to_address,
transaction_model.hash,
)
print("--- transactions_in %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["value_out"] = make_query(
db_session,
transaction_model.from_address,
transaction_model.value,
)
print("--- value_out %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["value_in"] = make_query(
db_session,
transaction_model.to_address,
transaction_model.value,
)
print("--- value_in %s seconds ---" % (time.time() - start_time))
except Exception as err:
print(err)
pass
return results
def generate_data(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
functions: List[str],
start: Any,
):
label_model = get_label_model(blockchain_type)
# create empty time series
time_step = timescales_params[timescale]["timestep"]
time_format = timescales_params[timescale]["timeformat"]
# if end is None:
end = datetime.utcnow()
time_series_subquery = db_session.query(
func.generate_series(
start,
end,
time_step,
).label("timeseries_points")
)
print(time_series_subquery.all())
time_series_subquery = time_series_subquery.subquery(name="time_series_subquery")
# get distinct tags labels in that range
label_requested = (
db_session.query(label_model.label.label("label"))
.filter(label_model.address == address)
.filter(label_model.label.in_(functions))
.distinct()
)
if start is not None:
label_requested = label_requested.filter(
func.to_timestamp(label_model.block_timestamp).cast(Date) > start
)
if end is not None:
label_requested = label_requested.filter(
func.to_timestamp(label_model.block_timestamp).cast(Date) < end
)
label_requested = label_requested.subquery(name="label_requested")
# empty timeseries with tags
empty_time_series_subquery = db_session.query(
func.to_char(time_series_subquery.c.timeseries_points, time_format).label(
"timeseries_points"
),
label_requested.c.label.label("label"),
)
empty_time_series_subquery = empty_time_series_subquery.subquery(
name="empty_time_series_subquery"
)
# tags count
label_counts = (
db_session.query(
func.to_char(
func.to_timestamp(label_model.block_timestamp).cast(Date), time_format
).label("timeseries_points"),
func.count(label_model.id).label("count"),
label_model.label.label("label"),
)
.filter(label_model.label.in_(functions))
.filter(label_model.address == address)
)
if start is not None:
label_counts = label_counts.filter(
func.to_timestamp(label_model.block_timestamp).cast(Date) > start
)
if end is not None:
label_counts = label_counts.filter(
func.to_timestamp(label_model.block_timestamp).cast(Date) < end
)
label_counts_subquery = (
label_counts.group_by(
text("timeseries_points"),
label_model.label,
)
.order_by(text("timeseries_points desc"))
.subquery(name="label_counts")
)
# Join empty tags_time_series with tags count eg apply tags counts to time series.
labels_time_series = (
db_session.query(
empty_time_series_subquery.c.timeseries_points.label("timeseries_points"),
empty_time_series_subquery.c.label.label("label"),
func.coalesce(label_counts_subquery.c.count.label("count"), 0),
)
.join(
label_counts_subquery,
and_(
empty_time_series_subquery.c.label == label_counts_subquery.c.label,
empty_time_series_subquery.c.timeseries_points
== label_counts_subquery.c.timeseries_points,
),
isouter=True,
)
.order_by(text("timeseries_points DESC"))
)
response_labels: Dict[Any, Any] = {}
for created_date, label, count in labels_time_series:
if not response_labels.get(label):
response_labels[label] = []
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_labels[label].append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_labels[label].append({"date": created_date, "count": count})
else:
if time_format == "YYYY-MM-DD HH24":
created_date, hour = created_date.split(" ")
response_labels[label].append(
{"date": created_date, "hour": hour, "count": count}
)
else:
response_labels[label].append({"date": created_date, "count": count})
return response_labels
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# Already processed
already_processed = []
for subscription in required_subscriptions.resources:
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
for timescale in [timescale.value for timescale in TimeScale]:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
abi_functions_names = [item["name"] for item in abi_functions]
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_functions_names,
start=start_date,
)
s3_data_object["functions"] = functions_calls_data
# generate data
abi_events_names = [
item["name"]
if item["name"] not in lable_filters
else lable_filters[item["name"]]
for item in abi_events
]
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_events_names,
start=start_date,
)
s3_data_object["events"] = events_data
s3_data_object["metrics"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=abi_events_names,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
timescale=timescale,
bucket=bucket,
hash=hash,
)
already_processed.append(f"{address}/{hash}")
time.sleep(10)
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers(
description="Drone dashboard statistics commands"
)
# Statistics parser
parser_generate = subcommands.add_parser(
"generate", description="Generate statistics"
)
parser_generate.set_defaults(func=lambda _: parser_generate.print_help())
parser_generate.add_argument(
"--blockchain",
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser_generate.set_defaults(func=stats_generate_handler)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.1.0"
MOONCRAWL_VERSION = "0.1.1"

Wyświetl plik

@ -1,4 +1,5 @@
# Path to IPC socket to use for web3 connections
export BUGOUT_BROOD_URL="https://auth.bugout.dev"
export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1"
export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545"
export MOONSTREAM_NODE_POLYGON_IPC_ADDR="127.0.0.1"

Wyświetl plik

@ -36,11 +36,14 @@ setup(
"boto3",
"bugout",
"chardet",
"fastapi",
"moonstreamdb",
"humbug",
"pydantic",
"python-dateutil",
"requests",
"tqdm",
"uvicorn",
"web3",
],
extras_require={
@ -50,11 +53,12 @@ setup(
entry_points={
"console_scripts": [
"crawler=mooncrawl.crawler:main",
"esd=mooncrawl.esd:main",
"identity=mooncrawl.identity:main",
"etherscan=mooncrawl.etherscan:main",
"nft=mooncrawl.nft.cli:main",
"contractcrawler=mooncrawl.contract.cli:main",
"esd=mooncrawl.esd:main",
"etherscan=mooncrawl.etherscan:main",
"identity=mooncrawl.identity:main",
"nft=mooncrawl.nft.cli:main",
"statistics=mooncrawl.stats_worker.dashboard:main",
]
},
)