diff --git a/backend/moonstreamapi/abi_decoder.py b/backend/moonstreamapi/abi_decoder.py index 2d469e41..240ef3f0 100644 --- a/backend/moonstreamapi/abi_decoder.py +++ b/backend/moonstreamapi/abi_decoder.py @@ -1,15 +1,16 @@ import argparse import binascii import sys -from typing import List, Optional, Union, Type, cast +from typing import List, Optional, Type, Union, cast import pyevmasm - -from moonstreamdb.db import yield_db_session from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import text -from .data import EVMEventSignature, EVMFunctionSignature, ContractABI + +from moonstreamdb.db import yield_db_session + +from .data import ContractABI, EVMEventSignature, EVMFunctionSignature def query_for_text_signatures( diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index f4ddc6ce..8468b00b 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -2,41 +2,36 @@ import hashlib import json from itertools import chain import logging -from typing import List, Optional, Dict, Any, Union -import time +from typing import List, Optional, Dict, Any from enum import Enum import uuid import boto3 # type: ignore -from bugout.data import BugoutSearchResults, BugoutSearchResult + +from bugout.data import BugoutSearchResults, BugoutSearchResult, BugoutResource from bugout.journal import SearchOrder from ens.utils import is_valid_ens_name # type: ignore from eth_utils.address import is_address # type: ignore -from moonstreamdb.models import ( - EthereumLabel, -) +from moonstreamdb.models import EthereumLabel from sqlalchemy import text from sqlalchemy.orm import Session +from web3 import Web3 from web3._utils.validation import validate_abi - -from .middleware import MoonstreamHTTPException from . import data -from .reporter import reporter from .middleware import MoonstreamHTTPException -from .settings import ETHERSCAN_SMARTCONTRACTS_BUCKET -from bugout.data import BugoutResource +from .reporter import reporter from .settings import ( - MOONSTREAM_APPLICATION_ID, - bugout_client as bc, BUGOUT_REQUEST_TIMEOUT_SECONDS, + ETHERSCAN_SMARTCONTRACTS_BUCKET, MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_APPLICATION_ID, MOONSTREAM_DATA_JOURNAL_ID, MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_MOONWORM_TASKS_JOURNAL, ) -from web3 import Web3 +from .settings import bugout_client as bc logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/admin/cli.py b/backend/moonstreamapi/admin/cli.py index 934a819c..48e3a1c7 100644 --- a/backend/moonstreamapi/admin/cli.py +++ b/backend/moonstreamapi/admin/cli.py @@ -2,13 +2,13 @@ Moonstream CLI """ import argparse - -import logging import json +import logging import os from posix import listdir from typing import Optional +from sqlalchemy.orm import with_expression from moonstreamdb.db import SessionLocal diff --git a/backend/moonstreamapi/admin/subscriptions.py b/backend/moonstreamapi/admin/subscriptions.py index 88321140..35f842f6 100644 --- a/backend/moonstreamapi/admin/subscriptions.py +++ b/backend/moonstreamapi/admin/subscriptions.py @@ -7,12 +7,9 @@ from typing import Dict, List, Optional, Union from bugout.data import BugoutResources -from ..settings import ( - MOONSTREAM_ADMIN_ACCESS_TOKEN, - bugout_client as bc, - BUGOUT_REQUEST_TIMEOUT_SECONDS, -) from .. import reporter +from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN +from ..settings import bugout_client as bc def migrate_subscriptions( diff --git a/backend/moonstreamapi/api.py b/backend/moonstreamapi/api.py index a625b7dd..2f7db4f8 100644 --- a/backend/moonstreamapi/api.py +++ b/backend/moonstreamapi/api.py @@ -8,16 +8,15 @@ from typing import Dict from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from . import actions -from . import data +from . import actions, data +from .middleware import BroodAuthMiddleware, MoonstreamHTTPException from .routes.address_info import router as addressinfo_router +from .routes.dashboards import router as dashboards_router from .routes.streams import router as streams_router from .routes.subscriptions import router as subscriptions_router from .routes.txinfo import router as txinfo_router from .routes.users import router as users_router from .routes.whales import router as whales_router -from .routes.dashboards import router as dashboards_router -from .middleware import BroodAuthMiddleware, MoonstreamHTTPException from .settings import DOCS_TARGET_PATH, ORIGINS from .version import MOONSTREAMAPI_VERSION diff --git a/backend/moonstreamapi/data.py b/backend/moonstreamapi/data.py index 432256e1..f333d618 100644 --- a/backend/moonstreamapi/data.py +++ b/backend/moonstreamapi/data.py @@ -17,6 +17,10 @@ class TimeScale(Enum): day = "day" +class UpdateStats(BaseModel): + timescales: List[str] + + class SubscriptionTypeResourceData(BaseModel): id: str name: str diff --git a/backend/moonstreamapi/middleware.py b/backend/moonstreamapi/middleware.py index 4fa84a2f..9030e9c1 100644 --- a/backend/moonstreamapi/middleware.py +++ b/backend/moonstreamapi/middleware.py @@ -7,7 +7,8 @@ from fastapi import HTTPException, Request, Response from starlette.middleware.base import BaseHTTPMiddleware from .reporter import reporter -from .settings import MOONSTREAM_APPLICATION_ID, bugout_client as bc +from .settings import MOONSTREAM_APPLICATION_ID +from .settings import bugout_client as bc logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/providers/__init__.py b/backend/moonstreamapi/providers/__init__.py index c92329cd..116fc21c 100644 --- a/backend/moonstreamapi/providers/__init__.py +++ b/backend/moonstreamapi/providers/__init__.py @@ -32,9 +32,9 @@ from bugout.app import Bugout from bugout.data import BugoutResource from sqlalchemy.orm import Session -from . import bugout, transactions, moonworm_provider from .. import data from ..stream_queries import StreamQuery +from . import bugout, moonworm_provider, transactions logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) diff --git a/backend/moonstreamapi/providers/bugout.py b/backend/moonstreamapi/providers/bugout.py index 371cdec7..c25c6c06 100644 --- a/backend/moonstreamapi/providers/bugout.py +++ b/backend/moonstreamapi/providers/bugout.py @@ -1,9 +1,9 @@ """ Event providers powered by Bugout journals. """ -from datetime import datetime import json import logging +from datetime import datetime from typing import Dict, List, Optional, Tuple from bugout.app import Bugout @@ -14,9 +14,8 @@ from dateutil.tz import UTC from sqlalchemy.orm import Session from .. import data -from ..stream_queries import StreamQuery - from ..settings import HUMBUG_TXPOOL_CLIENT_ID +from ..stream_queries import StreamQuery logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) diff --git a/backend/moonstreamapi/providers/moonworm_provider.py b/backend/moonstreamapi/providers/moonworm_provider.py index 9b870f18..819231eb 100644 --- a/backend/moonstreamapi/providers/moonworm_provider.py +++ b/backend/moonstreamapi/providers/moonworm_provider.py @@ -1,24 +1,18 @@ -from dataclasses import dataclass, field import logging -from typing import cast, Dict, Any, List, Optional, Tuple +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple, cast from bugout.app import Bugout from bugout.data import BugoutResource - -from moonstreamdb.blockchain import ( - get_label_model, - AvailableBlockchainType, -) -from sqlalchemy import or_, and_, text -from sqlalchemy.orm import Session, Query, query_expression +from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from sqlalchemy import and_, or_, text +from sqlalchemy.orm import Query, Session, query_expression from sqlalchemy.sql.expression import label - from .. import data from ..stream_boundaries import validate_stream_boundary from ..stream_queries import StreamQuery - logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) diff --git a/backend/moonstreamapi/providers/transactions.py b/backend/moonstreamapi/providers/transactions.py index a79d5eb8..0cfe2db1 100644 --- a/backend/moonstreamapi/providers/transactions.py +++ b/backend/moonstreamapi/providers/transactions.py @@ -1,25 +1,22 @@ -from dataclasses import dataclass, field import logging -from typing import cast, Dict, Any, List, Optional, Tuple +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple, cast from bugout.app import Bugout from bugout.data import BugoutResource - from moonstreamdb.blockchain import ( - get_label_model, - get_block_model, - get_transaction_model, AvailableBlockchainType, + get_block_model, + get_label_model, + get_transaction_model, ) -from sqlalchemy import or_, and_, text -from sqlalchemy.orm import Session, Query - +from sqlalchemy import and_, or_, text +from sqlalchemy.orm import Query, Session from .. import data from ..stream_boundaries import validate_stream_boundary from ..stream_queries import StreamQuery - logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) diff --git a/backend/moonstreamapi/routes/address_info.py b/backend/moonstreamapi/routes/address_info.py index c861b1c1..cbaff452 100644 --- a/backend/moonstreamapi/routes/address_info.py +++ b/backend/moonstreamapi/routes/address_info.py @@ -2,12 +2,12 @@ import logging from typing import Optional from fastapi import APIRouter, Depends, Query -from moonstreamdb.db import yield_db_session from sqlalchemy.orm import Session from web3 import Web3 -from .. import actions -from .. import data +from moonstreamdb.db import yield_db_session + +from .. import actions, data from ..middleware import MoonstreamHTTPException from ..web3_provider import yield_web3_provider diff --git a/backend/moonstreamapi/routes/dashboards.py b/backend/moonstreamapi/routes/dashboards.py index 4a6065da..7606d13e 100644 --- a/backend/moonstreamapi/routes/dashboards.py +++ b/backend/moonstreamapi/routes/dashboards.py @@ -1,25 +1,27 @@ +import json import logging from os import read -import json -from typing import Any, List, Optional, Dict +from typing import Any, Dict, List, Optional from uuid import UUID import boto3 # type: ignore +import requests from bugout.data import BugoutResource, BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Request, Query, Body +from fastapi import APIRouter, Body, Path, Query, Request -from .. import actions -from .. import data +from .. import actions, data from ..middleware import MoonstreamHTTPException from ..reporter import reporter from ..settings import ( - MOONSTREAM_APPLICATION_ID, - bugout_client as bc, BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_APPLICATION_ID, + MOONSTREAM_CRAWLERS_SERVER_URL, + MOONSTREAM_CRAWLERS_SERVER_PORT, MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, ) +from ..settings import bugout_client as bc logger = logging.getLogger(__name__) @@ -350,7 +352,7 @@ async def get_dashboard_data_links_handler( request: Request, dashboard_id: str ) -> Dict[UUID, Any]: """ - Update dashboards mainly fully overwrite name and subscription metadata + Get s3 presign urls for dshaboard grafics """ token = request.state.token @@ -427,10 +429,39 @@ async def get_dashboard_data_links_handler( ExpiresIn=300, HttpMethod="GET", ) - stats[subscription.id][timescale] = stats_presigned_url + stats[subscription.id][timescale] = {"url": stats_presigned_url} except Exception as err: logger.warning( f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, Key:{result_key} get error:{err}" ) return stats + + +@router.post("/{dashboard_id}/stats_update", tags=["dashboards"]) +async def update_dashbord_data_handler( + request: Request, + dashboard_id: str = Path(...), + updatestats: data.UpdateStats = Body(...), +) -> Dict[str, Any]: + """ + Return journal statistics + journal.read permission required. + """ + + token = request.state.token + + responce = requests.post( + f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/stats_update", + json={ + "dashboard_id": dashboard_id, + "timescales": updatestats.timescales, + "token": token, + }, + ) + if responce.status_code != 200: + raise MoonstreamHTTPException( + status_code=responce.status_code, + detail="Task for start generate stats failed.", + ) + return responce.json() diff --git a/backend/moonstreamapi/routes/streams.py b/backend/moonstreamapi/routes/streams.py index 8e592329..1917b6ea 100644 --- a/backend/moonstreamapi/routes/streams.py +++ b/backend/moonstreamapi/routes/streams.py @@ -5,11 +5,12 @@ import logging from typing import Any, Dict, List, Optional from bugout.data import BugoutResource -from fastapi import APIRouter, Request, Query, Depends -from moonstreamdb import db +from fastapi import APIRouter, Depends, Query, Request from sqlalchemy.orm import Session -from .. import data +from moonstreamdb import db + +from .. import data, stream_queries from ..middleware import MoonstreamHTTPException from ..providers import ( ReceivingEventsException, @@ -20,12 +21,11 @@ from ..providers import ( previous_event, ) from ..settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_DATA_JOURNAL_ID, - bugout_client as bc, - BUGOUT_REQUEST_TIMEOUT_SECONDS, ) -from .. import stream_queries +from ..settings import bugout_client as bc from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/routes/subscriptions.py b/backend/moonstreamapi/routes/subscriptions.py index beff02c4..40a66ec1 100644 --- a/backend/moonstreamapi/routes/subscriptions.py +++ b/backend/moonstreamapi/routes/subscriptions.py @@ -2,10 +2,9 @@ The Moonstream subscriptions HTTP API """ import hashlib -import logging import json -from typing import List, Optional, Dict, Any - +import logging +from typing import Any, Dict, List, Optional import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources @@ -16,14 +15,16 @@ from web3 import Web3 from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks from ..admin import subscription_types from .. import data +from ..actions import upload_abi_to_s3, validate_abi_json +from ..admin import subscription_types from ..middleware import MoonstreamHTTPException from ..reporter import reporter from ..settings import ( MOONSTREAM_APPLICATION_ID, - bugout_client as bc, MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, ) +from ..settings import bugout_client as bc from ..web3_provider import yield_web3_provider logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/routes/txinfo.py b/backend/moonstreamapi/routes/txinfo.py index c840c360..3ea6e706 100644 --- a/backend/moonstreamapi/routes/txinfo.py +++ b/backend/moonstreamapi/routes/txinfo.py @@ -9,12 +9,12 @@ import logging from typing import Optional from fastapi import APIRouter, Depends -from moonstreamdb.db import yield_db_session from sqlalchemy.orm import Session +from moonstreamdb.db import yield_db_session + +from .. import actions, data from ..abi_decoder import decode_abi -from .. import actions -from .. import data logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/routes/users.py b/backend/moonstreamapi/routes/users.py index acd8c328..226919be 100644 --- a/backend/moonstreamapi/routes/users.py +++ b/backend/moonstreamapi/routes/users.py @@ -2,27 +2,18 @@ The Moonstream users HTTP API """ import logging -from typing import Any, Dict, Optional import uuid +from typing import Any, Dict, Optional -from bugout.data import BugoutToken, BugoutUser, BugoutResource, BugoutUserTokens +from bugout.data import BugoutResource, BugoutToken, BugoutUser, BugoutUserTokens from bugout.exceptions import BugoutResponseException -from fastapi import ( - APIRouter, - Body, - Form, - Request, -) +from fastapi import APIRouter, Body, Form, Request from .. import data -from ..middleware import MoonstreamHTTPException - -from ..settings import ( - MOONSTREAM_APPLICATION_ID, - bugout_client as bc, - BUGOUT_REQUEST_TIMEOUT_SECONDS, -) from ..actions import create_onboarding_resource +from ..middleware import MoonstreamHTTPException +from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID +from ..settings import bugout_client as bc logger = logging.getLogger(__name__) diff --git a/backend/moonstreamapi/routes/whales.py b/backend/moonstreamapi/routes/whales.py index dbe5bd9e..0b1b87d8 100644 --- a/backend/moonstreamapi/routes/whales.py +++ b/backend/moonstreamapi/routes/whales.py @@ -7,15 +7,16 @@ import logging from typing import Optional from fastapi import APIRouter, Depends, Query -from moonstreamdb import db from sqlalchemy.orm import Session +from moonstreamdb import db + from .. import data from ..providers.bugout import ethereum_whalewatch_provider from ..settings import ( - bugout_client, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_DATA_JOURNAL_ID, + bugout_client, ) from ..stream_queries import StreamQuery diff --git a/backend/moonstreamapi/settings.py b/backend/moonstreamapi/settings.py index db2269bb..2882644a 100644 --- a/backend/moonstreamapi/settings.py +++ b/backend/moonstreamapi/settings.py @@ -66,6 +66,17 @@ MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/") ) +MOONSTREAM_CRAWLERS_SERVER_URL = os.environ.get("MOONSTREAM_CRAWLERS_SERVER_URL") +if MOONSTREAM_CRAWLERS_SERVER_URL is None: + raise ValueError("MOONSTREAM_CRAWLERS_SERVER_URL environment variable must be set") +MOONSTREAM_CRAWLERS_SERVER_URL = MOONSTREAM_CRAWLERS_SERVER_URL.rstrip("/") + +MOONSTREAM_CRAWLERS_SERVER_PORT = os.environ.get("MOONSTREAM_CRAWLERS_SERVER_PORT") +if MOONSTREAM_CRAWLERS_SERVER_PORT is None: + raise ValueError("MOONSTREAM_CRAWLERS_SERVER_PORT environment variable must be set") +MOONSTREAM_CRAWLERS_SERVER_PORT = MOONSTREAM_CRAWLERS_SERVER_PORT.rstrip("/") + + MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get( "MOONSTREAM_MOONWORM_TASKS_JOURNAL", "" ) diff --git a/backend/moonstreamapi/stream_queries.py b/backend/moonstreamapi/stream_queries.py index 16218751..2fd78ca8 100644 --- a/backend/moonstreamapi/stream_queries.py +++ b/backend/moonstreamapi/stream_queries.py @@ -1,9 +1,9 @@ """ Stream queries - data structure, and parser. """ -from dataclasses import dataclass, field import logging -from typing import cast, List, Tuple +from dataclasses import dataclass, field +from typing import List, Tuple, cast logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) diff --git a/backend/moonstreamapi/test_actions_abi_validations.py b/backend/moonstreamapi/test_actions_abi_validations.py index b5ef11f5..4d0251b6 100644 --- a/backend/moonstreamapi/test_actions_abi_validations.py +++ b/backend/moonstreamapi/test_actions_abi_validations.py @@ -1,10 +1,9 @@ import unittest -from .data import DashboardMeta from .actions import dashboards_abi_validation +from .data import DashboardMeta from .middleware import MoonstreamHTTPException - abi_example = [ { "inputs": [ diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 68aed72d..a6c50cf2 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -1,17 +1,30 @@ """ The Mooncrawl HTTP API """ +from datetime import datetime, timedelta import logging +from os import times import time -from typing import Dict +from typing import Dict, Any, List +from uuid import UUID -from fastapi import FastAPI +import boto3 # type: ignore +from fastapi import FastAPI, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware +from bugout.data import BugoutResource, BugoutResources + from . import data from .middleware import MoonstreamHTTPException -from .settings import DOCS_TARGET_PATH, ORIGINS +from .settings import ( + DOCS_TARGET_PATH, + ORIGINS, + bugout_client as bc, + BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, + MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, +) from .version import MOONCRAWL_VERSION +from .stats_worker import dashboard logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -65,17 +78,91 @@ async def now_handler() -> data.NowResponse: return data.NowResponse(epoch_time=time.time()) -@app.get("/jobs/stats_update", tags=["jobs"]) -async def status_handler(): +@app.post("/jobs/stats_update", tags=["jobs"]) +async def status_handler( + stats_update: data.StatsUpdateRequest, + background_tasks: BackgroundTasks, +): """ - Find latest crawlers records with creation timestamp: - - ethereum_txpool - - ethereum_trending + Update dashboard endpoint create are tasks for update. """ + + dashboard_resource: BugoutResource = bc.get_resource( + token=stats_update.token, + resource_id=stats_update.dashboard_id, + timeout=10, + ) + + # get all user subscriptions + + blockchain_subscriptions: BugoutResources = bc.list_resources( + token=stats_update.token, + params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION}, + timeout=10, + ) + + subscription_by_id = { + str(blockchain_subscription.id): blockchain_subscription + for blockchain_subscription in blockchain_subscriptions.resources + } + + s3_client = boto3.client("s3") + try: - pass + + background_tasks.add_task( + dashboard.stats_generate_api_task, + timescales=stats_update.timescales, + dashboard=dashboard_resource, + subscription_by_id=subscription_by_id, + ) + except Exception as e: logger.error(f"Unhandled status exception, error: {e}") raise MoonstreamHTTPException(status_code=500) - return + presigned_urls_response: Dict[UUID, Any] = {} + + for dashboard_subscription_filters in dashboard_resource.resource_data[ + "subscription_settings" + ]: + + subscription = subscription_by_id[ + dashboard_subscription_filters["subscription_id"] + ] + + for timescale in stats_update.timescales: + + presigned_urls_response[subscription.id] = {} + + try: + result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{stats_update.dashboard_id}/v1/{timescale}.json' + + object = s3_client.head_object( + Bucket=subscription.resource_data["bucket"], Key=result_key + ) + + stats_presigned_url = s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": subscription.resource_data["bucket"], + "Key": result_key, + }, + ExpiresIn=300, + HttpMethod="GET", + ) + + presigned_urls_response[subscription.id][timescale] = { + "url": stats_presigned_url, + "headers": { + "If-Modified-Since": ( + object["LastModified"] + timedelta(seconds=1) + ).strftime("%c") + }, + } + except Exception as err: + logger.warning( + f"Can't generate S3 presigned url in stats endpoint for Bucket:{subscription.resource_data['bucket']}, Key:{result_key} get error:{err}" + ) + + return presigned_urls_response diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 0420d5cf..4bf694c9 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum +from typing import List from pydantic import BaseModel @@ -10,6 +11,12 @@ class AvailableBlockchainType(Enum): POLYGON = "polygon" +class StatsUpdateRequest(BaseModel): + dashboard_id: str + timescales: List[str] + token: str + + @dataclass class DateRange: start_time: datetime diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 2b4abbde..89d2751f 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -3,6 +3,11 @@ from typing import cast from bugout.app import Bugout + +BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" +BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + + # Bugout BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py index af8cb55b..044dfbe0 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -8,7 +8,7 @@ import logging import time from datetime import datetime, timedelta from enum import Enum -from typing import Any, Callable, Dict, List, Union +from typing import Any, Callable, Dict, List from uuid import UUID import boto3 # type: ignore @@ -21,7 +21,6 @@ from web3 import Web3 from ..blockchain import ( connect, - get_block_model, get_label_model, get_transaction_model, ) @@ -34,8 +33,8 @@ from ..settings import ( ) from ..settings import bugout_client as bc +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) subscription_ids_by_blockchain = { @@ -101,7 +100,7 @@ def push_statistics( Metadata={"drone": "statistics"}, ) - print(f"Statistics push to bucket: s3://{bucket}/{result_key}") + logger.info(f"Statistics push to bucket: s3://{bucket}/{result_key}") def generate_data( @@ -381,9 +380,10 @@ def process_external( } ) except Exception as e: - print(f"Error processing external call: {e}") + logger.error(f"Error processing external call: {e}") - web3_client = connect(blockchain) + if external_calls: + web3_client = connect(blockchain) for extcall in external_calls: try: @@ -398,7 +398,7 @@ def process_external( {"display_name": extcall["display_name"], "value": response} ) except Exception as e: - print(f"Failed to call {extcall['name']} error: {e}") + logger.error(f"Failed to call {extcall['name']} error: {e}") return extention_data @@ -427,6 +427,79 @@ def get_count( ) +def generate_web3_metrics( + db_session: Session, + events: List[str], + blockchain_type: AvailableBlockchainType, + address: str, + crawler_label: str, + abi_json: Any, +) -> List[Any]: + """ + Generate stats for cards components + """ + + extention_data = [] + + abi_external_calls = [item for item in abi_json if item["type"] == "external_call"] + + extention_data = process_external( + abi_external_calls=abi_external_calls, + blockchain=blockchain_type, + ) + + extention_data.append( + { + "display_name": "Overall unique token owners.", + "value": get_unique_address( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + crawler_label=crawler_label, + ), + } + ) + + # TODO: Remove it if ABI already have correct web3_call signature. + + if "HatchStartedEvent" in events: + + extention_data.append( + { + "display_name": "Number of hatches started.", + "value": get_count( + name="HatchStartedEvent", + type="event", + db_session=db_session, + select_expression=get_label_model(blockchain_type), + blockchain_type=blockchain_type, + address=address, + crawler_label=crawler_label, + ), + } + ) + + if "HatchFinishedEvent" in events: + + extention_data.append( + { + "display_name": "Number of hatches finished.", + "value": get_count( + name="HatchFinishedEvent", + type="event", + db_session=db_session, + select_expression=distinct( + get_label_model(blockchain_type).label_data["args"]["tokenId"] + ), + blockchain_type=blockchain_type, + address=address, + crawler_label=crawler_label, + ), + } + ) + return extention_data + + def stats_generate_handler(args: argparse.Namespace): """ Start crawler with generate. @@ -434,10 +507,8 @@ def stats_generate_handler(args: argparse.Namespace): blockchain_type = AvailableBlockchainType(args.blockchain) with yield_db_session_ctx() as db_session: - # read all subscriptions start_time = time.time() - blockchain_type = AvailableBlockchainType(args.blockchain) dashboard_resources: BugoutResources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, @@ -445,10 +516,9 @@ def stats_generate_handler(args: argparse.Namespace): timeout=10, ) - print(f"Amount of dashboards: {len(dashboard_resources.resources)}") + logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}") # get all subscriptions - available_subscriptions: List[BugoutResource] = [] for subscription_type in subscription_ids_by_blockchain[args.blockchain]: @@ -469,7 +539,7 @@ def stats_generate_handler(args: argparse.Namespace): for blockchain_subscription in available_subscriptions } - print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}") + logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}") s3_client = boto3.client("s3") @@ -489,11 +559,11 @@ def stats_generate_handler(args: argparse.Namespace): continue subscriptions_count += 1 - - s3_data_object: Dict[str, Any] = {} - extention_data = [] + # The resulting object whivh be pushed to S3 + s3_data_object: Dict[str, Any] = {} + address = subscription_by_id[subscription_id].resource_data[ "address" ] @@ -506,33 +576,30 @@ def stats_generate_handler(args: argparse.Namespace): ): crawler_label = "moonworm" + # Read required events, functions and web3_call form ABI if not subscription_by_id[subscription_id].resource_data["abi"]: - methods = [] events = [] + abi_json = {} else: - bucket = subscription_by_id[subscription_id].resource_data[ "bucket" ] key = subscription_by_id[subscription_id].resource_data[ "s3_path" ] - abi = s3_client.get_object( Bucket=bucket, Key=key, ) abi_json = json.loads(abi["Body"].read()) - methods = generate_list_of_names( type="function", subscription_filters=dashboard_subscription_filters, read_abi=dashboard_subscription_filters["all_methods"], abi_json=abi_json, ) - events = generate_list_of_names( type="event", subscription_filters=dashboard_subscription_filters, @@ -540,65 +607,16 @@ def stats_generate_handler(args: argparse.Namespace): abi_json=abi_json, ) - abi_external_calls = [ - item for item in abi_json if item["type"] == "external_call" - ] - - extention_data = process_external( - abi_external_calls=abi_external_calls, - blockchain=blockchain_type, - ) - - extention_data.append( - { - "display_name": "Overall unique token owners.", - "value": get_unique_address( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - crawler_label=crawler_label, - ), - } + extention_data = generate_web3_metrics( + db_session=db_session, + events=events, + blockchain_type=blockchain_type, + address=address, + crawler_label=crawler_label, + abi_json=abi_json, ) - if "HatchStartedEvent" in events: - - extention_data.append( - { - "display_name": "Number of hatches started.", - "value": get_count( - name="HatchStartedEvent", - type="event", - db_session=db_session, - select_expression=get_label_model(blockchain_type), - blockchain_type=blockchain_type, - address=address, - crawler_label=crawler_label, - ), - } - ) - - if "HatchFinishedEvent" in events: - - extention_data.append( - { - "display_name": "Number of hatches finished.", - "value": get_count( - name="HatchFinishedEvent", - type="event", - db_session=db_session, - select_expression=distinct( - get_label_model(blockchain_type).label_data[ - "args" - ]["tokenId"] - ), - blockchain_type=blockchain_type, - address=address, - crawler_label=crawler_label, - ), - } - ) - + # Generate blocks state information current_blocks_state = get_blocks_state( db_session=db_session, blockchain_type=blockchain_type ) @@ -609,12 +627,17 @@ def stats_generate_handler(args: argparse.Namespace): datetime.utcnow() - timescales_delta[timescale]["timedelta"] ) - print(f"Timescale: {timescale}") - - s3_data_object["blocks_state"] = current_blocks_state + logger.info(f"Timescale: {timescale}") s3_data_object["web3_metric"] = extention_data + # Write state of blocks in database + s3_data_object["blocks_state"] = current_blocks_state + + # TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524 + s3_data_object["generic"] = {} + + # Generate functions call timeseries functions_calls_data = generate_data( db_session=db_session, blockchain_type=blockchain_type, @@ -625,9 +648,9 @@ def stats_generate_handler(args: argparse.Namespace): metric_type="tx_call", crawler_label=crawler_label, ) - s3_data_object["functions"] = functions_calls_data + # Generte events timeseries events_data = generate_data( db_session=db_session, blockchain_type=blockchain_type, @@ -638,11 +661,9 @@ def stats_generate_handler(args: argparse.Namespace): metric_type="event", crawler_label=crawler_label, ) - s3_data_object["events"] = events_data - s3_data_object["generic"] = {} - + # Push data to S3 bucket push_statistics( statistics_data=s3_data_object, subscription=subscription_by_id[subscription_id], @@ -671,6 +692,160 @@ def stats_generate_handler(args: argparse.Namespace): ) +def stats_generate_api_task( + timescales: List[str], + dashboard: BugoutResource, + subscription_by_id: Dict[str, BugoutResource], +): + """ + Start crawler with generate. + """ + + with yield_db_session_ctx() as db_session: + + logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}") + + s3_client = boto3.client("s3") + + for dashboard_subscription_filters in dashboard.resource_data[ + "subscription_settings" + ]: + + try: + + subscription_id = dashboard_subscription_filters["subscription_id"] + + blockchain_type = AvailableBlockchainType( + blockchain_by_subscription_id[ + subscription_by_id[subscription_id].resource_data[ + "subscription_type_id" + ] + ] + ) + + s3_data_object: Dict[str, Any] = {} + + extention_data = [] + + address = subscription_by_id[subscription_id].resource_data["address"] + + crawler_label = CRAWLER_LABEL + + if address in ( + "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", + "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8", + ): + crawler_label = "moonworm" + + # Read required events, functions and web3_call form ABI + if not subscription_by_id[subscription_id].resource_data["abi"]: + + methods = [] + events = [] + abi_json = {} + + else: + + bucket = subscription_by_id[subscription_id].resource_data["bucket"] + key = subscription_by_id[subscription_id].resource_data["s3_path"] + abi = s3_client.get_object( + Bucket=bucket, + Key=key, + ) + abi_json = json.loads(abi["Body"].read()) + + methods = generate_list_of_names( + type="function", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters["all_methods"], + abi_json=abi_json, + ) + + events = generate_list_of_names( + type="event", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters["all_events"], + abi_json=abi_json, + ) + + # Data for cards components + extention_data = generate_web3_metrics( + db_session=db_session, + events=events, + blockchain_type=blockchain_type, + address=address, + crawler_label=crawler_label, + abi_json=abi_json, + ) + + # Generate blocks state information + current_blocks_state = get_blocks_state( + db_session=db_session, blockchain_type=blockchain_type + ) + + for timescale in timescales: + + start_date = ( + datetime.utcnow() - timescales_delta[timescale]["timedelta"] + ) + + logger.info(f"Timescale: {timescale}") + + s3_data_object["web3_metric"] = extention_data + + # Write state of blocks in database + s3_data_object["blocks_state"] = current_blocks_state + + # TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524 + s3_data_object["generic"] = {} + + # Generate functions call timeseries + functions_calls_data = generate_data( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + functions=methods, + start=start_date, + metric_type="tx_call", + crawler_label=crawler_label, + ) + s3_data_object["functions"] = functions_calls_data + + # Generate events timeseries + events_data = generate_data( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + functions=events, + start=start_date, + metric_type="event", + crawler_label=crawler_label, + ) + s3_data_object["events"] = events_data + + # push data to S3 bucket + push_statistics( + statistics_data=s3_data_object, + subscription=subscription_by_id[subscription_id], + timescale=timescale, + bucket=bucket, + dashboard_id=dashboard.id, + ) + except Exception as err: + reporter.error_report( + err, + [ + "dashboard", + "statistics", + f"subscriptions:{subscription_id}", + f"dashboard:{dashboard.id}", + ], + ) + logger.error(err) + + def main() -> None: parser = argparse.ArgumentParser(description="Command Line Interface") parser.set_defaults(func=lambda _: parser.print_help()) diff --git a/frontend/pages/dashboard/[dashboardId].js b/frontend/pages/dashboard/[dashboardId].js index 05c9e230..878a5e0c 100644 --- a/frontend/pages/dashboard/[dashboardId].js +++ b/frontend/pages/dashboard/[dashboardId].js @@ -13,6 +13,7 @@ import { EditablePreview, Button, } from "@chakra-ui/react"; +import { RepeatIcon } from "@chakra-ui/icons"; import Scrollable from "../../src/components/Scrollable"; import RangeSelector from "../../src/components/RangeSelector"; import useDashboard from "../../src/core/hooks/useDashboard"; @@ -44,6 +45,7 @@ const Analytics = () => { dashboardLinksCache, deleteDashboard, updateDashboard, + refreshDashboard, } = useDashboard(dashboardId); const { subscriptionsCache } = useSubscriptions(); @@ -86,6 +88,20 @@ const Analytics = () => { const plotMinW = "250px"; + const refereshCharts = () => { + refreshDashboard.mutate({ + dashboardId: dashboardCache.data.id, + timeRange: timeRange, + }); + }; + + const retryCallbackFn = (attempts, status) => { + if (status === 304 && attempts > 5) { + refereshCharts(); + } + return status === 404 || status === 403 ? false : true; + }; + return ( { variant="outline" icon={} /> + } + variant="ghost" + colorScheme="green" + size="sm" + onClick={() => { + refereshCharts(); + }} + /> @@ -175,9 +203,11 @@ const Analytics = () => { > {name ?? ""} + diff --git a/frontend/src/components/SubscriptionReport.js b/frontend/src/components/SubscriptionReport.js index df01c9bd..05006882 100644 --- a/frontend/src/components/SubscriptionReport.js +++ b/frontend/src/components/SubscriptionReport.js @@ -1,6 +1,7 @@ import React, { useMemo } from "react"; import { usePresignedURL } from "../core/hooks"; import Report from "./Report"; + import { Spinner, Flex, @@ -19,16 +20,24 @@ timeMap[HOUR_KEY] = "hour"; timeMap[DAY_KEY] = "day"; timeMap[WEEK_KEY] = "week"; -const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => { - const { data, isLoading, failureCount } = usePresignedURL({ - url: url, +const SubscriptionReport = ({ + timeRange, + presignedRequest, + id, + refetchLinks, + retryCallbackFn, +}) => { + const { data, isLoading, failureCount, isFetching } = usePresignedURL({ + ...presignedRequest, isEnabled: true, id: id, cacheType: `${timeRange} subscription_report`, requestNewURLCallback: refetchLinks, hideToastOn404: true, + retryCallbackFn: retryCallbackFn, }); const plotMinW = "250px"; + const eventKeys = useMemo( () => Object.keys(data?.events ?? {}).length > 0 @@ -50,8 +59,9 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => { : undefined, [data] ); + if (failureCount < 1 && (!data || isLoading)) return ; - if (failureCount >= 1 && (!data || isLoading)) + if (failureCount >= 1 && (!data || isLoading)) { return ( {
); + } + return ( { flexBasis={plotMinW} direction="column" > + {`Latest block number: ${ + data?.blocks_state?.latest_labelled_block ?? "Not available" + }`} { boxShadow="md" m={2} > - - {key} - + {key} + {isFetching && } + { boxShadow="md" m={2} > - - {key} - + {key} + {isFetching && } + { boxShadow="md" m={2} > - - {key} - + {key} + {isFetching && } +
{ const toast = useToast(); const router = useRouter(); + const queryClient = useQueryClient(); const { user } = useContext(UserContext); const dashboardsListCache = useQuery( @@ -119,12 +120,35 @@ const useDashboard = (dashboardId) => { } ); + const refreshDashboard = useMutation(DashboardService.refreshDashboard, { + onSuccess: (data) => { + queryClient.setQueryData( + ["dashboardLinks", { dashboardId: dashboardId }], + (oldData) => { + let newData = { ...oldData }; + + Object.keys(data.data).forEach((subscription) => { + Object.keys(data.data[subscription]).forEach((timeScale) => { + newData.data[subscription][timeScale] = + data.data[subscription][timeScale]; + }); + }); + return newData; + } + ); + }, + onError: (error) => { + toast(error.error, "error", "Fail"); + }, + }); + return { createDashboard, dashboardsListCache, dashboardCache, deleteDashboard, dashboardLinksCache, + refreshDashboard, updateDashboard, }; }; diff --git a/frontend/src/core/hooks/usePresignedURL.js b/frontend/src/core/hooks/usePresignedURL.js index 14fc7b3e..f091f4ba 100644 --- a/frontend/src/core/hooks/usePresignedURL.js +++ b/frontend/src/core/hooks/usePresignedURL.js @@ -5,25 +5,36 @@ import axios from "axios"; const usePresignedURL = ({ url, + headers, cacheType, id, requestNewURLCallback, isEnabled, hideToastOn404, + retryCallbackFn, }) => { const toast = useToast(); const getFromPresignedURL = async () => { - const response = await axios({ + let requestParameters = { url: url, // You can uncomment this to use mockupsLibrary in development // url: `https://example.com/s3`, + headers: {}, method: "GET", - }); + }; + + if (headers) { + Object.keys(headers).map((key) => { + requestParameters["headers"][key] = headers[key]; + }); + } + + const response = await axios(requestParameters); return response.data; }; - const { data, isLoading, error, failureCount } = useQuery( + const { data, isLoading, error, failureCount, isFetching } = useQuery( ["presignedURL", cacheType, id, url], getFromPresignedURL, { @@ -34,6 +45,9 @@ const usePresignedURL = ({ staleTime: Infinity, enabled: isEnabled && url ? true : false, keepPreviousData: true, + retry: (attempts, e) => { + return retryCallbackFn(attempts, e?.response?.status); + }, onError: (e) => { if ( e?.response?.data?.includes("Request has expired") || @@ -41,7 +55,9 @@ const usePresignedURL = ({ ) { requestNewURLCallback(); } else { - !hideToastOn404 && toast(error, "error"); + !hideToastOn404 && + e?.response?.status !== 304 && + toast(error, "error"); } }, } @@ -52,6 +68,7 @@ const usePresignedURL = ({ isLoading, error, failureCount, + isFetching, }; }; diff --git a/frontend/src/core/services/dashboard.service.js b/frontend/src/core/services/dashboard.service.js index 5f6e248e..e645e9db 100644 --- a/frontend/src/core/services/dashboard.service.js +++ b/frontend/src/core/services/dashboard.service.js @@ -45,3 +45,13 @@ export const getDashboardLinks = (dashboardId) => { url: `${API_URL}/dashboards/${dashboardId}/stats`, }); }; + +export const refreshDashboard = ({ dashboardId, timeRange }) => { + return http({ + method: "POST", + url: `${API_URL}/dashboards/${dashboardId}/stats_update`, + data: { + timescales: [timeRange], + }, + }); +};