pull/919/head
Andrey 2023-09-19 13:40:24 +03:00
rodzic 827823ab7a
commit 5a6c8f5219
2 zmienionych plików z 101 dodań i 6 usunięć

Wyświetl plik

@ -12,7 +12,12 @@ from sqlalchemy.orm import with_expression
from moonstreamdb.db import SessionLocal
from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..settings import (
BUGOUT_BROOD_URL,
BUGOUT_SPIRE_URL,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions, moonworm_tasks, queries
@ -248,7 +253,7 @@ def moonworm_tasks_add_subscription_handler(args: argparse.Namespace) -> None:
def main() -> None:
cli_description = f"""Moonstream Admin CLI
Please m35ake sure that the following environment variables are set in your environment and exported to
Please make sure that the following environment variables are set in your environment and exported to
subprocesses:
1. MOONSTREAM_APPLICATION_ID
2. MOONSTREAM_ADMIN_ACCESS_TOKEN

Wyświetl plik

@ -29,10 +29,13 @@ def fill_missing_selectors_in_moonworm_tasks() -> None:
moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="type:subscription !#version:2.0",
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
search_query="#task_type:moonworm !#version:2.0",
limit=batch_size,
content=True,
)
logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 1.0")
entries_tags = []
## batch tasks
@ -41,14 +44,24 @@ def fill_missing_selectors_in_moonworm_tasks() -> None:
moonworm_tasks[i : i + batch_size]
for i in range(0, len(moonworm_tasks), batch_size)
]:
count = 0
for task in task_batch:
tags = ["#version:2.0"]
tags = ["version:2.0"]
## get abi
try:
abi = json.loads(task.content)
except Exception as e:
logger.warn(f"Unable to parse abi from task: {task.id}")
logger.warn(
f"Unable to parse abi from task: {task.entry_url.split()[-1]}: {e}"
)
raise e
continue
if "name" not in abi:
logger.warn(
f"Unable to find abi name in task: {task.entry_url.split()[-1]}"
)
continue
if not any([tag.startswith("abi_selector:") for tag in task.tags]):
@ -63,6 +76,8 @@ def fill_missing_selectors_in_moonworm_tasks() -> None:
tags.append(f"abi_selector:{abi_selector}")
count += 1
entries_tags.append(
{
"entry_id": task.entry_url.split("/")[-1], ## 😭
@ -70,6 +85,8 @@ def fill_missing_selectors_in_moonworm_tasks() -> None:
}
)
logger.info(f"Found {count} missing selectors in batch {len(task_batch)} tasks")
## update entries
try:
@ -82,3 +99,76 @@ def fill_missing_selectors_in_moonworm_tasks() -> None:
except BugoutResponseException as e:
logger.error(f"Unable to update entries tags: {e}")
continue
def deduplicate_moonworm_task_by_selector():
"""
Find moonworm tasks with same selector and remove old versions
"""
moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="#task_type:moonworm #version:2.0",
limit=100,
content=False,
)
logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 2.0")
## loop over tasks
selectors = {}
for task in moonworm_tasks:
tags = task.tags
## get selector
selector = [tag for tag in tags if tag.startswith("abi_selector:")]
if len(selector) == 0:
logger.warn(
f"Unable to find selector in task: {task.entry_url.split()[-1]}"
)
continue
selector = selector[0]
if selector not in selectors:
selectors[selector] = {"entries": {}}
selectors[selector]["entries"][task.entry_url.split("/")[-1]] = task.created_at
logger.info(f"Found {len(selectors)} selectors")
for selector, tasks_dict in selectors.items():
if len(tasks_dict["entries"]) == 1:
continue
## find latest task
latest_task_id = max(
tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key]
)
## remove all tasks except latest
logger.info(
f"Found {len(tasks_dict['entries'])} tasks with selector {selector}"
)
for task_id in tasks_dict["entries"]:
if task_id == latest_task_id:
continue
try:
bc.delete_entry(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=task_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
except BugoutResponseException as e:
logger.error(f"Unable to delete entry: {e}")
continue
logger.info(f"Deleted entry: {task_id}")