Add cli step command
Fix actions tokens in moonstream
Fix dashboards API
Fix dashboard resources reconection.
Fix mooncrawl dashboards generation workflow.
pull/751/head
Andrey 2023-04-25 16:49:01 +03:00
rodzic ccfae29520
commit 231382422e
10 zmienionych plików z 725 dodań i 450 usunięć

Wyświetl plik

@ -662,7 +662,7 @@ def get_entity_subscription_collection_id(
resource_data = {
"type": resource_type,
"user_id": str(user_id),
"subscription_collection": str(collection_id),
"collection_id": str(collection_id),
}
try:
@ -678,8 +678,7 @@ def get_entity_subscription_collection_id(
raise MoonstreamHTTPException(status_code=500, internal_error=e)
else:
resource = resources.resources[0]
print(resource.resource_data)
return resource.resource_data["subscription_collection"]
return resource.resource_data["collection_id"]
def generate_s3_access_links(

Wyświetl plik

@ -6,7 +6,7 @@ import json
import logging
import os
from posix import listdir
from typing import Optional
from typing import Optional, List, Dict, Any, Union, Callable
from sqlalchemy.orm import with_expression
@ -20,7 +20,6 @@ from .migrations import (
checksum_address,
update_dashboard_subscription_key,
generate_entity_subscriptions,
update_dashboards_connection,
)
@ -53,6 +52,9 @@ description: {checksum_address.__doc__}
- id: 20230213
name: {generate_entity_subscriptions.__name__}
description: {generate_entity_subscriptions.__doc__}
steps:
- step 1: generate_entity_subscriptions_from_brood_resources - Generate entity subscriptions from brood resources
- step 2: update_dashboards_connection - Update dashboards connection
"""
logger.info(entity_migration_overview)
@ -84,13 +86,40 @@ def migrations_run(args: argparse.Namespace) -> None:
db_session = SessionLocal()
try:
if args.id == 20230213:
step_map: Dict[str, Dict[str, Union[str, Callable]]] = {
"generate_entity_subscriptions_from_brood_resources": {
"action": generate_entity_subscriptions.generate_entity_subscriptions_from_brood_resources,
"description": "Generate entity subscriptions from brood resources",
},
"update_dashboards_connection": {
"action": generate_entity_subscriptions.update_dashboards_connection,
"description": "Update dashboards connection",
},
}
if args.command == "upgrade":
logger.info(
"Starting migrate subscriptions from resources to entity..."
)
generate_entity_subscriptions.Generate_entity_subscriptions_from_brood_resources()
print("Starting update of dashboards...")
update_dashboards_connection()
step = args.step
if step is None:
# run all steps
for step in step_map:
logger.info(
f"Starting step {step}: {step_map[step]['description']}"
)
migration_function = step_map[step]["action"]
if callable(migration_function):
migration_function()
elif step in step_map:
logger.info(
f"Starting step {step}: {step_map[step]['description']}"
)
migration_function = step_map[step]["action"]
if callable(migration_function):
migration_function()
else:
logger.info(f"Step {step} does not exist")
logger.info(f"Available steps: {step_map.keys()}")
elif args.command == "downgrade":
logger.info(
"Starting migrate subscriptions from entity to resources..."
@ -373,6 +402,13 @@ This CLI is configured to work with the following API URLs:
type=str,
help="Command for migration",
)
parser_migrations_run.add_argument(
"-s",
"--step",
required=False,
type=str,
help="How many steps to run",
)
parser_migrations_run.set_defaults(func=migrations_run)
parser_moonworm_tasks = subcommands.add_parser(

Wyświetl plik

@ -167,7 +167,7 @@ def find_user_collection(
if len(user_entity_resources.resources) > 0:
collection_id = user_entity_resources.resources[0].resource_data[
"subscription_collection"
"collection_id"
]
print(f"Collection found for user {user_id}. collection_id: {collection_id}")
return collection_id, str(user_entity_resources.resources[0].id)
@ -180,7 +180,7 @@ def find_user_collection(
return None, None
def Generate_entity_subscriptions_from_brood_resources() -> None:
def generate_entity_subscriptions_from_brood_resources() -> None:
"""
Parse all existing dashboards at Brood resource
and replace key to correct one.
@ -209,6 +209,8 @@ def Generate_entity_subscriptions_from_brood_resources() -> None:
users_subscriptions: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
@ -277,7 +279,8 @@ def Generate_entity_subscriptions_from_brood_resources() -> None:
resource_data = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
"subscription_collection": str(collection_id),
"collection_id": str(collection_id),
"version": "1.0.0",
}
### Create resource for user collection
@ -323,6 +326,9 @@ def Generate_entity_subscriptions_from_brood_resources() -> None:
# abi hash
abi_hash = hashlib.sha256(abi_body.encode("utf-8")).hexdigest()
abi = True
logger.info(
f"Got abi from S3 from path {subscription['s3_path']}"
)
except Exception as e:
logger.error(f"Failed to get abi from S3: {str(e)}")
abi = None
@ -353,7 +359,6 @@ def Generate_entity_subscriptions_from_brood_resources() -> None:
permissions=[
"journals.read",
"journals.update",
"journals.delete",
"journals.entries.read",
"journals.entries.create",
"journals.entries.update",
@ -362,9 +367,12 @@ def Generate_entity_subscriptions_from_brood_resources() -> None:
)
stages[user_id]["permissions_granted"] = True
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to add permissions to user: {str(e)}")
continue
else:
logger.warn(
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behaivior!"
)
except Exception as e:
traceback.print_exc()
@ -389,6 +397,12 @@ def update_dashboards_connection():
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Dashboards parsing and save to dashboards_by_user
dashboard_resources: BugoutResources = bc.list_resources(
@ -410,76 +424,97 @@ def update_dashboards_connection():
dashboards_by_user[user_id].append(dashboard)
for user in dashboards_by_user:
print(f"dashboards: {len(dashboards_by_user[user])}")
try:
for user in dashboards_by_user:
print(f"dashboards: {len(dashboards_by_user[user])}")
if user not in stages:
continue
for dashboard in dashboards_by_user[user]:
dashboard_data = dashboard.resource_data
dashboard_subscription_settings = dashboard_data.get(
"subscription_settings"
)
if dashboard_subscription_settings is None:
if user not in stages:
continue
print(f"dashboard {dashboard.id}")
for dashboard in dashboards_by_user[user]:
dashboard_data = dashboard.resource_data
print(f"dashboard name:{dashboard_data['name']}")
dashboard_subscription_settings = dashboard_data.get(
"subscription_settings"
)
for setting_index, subscription_setting in enumerate(
dashboard_subscription_settings
):
print(f"Find subscripton: {subscription_setting['subscription_id']}")
if dashboard_subscription_settings is None:
continue
if (
str(subscription_setting["subscription_id"])
in stages[user]["proccessed_subscriptions"]
print(f"dashboard {dashboard.id}")
print(f"dashboard name:{dashboard_data['name']}")
for setting_index, subscription_setting in enumerate(
dashboard_subscription_settings
):
print(
f"subscription found: {subscription_setting['subscription_id']}"
f"Find subscripton: {subscription_setting['subscription_id']}"
)
subscription_metadata = stages[user]["proccessed_subscriptions"][
subscription_setting["subscription_id"]
]
if str(dashboard.id) in subscription_metadata["dashboard_ids"]:
continue
try:
# change original dashboard subscription settings
dashboard_data["subscription_settings"][setting_index][
"subscription_id"
] = subscription_metadata["entity_id"]
# Update brood resource in bugout client
print("RECOVERY DASHBOARD")
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {
"subscription_settings": dashboard_data[
"subscription_settings"
]
}
},
if (
str(subscription_setting["subscription_id"])
in stages[user]["proccessed_subscriptions"]
):
print(
f"subscription found: {subscription_setting['subscription_id']}"
)
stages[user]["proccessed_subscriptions"][
str(subscription_setting["subscription_id"])
]["dashboard_ids"].append(str(dashboard.id))
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to update dashboard: {str(e)}")
breakpoint()
continue
subscription_stages_metadata = stages[user][
"proccessed_subscriptions"
][subscription_setting["subscription_id"]]
if (
str(dashboard.id)
in subscription_stages_metadata["dashboard_ids"]
):
continue
try:
# change original dashboard subscription settings
dashboard_data["subscription_settings"][setting_index][
"subscription_id"
] = subscription_stages_metadata["entity_id"]
# Update brood resource in bugout client
print("RECOVERY DASHBOARD")
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {
"subscription_settings": dashboard_data[
"subscription_settings"
]
}
},
)
stages[user]["proccessed_subscriptions"][
str(subscription_setting["subscription_id"])
]["dashboard_ids"].append(str(dashboard.id))
except Exception as e:
print(
f"****Failed to update dashboard: {str(e)} for user {user}****"
)
continue
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess dashboards: {str(e)}")
finally:
try:
with open("stages.json", "w") as f:
json.dump(stages, f)
except Exception as e:
logger.error(f"Failed to save stages: {str(e)}")
# write as text
with open("stages-json-failed.txt", "w") as f:
f.write(str(stages))
def revoke_admin_permissions_from_collections(
@ -588,7 +623,7 @@ def delete_generated_entity_subscriptions_from_brood_resources():
### Create collections and add subscriptions
try:
for user_id, subscriptions in users_subscriptions.items():
for user_id, _ in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
@ -642,6 +677,57 @@ def delete_generated_entity_subscriptions_from_brood_resources():
print(f"Failed to delete collection resource: {str(e)}")
continue
### Retunr all dashboards to old state
if user_id in dashboards_by_user:
for dashboard in dashboards_by_user[user_id]:
try:
dashboard_data = dashboard.resource_data
if "subscription_settings" not in dashboard_data:
continue
if (
"subscription_id"
not in dashboard_data["subscription_settings"]
):
continue
subscription_id = dashboard_data["subscription_settings"][
"subscription_id"
]
if (
subscription_id
not in stages[user_id]["proccessed_subscriptions"]
):
continue
dashboard_data["subscription_settings"][
"subscription_id"
] = stages[user_id]["proccessed_subscriptions"][
subscription_id
][
"old_subscription_id"
]
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {
"subscription_settings": dashboard_data[
"subscription_settings"
]
}
},
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to update dashboard: {str(e)}")
breakpoint()
continue
except Exception as e:
traceback.print_exc()
print(f"Failed to proccess user subscriptions: {str(e)}")

Wyświetl plik

@ -8,7 +8,7 @@ import boto3 # type: ignore
import requests # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from entity.data import EntitiesResponse
from entity.data import EntitiesResponse, EntityResponse
from fastapi import APIRouter, Body, Path, Query, Request
from .. import actions, data
@ -69,7 +69,7 @@ async def add_dashboard_handler(
# process existing subscriptions with supplied ids
available_subscriptions_ids: Dict[Union[UUID, str], Dict[str, Any]] = {
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
@ -77,9 +77,9 @@ async def add_dashboard_handler(
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions_ids.keys():
if (
available_subscriptions_ids[dashboard_subscription.subscription_id][
"secondary_fields"
].get("abi")
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
is None
):
logger.error(
@ -91,9 +91,9 @@ async def add_dashboard_handler(
)
abi = json.loads(
available_subscriptions_ids[dashboard_subscription.subscription_id][
"secondary_fields"
]["abi"]
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields["abi"]
)
actions.dashboards_abi_validation(
@ -239,7 +239,7 @@ async def update_dashboard_handler(
limit=1000,
)
available_subscriptions_ids: Dict[Union[UUID, str], Dict[str, Any]] = {
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
@ -247,9 +247,9 @@ async def update_dashboard_handler(
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions_ids:
if (
available_subscriptions_ids[dashboard_subscription.subscription_id][
"secondary_fields"
].get("abi")
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
is None
):
logger.error(
@ -261,9 +261,9 @@ async def update_dashboard_handler(
)
abi = json.loads(
available_subscriptions_ids[dashboard_subscription.subscription_id][
"secondary_fields"
].get("abi")
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
)
actions.dashboards_abi_validation(dashboard_subscription, abi)
@ -377,7 +377,8 @@ async def get_dashboard_data_links_handler(
available_timescales = [timescale.value for timescale in data.TimeScale]
stats[id] = {}
for fields in subscription["secondary_fields"]:
for fields in subscription.required_fields:
print(fields)
if "subscription_type_id" in fields:
subscription_type_id = fields["subscription_type_id"]
@ -396,7 +397,7 @@ async def get_dashboard_data_links_handler(
stats[id][timescale] = {"url": stats_presigned_url}
except Exception as err:
logger.warning(
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, Key:{result_key} get error:{err}"
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, get error:{err}"
)
return stats
@ -414,6 +415,7 @@ async def update_dashbord_data_handler(
"""
token = request.state.token
user = request.state.user
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/stats_update",
@ -421,6 +423,7 @@ async def update_dashbord_data_handler(
"dashboard_id": dashboard_id,
"timescales": updatestats.timescales,
"token": token,
"user_id": str(user.id),
},
)
if responce.status_code != 200:

Wyświetl plik

@ -2,7 +2,23 @@ from collections import OrderedDict
import hashlib
import json
import logging
from typing import Any, Dict
from typing import Any, Dict, Optional, Union
import uuid
from bugout.data import (
BugoutSearchResults,
BugoutSearchResult,
BugoutResource,
BugoutResources,
)
from bugout.journal import SearchOrder
from bugout.exceptions import BugoutResponseException
from entity.data import EntityCollectionsResponse, EntityCollectionResponse
from entity.exceptions import EntityUnexpectedResponse
from .middleware import MoonstreamHTTPException
from .settings import bugout_client as bc
import boto3 # type: ignore
@ -10,6 +26,12 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EntityCollectionNotFoundException(Exception):
"""
Raised when entity collection is not found
"""
def push_data_to_bucket(
data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {}
) -> None:
@ -56,3 +78,33 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
).hexdigest()
return hash
def get_entity_subscription_collection_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> Optional[str]:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
"""
params = {
"type": resource_type,
"user_id": str(user_id),
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({user_id}) with token ({token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(resources.resources) == 0:
raise EntityCollectionNotFoundException("Subscription collection not found.")
else:
resource = resources.resources[0]
return resource.resource_data["collection_id"]

Wyświetl plik

@ -10,15 +10,23 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from entity.data import EntityResponse
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from .actions import generate_s3_access_links, query_parameter_hash
from .actions import (
generate_s3_access_links,
query_parameter_hash,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from . import data
from .middleware import MoonstreamHTTPException
from .settings import (
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
DOCS_TARGET_PATH,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
@ -27,7 +35,7 @@ from .settings import (
ORIGINS,
LINKS_EXPIRATION_TIME,
)
from .settings import bugout_client as bc
from .settings import bugout_client as bc, entity_client as ec
from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION
@ -98,18 +106,25 @@ async def status_handler(
timeout=10,
)
# get all user subscriptions
try:
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=stats_update.user_id,
)
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=stats_update.token,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=10,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({stats_update.user_id}) with token: {stats_update.token}, error: {str(e)}"
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
# get subscription entities
s3_client = boto3.client("s3")
@ -133,15 +148,23 @@ async def status_handler(
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
subscription = subscription_by_id[
dashboard_subscription_filters["subscription_id"]
]
# get subscription by id
subscription: EntityResponse = ec.get_entity(
token=stats_update.token,
collection_id=collection_id,
entity_id=dashboard_subscription_filters["subscription_id"],
)
for reqired_field in subscription.required_fields:
if "subscription_type_id" in reqired_field:
subscriprions_type = reqired_field["subscription_type_id"]
for timescale in stats_update.timescales:
presigned_urls_response[subscription.id] = {}
try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{stats_update.dashboard_id}/v1/{timescale}.json'
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription.address}/{stats_update.dashboard_id}/v1/{timescale}.json"
object = s3_client.head_object(
Bucket=subscription.resource_data["bucket"], Key=result_key

Wyświetl plik

@ -10,6 +10,7 @@ class StatsUpdateRequest(BaseModel):
dashboard_id: str
timescales: List[str]
token: str
user_id: str
@dataclass

Wyświetl plik

@ -3,17 +3,22 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from entity.client import Entity
from moonstreamdb.blockchain import AvailableBlockchainType
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
# Entity
ENTITY_URL = os.environ.get("ENTITY_URL", "https://api.moonstream.to")
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
entity_client = Entity(ENTITY_URL)
BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
)
@ -139,6 +144,14 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
"MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX environment variable must be set"
)
MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET = os.environ.get(
"MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET"
)
if MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET is None:
raise ValueError(
"MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET environment variable must be set"
)
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)
@ -254,3 +267,10 @@ infura_networks = {
"url": f"https://polygon-mumbai.infura.io/v3/{INFURA_PROJECT_ID}",
},
}
## Moonstream resources types
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"

Wyświetl plik

@ -14,6 +14,7 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from entity.data import EntityResponse, EntityCollectionResponse
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
@ -32,19 +33,20 @@ from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET,
)
from ..settings import bugout_client as bc
from ..settings import bugout_client as bc, entity_client as ec
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
subscription_ids_by_blockchain = {
"ethereum": ["ethereum_blockchain", "ethereum_smartcontract"],
"polygon": ["polygon_blockchain", "polygon_smartcontract"],
"mumbai": ["mumbai_blockchain", "mumbai_smartcontract"],
"xdai": ["xdai_blockchain", "xdai_smartcontract"],
"wyrm": ["wyrm_blockchain", "wyrm_smartcontract"],
subscription_id_by_blockchain = {
"ethereum": "ethereum_smartcontract",
"polygon": "polygon_smartcontract",
"mumbai": "mumbai_smartcontract",
"xdai": "xdai_smartcontract",
"wyrm": "wyrm_smartcontract",
}
blockchain_by_subscription_id = {
@ -93,13 +95,14 @@ BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
def push_statistics(
statistics_data: Dict[str, Any],
subscription: Any,
subscription_type_id: str,
address: str,
timescale: str,
bucket: str,
dashboard_id: Union[UUID, str],
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription_type_id]}/contracts_data/{address}/{dashboard_id}/v1/{timescale}.json"
s3 = boto3.client("s3")
s3.put_object(
@ -585,26 +588,46 @@ def stats_generate_handler(args: argparse.Namespace):
timeout=10,
)
dashboards_by_subscription: Dict[str, List[BugoutResource]] = {}
for dashboard in dashboard_resources.resources:
subscription_id = dashboard.resource_data.get("subscription_id")
if subscription_id:
if subscription_id not in dashboards_by_subscription:
dashboards_by_subscription[subscription_id] = []
dashboards_by_subscription[subscription_id].append(dashboard)
logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# get all subscriptions
available_subscriptions: List[BugoutResource] = []
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"subscription_type_id": subscription_type,
},
timeout=10,
)
available_subscriptions.extend(blockchain_subscriptions.resources)
# for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
# # Create subscriptions dict for get subscriptions by id.
# blockchain_subscriptions: BugoutResources = bc.list_resources(
# token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
# params={
# "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
# "subscription_type_id": subscription_type,
# },
# timeout=10,
# )
# available_subscriptions.extend(blockchain_subscriptions.resources)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in available_subscriptions
# Get all users entity collections
user_entity_collections: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_ENTITY_COLLECTION,
},
)
user_collection_by_id = {
str(collection.resource_data["user_id"]): collection.resource_data[
"collection_id"
]
for collection in user_entity_collections.resources
}
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
@ -636,329 +659,355 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
for user_id, collection_id in user_collection_by_id.items():
# request all subscriptions for user
if subscription_id not in subscription_by_id:
# Mean it's are different blockchain type
continue
try:
UUID(subscription_id)
except Exception as err:
logger.error(
f"Subscription id {subscription_id} is not valid UUID: {err}"
)
continue
address = subscription_by_id[subscription_id].resource_data[
"address"
]
if address not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address] = {}
if (
str(dashboard.id)
not in address_dashboard_id_subscription_id_tree
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
] = []
if (
subscription_id
not in address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
]
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
].append(subscription_id)
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
if address not in merged_functions:
merged_functions[address] = {}
merged_functions[address]["merged"] = set()
if str(dashboard.id) not in merged_events[address]:
merged_events[address][str(dashboard.id)] = {}
if str(dashboard.id) not in merged_functions[address]:
merged_functions[address][str(dashboard.id)] = {}
merged_events[address][str(dashboard.id)][subscription_id] = events
merged_functions[address][str(dashboard.id)][
subscription_id
] = methods
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
external_call_hash = hashlib.md5(
json.dumps(external_call_without_display_name).encode(
"utf-8"
)
).hexdigest()
if str(dashboard.id) not in merged_external_calls:
merged_external_calls[str(dashboard.id)] = {}
if (
subscription_id
not in merged_external_calls[str(dashboard.id)]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {}
if (
external_call_hash
not in merged_external_calls[str(dashboard.id)][
subscription_id
]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {external_call_hash: external_call["display_name"]}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
# Fill merged events and functions calls for all subscriptions
for event in events:
merged_events[address]["merged"].add(event)
for method in methods:
merged_functions[address]["merged"].add(method)
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
access_id=args.access_id,
)
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
user_subscriptions: EntityCollectionResponse = ec.search_entities(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
required_field=[
"subscription_type_id:{}".format(
subscription_id_by_blockchain[blockchain_type]
)
],
)
s3_data_object_for_contract: Dict[str, Any] = {}
logger.info(
f"Amount of user subscriptions: {len(user_subscriptions.entities)}"
)
crawler_label = CRAWLER_LABEL
for subscription in user_subscriptions.entities:
subscription_id = str(subscription.entity_id)
for timescale in [timescale.value for timescale in TimeScale]:
try:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
if subscription_id not in dashboards_by_subscription:
continue
logger.info(f"Timescale: {timescale}")
dashboard = dashboards_by_subscription[subscription_id]
# Write state of blocks in database
s3_data_object_for_contract["blocks_state"] = current_blocks_state
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters[
"subscription_id"
]
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
try:
UUID(subscription_id)
except Exception as err:
logger.error(
f"Subscription id {subscription_id} is not valid UUID: {err}"
)
continue
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
address = subscription.address
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
if address not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address] = {}
for dashboard_id in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for (
if (
str(dashboard.id)
not in address_dashboard_id_subscription_id_tree
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
] = []
if (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
dashboard_id
]:
try:
extention_data = []
not in address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
]
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
].append(subscription_id)
s3_subscription_data_object: Dict[str, Any] = {}
abi = None
if "abi" in subscription.secondary_fields:
abi = subscription.secondary_fields["abi"]
s3_subscription_data_object[
"blocks_state"
] = s3_data_object_for_contract["blocks_state"]
# Read required events, functions and web3_call form ABI
if abi is None:
methods = []
events = []
abi_json = {}
if dashboard_id in merged_external_calls:
for (
external_call_hash,
display_name,
) in merged_external_calls[dashboard_id][
else:
abi_json = abi
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
if address not in merged_functions:
merged_functions[address] = {}
merged_functions[address]["merged"] = set()
if str(dashboard.id) not in merged_events[address]:
merged_events[address][str(dashboard.id)] = {}
if str(dashboard.id) not in merged_functions[address]:
merged_functions[address][str(dashboard.id)] = {}
merged_events[address][str(dashboard.id)][
subscription_id
] = events
merged_functions[address][str(dashboard.id)][
subscription_id
] = methods
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
external_call_hash = hashlib.md5(
json.dumps(
external_call_without_display_name
).encode("utf-8")
).hexdigest()
if str(dashboard.id) not in merged_external_calls:
merged_external_calls[str(dashboard.id)] = {}
if (
subscription_id
not in merged_external_calls[str(dashboard.id)]
):
merged_external_calls[str(dashboard.id)][
subscription_id
].items():
if external_call_hash in external_calls_results:
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[
external_call_hash
],
}
)
] = {}
s3_subscription_data_object[
"web3_metric"
] = extention_data
if (
external_call_hash
not in merged_external_calls[str(dashboard.id)][
subscription_id
]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {
external_call_hash: external_call[
"display_name"
]
}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
# list of user defined events
# Fill merged events and functions calls for all subscriptions
events_list = merged_events[address][dashboard_id][
subscription_id
]
for event in events:
merged_events[address]["merged"].add(event)
s3_subscription_data_object["events"] = {}
for method in methods:
merged_functions[address]["merged"].add(method)
for event in events_list:
if event in events_data:
s3_subscription_data_object["events"][
event
] = events_data[event]
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
# list of user defined functions
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
functions_list = merged_functions[address][
dashboard_id
][subscription_id]
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
access_id=args.access_id,
)
s3_subscription_data_object["methods"] = {}
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
for function in functions_list:
if function in functions_calls_data:
s3_subscription_data_object["methods"][
function
] = functions_calls_data[function]
s3_data_object_for_contract: Dict[str, Any] = {}
bucket = subscription_by_id[
subscription_id
].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
crawler_label = CRAWLER_LABEL
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard_id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard}",
],
)
logger.error(err)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}" f"timescale:{timescale}",
f"data_generation_failed",
],
)
logger.error(err)
for timescale in [timescale.value for timescale in TimeScale]:
try:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
logger.info(f"Timescale: {timescale}")
# Write state of blocks in database
s3_data_object_for_contract[
"blocks_state"
] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
for dashboard_id in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
dashboard_id
]:
try:
extention_data = []
s3_subscription_data_object: Dict[str, Any] = {}
s3_subscription_data_object[
"blocks_state"
] = s3_data_object_for_contract["blocks_state"]
if dashboard_id in merged_external_calls:
for (
external_call_hash,
display_name,
) in merged_external_calls[dashboard_id][
subscription_id
].items():
if (
external_call_hash
in external_calls_results
):
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[
external_call_hash
],
}
)
s3_subscription_data_object[
"web3_metric"
] = extention_data
# list of user defined events
events_list = merged_events[address][dashboard_id][
subscription_id
]
s3_subscription_data_object["events"] = {}
for event in events_list:
if event in events_data:
s3_subscription_data_object["events"][
event
] = events_data[event]
# list of user defined functions
functions_list = merged_functions[address][
dashboard_id
][subscription_id]
s3_subscription_data_object["methods"] = {}
for function in functions_list:
if function in functions_calls_data:
s3_subscription_data_object["methods"][
function
] = functions_calls_data[function]
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription_type_id=subscription_id_by_blockchain[
blockchain_type
],
address=address,
timescale=timescale,
bucket=MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET,
dashboard_id=dashboard_id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard}",
],
)
logger.error(err)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"timescale:{timescale}",
f"data_generation_failed",
],
)
logger.error(err)
reporter.custom_report(
title=f"Dashboard stats generated.",
@ -970,7 +1019,7 @@ def stats_generate_handler(args: argparse.Namespace):
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, BugoutResource],
subscription_by_id: Dict[str, EntityResponse],
access_id: Optional[UUID] = None,
):
"""
@ -980,47 +1029,51 @@ def stats_generate_api_task(
with yield_db_read_only_session_ctx() as db_session:
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
subscription_type_id = None
for required_field in subscription_by_id[
subscription_id
].required_fields:
if "subscription_type_id" in required_field:
subscription_type_id = required_field["subscription_type_id"]
if not subscription_type_id:
logger.warning(
f"Subscription type not found for subscription: {subscription_id}"
)
continue
blockchain_type = AvailableBlockchainType(
blockchain_by_subscription_id[
subscription_by_id[subscription_id].resource_data[
"subscription_type_id"
]
]
blockchain_by_subscription_id[subscription_type_id]
)
s3_data_object: Dict[str, Any] = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
address = subscription_by_id[subscription_id].address
crawler_label = CRAWLER_LABEL
if address in ("0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",):
crawler_label = "moonworm"
abi = None
if "abi" in subscription_by_id[subscription_id].secondary_fields:
abi = subscription_by_id[subscription_id].secondary_fields["abi"]
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
if abi is None:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_json = abi
methods = generate_list_of_names(
type="function",
@ -1096,9 +1149,10 @@ def stats_generate_api_task(
# push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
subscription_type_id=subscription_type_id,
address=address,
timescale=timescale,
bucket=bucket,
bucket=MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET,
dashboard_id=dashboard.id,
)
except Exception as err:

Wyświetl plik

@ -39,6 +39,7 @@ setup(
"fastapi",
"moonstreamdb>=0.3.3",
"moonstream>=0.1.1",
"moonstream-entity>=0.0.3",
"moonworm[moonstream]>=0.6.2",
"humbug",
"pydantic==1.9.2",