diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index f489c81f..5f018996 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -1,6 +1,9 @@ +import hashlib import json +from itertools import chain import logging -from typing import Optional, Dict, Any, Union +from typing import List, Optional, Dict, Any, Union +import time from enum import Enum import uuid @@ -22,7 +25,7 @@ from . import data from .reporter import reporter from .middleware import MoonstreamHTTPException from .settings import ETHERSCAN_SMARTCONTRACTS_BUCKET -from bugout.data import BugoutResource +from bugout.data import BugoutResource, DashboardMeta from .settings import ( MOONSTREAM_APPLICATION_ID, bugout_client as bc, @@ -31,6 +34,7 @@ from .settings import ( MOONSTREAM_DATA_JOURNAL_ID, MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, ) from web3 import Web3 @@ -430,3 +434,132 @@ def upload_abi_to_s3( update["s3_path"] = result_key return update + + +def get_all_entries_from_search( + journal_id: str, search_query: str, limit: int, token: str +) -> List[Any]: + """ + Get all required entries from journal using search interface + """ + skips = 0 + offset = 0 + + results: List[Any] = [] + + while skips < 3: + try: + existing_metods = bc.search( + token=token, + journal_id=journal_id, + query=search_query, + content=False, + timeout=10.0, + limit=limit, + offset=offset, + ) + results.extend(existing_metods.results) + + except: + time.sleep(0.5) + skips += 1 + continue + + if len(results) == existing_metods.total_results: + break + else: + offset += limit + continue + return results + + +def apply_moonworm_tasks( + s3_client: Any, + dashboard_subscriptions: List[DashboardMeta], + available_subscriptions: Dict[uuid.UUID, Dict[str, Any]], +) -> None: + """ + Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist + """ + + entries_pack = [] + + for dashboard_subscription in dashboard_subscriptions: + if dashboard_subscription.subscription_id in available_subscriptions.keys(): + + bucket = available_subscriptions[dashboard_subscription.subscription_id][ + "bucket" + ] + key = available_subscriptions[dashboard_subscription.subscription_id][ + "s3_path" + ] + + if bucket is None or key is None: + logger.error( + f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi" + ) + + s3_path = f"s3://{bucket}/{key}" + + try: + + response = s3_client.get_object( + Bucket=bucket, + Key=key, + ) + + except s3_client.exceptions.NoSuchKey as e: + logger.error( + f"Error getting Abi for subscription {str(dashboard_subscription.subscription_id)} S3 {s3_path} does not exist : {str(e)}" + ) + + abi = json.loads(response["Body"].read()) + + entries = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + search_query=f"tag:address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}", + limit=100, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + existing_tags = [entry.tags for entry in entries] + + existing_hashes = [ + tag.split(":")[-1] + for tag in chain(*existing_tags) + if "abi_metod_hash" in tag + ] + + abi_hashes_dict = { + hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method + for method in abi + if (method["type"] in ("event", "function")) + and ( + "stateMutability" in method and method["stateMutability"] == "view" + ) + } + + for hash in abi_hashes_dict: + if hash not in existing_hashes: + entries_pack.append( + { + "title": available_subscriptions[ + dashboard_subscription.subscription_id + ]["address"], + "content": json.dumps(abi_hashes_dict[hash], indent=4), + "tags": [ + f"address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}", + f"type:{abi_hashes_dict[hash]['type']}", + f"abi_metod_hash:{hash}", + f"status:active", + ], + } + ) + + if len(entries_pack) > 0: + bc.create_entries_pack( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries=entries_pack, + timeout=10, + ) diff --git a/backend/moonstreamapi/routes/dashboards.py b/backend/moonstreamapi/routes/dashboards.py index 7cfccfc2..609578d2 100644 --- a/backend/moonstreamapi/routes/dashboards.py +++ b/backend/moonstreamapi/routes/dashboards.py @@ -7,7 +7,7 @@ from uuid import UUID import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Request, Query, Body +from fastapi import APIRouter, Request, Query, Body, BackgroundTasks from .. import actions from .. import data @@ -20,7 +20,6 @@ from ..settings import ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, ) -import pprint logger = logging.getLogger(__name__) @@ -41,7 +40,9 @@ blockchain_by_subscription_id = { @router.post("/", tags=["dashboards"], response_model=BugoutResource) async def add_dashboard_handler( - request: Request, dashboard: data.DashboardCreate = Body(...) + request: Request, + background_tasks: BackgroundTasks, + dashboard: data.DashboardCreate = Body(...), ) -> BugoutResource: """ Add subscription to blockchain stream data for user. @@ -73,7 +74,7 @@ async def add_dashboard_handler( s3_client = boto3.client("s3") - available_subscriptions = { + available_subscriptions: Dict[UUID, Dict[str, Any]] = { resource.id: resource.resource_data for resource in resources.resources } @@ -150,6 +151,14 @@ async def add_dashboard_handler( logger.error(f"Error creating dashboard resource: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + # Generate tasks for moonworm + + background_tasks.add_task( + actions.apply_moonworm_tasks, + s3_client, + dashboard_subscriptions, + available_subscriptions, + ) return resource @@ -231,7 +240,10 @@ async def get_dashboard_handler( @router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource) async def update_dashboard_handler( - request: Request, dashboard_id: str, dashboard: data.DashboardUpdate = Body(...) + request: Request, + dashboard_id: str, + background_tasks: BackgroundTasks, + dashboard: data.DashboardUpdate = Body(...), ) -> BugoutResource: """ Update dashboards mainly fully overwrite name and subscription metadata @@ -338,6 +350,13 @@ async def update_dashboard_handler( logger.error(f"Error updating subscription resource: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + background_tasks.add_task( + actions.apply_moonworm_tasks, + s3_client, + dashboard_subscriptions, + available_subscriptions, + ) + return resource diff --git a/backend/moonstreamapi/settings.py b/backend/moonstreamapi/settings.py index c491ddc7..7a510c71 100644 --- a/backend/moonstreamapi/settings.py +++ b/backend/moonstreamapi/settings.py @@ -84,3 +84,11 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None: MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/") ) + +MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get( + "MOONSTREAM_MOONWORM_TASKS_JOURNAL", "" +) +if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "": + raise ValueError( + "MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set" + )