Merge branch 'main' into query-parameters-endpoint

pull/778/head
Andrey 2023-05-02 14:30:10 +03:00
commit aa0f2458d7
21 zmienionych plików z 1736 dodań i 572 usunięć

Wyświetl plik

@ -3,7 +3,7 @@ import hashlib
import json
from itertools import chain
import logging
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Union
from enum import Enum
import uuid
@ -17,6 +17,8 @@ from bugout.data import (
)
from bugout.journal import SearchOrder
from bugout.exceptions import BugoutResponseException
from entity.data import EntityCollectionsResponse, EntityCollectionResponse # type: ignore
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import EthereumLabel
@ -38,8 +40,9 @@ from .settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
from .settings import bugout_client as bc
from .settings import bugout_client as bc, entity_client as ec
logger = logging.getLogger(__name__)
@ -76,6 +79,12 @@ class ResourceQueryFetchException(Exception):
"""
class EntityCollectionNotFoundException(Exception):
"""
Raised when entity collection is not found
"""
class LabelNames(Enum):
ETHERSCAN_SMARTCONTRACT = "etherscan_smartcontract"
COINMARKETCAP_TOKEN = "coinmarketcap_token"
@ -315,7 +324,6 @@ def json_type(evm_type: str) -> type:
def dashboards_abi_validation(
dashboard_subscription: data.DashboardMeta,
abi: Any,
s3_path: str,
):
"""
Validate current dashboard subscription : https://github.com/bugout-dev/moonstream/issues/345#issuecomment-953052444
@ -336,7 +344,7 @@ def dashboards_abi_validation(
logger.error(
f"Error on dashboard resource validation method:{method['name']}"
f" of subscription: {dashboard_subscription.subscription_id}"
f"does not exists in Abi {s3_path}"
f"does not exists in Abi "
)
raise MoonstreamHTTPException(status_code=400)
if method.get("filters") and isinstance(method["filters"], dict):
@ -346,7 +354,7 @@ def dashboards_abi_validation(
logger.error(
f"Error on dashboard resource validation type argument: {input_argument_name} of method:{method['name']} "
f" of subscription: {dashboard_subscription.subscription_id} has incorrect"
f"does not exists in Abi {s3_path}"
f"does not exists in Abi"
)
raise MoonstreamHTTPException(status_code=400)
@ -373,7 +381,7 @@ def dashboards_abi_validation(
logger.error(
f"Error on dashboard resource validation event:{event['name']}"
f" of subscription: {dashboard_subscription.subscription_id}"
f"does not exists in Abi {s3_path}"
f"does not exists in Abi"
)
raise MoonstreamHTTPException(status_code=400)
@ -384,7 +392,7 @@ def dashboards_abi_validation(
logger.error(
f"Error on dashboard resource validation type argument: {input_argument_name} of method:{event['name']} "
f" of subscription: {dashboard_subscription.subscription_id} has incorrect"
f"does not exists in Abi {s3_path}"
f"does not exists in Abi"
)
raise MoonstreamHTTPException(status_code=400)
@ -598,6 +606,81 @@ def get_query_by_name(query_name: str, token: uuid.UUID) -> str:
return query_id
def get_entity_subscription_collection_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
create_if_not_exist: bool = False,
) -> Optional[str]:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
"""
params = {
"type": resource_type,
"user_id": str(user_id),
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({user_id}) with token ({token}), error: {str(e)}"
)
reporter.error_report(e)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(resources.resources) == 0:
if not create_if_not_exist:
raise EntityCollectionNotFoundException(
"Subscription collection not found."
)
try:
# try get collection
collections: EntityCollectionsResponse = ec.list_collections(token=token)
available_collections: Dict[str, str] = {
collection.name: collection.collection_id
for collection in collections.collections
}
if f"subscriptions_{user_id}" not in available_collections:
collection: EntityCollectionResponse = ec.add_collection(
token=token, name=f"subscriptions_{user_id}"
)
collection_id = collection.collection_id
else:
collection_id = available_collections[f"subscriptions_{user_id}"]
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
raise MoonstreamHTTPException(
status_code=500, detail="Can't create collection for subscriptions"
)
resource_data = {
"type": resource_type,
"user_id": str(user_id),
"collection_id": str(collection_id),
}
try:
resource: BugoutResource = bc.create_resource(
token=token,
application_id=MOONSTREAM_APPLICATION_ID,
resource_data=resource_data,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
else:
resource = resources.resources[0]
return resource.resource_data["collection_id"]
def generate_s3_access_links(
method_name: str,
bucket: str,

Wyświetl plik

@ -6,7 +6,7 @@ import json
import logging
import os
from posix import listdir
from typing import Optional
from typing import Optional, List, Dict, Any, Union, Callable
from sqlalchemy.orm import with_expression
@ -16,13 +16,17 @@ from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATIO
from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions, moonworm_tasks
from .migrations import checksum_address, update_dashboard_subscription_key
from .migrations import (
checksum_address,
update_dashboard_subscription_key,
generate_entity_subscriptions,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MIGRATIONS_FOLDER = "./moonstream/admin/migrations"
MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations"
def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]:
@ -43,6 +47,17 @@ name: {checksum_address.__name__}
description: {checksum_address.__doc__}
"""
logger.info(migrations_overview)
entity_migration_overview = f"""
- id: 20230213
name: {generate_entity_subscriptions.__name__}
description: {generate_entity_subscriptions.__doc__}
steps:
- step 1: generate_entity_subscriptions_from_brood_resources - Generate entity subscriptions from brood resources
- step 2: update_dashboards_connection - Update dashboards connection
"""
logger.info(entity_migration_overview)
json_migrations_oreview = "Available migrations files."
for file in os.listdir(MIGRATIONS_FOLDER):
if file.endswith(".json"):
@ -70,14 +85,62 @@ def migrations_run(args: argparse.Namespace) -> None:
web3_session = yield_web3_provider()
db_session = SessionLocal()
try:
if args.id == 20211101:
if args.id == 20230213:
step_map: Dict[str, Dict[str, Any]] = {
"upgrade": {
"generate_entity_subscriptions_from_brood_resources": {
"action": generate_entity_subscriptions.generate_entity_subscriptions_from_brood_resources,
"description": "Generate entity subscriptions from brood resources",
},
"update_dashboards_connection": {
"action": generate_entity_subscriptions.update_dashboards_connection,
"description": "Update dashboards connection",
},
},
"downgrade": {
"generate_entity_subscriptions_from_brood_resources": {
"action": generate_entity_subscriptions.delete_generated_entity_subscriptions_from_brood_resources,
"description": "Delete generated entity subscriptions from brood resources",
},
"update_dashboards_connection": {
"action": generate_entity_subscriptions.restore_dashboard_state,
"description": "Restore dashboard state",
},
},
}
if args.command not in ["upgrade", "downgrade"]:
logger.info("Wrong command. Please use upgrade or downgrade")
step = args.step
if step is None:
# run all steps
for step in step_map[args.command]:
logger.info(
f"Starting step {step}: {step_map[args.command][step]['description']}"
)
migration_function = step_map[args.command][step]["action"]
if callable(migration_function):
migration_function()
elif step in step_map[args.command]:
logger.info(
f"Starting step {step}: {step_map[args.command][step]['description']}"
)
migration_function = step_map[args.command][step]["action"]
if callable(migration_function):
migration_function()
else:
logger.info(f"Step {step} does not exist")
logger.info(f"Available steps: {step_map[args.command].keys()}")
elif args.id == 20211101:
logger.info("Starting update of subscriptions in Brood resource...")
checksum_address.checksum_all_subscription_addresses(web3_session)
logger.info("Starting update of ethereum_labels in database...")
checksum_address.checksum_all_labels_addresses(db_session, web3_session)
elif args.id == 20211202:
update_dashboard_subscription_key.update_dashboard_resources_key()
else:
elif args.id == 20211108:
drop_keys = []
if args.file is not None:
@ -343,6 +406,13 @@ This CLI is configured to work with the following API URLs:
type=str,
help="Command for migration",
)
parser_migrations_run.add_argument(
"-s",
"--step",
required=False,
type=str,
help="How many steps to run",
)
parser_migrations_run.set_defaults(func=migrations_run)
parser_moonworm_tasks = subcommands.add_parser(

Wyświetl plik

@ -0,0 +1,718 @@
"""
Generate entity subscriptions from existing brood resources subscriptions
"""
import hashlib
import logging
import json
import os
import traceback
from typing import List, Optional, Dict, Any, Union, Tuple
import uuid
import time
import boto3 # type: ignore
from bugout.data import BugoutResources, BugoutResource
from bugout.exceptions import BugoutResponseException
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from entity.data import EntityCollectionResponse, EntityResponse # type: ignore
from ...settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_APPLICATION_ID,
BUGOUT_RESOURCE_TYPE_DASHBOARD,
)
from ...settings import bugout_client as bc, entity_client as ec
from ..subscription_types import CANONICAL_SUBSCRIPTION_TYPES
logger = logging.getLogger(__name__)
### create collection for user
def create_collection_for_user(user_id: uuid.UUID) -> str:
"""
Create collection for user if not exist
"""
try:
# try get collection
collection: EntityCollectionResponse = ec.add_collection(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, name=f"subscriptions_{user_id}"
)
collection_id = collection.collection_id
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
return str(collection_id)
def add_entity_subscription(
user_id: uuid.UUID,
subscription_type_id: str,
collection_id: str,
address: str,
color: str,
label: str,
content: Dict[str, Any],
) -> EntityResponse:
"""
Add subscription to collection
"""
if subscription_type_id not in CANONICAL_SUBSCRIPTION_TYPES:
raise ValueError(
f"Unknown subscription type ID: {subscription_type_id}. "
f"Known subscription type IDs: {CANONICAL_SUBSCRIPTION_TYPES.keys()}"
)
elif CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain is None:
raise ValueError(
f"Subscription type ID {subscription_type_id} is not a blockchain subscription type."
)
entity = ec.add_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
address=address,
blockchain=CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain,
name=label,
required_fields=[
{"type": "subscription"},
{"subscription_type_id": f"{subscription_type_id}"},
{"color": f"{color}"},
{"label": f"{label}"},
{"user_id": f"{user_id}"},
],
secondary_fields=content,
)
return entity
def get_abi_from_s3(s3_path: str, bucket: str):
"""
Get ABI from S3
"""
try:
s3 = boto3.resource("s3")
obj = s3.Object(bucket, s3_path)
abi = obj.get()["Body"].read().decode("utf-8")
return abi
except Exception as e:
logger.error(f"Error get ABI from S3: {str(e)}")
def revoke_collection_permissions_from_user(
user_id: uuid.UUID, collection_id: str, permissions: List[str]
):
"""
Remove all permissions from user
"""
bc.delete_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
holder_type="user",
holder_id=user_id,
permission_list=permissions,
)
def find_user_collection(
user_id: uuid.UUID,
create_if_not_exists: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Find user collection in Brood resources
Can create new collection if not exists and create_if_not_exists = True
"""
params = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
}
logger.info(f"Looking for collection for user {user_id}")
try:
user_entity_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params
)
except BugoutResponseException as e:
logger.error(
f"Error listing subscriptions for user ({user_id}) Bugout error: {str(e)}"
)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({user_id}) error: {str(e)}"
)
if len(user_entity_resources.resources) > 0:
collection_id = user_entity_resources.resources[0].resource_data[
"collection_id"
]
logger.info(
f"Collection found for user {user_id}. collection_id: {collection_id}"
)
return collection_id, str(user_entity_resources.resources[0].id)
elif create_if_not_exists:
# Create collection new collection for user
logger.info(f"Creating new collection")
collection = create_collection_for_user(user_id)
return collection, None
return None, None
def generate_entity_subscriptions_from_brood_resources() -> None:
"""
Parse all existing dashboards at Brood resource
and replace key to correct one.
Schema can use for rename first level keys
"""
### Get admin user id
admin_user = bc.get_user(token=MOONSTREAM_ADMIN_ACCESS_TOKEN)
admin_user_id = admin_user.id
logger.info(f"admin user :{admin_user.username}")
### Get all subscription resources type = "subscription"
resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
logger.info(f"Admin own {len(resources.resources)} subscriptions")
### initial users_subscriptions, dashboards_by_user, stages is empty
users_subscriptions: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Subscriptions parsing and save to users_subscriptions
for resource in resources.resources:
resource_data = resource.resource_data
resource_data["subscription_id"] = resource.id
if "user_id" not in resource_data:
continue
user_id = resource_data["user_id"]
if user_id not in users_subscriptions:
users_subscriptions[user_id] = []
# Stages object
if user_id not in stages:
stages[user_id] = {}
users_subscriptions[user_id].append(resource_data)
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
try:
for user_id, subscriptions in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
"subscription_resource_id"
]
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
user_id, create_if_not_exists=True
)
if collection_id is None:
logger.info(f"Collection not found or create for user {user_id}")
continue
stages[user_id]["collection_id"] = collection_id
# Create user subscription collection resource
if "subscription_resource_id" not in stages[user_id]:
if resource_id_of_user_collection is not None:
stages[user_id]["subscription_resource_id"] = str(
resource_id_of_user_collection
)
else:
resource_data = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
"collection_id": str(collection_id),
"version": "1.0.0",
}
### Create resource for user collection
try:
subscription_resource: BugoutResource = bc.create_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
application_id=MOONSTREAM_APPLICATION_ID,
resource_data=resource_data,
)
stages[user_id]["subscription_resource_id"] = str(
subscription_resource.id
)
except Exception as e:
logger.error(
f"Failed to create subscription brood resource: {str(e)}"
)
if "processed_subscriptions" not in stages[user_id]:
stages[user_id]["processed_subscriptions"] = {}
### Add subscriptions to collection
for subscription in subscriptions:
if (
str(subscription["subscription_id"])
in stages[user_id]["processed_subscriptions"]
):
continue
subscription_type_id = subscription["subscription_type_id"]
address = subscription["address"]
color = subscription["color"]
label = subscription["label"]
# try to get abi from S3
abi = None
if subscription.get("bucket") and subscription.get("s3_path"):
try:
abi_body = get_abi_from_s3(
bucket=subscription["bucket"],
s3_path=subscription["s3_path"],
)
# abi hash
abi_hash = hashlib.sha256(abi_body.encode("utf-8")).hexdigest()
abi = True
logger.info(
f"Got abi from S3 from path {subscription['s3_path']}"
)
except Exception as e:
logger.error(f"Failed to get abi from S3: {str(e)}")
abi = None
# Add subscription to collection
logger.info(f"Add subscription to collection: {collection_id}")
entity = add_entity_subscription(
user_id=user_id,
collection_id=collection_id,
subscription_type_id=subscription_type_id,
address=address,
color=color,
label=label,
content={"abi": abi_body, "abi_hash": abi_hash} if abi else {},
)
stages[user_id]["processed_subscriptions"][
str(subscription["subscription_id"])
] = {"entity_id": str(entity.entity_id), "dashboard_ids": []}
# Add permissions to user
if user_id != admin_user_id:
# Add permissions to user
if "permissions_granted" not in stages[user_id]:
try:
bc.update_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
holder_type="user",
holder_id=user_id,
permission_list=[
"journals.read",
"journals.update",
"journals.entries.read",
"journals.entries.create",
"journals.entries.update",
"journals.entries.delete",
],
)
stages[user_id]["permissions_granted"] = True
except Exception as e:
logger.error(f"Failed to add permissions to user: {str(e)}")
continue
else:
logger.warn(
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behaivior!"
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
finally:
try:
with open("stages.json", "w") as f:
json.dump(stages, f)
except Exception as e:
logger.error(f"Failed to save stages: {str(e)}")
# write as text
with open("stages-json-failed.txt", "w") as f:
f.write(str(stages))
def update_dashboards_connection():
"""
Look up all dashboards and update their connection to the user subscription
"""
dashboards_by_user: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Dashboards parsing and save to dashboards_by_user
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
for dashboard in dashboard_resources.resources:
if "user_id" not in dashboard.resource_data:
continue
user_id = dashboard.resource_data["user_id"]
if user_id not in dashboards_by_user:
dashboards_by_user[user_id] = []
dashboards_by_user[user_id].append(dashboard)
try:
for user in dashboards_by_user:
logger.info(f"dashboards: {len(dashboards_by_user[user])}")
if user not in stages:
continue
for dashboard in dashboards_by_user[user]:
dashboard_data = dashboard.resource_data
dashboard_subscription_settings = dashboard_data.get(
"subscription_settings"
)
if dashboard_subscription_settings is None:
continue
if len(dashboard_subscription_settings) == 0:
continue
logger.info(f"dashboard {dashboard.id}")
for setting_index, subscription_setting in enumerate(
dashboard_subscription_settings
):
logger.info(
f"Find subscripton: {subscription_setting['subscription_id']}"
)
old_subscription_id = str(subscription_setting["subscription_id"])
if old_subscription_id in stages[user]["processed_subscriptions"]:
logger.info(
f"subscription found: {subscription_setting['subscription_id']}"
)
subscription_stages_metadata = stages[user][
"processed_subscriptions"
][subscription_setting["subscription_id"]]
if (
str(dashboard.id)
in subscription_stages_metadata["dashboard_ids"]
):
continue
# change original dashboard subscription settings
dashboard_data["subscription_settings"][setting_index][
"subscription_id"
] = subscription_stages_metadata["entity_id"]
# Update brood resource in bugout client
stages[user]["processed_subscriptions"][old_subscription_id][
"dashboard_ids"
].append(str(dashboard.id))
try:
logger.info(f"Update dashboard: {dashboard.id} for user {user}")
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {
"subscription_settings": dashboard_data[
"subscription_settings"
]
}
},
)
except Exception as e:
traceback.print_exc()
logger.error(
f"**Failed to update dashboard: {str(e)} for user {user}**"
)
continue
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess dashboards: {str(e)}")
finally:
try:
with open("stages.json", "w") as f:
json.dump(stages, f)
except Exception as e:
logger.error(f"Failed to save stages: {str(e)}")
# write as text
with open("stages-json-failed.txt", "w") as f:
f.write(str(stages))
def delete_generated_entity_subscriptions_from_brood_resources():
"""
Delete all generated entity subscriptions previously created by the script
Also delete all generated entity subscriptions from the brood resources
"""
### stages file example
admin_user = bc.get_user(token=MOONSTREAM_ADMIN_ACCESS_TOKEN)
logger.info(f"admin user :{admin_user.username}")
# Get all subscriptions
### Get all subscription resources type = "subscription"
resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
logger.info(f"Admin own {len(resources.resources)} subscriptions")
### initial users_subscriptions, dashboards_by_user, stages is empty
users_subscriptions: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Subscriptions parsing and save to users_subscriptions
for resource in resources.resources:
resource_data = resource.resource_data
resource_data["subscription_id"] = resource.id
if "user_id" not in resource_data:
continue
user_id = resource_data["user_id"]
if user_id not in users_subscriptions:
users_subscriptions[user_id] = []
users_subscriptions[user_id].append(resource_data)
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
try:
for user_id, _ in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
"subscription_resource_id"
]
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
user_id, create_if_not_exists=False
)
if collection_id is None:
logger.info(f"Collection not found or create for user {user_id}")
continue
### Delete collection
try:
ec.delete_collection(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, collection_id=collection_id
)
logger.info(f"Collection deleted {collection_id}")
except Exception as e:
logger.error(f"Failed to delete collection: {str(e)}")
### Delete collection resource
try:
logger.info(f"Collection resource id {resource_id_of_user_collection}")
bc.delete_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=resource_id_of_user_collection,
)
logger.info(
f"Collection resource deleted {resource_id_of_user_collection}"
)
# clear stages
stages[user_id] = {}
except Exception as e:
logger.error(f"Failed to delete collection resource: {str(e)}")
continue
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
def restore_dashboard_state():
### initial users_subscriptions, dashboards_by_user, stages is empty
dashboards_by_user: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Subscriptions parsing and save to users_subscriptions
### Dashboards parsing and save to dashboards_by_user
dashboards: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
for dashboard in dashboards.resources:
if "user_id" not in dashboard.resource_data:
continue
user_id = dashboard.resource_data["user_id"]
if user_id not in dashboards_by_user:
dashboards_by_user[user_id] = []
dashboards_by_user[user_id].append(dashboard)
### Retunr all dashboards to old state
logger.info(f"Amount of users: {len(dashboards_by_user)}")
for user_id in dashboards_by_user:
logger.info(
f"Amount of dashboards: {len(dashboards_by_user[user_id])} of user {user_id}"
)
user_entity_subscriptions = {
subscription["entity_id"]: key
for key, subscription in stages[user_id]["processed_subscriptions"].items()
}
for dashboard in dashboards_by_user[user_id]:
try:
dashboard_data = dashboard.resource_data
if "subscription_settings" not in dashboard_data:
logger.info("no subscription_settings")
continue
if len(dashboard_data["subscription_settings"]) == 0:
logger.info("no subscription_settings")
continue
dashboard_metadata = dashboard_data["subscription_settings"]
for index, settings in enumerate(dashboard_metadata):
if "subscription_id" not in settings:
logger.info("no subscription_id")
continue
subscription_id = settings["subscription_id"]
if subscription_id not in user_entity_subscriptions:
continue
logger.info(
f"Update dashboard {dashboard.id} with subscription {subscription_id} to old state"
)
dashboard_metadata[index][
"subscription_id"
] = user_entity_subscriptions[subscription_id]
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {"subscription_settings": dashboard_metadata}
},
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to update dashboard: {str(e)}")
continue

Wyświetl plik

@ -20,6 +20,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"ethereum_smartcontract": SubscriptionTypeResourceData(
id="ethereum_smartcontract",
name="Ethereum smartcontracts",
blockchain="ethereum",
choices=["input:address", "tag:erc721"],
description="Contracts events and tx_calls of contract of Ethereum blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/ethereum/eth-diamond-purple.png",
@ -30,6 +31,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"polygon_smartcontract": SubscriptionTypeResourceData(
id="polygon_smartcontract",
name="Polygon smartcontracts",
blockchain="polygon",
choices=["input:address", "tag:erc721"],
description="Contracts events and tx_calls of contract of Polygon blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/matic-token-inverted-icon.png",
@ -40,6 +42,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"mumbai_smartcontract": SubscriptionTypeResourceData(
id="mumbai_smartcontract",
name="Mumbai smartcontracts",
blockchain="mumbai",
choices=["input:address", "tag:erc721"],
description="Contracts events and tx_calls of contract of Mumbai blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/matic-token-inverted-icon.png",
@ -50,6 +53,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"xdai_smartcontract": SubscriptionTypeResourceData(
id="xdai_smartcontract",
name="XDai smartcontract",
blockchain="xdai",
choices=["input:address", "tag:erc721"],
description="Contracts events and tx_calls of contract of XDai blockchain.",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/xdai-token-logo.png",
@ -60,6 +64,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"wyrm_smartcontract": SubscriptionTypeResourceData(
id="wyrm_smartcontract",
name="Wyrm smartcontract",
blockchain="wyrm",
choices=["input:address", "tag:erc721"],
description="Contracts events and tx_calls of contract of Wyrm blockchain.",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/great-wyrm-network-logo.png",
@ -70,6 +75,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"ethereum_blockchain": SubscriptionTypeResourceData(
id="ethereum_blockchain",
name="Ethereum transactions",
blockchain="ethereum",
choices=["input:address", "tag:erc721"],
description="Transactions that have been mined into the Ethereum blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/ethereum/eth-diamond-purple.png",
@ -80,6 +86,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"polygon_blockchain": SubscriptionTypeResourceData(
id="polygon_blockchain",
name="Polygon transactions",
blockchain="polygon",
choices=["input:address", "tag:erc721"],
description="Transactions that have been mined into the Polygon blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/matic-token-inverted-icon.png",
@ -90,6 +97,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"mumbai_blockchain": SubscriptionTypeResourceData(
id="mumbai_blockchain",
name="Mumbai transactions",
blockchain="mumbai",
choices=["input:address", "tag:erc721"],
description="Transactions that have been mined into the Mumbai blockchain",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/matic-token-inverted-icon.png",
@ -100,6 +108,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"xdai_blockchain": SubscriptionTypeResourceData(
id="xdai_blockchain",
name="XDai transactions",
blockchain="xdai",
choices=["input:address", "tag:erc721"],
description="Gnosis chain transactions subscription.",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/xdai-token-logo.png",
@ -110,6 +119,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"wyrm_blockchain": SubscriptionTypeResourceData(
id="wyrm_blockchain",
name="Wyrm transactions",
blockchain="wyrm",
choices=["input:address", "tag:erc721"],
description="Wyrm chain transactions subscription.",
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/great-wyrm-network-logo.png",
@ -120,6 +130,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"ethereum_whalewatch": SubscriptionTypeResourceData(
id="ethereum_whalewatch",
name="Ethereum whale watch",
blockchain="ethereum",
description="Ethereum accounts that have experienced a lot of recent activity",
choices=[],
# Icon taken from: https://www.maxpixel.net/Whale-Cetacean-Wildlife-Symbol-Ocean-Sea-Black-99310
@ -131,6 +142,7 @@ CANONICAL_SUBSCRIPTION_TYPES = {
"ethereum_txpool": SubscriptionTypeResourceData(
id="ethereum_txpool",
name="Ethereum transaction pool",
blockchain="ethereum",
description="Transactions that have been submitted into the Ethereum transaction pool but not necessarily mined yet",
choices=["input:address", "tag:erc721"],
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/ethereum/eth-diamond-rainbow.png",

Wyświetl plik

@ -31,6 +31,7 @@ class SubscriptionTypeResourceData(BaseModel):
description: str
choices: List[str] = Field(default_factory=list)
icon_url: str
blockchain: Optional[str] = None
stripe_product_id: Optional[str] = None
stripe_price_id: Optional[str] = None
active: bool = False
@ -42,14 +43,14 @@ class SubscriptionTypesListResponse(BaseModel):
class SubscriptionResourceData(BaseModel):
id: str
address: str
address: Optional[str]
abi: Optional[str]
color: Optional[str]
label: Optional[str]
user_id: str
subscription_type_id: str
created_at: datetime
updated_at: datetime
subscription_type_id: Optional[str]
created_at: Optional[datetime]
updated_at: Optional[datetime]
class CreateSubscriptionRequest(BaseModel):
@ -239,7 +240,7 @@ class OnboardingState(BaseModel):
class SubdcriptionsAbiResponse(BaseModel):
url: str
abi: str
class DashboardMeta(BaseModel):

Wyświetl plik

@ -1,13 +1,14 @@
import json
import logging
from os import read
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
import boto3 # type: ignore
import requests
import requests # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from entity.data import EntitiesResponse, EntityResponse # type: ignore
from fastapi import APIRouter, Body, Path, Query, Request
from .. import actions, data
@ -15,13 +16,15 @@ from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc
from ..settings import bugout_client as bc, entity_client as ec
logger = logging.getLogger(__name__)
@ -49,42 +52,36 @@ async def add_dashboard_handler(
subscription_settings = dashboard.subscription_settings
# Get all user subscriptions
params = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"user_id": str(user.id),
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({request.user.id}) with token ({request.state.token}), error: {str(e)}"
)
reporter.error_report(e)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# Get user collection id
collection_id = actions.get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
limit=1000,
)
# process existing subscriptions with supplied ids
s3_client = boto3.client("s3")
available_subscriptions: Dict[UUID, Dict[str, Any]] = {
resource.id: resource.resource_data for resource in resources.resources
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions.keys():
# TODO(Andrey): Add some dedublication for get object from s3 for repeated subscription_id
bucket = available_subscriptions[dashboard_subscription.subscription_id][
"bucket"
]
key = available_subscriptions[dashboard_subscription.subscription_id][
"s3_path"
]
if bucket is None or key is None:
if dashboard_subscription.subscription_id in available_subscriptions_ids.keys():
if (
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
is None
):
logger.error(
f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi"
)
@ -92,28 +89,16 @@ async def add_dashboard_handler(
status_code=404,
detail=f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi",
)
s3_path = f"s3://{bucket}/{key}"
try:
response = s3_client.get_object(
Bucket=bucket,
Key=key,
)
except s3_client.exceptions.NoSuchKey as e:
logger.error(
f"Error getting Abi for subscription {str(dashboard_subscription.subscription_id)} S3 {s3_path} does not exist : {str(e)}"
)
raise MoonstreamHTTPException(
status_code=500,
internal_error=e,
detail=f"We can't access the abi for subscription with id:{str(dashboard_subscription.subscription_id)}.",
)
abi = json.loads(response["Body"].read())
abi = json.loads(
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields["abi"]
)
actions.dashboards_abi_validation(
dashboard_subscription, abi, s3_path=s3_path
dashboard_subscription,
abi,
)
else:
@ -239,39 +224,34 @@ async def update_dashboard_handler(
subscription_settings = dashboard.subscription_settings
params = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"user_id": str(user.id),
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({request.user.id}) with token ({request.state.token}), error: {str(e)}"
)
reporter.error_report(e)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# Get user collection id
s3_client = boto3.client("s3")
collection_id = actions.get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
available_subscriptions = {
resource.id: resource.resource_data for resource in resources.resources
subscriprions_list = ec.search_entities(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
limit=1000,
)
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions:
# TODO(Andrey): Add some dedublication for get object from s3 for repeated subscription_id
bucket = available_subscriptions[dashboard_subscription.subscription_id][
"bucket"
]
abi_path = available_subscriptions[dashboard_subscription.subscription_id][
"s3_path"
]
if bucket is None or abi_path is None:
if dashboard_subscription.subscription_id in available_subscriptions_ids:
if (
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
is None
):
logger.error(
f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi"
)
@ -279,29 +259,15 @@ async def update_dashboard_handler(
status_code=404,
detail=f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi",
)
s3_path = f"s3://{bucket}/{abi_path}"
try:
response = s3_client.get_object(
Bucket=bucket,
Key=abi_path,
)
except s3_client.exceptions.NoSuchKey as e:
logger.error(
f"Error getting Abi for subscription {dashboard_subscription.subscription_id} S3 {s3_path} does not exist : {str(e)}"
)
raise MoonstreamHTTPException(
status_code=500,
internal_error=e,
detail=f"We can't access the abi for subscription with id:{dashboard_subscription.subscription_id}.",
)
abi = json.loads(response["Body"].read())
actions.dashboards_abi_validation(
dashboard_subscription, abi, s3_path=s3_path
abi = json.loads(
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
)
actions.dashboards_abi_validation(dashboard_subscription, abi)
else:
logger.error(
f"Error subscription_id: {dashboard_subscription.subscription_id} not exists."
@ -336,9 +302,9 @@ async def update_dashboard_handler(
@router.get("/{dashboard_id}/stats", tags=["dashboards"])
async def get_dashboard_data_links_handler(
request: Request, dashboard_id: str
) -> Dict[UUID, Any]:
) -> Dict[Union[UUID, str], Any]:
"""
Get s3 presign urls for dshaboard grafics
Get s3 presign urls for dashboard grafics
"""
token = request.state.token
@ -362,49 +328,51 @@ async def get_dashboard_data_links_handler(
# get subscriptions
params = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"user_id": str(user.id),
}
try:
subscription_resources: BugoutResources = bc.list_resources(
token=token, params=params
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({request.user.id}) with token ({request.state.token}), error: {str(e)}"
)
reporter.error_report(e)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
collection_id = actions.get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
limit=1000,
)
# filter out dasboards
subscriptions_ids = [
UUID(subscription_meta["subscription_id"])
subscription_meta["subscription_id"]
for subscription_meta in dashboard_resource.resource_data[
"subscription_settings"
]
]
dashboard_subscriptions = [
subscription
for subscription in subscription_resources.resources
if subscription.id in subscriptions_ids
]
dashboard_subscriptions: Dict[Union[UUID, str], EntitiesResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
if str(subscription.entity_id) in subscriptions_ids
}
# generate s3 links
s3_client = boto3.client("s3")
stats: Dict[UUID, Any] = {}
stats: Dict[Union[str, UUID], Dict[str, Any]] = {}
for subscription in dashboard_subscriptions:
for id, subscription in dashboard_subscriptions.items():
available_timescales = [timescale.value for timescale in data.TimeScale]
stats[subscription.id] = {}
stats[id] = {}
for fields in subscription.required_fields:
if "subscription_type_id" in fields:
subscription_type_id = fields["subscription_type_id"]
for timescale in available_timescales:
try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{actions.blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{actions.blockchain_by_subscription_id[subscription_type_id]}/contracts_data/{subscription.address}/{dashboard_id}/v1/{timescale}.json"
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
@ -414,10 +382,10 @@ async def get_dashboard_data_links_handler(
ExpiresIn=300,
HttpMethod="GET",
)
stats[subscription.id][timescale] = {"url": stats_presigned_url}
stats[id][timescale] = {"url": stats_presigned_url}
except Exception as err:
logger.warning(
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, Key:{result_key} get error:{err}"
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, get error:{err}"
)
return stats
@ -435,6 +403,7 @@ async def update_dashbord_data_handler(
"""
token = request.state.token
user = request.state.user
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/stats_update",
@ -442,6 +411,7 @@ async def update_dashbord_data_handler(
"dashboard_id": dashboard_id,
"timescales": updatestats.timescales,
"token": token,
"user_id": str(user.id),
},
)
if responce.status_code != 200:

Wyświetl plik

@ -6,27 +6,26 @@ import json
import logging
from typing import Any, Dict, List, Optional
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks
from web3 import Web3
from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks
from ..actions import (
validate_abi_json,
apply_moonworm_tasks,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from ..admin import subscription_types
from .. import data
from ..actions import upload_abi_to_s3, validate_abi_json
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from ..settings import bugout_client as bc
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN
from ..web3_provider import yield_web3_provider
logger = logging.getLogger(__name__)
router = APIRouter(
@ -34,11 +33,12 @@ router = APIRouter(
)
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
@router.post("/", tags=["subscriptions"], response_model=data.SubscriptionResourceData)
async def add_subscription_handler(
request: Request, # subscription_data: data.CreateSubscriptionRequest = Body(...)
request: Request,
background_tasks: BackgroundTasks,
address: str = Form(...),
color: str = Form(...),
@ -68,6 +68,11 @@ async def add_subscription_handler(
internal_error=e,
detail="Currently unable to convert address to checksum address",
)
else:
raise MoonstreamHTTPException(
status_code=400,
detail="Currently ethereum_whalewatch not supported",
)
active_subscription_types_response = subscription_types.list_subscription_types(
active_only=True
@ -86,29 +91,7 @@ async def add_subscription_handler(
user = request.state.user
resource_data = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"user_id": str(user.id),
"subscription_type_id": subscription_type_id,
"address": address,
"color": color,
"label": label,
"abi": None,
"bucket": None,
"s3_path": None,
}
try:
resource: BugoutResource = bc.create_resource(
token=token,
application_id=MOONSTREAM_APPLICATION_ID,
resource_data=resource_data,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
content: Dict[str, Any] = {}
if abi:
try:
@ -118,28 +101,12 @@ async def add_subscription_handler(
validate_abi_json(json_abi)
update_resource = upload_abi_to_s3(resource=resource, abi=abi, update={})
abi_string = json.dumps(json_abi, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
update_resource["abi_hash"] = hash
try:
updated_resource: BugoutResource = bc.update_resource(
token=token,
resource_id=resource.id,
resource_data=data.SubscriptionUpdate(
update=update_resource,
).dict(),
)
resource = updated_resource
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error getting user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
content["abi"] = abi
content["abi_hash"] = hash
background_tasks.add_task(
apply_moonworm_tasks,
@ -148,16 +115,55 @@ async def add_subscription_handler(
address,
)
try:
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
create_if_not_exist=True,
)
entity = ec.add_entity(
token=token,
collection_id=collection_id,
address=address,
blockchain=subscription_types.CANONICAL_SUBSCRIPTION_TYPES[
subscription_type_id
].blockchain,
name=label,
required_fields=[
{"type": "subscription"},
{"subscription_type_id": f"{subscription_type_id}"},
{"color": f"{color}"},
{"label": f"{label}"},
{"user_id": f"{user.id}"},
],
secondary_fields=content,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Failed to get collection id")
raise MoonstreamHTTPException(
status_code=500,
internal_error=e,
detail="Currently unable to get collection id",
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],
address=resource.resource_data["address"],
color=resource.resource_data["color"],
label=resource.resource_data["label"],
abi=resource.resource_data.get("abi"),
subscription_type_id=resource.resource_data["subscription_type_id"],
updated_at=resource.updated_at,
created_at=resource.created_at,
id=str(entity.entity_id),
user_id=str(user.id),
address=address,
color=color,
label=label,
abi=entity.secondary_fields.get("abi"),
subscription_type_id=subscription_type_id,
updated_at=entity.updated_at,
created_at=entity.created_at,
)
@ -171,24 +177,63 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
Delete subscriptions.
"""
token = request.state.token
user = request.state.user
try:
deleted_resource = bc.delete_resource(token=token, resource_id=subscription_id)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
deleted_entity = ec.delete_entity(
token=token,
collection_id=collection_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Error deleting subscription: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
logger.error(f"Failed to delete subscription")
raise MoonstreamHTTPException(
status_code=500,
detail="Internal error",
)
tags = deleted_entity.required_fields
subscription_type_id = None
color = None
label = None
abi = None
if tags is not None:
for tag in tags:
if "subscription_type_id" in tag:
subscription_type_id = tag["subscription_type_id"]
if "color" in tag:
color = tag["color"]
if "label" in tag:
label = tag["label"]
if deleted_entity.secondary_fields is not None:
abi = deleted_entity.secondary_fields.get("abi")
return data.SubscriptionResourceData(
id=str(deleted_resource.id),
user_id=deleted_resource.resource_data["user_id"],
address=deleted_resource.resource_data["address"],
color=deleted_resource.resource_data["color"],
label=deleted_resource.resource_data["label"],
abi=deleted_resource.resource_data.get("abi"),
subscription_type_id=deleted_resource.resource_data["subscription_type_id"],
updated_at=deleted_resource.updated_at,
created_at=deleted_resource.created_at,
id=str(deleted_entity.entity_id),
user_id=str(user.id),
address=deleted_entity.address,
color=color,
label=label,
abi=abi,
subscription_type_id=subscription_type_id,
updated_at=deleted_entity.updated_at,
created_at=deleted_entity.created_at,
)
@ -198,37 +243,66 @@ async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListR
Get user's subscriptions.
"""
token = request.state.token
params = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"user_id": str(request.state.user.id),
}
user = request.state.user
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
subscriprions_list = ec.search_entities(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
limit=1000,
)
# resources: BugoutResources = bc.list_resources(token=token, params=params)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({request.user.id}) with token ({request.state.token}), error: {str(e)}"
f"Error listing subscriptions for user ({user.id}) with token ({token}), error: {str(e)}"
)
reporter.error_report(e)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return data.SubscriptionsListResponse(
subscriptions=[
subscriptions = []
for subscription in subscriprions_list.entities:
tags = subscription.required_fields
label, color, subscription_type_id = None, None, None
for tag in tags:
if "subscription_type_id" in tag:
subscription_type_id = tag["subscription_type_id"]
if "color" in tag:
color = tag["color"]
if "label" in tag:
label = tag["label"]
subscriptions.append(
data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],
address=resource.resource_data["address"],
color=resource.resource_data["color"],
label=resource.resource_data["label"],
abi=resource.resource_data.get("abi"),
subscription_type_id=resource.resource_data["subscription_type_id"],
updated_at=resource.updated_at,
created_at=resource.created_at,
id=str(subscription.entity_id),
user_id=str(user.id),
address=subscription.address,
color=color,
label=label,
abi="True" if subscription.secondary_fields.get("abi") else None,
subscription_type_id=subscription_type_id,
updated_at=subscription.updated_at,
created_at=subscription.created_at,
)
for resource in resources.resources
]
)
)
return data.SubscriptionsListResponse(subscriptions=subscriptions)
@router.put(
@ -249,13 +323,61 @@ async def update_subscriptions_handler(
"""
token = request.state.token
update: Dict[str, Any] = {}
user = request.state.user
if color:
update["color"] = color
update_required_fields = []
if label:
update["label"] = label
update_secondary_fields = {}
try:
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_entity = ec.get_entity(
token=token,
collection_id=collection_id,
entity_id=subscription_id,
)
subscription_type_id = None
update_required_fields = subscription_entity.required_fields
for field in update_required_fields:
if "subscription_type_id" in field:
subscription_type_id = field["subscription_type_id"]
if not subscription_type_id:
logger.error(
f"Subscription entity {subscription_id} in collection {collection_id} has no subscription_type_id malformed subscription entity"
)
raise MoonstreamHTTPException(
status_code=404,
detail="Not valid subscription entity",
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error get subscriptions for user ({user.id}) with token ({token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
for field in update_required_fields:
if "color" in field and color is not None:
field["color"] = color
if "label" in field and label is not None:
field["label"] = label
if abi:
try:
@ -267,63 +389,47 @@ async def update_subscriptions_handler(
abi_string = json.dumps(json_abi, sort_keys=True, indent=2)
update_secondary_fields["abi"] = abi_string
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
try:
subscription_resource: BugoutResource = bc.get_resource(
token=token,
resource_id=subscription_id,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if subscription_resource.resource_data["abi"] is not None:
raise MoonstreamHTTPException(
status_code=400,
detail="Subscription already have ABI. For add a new ABI create new subscription.",
)
update = upload_abi_to_s3(
resource=subscription_resource, abi=abi, update=update
)
update["abi_hash"] = hash
update_secondary_fields["abi_hash"] = hash
else:
update_secondary_fields = subscription_entity.secondary_fields
try:
resource: BugoutResource = bc.update_resource(
subscription = ec.update_entity(
token=token,
resource_id=subscription_id,
resource_data=data.SubscriptionUpdate(
update=update,
).dict(),
collection_id=collection_id,
entity_id=subscription_id,
address=subscription_entity.address,
blockchain=subscription_entity.blockchain,
name=subscription_entity.name,
required_fields=update_required_fields,
secondary_fields=update_secondary_fields,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error getting user subscriptions: {str(e)}")
logger.error(f"Error update user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if abi:
background_tasks.add_task(
apply_moonworm_tasks,
subscription_resource.resource_data["subscription_type_id"],
subscription_type_id,
json_abi,
subscription_resource.resource_data["address"],
subscription.address,
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],
address=resource.resource_data["address"],
color=resource.resource_data["color"],
label=resource.resource_data["label"],
abi=resource.resource_data.get("abi"),
subscription_type_id=resource.resource_data["subscription_type_id"],
updated_at=resource.updated_at,
created_at=resource.created_at,
id=str(subscription.entity_id),
user_id=str(user.id),
address=subscription.address,
color=color,
label=label,
abi=subscription.secondary_fields.get("abi"),
subscription_type_id=subscription_type_id,
updated_at=subscription_entity.updated_at,
created_at=subscription_entity.created_at,
)
@ -337,39 +443,41 @@ async def get_subscription_abi_handler(
subscription_id: str,
) -> data.SubdcriptionsAbiResponse:
token = request.state.token
user = request.state.user
try:
subscription_resource: BugoutResource = bc.get_resource(
token=token,
resource_id=subscription_id,
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if subscription_resource.resource_data["abi"] is None:
# get subscription entity
subscription_resource = ec.get_entity(
token=token,
collection_id=collection_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="Subscription abi not exists.",
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error get subscriptions for user ({user}) with token ({token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
s3_client = boto3.client("s3")
if "abi" not in subscription_resource.secondary_fields.keys():
raise MoonstreamHTTPException(status_code=404, detail="Abi not found")
result_key = f"{subscription_resource.resource_data['s3_path']}"
presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": subscription_resource.resource_data["bucket"],
"Key": result_key,
},
ExpiresIn=300,
HttpMethod="GET",
return data.SubdcriptionsAbiResponse(
abi=subscription_resource.secondary_fields["abi"]
)
return data.SubdcriptionsAbiResponse(url=presigned_url)
@router.get(
"/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse

Wyświetl plik

@ -1,6 +1,8 @@
import os
from bugout.app import Bugout
from entity.client import Entity # type: ignore
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
@ -9,6 +11,15 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
BUGOUT_REQUEST_TIMEOUT_SECONDS = 5
HUMBUG_REPORTER_BACKEND_TOKEN = os.environ.get("HUMBUG_REPORTER_BACKEND_TOKEN")
@ -111,3 +122,10 @@ if MOONSTREAM_S3_QUERIES_BUCKET_PREFIX == "":
raise ValueError(
"MOONSTREAM_S3_QUERIES_BUCKET_PREFIX environment variable must be set"
)
## Moonstream resources types
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.2.3"
MOONSTREAMAPI_VERSION = "0.2.4"

Wyświetl plik

@ -14,6 +14,7 @@ setup(
"appdirs",
"boto3",
"bugout>=0.1.19",
"moonstream-entity>=0.0.4",
"fastapi",
"moonstreamdb>=0.3.3",
"humbug",
@ -23,7 +24,7 @@ setup(
"python-multipart",
"python-slugify",
"uvicorn",
"web3",
"web3>=5.30.0, <6",
],
extras_require={
"dev": ["black", "isort", "mypy", "types-requests", "types-python-dateutil"],

Wyświetl plik

@ -2,7 +2,16 @@ from collections import OrderedDict
import hashlib
import json
import logging
from typing import Any, Dict
from typing import Any, Dict, Optional, Union
import uuid
from bugout.data import (
BugoutResources,
)
from bugout.exceptions import BugoutResponseException
from .middleware import MoonstreamHTTPException
from .settings import bugout_client as bc
import boto3 # type: ignore
@ -10,6 +19,12 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EntityCollectionNotFoundException(Exception):
"""
Raised when entity collection is not found
"""
def push_data_to_bucket(
data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {}
) -> None:
@ -56,3 +71,33 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
).hexdigest()
return hash
def get_entity_subscription_collection_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> Optional[str]:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
"""
params = {
"type": resource_type,
"user_id": str(user_id),
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({user_id}) with token ({token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(resources.resources) == 0:
raise EntityCollectionNotFoundException("Subscription collection not found.")
else:
resource = resources.resources[0]
return resource.resource_data["collection_id"]

Wyświetl plik

@ -9,16 +9,24 @@ from typing import Any, Dict, List
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.data import BugoutResource
from entity.data import EntityResponse # type: ignore
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from .actions import generate_s3_access_links, query_parameter_hash
from .actions import (
generate_s3_access_links,
query_parameter_hash,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from . import data
from .middleware import MoonstreamHTTPException
from .settings import (
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
DOCS_TARGET_PATH,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
@ -26,8 +34,9 @@ from .settings import (
NB_CONTROLLER_ACCESS_ID,
ORIGINS,
LINKS_EXPIRATION_TIME,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
)
from .settings import bugout_client as bc
from .settings import bugout_client as bc, entity_client as ec
from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION
@ -98,21 +107,43 @@ async def status_handler(
timeout=10,
)
# get all user subscriptions
try:
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=UUID(stats_update.user_id),
)
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=stats_update.token,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=10,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({stats_update.user_id}) with token: {stats_update.token}, error: {str(e)}"
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
# get subscription entities
s3_client = boto3.client("s3")
subscription_by_id: Dict[str, EntityResponse] = {}
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
# get subscription by id
subscription: EntityResponse = ec.get_entity(
token=stats_update.token,
collection_id=collection_id,
entity_id=dashboard_subscription_filters["subscription_id"],
)
subscription_by_id[str(subscription.entity_id)] = subscription
try:
background_tasks.add_task(
dashboard.stats_generate_api_task,
@ -133,31 +164,37 @@ async def status_handler(
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
subscription = subscription_by_id[
# get subscription by id
subscription_entity = subscription_by_id[
dashboard_subscription_filters["subscription_id"]
]
for reqired_field in subscription.required_fields:
if "subscription_type_id" in reqired_field:
subscriprions_type = reqired_field["subscription_type_id"]
for timescale in stats_update.timescales:
presigned_urls_response[subscription.id] = {}
presigned_urls_response[subscription_entity.entity_id] = {}
try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{stats_update.dashboard_id}/v1/{timescale}.json'
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription_entity.address}/{stats_update.dashboard_id}/v1/{timescale}.json"
object = s3_client.head_object(
Bucket=subscription.resource_data["bucket"], Key=result_key
Bucket=MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, Key=result_key
)
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": subscription.resource_data["bucket"],
"Bucket": MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
"Key": result_key,
},
ExpiresIn=300,
HttpMethod="GET",
)
presigned_urls_response[subscription.id][timescale] = {
presigned_urls_response[subscription_entity.entity_id][timescale] = {
"url": stats_presigned_url,
"headers": {
"If-Modified-Since": (
@ -167,7 +204,7 @@ async def status_handler(
}
except Exception as err:
logger.warning(
f"Can't generate S3 presigned url in stats endpoint for Bucket:{subscription.resource_data['bucket']}, Key:{result_key} get error:{err}"
f"Can't generate S3 presigned url in stats endpoint for Bucket:{MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET}, Key:{result_key} get error:{err}"
)
return presigned_urls_response

Wyświetl plik

@ -10,6 +10,7 @@ class StatsUpdateRequest(BaseModel):
dashboard_id: str
timescales: List[str]
token: str
user_id: str
@dataclass

Wyświetl plik

@ -3,17 +3,27 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from entity.client import Entity # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
)
@ -139,6 +149,14 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
"MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX environment variable must be set"
)
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET = os.environ.get(
"MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET"
)
if MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET is None:
raise ValueError(
"MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET environment variable must be set"
)
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)
@ -254,3 +272,10 @@ infura_networks = {
"url": f"https://polygon-mumbai.infura.io/v3/{INFURA_PROJECT_ID}",
},
}
## Moonstream resources types
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"

Wyświetl plik

@ -14,6 +14,7 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from entity.data import EntityResponse, EntityCollectionResponse # type: ignore
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
@ -32,19 +33,21 @@ from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc
from ..settings import bugout_client as bc, entity_client as ec
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
subscription_ids_by_blockchain = {
"ethereum": ["ethereum_blockchain", "ethereum_smartcontract"],
"polygon": ["polygon_blockchain", "polygon_smartcontract"],
"mumbai": ["mumbai_blockchain", "mumbai_smartcontract"],
"xdai": ["xdai_blockchain", "xdai_smartcontract"],
"wyrm": ["wyrm_blockchain", "wyrm_smartcontract"],
subscription_id_by_blockchain = {
"ethereum": "ethereum_smartcontract",
"polygon": "polygon_smartcontract",
"mumbai": "mumbai_smartcontract",
"xdai": "xdai_smartcontract",
"wyrm": "wyrm_smartcontract",
}
blockchain_by_subscription_id = {
@ -93,13 +96,14 @@ BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
def push_statistics(
statistics_data: Dict[str, Any],
subscription: Any,
subscription_type_id: str,
address: str,
timescale: str,
bucket: str,
dashboard_id: Union[UUID, str],
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription_type_id]}/contracts_data/{address}/{dashboard_id}/v1/{timescale}.json"
s3 = boto3.client("s3")
s3.put_object(
@ -585,32 +589,41 @@ def stats_generate_handler(args: argparse.Namespace):
timeout=10,
)
dashboards_by_subscription: Dict[str, List[BugoutResource]] = {}
for dashboard in dashboard_resources.resources:
dashboard_subscription_settings = dashboard.resource_data.get(
"subscription_settings"
)
if dashboard_subscription_settings:
for dashboard_setting in dashboard_subscription_settings:
subscription_id = dashboard_setting.get("subscription_id")
if subscription_id:
if subscription_id not in dashboards_by_subscription:
dashboards_by_subscription[subscription_id] = []
dashboards_by_subscription[subscription_id].append(dashboard)
logger.info(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# get all subscriptions
available_subscriptions: List[BugoutResource] = []
# Get all users entity collections
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"subscription_type_id": subscription_type,
},
timeout=10,
)
available_subscriptions.extend(blockchain_subscriptions.resources)
user_entity_collections: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
},
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in available_subscriptions
user_collection_by_id = {
str(collection.resource_data["user_id"]): collection.resource_data[
"collection_id"
]
for collection in user_entity_collections.resources
if collection.resource_data.get("collection_id")
}
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
subscriptions_count = 0
# generate merged events and functions calls for all subscriptions
@ -636,164 +649,196 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
for user_id, collection_id in user_collection_by_id.items():
# request all subscriptions for user
if subscription_id not in subscription_by_id:
# Mean it's are different blockchain type
continue
user_subscriptions: EntityCollectionResponse = ec.search_entities(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
required_field=[
"subscription_type_id:{}".format(
subscription_id_by_blockchain[args.blockchain]
)
],
)
try:
UUID(subscription_id)
except Exception as err:
logger.error(
f"Subscription id {subscription_id} is not valid UUID: {err}"
)
continue
logger.info(
f"Amount of user subscriptions: {len(user_subscriptions.entities)}"
)
address = subscription_by_id[subscription_id].resource_data[
"address"
]
if address not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address] = {}
for subscription in user_subscriptions.entities:
subscription_id = str(subscription.entity_id)
if (
str(dashboard.id)
not in address_dashboard_id_subscription_id_tree
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
] = []
if subscription_id not in dashboards_by_subscription:
logger.info(
f"Subscription {subscription_id} has no dashboard. Skipping."
)
continue
if (
subscription_id
not in address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
]
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
].append(subscription_id)
dashboards = dashboards_by_subscription[subscription_id]
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
for dashboard in dashboards:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters[
"subscription_id"
]
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
if address not in merged_functions:
merged_functions[address] = {}
merged_functions[address]["merged"] = set()
if str(dashboard.id) not in merged_events[address]:
merged_events[address][str(dashboard.id)] = {}
if str(dashboard.id) not in merged_functions[address]:
merged_functions[address][str(dashboard.id)] = {}
merged_events[address][str(dashboard.id)][subscription_id] = events
merged_functions[address][str(dashboard.id)][
subscription_id
] = methods
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
external_call_hash = hashlib.md5(
json.dumps(external_call_without_display_name).encode(
"utf-8"
try:
UUID(subscription_id)
except Exception as err:
logger.error(
f"Subscription id {subscription_id} is not valid UUID: {err}"
)
).hexdigest()
continue
if str(dashboard.id) not in merged_external_calls:
merged_external_calls[str(dashboard.id)] = {}
address = subscription.address
if address not in address_dashboard_id_subscription_id_tree:
address_dashboard_id_subscription_id_tree[address] = {}
if (
str(dashboard.id)
not in address_dashboard_id_subscription_id_tree
):
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
] = []
if (
subscription_id
not in merged_external_calls[str(dashboard.id)]
not in address_dashboard_id_subscription_id_tree[
address
][str(dashboard.id)]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {}
address_dashboard_id_subscription_id_tree[address][
str(dashboard.id)
].append(subscription_id)
if (
external_call_hash
not in merged_external_calls[str(dashboard.id)][
subscription_id
]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {external_call_hash: external_call["display_name"]}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
abi = None
if "abi" in subscription.secondary_fields:
abi = subscription.secondary_fields["abi"]
# Fill merged events and functions calls for all subscriptions
# Read required events, functions and web3_call form ABI
if abi is None:
methods = []
events = []
abi_json = {}
for event in events:
merged_events[address]["merged"].add(event)
else:
abi_json = json.loads(abi)
for method in methods:
merged_functions[address]["merged"].add(method)
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters[
"all_methods"
],
abi_json=abi_json,
)
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters[
"all_events"
],
abi_json=abi_json,
)
if address not in merged_events:
merged_events[address] = {}
merged_events[address]["merged"] = set()
if address not in merged_functions:
merged_functions[address] = {}
merged_functions[address]["merged"] = set()
if str(dashboard.id) not in merged_events[address]:
merged_events[address][str(dashboard.id)] = {}
if str(dashboard.id) not in merged_functions[address]:
merged_functions[address][str(dashboard.id)] = {}
merged_events[address][str(dashboard.id)][
subscription_id
] = events
merged_functions[address][str(dashboard.id)][
subscription_id
] = methods
# Get external calls from ABI.
# external_calls merging required direct hash of external_call object.
# or if more correct hash of address and function call signature.
# create external_call selectors.
external_calls = [
external_call
for external_call in abi_json
if external_call["type"] == "external_call"
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
"type": "external_call",
"address": external_call["address"],
"name": external_call["name"],
"inputs": external_call["inputs"],
"outputs": external_call["outputs"],
}
external_call_hash = hashlib.md5(
json.dumps(
external_call_without_display_name
).encode("utf-8")
).hexdigest()
if str(dashboard.id) not in merged_external_calls:
merged_external_calls[str(dashboard.id)] = {}
if (
subscription_id
not in merged_external_calls[str(dashboard.id)]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {}
if (
external_call_hash
not in merged_external_calls[str(dashboard.id)][
subscription_id
]
):
merged_external_calls[str(dashboard.id)][
subscription_id
] = {
external_call_hash: external_call[
"display_name"
]
}
if (
external_call_hash
not in merged_external_calls["merged"]
):
merged_external_calls["merged"][
external_call_hash
] = external_call_without_display_name
# Fill merged events and functions calls for all subscriptions
for event in events:
merged_events[address]["merged"].add(event)
for method in methods:
merged_functions[address]["merged"].add(method)
except Exception as e:
logger.error(f"Error while merging subscriptions: {e}")
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
@ -919,19 +964,15 @@ def stats_generate_handler(args: argparse.Namespace):
function
] = functions_calls_data[function]
bucket = subscription_by_id[
subscription_id
].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription=subscription_by_id[subscription_id],
subscription_type_id=subscription_id_by_blockchain[
args.blockchain
],
address=address,
timescale=timescale,
bucket=bucket,
bucket=MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, # type: ignore
dashboard_id=dashboard_id,
)
except Exception as err:
@ -970,7 +1011,7 @@ def stats_generate_handler(args: argparse.Namespace):
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, BugoutResource],
subscription_by_id: Dict[str, EntityResponse],
access_id: Optional[UUID] = None,
):
"""
@ -980,47 +1021,49 @@ def stats_generate_api_task(
with yield_db_read_only_session_ctx() as db_session:
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
subscription_type_id = None
for required_field in subscription_by_id[
subscription_id
].required_fields:
if "subscription_type_id" in required_field:
subscription_type_id = required_field["subscription_type_id"]
if not subscription_type_id:
logger.warning(
f"Subscription type not found for subscription: {subscription_id}"
)
continue
blockchain_type = AvailableBlockchainType(
blockchain_by_subscription_id[
subscription_by_id[subscription_id].resource_data[
"subscription_type_id"
]
]
blockchain_by_subscription_id[subscription_type_id]
)
s3_data_object: Dict[str, Any] = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
address = subscription_by_id[subscription_id].address
crawler_label = CRAWLER_LABEL
if address in ("0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",):
crawler_label = "moonworm"
abi = None
if "abi" in subscription_by_id[subscription_id].secondary_fields:
abi = subscription_by_id[subscription_id].secondary_fields["abi"]
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
if abi is None:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_json = json.loads(abi)
methods = generate_list_of_names(
type="function",
@ -1037,6 +1080,7 @@ def stats_generate_api_task(
)
# Data for cards components
extention_data = generate_web3_metrics(
db_session=db_session,
events=events,
@ -1091,18 +1135,19 @@ def stats_generate_api_task(
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data
# push data to S3 bucket
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
subscription_type_id=subscription_type_id,
address=address,
timescale=timescale,
bucket=bucket,
bucket=MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, # type: ignore
dashboard_id=dashboard.id,
)
except Exception as err:
traceback.print_exc()
reporter.error_report(
err,
[

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.2.9"
MOONCRAWL_VERSION = "0.3.0"

Wyświetl plik

@ -3,6 +3,10 @@ export BUGOUT_BROOD_URL="https://auth.bugout.dev"
export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Entity environment variables
export MOONSTREAM_ENTITY_URL="https://api.moonstream.to/entity"
# Moonstream environment variables
export MOONSTREAM_BUGOUT_TIMEOUT_SECONDS=30
export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to"
@ -28,6 +32,7 @@ export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_for_smart_contracts>"
export MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX="<Previx_for_AWS_S3_bucket_(prod,dev,..)>"
export MOONSTREAM_S3_QUERIES_BUCKET="<AWS_S3_bucket_to_store_sql_queries>"
export MOONSTREAM_S3_QUERIES_BUCKET_PREFIX="dev"
export MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET="<AWS_S3_bucket_to_store_dashboards_data>"
# 3rd parties environment variables
export MOONSTREAM_ETHERSCAN_TOKEN="<Token_for_etherscan>"

Wyświetl plik

@ -39,6 +39,7 @@ setup(
"fastapi",
"moonstreamdb>=0.3.3",
"moonstream>=0.1.1",
"moonstream-entity>=0.0.4",
"moonworm[moonstream]>=0.6.2",
"humbug",
"pydantic==1.9.2",

Wyświetl plik

@ -1,38 +1,61 @@
import React, { useContext, useMemo } from "react";
import React, { useContext, useMemo, useState, useEffect } from "react";
import { useQuery } from "react-query";
import { useToast } from "../core/hooks";
import { chakra, Stack, Spinner } from "@chakra-ui/react";
import { useSubscription, usePresignedURL } from "../core/hooks";
import { queryCacheProps } from "../core/hooks/hookCommon";
import { SubscriptionsService } from "../core/services";
import CheckboxGrouped from "./CheckboxGrouped";
import UIContext from "../core/providers/UIProvider/context";
import UserContext from "../core/providers/UserProvider/context";
import {
DASHBOARD_CONFIGURE_SETTING_SCOPES,
DASHBOARD_UPDATE_ACTIONS,
} from "../core/constants";
const SuggestABI = ({ subscriptionId, state }) => {
const { subscriptionLinksCache } = useSubscription({
id: subscriptionId,
});
const toast = useToast();
const user = useContext(UserContext);
const subscriptionLinksCache = useQuery(
["dashboardLinks", subscriptionId],
SubscriptionsService.getSubscriptionABI(subscriptionId),
{
...queryCacheProps,
onError: (error) => {
toast(error, "error");
},
enabled: !!user && !!subscriptionId,
}
);
const { dispatchDashboardUpdate } = useContext(UIContext);
const { data, isLoading } = usePresignedURL({
url: subscriptionLinksCache?.data?.data?.url,
isEnabled: true,
id: subscriptionId,
cacheType: "abi",
requestNewURLCallback: subscriptionLinksCache.refetch,
});
const [abi, setAbi] = useState(null);
useEffect(() => {
if (subscriptionLinksCache?.data?.data?.abi) {
setAbi(JSON.parse(subscriptionLinksCache?.data?.data?.abi));
}
}, [subscriptionLinksCache.data]);
const abiEvents = useMemo(
() => data && data.filter((abiItem) => abiItem.type === "event"),
[data]
() => abi && abi.filter((abiItem) => abiItem.type === "event"),
[abi]
);
const abiMethods = useMemo(
() => data && data.filter((abiItem) => abiItem.type === "function"),
[data]
() => abi && abi.filter((abiItem) => abiItem.type === "function"),
[abi]
);
if (isLoading) return <Spinner />;
if (!data) return "";
// Waiting for abi to be available
if (!abi) return <Spinner />;
return (
<>

Wyświetl plik

@ -308,20 +308,21 @@ const SubscriptionCard = ({ subscription, isDesktopView, iconLink }) => {
{subscription.abi ? (
<CheckIcon />
) : (
<Button
colorScheme="orange"
size="xs"
py={2}
disabled={!subscription.address}
onClick={() =>
overlay.toggleModal({
type: MODAL_TYPES.UPLOAD_ABI,
props: { id: subscription.id },
})
}
>
Upload
</Button>
<></>
// <Button
// colorScheme="orange"
// size="xs"
// py={2}
// disabled={!subscription.address}
// onClick={() =>
// overlay.toggleModal({
// type: MODAL_TYPES.UPLOAD_ABI,
// props: { id: subscription.id },
// })
// }
// >
// Upload
// </Button>
)}
</Td>
<Td {...cellProps}>

Wyświetl plik

@ -9,7 +9,7 @@ const useSubscription = ({ id }) => {
const toast = useToast();
const user = useContext(UserContext);
const subscriptionLinksCache = useQuery(
const { subscriptionLinksCache } = useQuery(
["dashboardLinks", id],
SubscriptionsService.getSubscriptionABI(id),
{