From 5a6c8f52194926ea27bb4f61d4511f5f3a3f4b3b Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 19 Sep 2023 13:40:24 +0300 Subject: [PATCH] Add changes. --- moonstreamapi/moonstreamapi/admin/cli.py | 9 +- .../admin/migrations/add_selectors.py | 98 ++++++++++++++++++- 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 84b14b1e..5a1b0f8b 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -12,7 +12,12 @@ from sqlalchemy.orm import with_expression from moonstreamdb.db import SessionLocal -from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID +from ..settings import ( + BUGOUT_BROOD_URL, + BUGOUT_SPIRE_URL, + MOONSTREAM_APPLICATION_ID, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, +) from ..web3_provider import yield_web3_provider from . import subscription_types, subscriptions, moonworm_tasks, queries @@ -248,7 +253,7 @@ def moonworm_tasks_add_subscription_handler(args: argparse.Namespace) -> None: def main() -> None: cli_description = f"""Moonstream Admin CLI -Please m35ake sure that the following environment variables are set in your environment and exported to +Please make sure that the following environment variables are set in your environment and exported to subprocesses: 1. MOONSTREAM_APPLICATION_ID 2. MOONSTREAM_ADMIN_ACCESS_TOKEN diff --git a/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py b/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py index daa79d90..5c09accf 100644 --- a/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py +++ b/moonstreamapi/moonstreamapi/admin/migrations/add_selectors.py @@ -29,10 +29,13 @@ def fill_missing_selectors_in_moonworm_tasks() -> None: moonworm_tasks = get_all_entries_from_search( journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - search_query="type:subscription !#version:2.0", - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + search_query="#task_type:moonworm !#version:2.0", + limit=batch_size, + content=True, ) + logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 1.0") + entries_tags = [] ## batch tasks @@ -41,14 +44,24 @@ def fill_missing_selectors_in_moonworm_tasks() -> None: moonworm_tasks[i : i + batch_size] for i in range(0, len(moonworm_tasks), batch_size) ]: + count = 0 for task in task_batch: - tags = ["#version:2.0"] + tags = ["version:2.0"] ## get abi try: abi = json.loads(task.content) except Exception as e: - logger.warn(f"Unable to parse abi from task: {task.id}") + logger.warn( + f"Unable to parse abi from task: {task.entry_url.split()[-1]}: {e}" + ) + raise e + continue + + if "name" not in abi: + logger.warn( + f"Unable to find abi name in task: {task.entry_url.split()[-1]}" + ) continue if not any([tag.startswith("abi_selector:") for tag in task.tags]): @@ -63,6 +76,8 @@ def fill_missing_selectors_in_moonworm_tasks() -> None: tags.append(f"abi_selector:{abi_selector}") + count += 1 + entries_tags.append( { "entry_id": task.entry_url.split("/")[-1], ## 😭 @@ -70,6 +85,8 @@ def fill_missing_selectors_in_moonworm_tasks() -> None: } ) + logger.info(f"Found {count} missing selectors in batch {len(task_batch)} tasks") + ## update entries try: @@ -82,3 +99,76 @@ def fill_missing_selectors_in_moonworm_tasks() -> None: except BugoutResponseException as e: logger.error(f"Unable to update entries tags: {e}") continue + + +def deduplicate_moonworm_task_by_selector(): + """ + Find moonworm tasks with same selector and remove old versions + """ + + moonworm_tasks = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + search_query="#task_type:moonworm #version:2.0", + limit=100, + content=False, + ) + + logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 2.0") + + ## loop over tasks + + selectors = {} + + for task in moonworm_tasks: + tags = task.tags + + ## get selector + selector = [tag for tag in tags if tag.startswith("abi_selector:")] + + if len(selector) == 0: + logger.warn( + f"Unable to find selector in task: {task.entry_url.split()[-1]}" + ) + continue + + selector = selector[0] + + if selector not in selectors: + selectors[selector] = {"entries": {}} + + selectors[selector]["entries"][task.entry_url.split("/")[-1]] = task.created_at + + logger.info(f"Found {len(selectors)} selectors") + + for selector, tasks_dict in selectors.items(): + if len(tasks_dict["entries"]) == 1: + continue + + ## find latest task + + latest_task_id = max( + tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key] + ) + + ## remove all tasks except latest + + logger.info( + f"Found {len(tasks_dict['entries'])} tasks with selector {selector}" + ) + + for task_id in tasks_dict["entries"]: + if task_id == latest_task_id: + continue + + try: + bc.delete_entry( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entry_id=task_id, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + except BugoutResponseException as e: + logger.error(f"Unable to delete entry: {e}") + continue + + logger.info(f"Deleted entry: {task_id}")