Added user tasks migration.

pull/1076/head
Andrey 2024-05-30 17:29:23 +03:00
rodzic 62adb3becd
commit a28270ef07
3 zmienionych plików z 162 dodań i 2 usunięć

Wyświetl plik

@ -8,6 +8,7 @@ import logging
import os import os
from posix import listdir from posix import listdir
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
import uuid
from moonstreamdb.db import SessionLocal from moonstreamdb.db import SessionLocal
from sqlalchemy.orm import with_expression from sqlalchemy.orm import with_expression
@ -35,6 +36,13 @@ logger = logging.getLogger(__name__)
MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations" MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations"
def uuid_type(value):
try:
return uuid.UUID(value)
except ValueError:
raise argparse.ArgumentTypeError(f"{value} is not a valid UUID.")
def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]: def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]:
if raw_arg is None: if raw_arg is None:
return None return None
@ -285,6 +293,15 @@ def generate_usage_handler(args: argparse.Namespace) -> None:
output_file.write(json.dumps(usage_info, indent=4)) output_file.write(json.dumps(usage_info, indent=4))
def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None:
"""
Read users subsriptions and rewrite them to v3 jobs table
"""
### Request user resources from brood
moonworm_tasks.migrate_v3_tasks(user_id=args.user_id, customer_id=args.customer_id)
def main() -> None: def main() -> None:
cli_description = f"""Moonstream Admin CLI cli_description = f"""Moonstream Admin CLI
@ -539,6 +556,27 @@ This CLI is configured to work with the following API URLs:
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
parser_moonworm_tasks_migrate = subcommands_moonworm_tasks.add_parser(
"migrate-v3",
description="Migrate moonworm tasks to abi_jobs of moonstream index",
)
parser_moonworm_tasks_migrate.add_argument(
"--user-id",
required=True,
type=uuid_type,
help="user-id of which we want see subscription.",
)
parser_moonworm_tasks_migrate.add_argument(
"--customer-id",
required=True,
type=uuid_type,
help="customer-id of which we want see subscription.",
)
parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate)
queries_parser = subcommands.add_parser( queries_parser = subcommands.add_parser(
"queries", description="Manage Moonstream queries" "queries", description="Manage Moonstream queries"
) )

Wyświetl plik

@ -1,13 +1,24 @@
import json import json
import logging import logging
from typing import List, Dict, Union, Any
from uuid import UUID
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException from bugout.exceptions import BugoutResponseException
from moonstreamdbv3.db import MoonstreamDBEngine
from moonstreamdbv3.models_indexes import AbiJobs
from web3 import Web3
from ..actions import apply_moonworm_tasks, get_all_entries_from_search from ..actions import apply_moonworm_tasks, get_all_entries_from_search
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from ..settings import bugout_client as bc from ..settings import bugout_client as bc
from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -81,3 +92,113 @@ def add_subscription(id: str):
) )
else: else:
logging.info("For apply to moonworm tasks subscriptions must have an abi.") logging.info("For apply to moonworm tasks subscriptions must have an abi.")
def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None:
"""
Migrate moonworm tasks
"""
### get user subscription entity journal id
subscription_resources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
params={
"user_id": user_id,
"type": "entity_subscription",
},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
logger.info(
"Found users collection resources: %s", len(subscription_resources.resources)
)
if len(subscription_resources.resources) == 0:
raise Exception("User has no subscriptions")
collection_id = subscription_resources.resources[0].resource_data["collection_id"]
subscriptions: List[BugoutSearchResult] = get_all_entries_from_search(
journal_id=collection_id,
search_query=f"tag:type:subscription",
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
logger.info("Found users subscriptions: %s", len(subscriptions))
if len(subscriptions) == 0:
raise Exception("User has no subscriptions")
for subscription in subscriptions:
abi = None
address = None
subscription_type_id = None
if subscription.content is None:
continue
subscription_data = json.loads(subscription.content)
if "abi" in subscription_data:
abi = subscription_data["abi"]
for tag in subscription.tags:
if tag.startswith("subscription_type_id:"):
subscription_type_id = tag.split(":")[1]
if tag.startswith("address:"):
address = tag.split(":")[1]
if subscription_type_id is None:
continue
### reformat abi to separate abi tasks
if abi is None:
continue
chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id]["blockchain"]
with MoonstreamDBEngine.yield_db_session_ctx() as session:
for abi_task in abi:
if abi_task["type"] not in ("event", "function"):
continue
abi_selector = Web3.keccak(
text=abi_task["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi_task["inputs"]))
+ ")"
)[:4].hex()
try:
subscription = AbiJobs(
address=address,
user_id=user_id,
customer_id=customer_id,
abi_selector=abi_selector,
chain=chain,
abi_name=abi_task["name"],
status="active",
historical_crawl_status="pending",
progress=0,
moonworm_task_pickedup=False,
abi=abi_task,
)
session.add(subscription)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
session.rollback()
continue
session.commit()
return None

Wyświetl plik

@ -17,6 +17,7 @@ setup(
"fastapi", "fastapi",
"moonstream", "moonstream",
"moonstreamdb>=0.4.4", "moonstreamdb>=0.4.4",
"moonstreamdb-v3>=0.0.6",
"humbug", "humbug",
"pydantic==1.10.2", "pydantic==1.10.2",
"pyevmasm", "pyevmasm",