From ac852f51bb66d4e8837b9d9e27ef64cd3cd1682f Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Mon, 6 Dec 2021 21:42:30 +0200 Subject: [PATCH] Move create moonworm task to subscriptions api. --- backend/moonstreamapi/actions.py | 105 ++++++------------ backend/moonstreamapi/routes/dashboards.py | 21 +--- backend/moonstreamapi/routes/subscriptions.py | 22 +++- 3 files changed, 55 insertions(+), 93 deletions(-) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 48f072cf..d3af87b6 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -480,8 +480,8 @@ def get_all_entries_from_search( def apply_moonworm_tasks( s3_client: Any, - dashboard_subscriptions: List[data.DashboardMeta], - available_subscriptions: Dict[uuid.UUID, Dict[str, Any]], + abi: Any, + address: str, ) -> None: """ Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist @@ -489,78 +489,45 @@ def apply_moonworm_tasks( entries_pack = [] - for dashboard_subscription in dashboard_subscriptions: - if dashboard_subscription.subscription_id in available_subscriptions.keys(): + try: + entries = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + search_query=f"tag:address:{address}", + limit=100, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) - try: - bucket = available_subscriptions[ - dashboard_subscription.subscription_id - ]["bucket"] - key = available_subscriptions[dashboard_subscription.subscription_id][ - "s3_path" - ] + existing_tags = [entry.tags for entry in entries] - if bucket is None or key is None: - logger.error( - f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi" - ) + existing_hashes = [ + tag.split(":")[-1] + for tag in chain(*existing_tags) + if "abi_metod_hash" in tag + ] - s3_path = f"s3://{bucket}/{key}" + abi_hashes_dict = { + hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method + for method in abi + if (method["type"] in ("event", "function")) + and (method.get("stateMutability", "") != "view") + } - try: - - response = s3_client.get_object( - Bucket=bucket, - Key=key, - ) - - except s3_client.exceptions.NoSuchKey as e: - logger.error( - f"Error getting Abi for subscription {str(dashboard_subscription.subscription_id)} S3 {s3_path} does not exist : {str(e)}" - ) - - abi = json.loads(response["Body"].read()) - - entries = get_all_entries_from_search( - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - search_query=f"tag:address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}", - limit=100, - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + for hash in abi_hashes_dict: + if hash not in existing_hashes: + entries_pack.append( + { + "title": address, + "content": json.dumps(abi_hashes_dict[hash], indent=4), + "tags": [ + f"address:{address}", + f"type:{abi_hashes_dict[hash]['type']}", + f"abi_metod_hash:{hash}", + f"status:active", + ], + } ) - - existing_tags = [entry.tags for entry in entries] - - existing_hashes = [ - tag.split(":")[-1] - for tag in chain(*existing_tags) - if "abi_metod_hash" in tag - ] - - abi_hashes_dict = { - hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method - for method in abi - if (method["type"] in ("event", "function")) - and (method.get("stateMutability", "") != "view") - } - - for hash in abi_hashes_dict: - if hash not in existing_hashes: - entries_pack.append( - { - "title": available_subscriptions[ - dashboard_subscription.subscription_id - ]["address"], - "content": json.dumps(abi_hashes_dict[hash], indent=4), - "tags": [ - f"address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}", - f"type:{abi_hashes_dict[hash]['type']}", - f"abi_metod_hash:{hash}", - f"status:active", - ], - } - ) - except Exception as e: - reporter.error_report(e) + except Exception as e: + reporter.error_report(e) if len(entries_pack) > 0: bc.create_entries_pack( diff --git a/backend/moonstreamapi/routes/dashboards.py b/backend/moonstreamapi/routes/dashboards.py index 609578d2..f4331112 100644 --- a/backend/moonstreamapi/routes/dashboards.py +++ b/backend/moonstreamapi/routes/dashboards.py @@ -7,7 +7,7 @@ from uuid import UUID import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Request, Query, Body, BackgroundTasks +from fastapi import APIRouter, Request, Query, Body from .. import actions from .. import data @@ -41,7 +41,6 @@ blockchain_by_subscription_id = { @router.post("/", tags=["dashboards"], response_model=BugoutResource) async def add_dashboard_handler( request: Request, - background_tasks: BackgroundTasks, dashboard: data.DashboardCreate = Body(...), ) -> BugoutResource: """ @@ -150,15 +149,7 @@ async def add_dashboard_handler( except Exception as e: logger.error(f"Error creating dashboard resource: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) - - # Generate tasks for moonworm - - background_tasks.add_task( - actions.apply_moonworm_tasks, - s3_client, - dashboard_subscriptions, - available_subscriptions, - ) + return resource @@ -242,7 +233,6 @@ async def get_dashboard_handler( async def update_dashboard_handler( request: Request, dashboard_id: str, - background_tasks: BackgroundTasks, dashboard: data.DashboardUpdate = Body(...), ) -> BugoutResource: """ @@ -350,13 +340,6 @@ async def update_dashboard_handler( logger.error(f"Error updating subscription resource: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) - background_tasks.add_task( - actions.apply_moonworm_tasks, - s3_client, - dashboard_subscriptions, - available_subscriptions, - ) - return resource diff --git a/backend/moonstreamapi/routes/subscriptions.py b/backend/moonstreamapi/routes/subscriptions.py index c81014a2..de89a21b 100644 --- a/backend/moonstreamapi/routes/subscriptions.py +++ b/backend/moonstreamapi/routes/subscriptions.py @@ -10,13 +10,10 @@ from typing import List, Optional, Dict, Any import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Depends, Request, Form +from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks from web3 import Web3 -from ..actions import ( - validate_abi_json, - upload_abi_to_s3, -) +from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks from ..admin import subscription_types from .. import data from ..middleware import MoonstreamHTTPException @@ -41,6 +38,7 @@ BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" @router.post("/", tags=["subscriptions"], response_model=data.SubscriptionResourceData) async def add_subscription_handler( request: Request, # subscription_data: data.CreateSubscriptionRequest = Body(...) + background_tasks: BackgroundTasks, address: str = Form(...), color: str = Form(...), label: str = Form(...), @@ -143,6 +141,12 @@ async def add_subscription_handler( logger.error(f"Error getting user subscriptions: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + background_tasks.add_task( + apply_moonworm_tasks, + json_abi, + address, + ) + return data.SubscriptionResourceData( id=str(resource.id), user_id=resource.resource_data["user_id"], @@ -234,6 +238,7 @@ async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListR async def update_subscriptions_handler( request: Request, subscription_id: str, + background_tasks: BackgroundTasks, color: Optional[str] = Form(None), label: Optional[str] = Form(None), abi: Optional[str] = Form(None), @@ -301,6 +306,13 @@ async def update_subscriptions_handler( logger.error(f"Error getting user subscriptions: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + if abi: + background_tasks.add_task( + apply_moonworm_tasks, + json_abi, + subscription_resource.resource_data["address"], + ) + return data.SubscriptionResourceData( id=str(resource.id), user_id=resource.resource_data["user_id"],