From a28270ef07bfaab70af419b3fa3d6a16f92a54bb Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 30 May 2024 17:29:23 +0300 Subject: [PATCH] Added user tasks migration. --- moonstreamapi/moonstreamapi/admin/cli.py | 38 ++++++ .../moonstreamapi/admin/moonworm_tasks.py | 125 +++++++++++++++++- moonstreamapi/setup.py | 1 + 3 files changed, 162 insertions(+), 2 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 0089108f..1c82c600 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -8,6 +8,7 @@ import logging import os from posix import listdir from typing import Any, Callable, Dict, List, Optional, Union +import uuid from moonstreamdb.db import SessionLocal from sqlalchemy.orm import with_expression @@ -35,6 +36,13 @@ logger = logging.getLogger(__name__) MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations" +def uuid_type(value): + try: + return uuid.UUID(value) + except ValueError: + raise argparse.ArgumentTypeError(f"{value} is not a valid UUID.") + + def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]: if raw_arg is None: return None @@ -285,6 +293,15 @@ def generate_usage_handler(args: argparse.Namespace) -> None: output_file.write(json.dumps(usage_info, indent=4)) +def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None: + """ + Read users subsriptions and rewrite them to v3 jobs table + """ + ### Request user resources from brood + + moonworm_tasks.migrate_v3_tasks(user_id=args.user_id, customer_id=args.customer_id) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -539,6 +556,27 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) + parser_moonworm_tasks_migrate = subcommands_moonworm_tasks.add_parser( + "migrate-v3", + description="Migrate moonworm tasks to abi_jobs of moonstream index", + ) + + parser_moonworm_tasks_migrate.add_argument( + "--user-id", + required=True, + type=uuid_type, + help="user-id of which we want see subscription.", + ) + + parser_moonworm_tasks_migrate.add_argument( + "--customer-id", + required=True, + type=uuid_type, + help="customer-id of which we want see subscription.", + ) + + parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate) + queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 7a368d35..0ed18710 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,13 +1,24 @@ import json import logging +from typing import List, Dict, Union, Any +from uuid import UUID import boto3 # type: ignore -from bugout.data import BugoutResource, BugoutResources +from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult from bugout.exceptions import BugoutResponseException +from moonstreamdbv3.db import MoonstreamDBEngine +from moonstreamdbv3.models_indexes import AbiJobs +from web3 import Web3 + from ..actions import apply_moonworm_tasks, get_all_entries_from_search -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL +from ..settings import ( + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + BUGOUT_REQUEST_TIMEOUT_SECONDS, +) from ..settings import bugout_client as bc +from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES logger = logging.getLogger(__name__) @@ -81,3 +92,113 @@ def add_subscription(id: str): ) else: logging.info("For apply to moonworm tasks subscriptions must have an abi.") + + +def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: + """ + Migrate moonworm tasks + + """ + + ### get user subscription entity journal id + + subscription_resources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + params={ + "user_id": user_id, + "type": "entity_subscription", + }, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + logger.info( + "Found users collection resources: %s", len(subscription_resources.resources) + ) + + if len(subscription_resources.resources) == 0: + raise Exception("User has no subscriptions") + + collection_id = subscription_resources.resources[0].resource_data["collection_id"] + + subscriptions: List[BugoutSearchResult] = get_all_entries_from_search( + journal_id=collection_id, + search_query=f"tag:type:subscription", + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + logger.info("Found users subscriptions: %s", len(subscriptions)) + + if len(subscriptions) == 0: + raise Exception("User has no subscriptions") + + for subscription in subscriptions: + + abi = None + address = None + subscription_type_id = None + + if subscription.content is None: + continue + + subscription_data = json.loads(subscription.content) + + if "abi" in subscription_data: + abi = subscription_data["abi"] + + for tag in subscription.tags: + if tag.startswith("subscription_type_id:"): + subscription_type_id = tag.split(":")[1] + if tag.startswith("address:"): + address = tag.split(":")[1] + + if subscription_type_id is None: + continue + + ### reformat abi to separate abi tasks + + if abi is None: + continue + + chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id]["blockchain"] + + with MoonstreamDBEngine.yield_db_session_ctx() as session: + for abi_task in abi: + + if abi_task["type"] not in ("event", "function"): + continue + + abi_selector = Web3.keccak( + text=abi_task["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi_task["inputs"])) + + ")" + )[:4].hex() + + try: + + subscription = AbiJobs( + address=address, + user_id=user_id, + customer_id=customer_id, + abi_selector=abi_selector, + chain=chain, + abi_name=abi_task["name"], + status="active", + historical_crawl_status="pending", + progress=0, + moonworm_task_pickedup=False, + abi=abi_task, + ) + + session.add(subscription) + + except Exception as e: + logger.error( + f"Error creating subscription for subscription {subscription.id}: {str(e)}" + ) + session.rollback() + continue + + session.commit() + + return None diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index 10b43de9..79c93d08 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,6 +17,7 @@ setup( "fastapi", "moonstream", "moonstreamdb>=0.4.4", + "moonstreamdb-v3>=0.0.6", "humbug", "pydantic==1.10.2", "pyevmasm",