Merge pull request #516 from bugout-dev/refresh-dashboard-stats

Refresh dashboard stats
pull/525/head
Andrei-Dolgolev 2022-01-17 19:17:48 +02:00 zatwierdzone przez GitHub
commit 9d84650ef3
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
30 zmienionych plików z 620 dodań i 226 usunięć

Wyświetl plik

@ -1,15 +1,16 @@
import argparse
import binascii
import sys
from typing import List, Optional, Union, Type, cast
from typing import List, Optional, Type, Union, cast
import pyevmasm
from moonstreamdb.db import yield_db_session
from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import text
from .data import EVMEventSignature, EVMFunctionSignature, ContractABI
from moonstreamdb.db import yield_db_session
from .data import ContractABI, EVMEventSignature, EVMFunctionSignature
def query_for_text_signatures(

Wyświetl plik

@ -2,41 +2,36 @@ import hashlib
import json
from itertools import chain
import logging
from typing import List, Optional, Dict, Any, Union
import time
from typing import List, Optional, Dict, Any
from enum import Enum
import uuid
import boto3 # type: ignore
from bugout.data import BugoutSearchResults, BugoutSearchResult
from bugout.data import BugoutSearchResults, BugoutSearchResult, BugoutResource
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import (
EthereumLabel,
)
from moonstreamdb.models import EthereumLabel
from sqlalchemy import text
from sqlalchemy.orm import Session
from web3 import Web3
from web3._utils.validation import validate_abi
from .middleware import MoonstreamHTTPException
from . import data
from .reporter import reporter
from .middleware import MoonstreamHTTPException
from .settings import ETHERSCAN_SMARTCONTRACTS_BUCKET
from bugout.data import BugoutResource
from .reporter import reporter
from .settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
ETHERSCAN_SMARTCONTRACTS_BUCKET,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from web3 import Web3
from .settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,13 +2,13 @@
Moonstream CLI
"""
import argparse
import logging
import json
import logging
import os
from posix import listdir
from typing import Optional
from sqlalchemy.orm import with_expression
from moonstreamdb.db import SessionLocal

Wyświetl plik

@ -7,12 +7,9 @@ from typing import Dict, List, Optional, Union
from bugout.data import BugoutResources
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from .. import reporter
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN
from ..settings import bugout_client as bc
def migrate_subscriptions(

Wyświetl plik

@ -8,16 +8,15 @@ from typing import Dict
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from . import actions
from . import data
from . import actions, data
from .middleware import BroodAuthMiddleware, MoonstreamHTTPException
from .routes.address_info import router as addressinfo_router
from .routes.dashboards import router as dashboards_router
from .routes.streams import router as streams_router
from .routes.subscriptions import router as subscriptions_router
from .routes.txinfo import router as txinfo_router
from .routes.users import router as users_router
from .routes.whales import router as whales_router
from .routes.dashboards import router as dashboards_router
from .middleware import BroodAuthMiddleware, MoonstreamHTTPException
from .settings import DOCS_TARGET_PATH, ORIGINS
from .version import MOONSTREAMAPI_VERSION

Wyświetl plik

@ -17,6 +17,10 @@ class TimeScale(Enum):
day = "day"
class UpdateStats(BaseModel):
timescales: List[str]
class SubscriptionTypeResourceData(BaseModel):
id: str
name: str

Wyświetl plik

@ -7,7 +7,8 @@ from fastapi import HTTPException, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from .reporter import reporter
from .settings import MOONSTREAM_APPLICATION_ID, bugout_client as bc
from .settings import MOONSTREAM_APPLICATION_ID
from .settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -32,9 +32,9 @@ from bugout.app import Bugout
from bugout.data import BugoutResource
from sqlalchemy.orm import Session
from . import bugout, transactions, moonworm_provider
from .. import data
from ..stream_queries import StreamQuery
from . import bugout, moonworm_provider, transactions
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,9 +1,9 @@
"""
Event providers powered by Bugout journals.
"""
from datetime import datetime
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from bugout.app import Bugout
@ -14,9 +14,8 @@ from dateutil.tz import UTC
from sqlalchemy.orm import Session
from .. import data
from ..stream_queries import StreamQuery
from ..settings import HUMBUG_TXPOOL_CLIENT_ID
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,24 +1,18 @@
from dataclasses import dataclass, field
import logging
from typing import cast, Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout
from bugout.data import BugoutResource
from moonstreamdb.blockchain import (
get_label_model,
AvailableBlockchainType,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query, query_expression
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import Query, Session, query_expression
from sqlalchemy.sql.expression import label
from .. import data
from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,25 +1,22 @@
from dataclasses import dataclass, field
import logging
from typing import cast, Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout
from bugout.data import BugoutResource
from moonstreamdb.blockchain import (
get_label_model,
get_block_model,
get_transaction_model,
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import Query, Session
from .. import data
from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -2,12 +2,12 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from moonstreamdb.db import yield_db_session
from sqlalchemy.orm import Session
from web3 import Web3
from .. import actions
from .. import data
from moonstreamdb.db import yield_db_session
from .. import actions, data
from ..middleware import MoonstreamHTTPException
from ..web3_provider import yield_web3_provider

Wyświetl plik

@ -1,25 +1,27 @@
import json
import logging
from os import read
import json
from typing import Any, List, Optional, Dict
from typing import Any, Dict, List, Optional
from uuid import UUID
import boto3 # type: ignore
import requests
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Request, Query, Body
from fastapi import APIRouter, Body, Path, Query, Request
from .. import actions
from .. import data
from .. import actions, data
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
@ -350,7 +352,7 @@ async def get_dashboard_data_links_handler(
request: Request, dashboard_id: str
) -> Dict[UUID, Any]:
"""
Update dashboards mainly fully overwrite name and subscription metadata
Get s3 presign urls for dshaboard grafics
"""
token = request.state.token
@ -427,10 +429,39 @@ async def get_dashboard_data_links_handler(
ExpiresIn=300,
HttpMethod="GET",
)
stats[subscription.id][timescale] = stats_presigned_url
stats[subscription.id][timescale] = {"url": stats_presigned_url}
except Exception as err:
logger.warning(
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, Key:{result_key} get error:{err}"
)
return stats
@router.post("/{dashboard_id}/stats_update", tags=["dashboards"])
async def update_dashbord_data_handler(
request: Request,
dashboard_id: str = Path(...),
updatestats: data.UpdateStats = Body(...),
) -> Dict[str, Any]:
"""
Return journal statistics
journal.read permission required.
"""
token = request.state.token
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/stats_update",
json={
"dashboard_id": dashboard_id,
"timescales": updatestats.timescales,
"token": token,
},
)
if responce.status_code != 200:
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail="Task for start generate stats failed.",
)
return responce.json()

Wyświetl plik

@ -5,11 +5,12 @@ import logging
from typing import Any, Dict, List, Optional
from bugout.data import BugoutResource
from fastapi import APIRouter, Request, Query, Depends
from moonstreamdb import db
from fastapi import APIRouter, Depends, Query, Request
from sqlalchemy.orm import Session
from .. import data
from moonstreamdb import db
from .. import data, stream_queries
from ..middleware import MoonstreamHTTPException
from ..providers import (
ReceivingEventsException,
@ -20,12 +21,11 @@ from ..providers import (
previous_event,
)
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from .. import stream_queries
from ..settings import bugout_client as bc
from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,10 +2,9 @@
The Moonstream subscriptions HTTP API
"""
import hashlib
import logging
import json
from typing import List, Optional, Dict, Any
import logging
from typing import Any, Dict, List, Optional
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
@ -16,14 +15,16 @@ from web3 import Web3
from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks
from ..admin import subscription_types
from .. import data
from ..actions import upload_abi_to_s3, validate_abi_json
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from ..settings import bugout_client as bc
from ..web3_provider import yield_web3_provider
logger = logging.getLogger(__name__)

Wyświetl plik

@ -9,12 +9,12 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends
from moonstreamdb.db import yield_db_session
from sqlalchemy.orm import Session
from moonstreamdb.db import yield_db_session
from .. import actions, data
from ..abi_decoder import decode_abi
from .. import actions
from .. import data
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,27 +2,18 @@
The Moonstream users HTTP API
"""
import logging
from typing import Any, Dict, Optional
import uuid
from typing import Any, Dict, Optional
from bugout.data import BugoutToken, BugoutUser, BugoutResource, BugoutUserTokens
from bugout.data import BugoutResource, BugoutToken, BugoutUser, BugoutUserTokens
from bugout.exceptions import BugoutResponseException
from fastapi import (
APIRouter,
Body,
Form,
Request,
)
from fastapi import APIRouter, Body, Form, Request
from .. import data
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from ..actions import create_onboarding_resource
from ..middleware import MoonstreamHTTPException
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -7,15 +7,16 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from moonstreamdb import db
from sqlalchemy.orm import Session
from moonstreamdb import db
from .. import data
from ..providers.bugout import ethereum_whalewatch_provider
from ..settings import (
bugout_client,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
bugout_client,
)
from ..stream_queries import StreamQuery

Wyświetl plik

@ -66,6 +66,17 @@ MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/")
)
MOONSTREAM_CRAWLERS_SERVER_URL = os.environ.get("MOONSTREAM_CRAWLERS_SERVER_URL")
if MOONSTREAM_CRAWLERS_SERVER_URL is None:
raise ValueError("MOONSTREAM_CRAWLERS_SERVER_URL environment variable must be set")
MOONSTREAM_CRAWLERS_SERVER_URL = MOONSTREAM_CRAWLERS_SERVER_URL.rstrip("/")
MOONSTREAM_CRAWLERS_SERVER_PORT = os.environ.get("MOONSTREAM_CRAWLERS_SERVER_PORT")
if MOONSTREAM_CRAWLERS_SERVER_PORT is None:
raise ValueError("MOONSTREAM_CRAWLERS_SERVER_PORT environment variable must be set")
MOONSTREAM_CRAWLERS_SERVER_PORT = MOONSTREAM_CRAWLERS_SERVER_PORT.rstrip("/")
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)

Wyświetl plik

@ -1,9 +1,9 @@
"""
Stream queries - data structure, and parser.
"""
from dataclasses import dataclass, field
import logging
from typing import cast, List, Tuple
from dataclasses import dataclass, field
from typing import List, Tuple, cast
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

Wyświetl plik

@ -1,10 +1,9 @@
import unittest
from .data import DashboardMeta
from .actions import dashboards_abi_validation
from .data import DashboardMeta
from .middleware import MoonstreamHTTPException
abi_example = [
{
"inputs": [

Wyświetl plik

@ -1,17 +1,30 @@
"""
The Mooncrawl HTTP API
"""
from datetime import datetime, timedelta
import logging
from os import times
import time
from typing import Dict
from typing import Dict, Any, List
from uuid import UUID
from fastapi import FastAPI
import boto3 # type: ignore
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from bugout.data import BugoutResource, BugoutResources
from . import data
from .middleware import MoonstreamHTTPException
from .settings import DOCS_TARGET_PATH, ORIGINS
from .settings import (
DOCS_TARGET_PATH,
ORIGINS,
bugout_client as bc,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from .version import MOONCRAWL_VERSION
from .stats_worker import dashboard
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -65,17 +78,91 @@ async def now_handler() -> data.NowResponse:
return data.NowResponse(epoch_time=time.time())
@app.get("/jobs/stats_update", tags=["jobs"])
async def status_handler():
@app.post("/jobs/stats_update", tags=["jobs"])
async def status_handler(
stats_update: data.StatsUpdateRequest,
background_tasks: BackgroundTasks,
):
"""
Find latest crawlers records with creation timestamp:
- ethereum_txpool
- ethereum_trending
Update dashboard endpoint create are tasks for update.
"""
dashboard_resource: BugoutResource = bc.get_resource(
token=stats_update.token,
resource_id=stats_update.dashboard_id,
timeout=10,
)
# get all user subscriptions
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=stats_update.token,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=10,
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
s3_client = boto3.client("s3")
try:
pass
background_tasks.add_task(
dashboard.stats_generate_api_task,
timescales=stats_update.timescales,
dashboard=dashboard_resource,
subscription_by_id=subscription_by_id,
)
except Exception as e:
logger.error(f"Unhandled status exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
return
presigned_urls_response: Dict[UUID, Any] = {}
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
subscription = subscription_by_id[
dashboard_subscription_filters["subscription_id"]
]
for timescale in stats_update.timescales:
presigned_urls_response[subscription.id] = {}
try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{stats_update.dashboard_id}/v1/{timescale}.json'
object = s3_client.head_object(
Bucket=subscription.resource_data["bucket"], Key=result_key
)
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": subscription.resource_data["bucket"],
"Key": result_key,
},
ExpiresIn=300,
HttpMethod="GET",
)
presigned_urls_response[subscription.id][timescale] = {
"url": stats_presigned_url,
"headers": {
"If-Modified-Since": (
object["LastModified"] + timedelta(seconds=1)
).strftime("%c")
},
}
except Exception as err:
logger.warning(
f"Can't generate S3 presigned url in stats endpoint for Bucket:{subscription.resource_data['bucket']}, Key:{result_key} get error:{err}"
)
return presigned_urls_response

Wyświetl plik

@ -1,6 +1,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List
from pydantic import BaseModel
@ -10,6 +11,12 @@ class AvailableBlockchainType(Enum):
POLYGON = "polygon"
class StatsUpdateRequest(BaseModel):
dashboard_id: str
timescales: List[str]
token: str
@dataclass
class DateRange:
start_time: datetime

Wyświetl plik

@ -3,6 +3,11 @@ from typing import cast
from bugout.app import Bugout
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")

Wyświetl plik

@ -8,7 +8,7 @@ import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Union
from typing import Any, Callable, Dict, List
from uuid import UUID
import boto3 # type: ignore
@ -21,7 +21,6 @@ from web3 import Web3
from ..blockchain import (
connect,
get_block_model,
get_label_model,
get_transaction_model,
)
@ -34,8 +33,8 @@ from ..settings import (
)
from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
subscription_ids_by_blockchain = {
@ -101,7 +100,7 @@ def push_statistics(
Metadata={"drone": "statistics"},
)
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
logger.info(f"Statistics push to bucket: s3://{bucket}/{result_key}")
def generate_data(
@ -381,9 +380,10 @@ def process_external(
}
)
except Exception as e:
print(f"Error processing external call: {e}")
logger.error(f"Error processing external call: {e}")
web3_client = connect(blockchain)
if external_calls:
web3_client = connect(blockchain)
for extcall in external_calls:
try:
@ -398,7 +398,7 @@ def process_external(
{"display_name": extcall["display_name"], "value": response}
)
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
logger.error(f"Failed to call {extcall['name']} error: {e}")
return extention_data
@ -427,6 +427,79 @@ def get_count(
)
def generate_web3_metrics(
db_session: Session,
events: List[str],
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
abi_json: Any,
) -> List[Any]:
"""
Generate stats for cards components
"""
extention_data = []
abi_external_calls = [item for item in abi_json if item["type"] == "external_call"]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# TODO: Remove it if ABI already have correct web3_call signature.
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data["args"]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
return extention_data
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
@ -434,10 +507,8 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
# read all subscriptions
start_time = time.time()
blockchain_type = AvailableBlockchainType(args.blockchain)
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -445,10 +516,9 @@ def stats_generate_handler(args: argparse.Namespace):
timeout=10,
)
print(f"Amount of dashboards: {len(dashboard_resources.resources)}")
logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# get all subscriptions
available_subscriptions: List[BugoutResource] = []
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
@ -469,7 +539,7 @@ def stats_generate_handler(args: argparse.Namespace):
for blockchain_subscription in available_subscriptions
}
print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
@ -489,11 +559,11 @@ def stats_generate_handler(args: argparse.Namespace):
continue
subscriptions_count += 1
s3_data_object: Dict[str, Any] = {}
extention_data = []
# The resulting object whivh be pushed to S3
s3_data_object: Dict[str, Any] = {}
address = subscription_by_id[subscription_id].resource_data[
"address"
]
@ -506,33 +576,30 @@ def stats_generate_handler(args: argparse.Namespace):
):
crawler_label = "moonworm"
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
@ -540,65 +607,16 @@ def stats_generate_handler(args: argparse.Namespace):
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
)
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data[
"args"
]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
# Generate blocks state information
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
@ -609,12 +627,17 @@ def stats_generate_handler(args: argparse.Namespace):
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
s3_data_object["blocks_state"] = current_blocks_state
logger.info(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
# Write state of blocks in database
s3_data_object["blocks_state"] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -625,9 +648,9 @@ def stats_generate_handler(args: argparse.Namespace):
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["functions"] = functions_calls_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
@ -638,11 +661,9 @@ def stats_generate_handler(args: argparse.Namespace):
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
s3_data_object["generic"] = {}
# Push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
@ -671,6 +692,160 @@ def stats_generate_handler(args: argparse.Namespace):
)
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, BugoutResource],
):
"""
Start crawler with generate.
"""
with yield_db_session_ctx() as db_session:
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
blockchain_type = AvailableBlockchainType(
blockchain_by_subscription_id[
subscription_by_id[subscription_id].resource_data[
"subscription_type_id"
]
]
)
s3_data_object: Dict[str, Any] = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
crawler_label = CRAWLER_LABEL
if address in (
"0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",
"0xA2a13cE1824F3916fC84C65e559391fc6674e6e8",
):
crawler_label = "moonworm"
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
# Data for cards components
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
)
# Generate blocks state information
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
for timescale in timescales:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
logger.info(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
# Write state of blocks in database
s3_data_object["blocks_state"] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=methods,
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["functions"] = functions_calls_data
# Generate events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=events,
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
# push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard.id,
)
except Exception as err:
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard.id}",
],
)
logger.error(err)
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())

Wyświetl plik

@ -13,6 +13,7 @@ import {
EditablePreview,
Button,
} from "@chakra-ui/react";
import { RepeatIcon } from "@chakra-ui/icons";
import Scrollable from "../../src/components/Scrollable";
import RangeSelector from "../../src/components/RangeSelector";
import useDashboard from "../../src/core/hooks/useDashboard";
@ -44,6 +45,7 @@ const Analytics = () => {
dashboardLinksCache,
deleteDashboard,
updateDashboard,
refreshDashboard,
} = useDashboard(dashboardId);
const { subscriptionsCache } = useSubscriptions();
@ -86,6 +88,20 @@ const Analytics = () => {
const plotMinW = "250px";
const refereshCharts = () => {
refreshDashboard.mutate({
dashboardId: dashboardCache.data.id,
timeRange: timeRange,
});
};
const retryCallbackFn = (attempts, status) => {
if (status === 304 && attempts > 5) {
refereshCharts();
}
return status === 404 || status === 403 ? false : true;
};
return (
<Scrollable>
<Flex
@ -146,6 +162,18 @@ const Analytics = () => {
variant="outline"
icon={<BsGear />}
/>
<IconButton
isLoading={
refreshDashboard.isLoading || refreshDashboard.isFetching
}
icon={<RepeatIcon />}
variant="ghost"
colorScheme="green"
size="sm"
onClick={() => {
refereshCharts();
}}
/>
</Stack>
<Flex w="100%" direction="row" flexWrap="wrap-reverse" id="container">
@ -175,9 +203,11 @@ const Analytics = () => {
>
{name ?? ""}
</Text>
<SubscriptionReport
retryCallbackFn={retryCallbackFn}
timeRange={timeRange}
url={s3PresignedURLs[timeRange]}
presignedRequest={s3PresignedURLs[timeRange]}
id={dashboardId}
refetchLinks={dashboardLinksCache.refetch}
/>

Wyświetl plik

@ -1,6 +1,7 @@
import React, { useMemo } from "react";
import { usePresignedURL } from "../core/hooks";
import Report from "./Report";
import {
Spinner,
Flex,
@ -19,16 +20,24 @@ timeMap[HOUR_KEY] = "hour";
timeMap[DAY_KEY] = "day";
timeMap[WEEK_KEY] = "week";
const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
const { data, isLoading, failureCount } = usePresignedURL({
url: url,
const SubscriptionReport = ({
timeRange,
presignedRequest,
id,
refetchLinks,
retryCallbackFn,
}) => {
const { data, isLoading, failureCount, isFetching } = usePresignedURL({
...presignedRequest,
isEnabled: true,
id: id,
cacheType: `${timeRange} subscription_report`,
requestNewURLCallback: refetchLinks,
hideToastOn404: true,
retryCallbackFn: retryCallbackFn,
});
const plotMinW = "250px";
const eventKeys = useMemo(
() =>
Object.keys(data?.events ?? {}).length > 0
@ -50,8 +59,9 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
: undefined,
[data]
);
if (failureCount < 1 && (!data || isLoading)) return <Spinner />;
if (failureCount >= 1 && (!data || isLoading))
if (failureCount >= 1 && (!data || isLoading)) {
return (
<Container
w="100%"
@ -85,6 +95,8 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
<br />
</Container>
);
}
return (
<Flex
w="100%"
@ -93,6 +105,9 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
flexBasis={plotMinW}
direction="column"
>
<Text fontSize="xs" textAlign="right">{`Latest block number: ${
data?.blocks_state?.latest_labelled_block ?? "Not available"
}`}</Text>
<Flex
bgColor="blue.50"
direction={["column", "row", null]}
@ -155,15 +170,16 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
boxShadow="md"
m={2}
>
<Text
<Flex
direction="row"
w="100%"
py={2}
bgColor="gray.50"
fontWeight="600"
textAlign="center"
bgColor="blue.50"
placeItems="center"
justifyContent={"center"}
>
{key}
</Text>
<Text>{key}</Text>
{isFetching && <Spinner size="sm" m={1} />}
</Flex>
<Report
data={data.events[key]}
metric={key}
@ -198,15 +214,16 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
boxShadow="md"
m={2}
>
<Text
<Flex
direction="row"
w="100%"
py={2}
bgColor="gray.50"
fontWeight="600"
textAlign="center"
bgColor="blue.50"
placeItems="center"
justifyContent={"center"}
>
{key}
</Text>
<Text>{key}</Text>
{isFetching && <Spinner size="sm" m={2} />}
</Flex>
<Report
data={data.functions[key]}
metric={key}
@ -241,15 +258,16 @@ const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
boxShadow="md"
m={2}
>
<Text
<Flex
direction="row"
w="100%"
py={2}
bgColor="gray.50"
fontWeight="600"
textAlign="center"
bgColor="blue.50"
placeItems="center"
justifyContent={"center"}
>
{key}
</Text>
<Text>{key}</Text>
{isFetching && <Spinner size="sm" m={2} />}
</Flex>
<Report
data={data.generic[key]}
metric={key}

Wyświetl plik

@ -1,4 +1,4 @@
import { useMutation, useQuery } from "react-query";
import { useMutation, useQuery, useQueryClient } from "react-query";
import { useRouter, useToast } from ".";
import { queryCacheProps } from "./hookCommon";
import { DashboardService } from "../services";
@ -8,6 +8,7 @@ import UserContext from "../providers/UserProvider/context";
const useDashboard = (dashboardId) => {
const toast = useToast();
const router = useRouter();
const queryClient = useQueryClient();
const { user } = useContext(UserContext);
const dashboardsListCache = useQuery(
@ -119,12 +120,35 @@ const useDashboard = (dashboardId) => {
}
);
const refreshDashboard = useMutation(DashboardService.refreshDashboard, {
onSuccess: (data) => {
queryClient.setQueryData(
["dashboardLinks", { dashboardId: dashboardId }],
(oldData) => {
let newData = { ...oldData };
Object.keys(data.data).forEach((subscription) => {
Object.keys(data.data[subscription]).forEach((timeScale) => {
newData.data[subscription][timeScale] =
data.data[subscription][timeScale];
});
});
return newData;
}
);
},
onError: (error) => {
toast(error.error, "error", "Fail");
},
});
return {
createDashboard,
dashboardsListCache,
dashboardCache,
deleteDashboard,
dashboardLinksCache,
refreshDashboard,
updateDashboard,
};
};

Wyświetl plik

@ -5,25 +5,36 @@ import axios from "axios";
const usePresignedURL = ({
url,
headers,
cacheType,
id,
requestNewURLCallback,
isEnabled,
hideToastOn404,
retryCallbackFn,
}) => {
const toast = useToast();
const getFromPresignedURL = async () => {
const response = await axios({
let requestParameters = {
url: url,
// You can uncomment this to use mockupsLibrary in development
// url: `https://example.com/s3`,
headers: {},
method: "GET",
});
};
if (headers) {
Object.keys(headers).map((key) => {
requestParameters["headers"][key] = headers[key];
});
}
const response = await axios(requestParameters);
return response.data;
};
const { data, isLoading, error, failureCount } = useQuery(
const { data, isLoading, error, failureCount, isFetching } = useQuery(
["presignedURL", cacheType, id, url],
getFromPresignedURL,
{
@ -34,6 +45,9 @@ const usePresignedURL = ({
staleTime: Infinity,
enabled: isEnabled && url ? true : false,
keepPreviousData: true,
retry: (attempts, e) => {
return retryCallbackFn(attempts, e?.response?.status);
},
onError: (e) => {
if (
e?.response?.data?.includes("Request has expired") ||
@ -41,7 +55,9 @@ const usePresignedURL = ({
) {
requestNewURLCallback();
} else {
!hideToastOn404 && toast(error, "error");
!hideToastOn404 &&
e?.response?.status !== 304 &&
toast(error, "error");
}
},
}
@ -52,6 +68,7 @@ const usePresignedURL = ({
isLoading,
error,
failureCount,
isFetching,
};
};

Wyświetl plik

@ -45,3 +45,13 @@ export const getDashboardLinks = (dashboardId) => {
url: `${API_URL}/dashboards/${dashboardId}/stats`,
});
};
export const refreshDashboard = ({ dashboardId, timeRange }) => {
return http({
method: "POST",
url: `${API_URL}/dashboards/${dashboardId}/stats_update`,
data: {
timescales: [timeRange],
},
});
};