From 9fa949ec0fecf2d1891fc160c38425b9014b25e0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Dec 2023 22:42:12 +0200 Subject: [PATCH] Add changes. --- moonstreamapi/moonstreamapi/admin/cli.py | 61 ++++ .../moonstreamapi/admin/moonworm_tasks.py | 315 ++++++++++++++++++ 2 files changed, 376 insertions(+) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index b05a595a..e9faf125 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -39,6 +39,15 @@ def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]: return False +def get_moonworm_tasks_report(args: argparse.Namespace) -> str: + repot = moonworm_tasks.get_moonworm_tasks_state( + blockchain=args.blockchain, + ) + + with open("moonworm_tasks_report.json", "w") as f: + json.dump(repot, f, indent=4) + + def migration_run(step_map, command, step, step_order): if step is None: # run all steps @@ -478,6 +487,58 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) + parser_moonworm_tasks_report = subcommands_moonworm_tasks.add_parser( + "report", description="Return report of moonworm tasks." + ) + + parser_moonworm_tasks_report.add_argument( + "-b", + "--blockchain", + type=str, + help="Blockchain for report.", + ) + + parser_moonworm_tasks_report.set_defaults(func=get_moonworm_tasks_report) + + parser_moonworm_tasks_manage = subcommands_moonworm_tasks.add_parser( + "manage", description="Manage moonworm tasks." + ) + + parser_moonworm_tasks_manage.add_argument( + "-a", + "--action", + type=str, + choices=["view", "delete", "restart", "mark_as_finished"], + default="view", + help="Action to perform on the tasks (view, delete, restart, mark_as_finished).", + ) + + # Add argument for specifying the blockchain + parser_moonworm_tasks_manage.add_argument( + "-b", "--blockchain", type=str, required=True, help="Blockchain for the tasks." + ) + + # Add argument for specifying the addresses + parser_moonworm_tasks_manage.add_argument( + "-ad", + "--addresses", + nargs="+", + required=True, + help="List of addresses associated with the tasks.", + ) + + # Add argument for specifying the task type + parser_moonworm_tasks_manage.add_argument( + "-t", + "--task-type", + type=str, + help="Type of the tasks (event or function).", + ) + + parser_moonworm_tasks_manage.set_defaults( + func=moonworm_tasks.moonworm_tasks_manage_handler + ) + queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 33fc6d7a..129d2aed 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,5 +1,7 @@ import logging import json +from enum import Enum +from typing import List, Optional, Literal import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources @@ -9,10 +11,19 @@ from bugout.exceptions import BugoutResponseException from ..actions import get_all_entries_from_search, apply_moonworm_tasks from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL from ..settings import bugout_client as bc +from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES + logger = logging.getLogger(__name__) +class ActionType(Enum): + DELETE = "delete" + RESTART = "restart" + MARK_AS_FINISHED = "mark_as_finished" + VIEW = "view" + + def get_list_of_addresses(): """ Return list of addresses of tasks @@ -82,3 +93,307 @@ def add_subscription(id: str): ) else: logging.info("For apply to moonworm tasks subscriptions must have an abi.") + + +def restart_tasks(bc, token, journal_id, tasks): + delete_entries_tags = [] + add_entries_tags = [] + for task in tasks: + entry_id = task["entry_id"] + # Find tags to delete + tags_to_delete = [ + tag + for tag in task["tags"] + if tag.startswith("historical_crawl_status:") or tag.startswith("progress:") + ] + + # Delete tags + if tags_to_delete: + delete_entries_tags.append({"entry_id": entry_id, "tags": tags_to_delete}) + + # Add new tags + tags_to_add = ["progress:0", "historical_crawl_status:pending"] + add_entries_tags.append({"entry_id": entry_id, "tags": tags_to_add}) + + print(delete_entries_tags) + + # Delete tags + if delete_entries_tags: + bc.delete_entries_tags( + token=token, journal_id=journal_id, entries_tags=delete_entries_tags + ) + + # Add tags + if add_entries_tags: + bc.create_entries_tags( + token=token, journal_id=journal_id, entries_tags=add_entries_tags + ) + + +def mark_tasks_as_finished(bc, token, journal_id, tasks): + add_entries_tags = [] + for task in tasks: + entry_id = task["entry_id"] + # Add new tags + tags_to_add = ["progress:100", "historical_crawl_status:finished"] + add_entries_tags.append({"entry_id": entry_id, "tags": tags_to_add}) + + # Add tags + if add_entries_tags: + bc.create_entries_tags( + token=token, journal_id=journal_id, entries_tags=add_entries_tags + ) + + +def delete_tasks(bc, token, journal_id, tasks): + for task in tasks: + entry_id = task["entry_id"] + # Delete the task + bc.delete_entry(token, journal_id, entry_id) + + +def moonworm_tasks_manage_handler(args): + print( + f"Managing moonworm tasks with action: {args.action}, blockchain: {args.blockchain}, addresses: {args.addresses}, task type: {args.task_type}" + ) + return manage_moonworm_tasks( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + blockchain=args.blockchain, + addresses=args.addresses, + task_type=args.task_type, + action=ActionType(args.action), + ) + + +def get_moonworm_tasks_by_filters( + journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token: str = MOONSTREAM_ADMIN_ACCESS_TOKEN, + blockchain: str = "ethereum", + addresses: List[str] = [], + task_type: Optional[Literal["event", "function"]] = None, + task_names: Optional[List[str]] = None, +): + """ + Return list of tags depends on query and tag + """ + + blockchain_to_subscription_type = { + value.blockchain: key + for key, value in CANONICAL_SUBSCRIPTION_TYPES.items() + if "smartcontract" in key + } + + if blockchain not in blockchain_to_subscription_type: + logger.error(f"Unknown blockchain {blockchain}") + return + + search_query = f"#subscription_type:{blockchain_to_subscription_type[blockchain]} #moonworm_task_pickedup:True" + + if addresses: + search_query += " " + " ".join([f"#address:{address}" for address in addresses]) + + if task_type: + search_query += f" #type:{task_type}" + + if task_names: + search_query += " " + " ".join([f"abi_name:{name}" for name in task_names]) + + print(search_query) + + entries = get_all_entries_from_search( + journal_id=journal_id, + search_query=search_query, + limit=100, + token=token, + ) + + print(f"Found {len(entries)} moonworm tasks") + + tasks = [] + + for entry in entries: + historical_crawl_status = [ + tag.split(":")[-1] + for tag in entry.tags + if tag.startswith("historical_crawl_status:") + ] + + address = [tag for tag in entry.tags if tag.startswith("address:")] + + progress = [tag for tag in entry.tags if tag.startswith("progress:")] + + abi_name = [tag for tag in entry.tags if tag.startswith("abi_name:")] + + moonworm_task_pickedup = [ + tag.split(":")[-1] + for tag in entry.tags + if tag.startswith("moonworm_task_pickedup:") + ] + + historical_crawl_status = " ".join(historical_crawl_status) + address = address[0].split(":")[1] if len(address) > 0 else "address not found" + progress = ( + progress[0].split(":")[1] if len(progress) > 0 else "progress not found" + ) + abi_name = ( + abi_name[0].split(":")[1] if len(abi_name) > 0 else "abi_name not found" + ) + + tasks.append( + { + "entry_id": entry.entry_url.split("/")[-1], + "tags": entry.tags, + "historical_crawl_status": historical_crawl_status, + "address": address, + "progress": progress, + "abi_name": abi_name, + "moonworm_task_pickedup": " ".join(moonworm_task_pickedup), + } + ) + + return tasks + + +def get_moonworm_tasks_state( + journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token: str = MOONSTREAM_ADMIN_ACCESS_TOKEN, + blockchain: str = "ethereum", +): + """ + Return list of tags depends on query and tag + """ + + blockchain_to_subscription_type = { + value["blockchain"]: key + for key, value in CANONICAL_SUBSCRIPTION_TYPES.items() + if "smartcontract" in value + } + + if blockchain not in blockchain_to_subscription_type: + logger.error(f"Unknown blockchain {blockchain}") + return + + entries = get_all_entries_from_search( + journal_id=journal_id, + search_query=f"#subscription_type:{blockchain_to_subscription_type[blockchain]} #moonworm_task_pickedup:True", + limit=100, + token=token, + ) + + print(f"Found {len(entries)} moonworm tasks") + + ### loop over tasks split by historical_crawl_status:in_progress and historical_crawl_status:finished and historical_crawl_status:pending + + tasks = { + "in_progress": {}, + "finished": {}, + "pending": {}, + } + + for entry in entries: + historical_crawl_status = [ + tag for tag in entry.tags if tag.startswith("historical_crawl_status:") + ] + + address = [tag for tag in entry.tags if tag.startswith("address:")] + + progress = [tag for tag in entry.tags if tag.startswith("progress:")] + + abi_name = [tag for tag in entry.tags if tag.startswith("abi_name:")] + + if len(historical_crawl_status) == 0: + logger.warn( + f"Unable to find historical_crawl_status in task: {entry.entry_url.split()[-1]}" + ) + continue + + historical_crawl_status = historical_crawl_status[0].split(":")[1] + address = address[0].split(":")[1] + progress = progress[0].split(":")[1] + abi_name = abi_name[0].split(":")[1] + + if historical_crawl_status not in tasks: + tasks[historical_crawl_status] = {} + if address not in tasks[historical_crawl_status]: + tasks[historical_crawl_status][address] = {} + + if abi_name not in tasks[historical_crawl_status][address]: + tasks[historical_crawl_status][address][abi_name] = progress + + return tasks + + +def confirm_action(action, tasks): + """Ask the user for confirmation before proceeding with the action.""" + print(f"You are about to {action} the following tasks:") + tasks = sorted(tasks, key=lambda task: task["address"]) + for task in tasks: + print(f" {task['address']} {task['abi_name']} {task['progress']}%") + + confirmation = input( + f"Do you want to proceed with {action} these tasks? (yes/no): " + ) + return confirmation.lower() == "yes" + + +def view_tasks(tasks): + """Display details of the tasks.""" + if not tasks: + print("No tasks to display.") + return + + tasks = sorted(tasks, key=lambda task: task["address"]) + for task in tasks: + print( + f" {task['address']} {task['abi_name']} {task['progress']}% HC statuses:{task['historical_crawl_status']} moonworm:{task['moonworm_task_pickedup']}" + ) + + +def manage_moonworm_tasks( + journal_id: str, + token: str, + blockchain: str, + addresses: List[str], + task_type: Optional[str], + action: ActionType, +): + """ + Manage moonworm tasks based on the provided parameters. + + :param journal_id: ID of the journal containing the tasks. + :param token: Access token for authentication. + :param blockchain: Type of blockchain (e.g., 'ethereum'). + :param addresses: List of contract addresses to filter tasks. + :param task_type: Type of the task ('event' or 'function'). + :param action: Action to perform ('delete', 'restart', 'mark_as_finished'). + """ + + # Get all tasks matching the provided filters + filtered_tasks = get_moonworm_tasks_by_filters( + journal_id=journal_id, + token=token, + blockchain=blockchain, + addresses=addresses, + task_type=task_type, + ) + + # Confirm action with the user + if action == ActionType.VIEW: + view_tasks(filtered_tasks) + else: + if confirm_action(action.value, filtered_tasks): + if action == ActionType.DELETE: + delete_tasks(bc, token, journal_id, filtered_tasks) + elif action == ActionType.RESTART: + restart_tasks(bc, token, journal_id, filtered_tasks) + elif action == ActionType.MARK_AS_FINISHED: + mark_tasks_as_finished(bc, token, journal_id, filtered_tasks) + print(f"Action '{action.value}' completed on filtered tasks.") + else: + print("Action cancelled by the user.") + + return { + "status": "success", + "message": f"Action '{action.value}' was approved and executed.", + }