diff --git a/crawlers/mooncrawl/dev.sh b/crawlers/mooncrawl/dev.sh new file mode 100755 index 00000000..7c0e3345 --- /dev/null +++ b/crawlers/mooncrawl/dev.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env sh + +# Expects access to Python environment with the requirements for this project installed. +set -e + +MOONSTREAM_HOST="${MOONSTREAM_HOST:-0.0.0.0}" +MOONSTREAM_PORT="${MOONSTREAM_PORT:-7491}" + +uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" mooncrawl.api:app --reload diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py new file mode 100644 index 00000000..68aed72d --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -0,0 +1,81 @@ +""" +The Mooncrawl HTTP API +""" +import logging +import time +from typing import Dict + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from . import data +from .middleware import MoonstreamHTTPException +from .settings import DOCS_TARGET_PATH, ORIGINS +from .version import MOONCRAWL_VERSION + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +tags_metadata = [ + {"name": "jobs", "description": "Trigger crawler jobs."}, + {"name": "time", "description": "Server timestamp endpoints."}, +] + +app = FastAPI( + title=f"Mooncrawl HTTP API", + description="Mooncrawl API endpoints.", + version=MOONCRAWL_VERSION, + openapi_tags=tags_metadata, + openapi_url="/openapi.json", + docs_url=None, + redoc_url=f"/{DOCS_TARGET_PATH}", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/ping", response_model=data.PingResponse) +async def ping_handler() -> data.PingResponse: + """ + Check server status. + """ + return data.PingResponse(status="ok") + + +@app.get("/version", response_model=data.VersionResponse) +async def version_handler() -> data.VersionResponse: + """ + Get server version. + """ + return data.VersionResponse(version=MOONCRAWL_VERSION) + + +@app.get("/now", tags=["time"]) +async def now_handler() -> data.NowResponse: + """ + Get server current time. + """ + return data.NowResponse(epoch_time=time.time()) + + +@app.get("/jobs/stats_update", tags=["jobs"]) +async def status_handler(): + """ + Find latest crawlers records with creation timestamp: + - ethereum_txpool + - ethereum_trending + """ + try: + pass + except Exception as e: + logger.error(f"Unhandled status exception, error: {e}") + raise MoonstreamHTTPException(status_code=500) + + return diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index 0aef1ab4..ded7a145 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -5,8 +5,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from moonstreamdb.db import yield_db_session, yield_db_session_ctx from moonstreamdb.models import ( EthereumBlock, + EthereumLabel, EthereumTransaction, PolygonBlock, + PolygonLabel, PolygonTransaction, ) from psycopg2.errors import UniqueViolation # type: ignore @@ -78,6 +80,24 @@ def get_block_model( return block_model +def get_label_model( + blockchain_type: AvailableBlockchainType, +) -> Type[Union[EthereumLabel, PolygonLabel]]: + """ + Depends on provided blockchain type: Ethereum or Polygon, + set proper block label model: EthereumLabel or PolygonLabel. + """ + label_model: Type[Union[EthereumLabel, PolygonLabel]] + if blockchain_type == AvailableBlockchainType.ETHEREUM: + label_model = EthereumLabel + elif blockchain_type == AvailableBlockchainType.POLYGON: + label_model = PolygonLabel + else: + raise Exception("Unsupported blockchain type provided") + + return label_model + + def get_transaction_model( blockchain_type: AvailableBlockchainType, ) -> Type[Union[EthereumTransaction, PolygonTransaction]]: diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 04e7235a..0420d5cf 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum +from pydantic import BaseModel + class AvailableBlockchainType(Enum): ETHEREUM = "ethereum" @@ -14,3 +16,27 @@ class DateRange: end_time: datetime include_start: bool include_end: bool + + +class PingResponse(BaseModel): + """ + Schema for ping response + """ + + status: str + + +class VersionResponse(BaseModel): + """ + Schema for responses on /version endpoint + """ + + version: str + + +class NowResponse(BaseModel): + """ + Schema for responses on /now endpoint + """ + + epoch_time: float diff --git a/crawlers/mooncrawl/mooncrawl/middleware.py b/crawlers/mooncrawl/mooncrawl/middleware.py new file mode 100644 index 00000000..a722df0c --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/middleware.py @@ -0,0 +1,26 @@ +import logging +from typing import Any, Dict, Optional + +from fastapi import HTTPException + +from .reporter import reporter + +logger = logging.getLogger(__name__) + + +class MoonstreamHTTPException(HTTPException): + """ + Extended HTTPException to handle 500 Internal server errors + and send crash reports. + """ + + def __init__( + self, + status_code: int, + detail: Any = None, + headers: Optional[Dict[str, Any]] = None, + internal_error: Exception = None, + ): + super().__init__(status_code, detail, headers) + if internal_error is not None: + reporter.error_report(internal_error) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index b2684104..bf27068b 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -1,9 +1,26 @@ import os from typing import cast +from bugout.app import Bugout + # Bugout +BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") +BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") +bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) + HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN") +# Origin +RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS") +if RAW_ORIGINS is None: + raise ValueError( + "MOONSTREAM_CORS_ALLOWED_ORIGINS environment variable must be set (comma-separated list of CORS allowed origins)" + ) +ORIGINS = RAW_ORIGINS.split(",") + +# OpenAPI +DOCS_TARGET_PATH = "docs" + # Geth connection address MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get( "MOONSTREAM_NODE_ETHEREUM_IPC_ADDR", "127.0.0.1" diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/__init__.py b/crawlers/mooncrawl/mooncrawl/stats_worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py new file mode 100644 index 00000000..a9f896c4 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -0,0 +1,488 @@ +""" +Generates dashboard. +""" +import argparse +import hashlib +import json +import logging +import time +from datetime import datetime, timedelta +from enum import Enum +from typing import Any, Callable, Dict, List + +import boto3 # type: ignore +from bugout.data import BugoutResources +from moonstreamdb.db import yield_db_session_ctx +from sqlalchemy import Column, Date, and_, func, text +from sqlalchemy.orm import Query, Session + +from ..blockchain import get_block_model, get_label_model, get_transaction_model +from ..data import AvailableBlockchainType +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN +from ..settings import bugout_client as bc + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +lable_filters = {"Transfer": "nft_transfer"} + + +class TimeScale(Enum): + year = "year" + month = "month" + week = "week" + day = "day" + + +timescales_params: Dict[str, Dict[str, str]] = { + "year": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"}, + "month": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"}, + "week": {"timestep": "1 day", "timeformat": "YYYY-MM-DD"}, + "day": {"timestep": "1 hours", "timeformat": "YYYY-MM-DD HH24"}, +} + +timescales_delta: Dict[str, Dict[str, timedelta]] = { + "year": {"timedelta": timedelta(days=365)}, + "month": {"timedelta": timedelta(days=27)}, + "week": {"timedelta": timedelta(days=6)}, + "day": {"timedelta": timedelta(hours=24)}, +} + +BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" +BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + + +def push_statistics( + statistics_data: Dict[str, Any], + subscription: Any, + timescale: str, + bucket: str, + hash: str, +) -> None: + + result_bytes = json.dumps(statistics_data).encode("utf-8") + result_key = f'contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' + + s3 = boto3.client("s3") + s3.put_object( + Body=result_bytes, + Bucket=bucket, + Key=result_key, + ContentType="application/json", + Metadata={"drone": "statistics"}, + ) + + print(f"Statistics push to bucket: s3://{bucket}/{result_key}") + + +def generate_metrics( + db_session: Session, + blockchain_type: AvailableBlockchainType, + address: str, + timescale: str, + metrics: List[str], + start: Any, +): + """ + Generage metrics + """ + block_model = get_block_model(blockchain_type) + transaction_model = get_transaction_model(blockchain_type) + + start = start + end = datetime.utcnow() + + start_timestamp = int(start.timestamp()) + end_timestamp = int(end.timestamp()) + + results: Dict[str, Any] = {} + + time_step = timescales_params[timescale]["timestep"] + + time_format = timescales_params[timescale]["timeformat"] + + def make_query( + db_session: Session, + identifying_column: Column, + statistic_column: Column, + ) -> Query: + + unformated_time_series_subquery = db_session.query( + func.generate_series( + start, + end, + time_step, + ).label("timeseries_points") + ).subquery(name="unformated_time_series_subquery") + + time_series_formated = db_session.query( + func.to_char( + unformated_time_series_subquery.c.timeseries_points, time_format + ).label("timeseries_points") + ) + + time_series_formated_subquery = time_series_formated.subquery( + name="time_series_subquery" + ) + + metric_count_subquery = ( + db_session.query( + func.count(statistic_column).label("count"), + func.to_char( + func.to_timestamp(block_model.timestamp).cast(Date), time_format + ).label("timeseries_points"), + ) + .join( + block_model, + transaction_model.block_number == block_model.block_number, + ) + .filter(identifying_column == address) + .filter(block_model.timestamp >= start_timestamp) + .filter(block_model.timestamp <= end_timestamp) + .group_by(text("timeseries_points")) + ).subquery(name="metric_counts") + + metrics_time_series = ( + db_session.query( + time_series_formated_subquery.c.timeseries_points.label( + "timeseries_points" + ), + func.coalesce(metric_count_subquery.c.count.label("count"), 0), + ) + .join( + metric_count_subquery, + time_series_formated_subquery.c.timeseries_points + == metric_count_subquery.c.timeseries_points, + isouter=True, + ) + .order_by(text("timeseries_points DESC")) + ) + + response_metric: List[Any] = [] + + for created_date, count in metrics_time_series: + + if time_format == "YYYY-MM-DD HH24": + created_date, hour = created_date.split(" ") + response_metric.append( + {"date": created_date, "hour": hour, "count": count} + ) + else: + response_metric.append({"date": created_date, "count": count}) + + return response_metric + + try: + start_time = time.time() + + results["transactions_out"] = make_query( + db_session, + transaction_model.from_address, + transaction_model.hash, + ) + + print("--- transactions_out %s seconds ---" % (time.time() - start_time)) + + start_time = time.time() + results["transactions_in"] = make_query( + db_session, + transaction_model.to_address, + transaction_model.hash, + ) + + print("--- transactions_in %s seconds ---" % (time.time() - start_time)) + + start_time = time.time() + results["value_out"] = make_query( + db_session, + transaction_model.from_address, + transaction_model.value, + ) + print("--- value_out %s seconds ---" % (time.time() - start_time)) + + start_time = time.time() + results["value_in"] = make_query( + db_session, + transaction_model.to_address, + transaction_model.value, + ) + + print("--- value_in %s seconds ---" % (time.time() - start_time)) + + except Exception as err: + print(err) + pass + + return results + + +def generate_data( + db_session: Session, + blockchain_type: AvailableBlockchainType, + address: str, + timescale: str, + functions: List[str], + start: Any, +): + label_model = get_label_model(blockchain_type) + + # create empty time series + + time_step = timescales_params[timescale]["timestep"] + + time_format = timescales_params[timescale]["timeformat"] + + # if end is None: + end = datetime.utcnow() + + time_series_subquery = db_session.query( + func.generate_series( + start, + end, + time_step, + ).label("timeseries_points") + ) + + print(time_series_subquery.all()) + + time_series_subquery = time_series_subquery.subquery(name="time_series_subquery") + + # get distinct tags labels in that range + + label_requested = ( + db_session.query(label_model.label.label("label")) + .filter(label_model.address == address) + .filter(label_model.label.in_(functions)) + .distinct() + ) + + if start is not None: + label_requested = label_requested.filter( + func.to_timestamp(label_model.block_timestamp).cast(Date) > start + ) + if end is not None: + label_requested = label_requested.filter( + func.to_timestamp(label_model.block_timestamp).cast(Date) < end + ) + + label_requested = label_requested.subquery(name="label_requested") + + # empty timeseries with tags + empty_time_series_subquery = db_session.query( + func.to_char(time_series_subquery.c.timeseries_points, time_format).label( + "timeseries_points" + ), + label_requested.c.label.label("label"), + ) + + empty_time_series_subquery = empty_time_series_subquery.subquery( + name="empty_time_series_subquery" + ) + + # tags count + label_counts = ( + db_session.query( + func.to_char( + func.to_timestamp(label_model.block_timestamp).cast(Date), time_format + ).label("timeseries_points"), + func.count(label_model.id).label("count"), + label_model.label.label("label"), + ) + .filter(label_model.label.in_(functions)) + .filter(label_model.address == address) + ) + + if start is not None: + label_counts = label_counts.filter( + func.to_timestamp(label_model.block_timestamp).cast(Date) > start + ) + if end is not None: + label_counts = label_counts.filter( + func.to_timestamp(label_model.block_timestamp).cast(Date) < end + ) + + label_counts_subquery = ( + label_counts.group_by( + text("timeseries_points"), + label_model.label, + ) + .order_by(text("timeseries_points desc")) + .subquery(name="label_counts") + ) + + # Join empty tags_time_series with tags count eg apply tags counts to time series. + labels_time_series = ( + db_session.query( + empty_time_series_subquery.c.timeseries_points.label("timeseries_points"), + empty_time_series_subquery.c.label.label("label"), + func.coalesce(label_counts_subquery.c.count.label("count"), 0), + ) + .join( + label_counts_subquery, + and_( + empty_time_series_subquery.c.label == label_counts_subquery.c.label, + empty_time_series_subquery.c.timeseries_points + == label_counts_subquery.c.timeseries_points, + ), + isouter=True, + ) + .order_by(text("timeseries_points DESC")) + ) + + response_labels: Dict[Any, Any] = {} + + for created_date, label, count in labels_time_series: + + if not response_labels.get(label): + response_labels[label] = [] + if time_format == "YYYY-MM-DD HH24": + created_date, hour = created_date.split(" ") + response_labels[label].append( + {"date": created_date, "hour": hour, "count": count} + ) + else: + response_labels[label].append({"date": created_date, "count": count}) + else: + if time_format == "YYYY-MM-DD HH24": + created_date, hour = created_date.split(" ") + response_labels[label].append( + {"date": created_date, "hour": hour, "count": count} + ) + else: + response_labels[label].append({"date": created_date, "count": count}) + + return response_labels + + +def stats_generate_handler(args: argparse.Namespace): + """ + Start crawler with generate. + """ + blockchain_type = AvailableBlockchainType(args.blockchain) + + with yield_db_session_ctx() as db_session: + # read all subscriptions + required_subscriptions: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"}, + timeout=10, + ) + + print(f"Subscriptions for processing: {len(required_subscriptions.resources)}") + + s3_client = boto3.client("s3") + + # Already processed + already_processed = [] + + for subscription in required_subscriptions.resources: + bucket = subscription.resource_data["bucket"] + key = subscription.resource_data["s3_path"] + address = subscription.resource_data["address"] + + print(f"Expected bucket: s3://{bucket}/{key}") + + abi = s3_client.get_object( + Bucket=bucket, + Key=key, + ) + abi_json = json.loads(abi["Body"].read()) + + abi_string = json.dumps(abi_json, sort_keys=True, indent=2) + + hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest() + + if f"{address}/{hash}" in already_processed: + continue + + s3_data_object = {} + + abi_functions = [item for item in abi_json if item["type"] == "function"] + abi_events = [item for item in abi_json if item["type"] == "event"] + + for timescale in [timescale.value for timescale in TimeScale]: + + start_date = ( + datetime.utcnow() - timescales_delta[timescale]["timedelta"] + ) + + print(f"Timescale: {timescale}") + + abi_functions_names = [item["name"] for item in abi_functions] + + functions_calls_data = generate_data( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + functions=abi_functions_names, + start=start_date, + ) + + s3_data_object["functions"] = functions_calls_data + # generate data + + abi_events_names = [ + item["name"] + if item["name"] not in lable_filters + else lable_filters[item["name"]] + for item in abi_events + ] + + events_data = generate_data( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + functions=abi_events_names, + start=start_date, + ) + + s3_data_object["events"] = events_data + + s3_data_object["metrics"] = generate_metrics( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + metrics=abi_events_names, + start=start_date, + ) + + push_statistics( + statistics_data=s3_data_object, + subscription=subscription, + timescale=timescale, + bucket=bucket, + hash=hash, + ) + already_processed.append(f"{address}/{hash}") + + time.sleep(10) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Command Line Interface") + parser.set_defaults(func=lambda _: parser.print_help()) + subcommands = parser.add_subparsers( + description="Drone dashboard statistics commands" + ) + + # Statistics parser + parser_generate = subcommands.add_parser( + "generate", description="Generate statistics" + ) + parser_generate.set_defaults(func=lambda _: parser_generate.print_help()) + parser_generate.add_argument( + "--blockchain", + required=True, + help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", + ) + parser_generate.set_defaults(func=stats_generate_handler) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index fd80ca83..1ae8a453 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.1.0" +MOONCRAWL_VERSION = "0.1.1" diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 5a2ce208..a5070f10 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -1,4 +1,5 @@ -# Path to IPC socket to use for web3 connections +export BUGOUT_BROOD_URL="https://auth.bugout.dev" +export BUGOUT_SPIRE_URL="https://spire.bugout.dev" export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1" export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545" export MOONSTREAM_NODE_POLYGON_IPC_ADDR="127.0.0.1" diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 8f755014..f5718927 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -36,11 +36,14 @@ setup( "boto3", "bugout", "chardet", + "fastapi", "moonstreamdb", "humbug", + "pydantic", "python-dateutil", "requests", "tqdm", + "uvicorn", "web3", ], extras_require={ @@ -50,11 +53,12 @@ setup( entry_points={ "console_scripts": [ "crawler=mooncrawl.crawler:main", - "esd=mooncrawl.esd:main", - "identity=mooncrawl.identity:main", - "etherscan=mooncrawl.etherscan:main", - "nft=mooncrawl.nft.cli:main", "contractcrawler=mooncrawl.contract.cli:main", + "esd=mooncrawl.esd:main", + "etherscan=mooncrawl.etherscan:main", + "identity=mooncrawl.identity:main", + "nft=mooncrawl.nft.cli:main", + "statistics=mooncrawl.stats_worker.dashboard:main", ] }, )