Merge pull request #919 from moonstream-to/migrate-moonworm-tasks

Migrate moonworm tasks
pull/1011/head
Andrey Dolgolev 2024-02-01 12:33:47 +02:00 zatwierdzone przez GitHub
commit 38947e1379
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
3 zmienionych plików z 243 dodań i 25 usunięć

Wyświetl plik

@ -473,7 +473,7 @@ def upload_abi_to_s3(
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
@ -486,7 +486,7 @@ def get_all_entries_from_search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
@ -499,7 +499,7 @@ def get_all_entries_from_search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
@ -529,47 +529,45 @@ def apply_moonworm_tasks(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
# create historical crawl task in journal
# will use create_entries_pack for creating entries in journal
existing_tags = [entry.tags for entry in entries]
existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_method_hash" in tag
existing_selectors = [
tag.split(":")[-1] for tag in chain(*existing_tags) if "abi_selector" in tag
]
abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
abi_selectors_dict = {
Web3.keccak(
text=method["name"]
+ "("
+ ",".join(map(lambda x: x["type"], method["inputs"]))
+ ")"
)[:4].hex(): method
for method in abi
if (method["type"] in ("event", "function"))
and (method.get("stateMutability", "") != "view")
}
for hash in abi_hashes_dict:
if hash not in existing_hashes:
abi_selector = Web3.keccak(
text=abi_hashes_dict[hash]["name"]
+ "("
+ ",".join(
map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"])
)
+ ")"
)[:4].hex()
for abi_selector in abi_selectors_dict:
if abi_selector not in existing_selectors:
hash = hashlib.md5(
json.dumps(abi_selectors_dict[abi_selector]).encode("utf-8")
).hexdigest()
moonworm_abi_tasks_entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"content": json.dumps(
abi_selectors_dict[abi_selector], indent=4
),
"tags": [
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"type:{abi_selectors_dict[abi_selector]['type']}",
f"abi_method_hash:{hash}",
f"abi_selector:{abi_selector}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"abi_name:{abi_selectors_dict[abi_selector]['name']}",
f"status:active",
f"task_type:moonworm",
f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)

Wyświetl plik

@ -12,7 +12,12 @@ from sqlalchemy.orm import with_expression
from moonstreamdb.db import SessionLocal
from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..settings import (
BUGOUT_BROOD_URL,
BUGOUT_SPIRE_URL,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions, moonworm_tasks, queries
@ -20,6 +25,7 @@ from .migrations import (
checksum_address,
update_dashboard_subscription_key,
generate_entity_subscriptions,
add_selectors,
)
@ -87,6 +93,9 @@ steps:
- id: 20230501
name: fix_duplicates_keys_in_entity_subscription
description: Fix entity duplicates keys for all subscriptions introduced in 20230213
- id: 20230904
name fill_missing_selectors_in_moonworm_tasks
description: Get all moonworm jobs from moonworm journal and add selector tag if it not represent
"""
logger.info(entity_migration_overview)
@ -117,6 +126,30 @@ def migrations_run(args: argparse.Namespace) -> None:
web3_session = yield_web3_provider()
db_session = SessionLocal()
try:
if args.id == 20230904:
step_order = [
"fill_missing_selectors_in_moonworm_tasks",
"deduplicate_moonworm_tasks",
]
step_map: Dict[str, Dict[str, Any]] = {
"upgrade": {
"fill_missing_selectors_in_moonworm_tasks": {
"action": add_selectors.fill_missing_selectors_in_moonworm_tasks,
"description": "Get all moonworm jobs from moonworm journal and add selector tag if it not represent",
},
"deduplicate_moonworm_tasks": {
"action": add_selectors.deduplicate_moonworm_task_by_selector,
"description": "Deduplicate moonworm tasks by selector",
},
},
"downgrade": {},
}
if args.command not in ["upgrade", "downgrade"]:
logger.info("Wrong command. Please use upgrade or downgrade")
step = args.step
migration_run(step_map, args.command, step, step_order)
if args.id == 20230501:
# fix entity duplicates keys for all subscriptions introduced in 20230213

Wyświetl plik

@ -0,0 +1,187 @@
"""
Add selectors to all moonworm tasks.
"""
import logging
import json
from bugout.exceptions import BugoutResponseException
from web3 import Web3
from ...settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ...settings import bugout_client as bc
from ...actions import get_all_entries_from_search
logger = logging.getLogger(__name__)
def fill_missing_selectors_in_moonworm_tasks() -> None:
"""
Add selectors to all moonworm tasks.
"""
batch_size = 100
moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="#task_type:moonworm !#version:2.0",
limit=batch_size,
content=True,
)
logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 1.0")
entries_tags = []
## batch tasks
for task_batch in [
moonworm_tasks[i : i + batch_size]
for i in range(0, len(moonworm_tasks), batch_size)
]:
count = 0
for task in task_batch:
tags = ["version:2.0"]
## get abi
try:
abi = json.loads(task.content)
except Exception as e:
logger.warn(
f"Unable to parse abi from task: {task.entry_url.split()[-1]}: {e}"
)
continue
if "name" not in abi:
logger.warn(
f"Unable to find abi name in task: {task.entry_url.split()[-1]}"
)
continue
if not any([tag.startswith("abi_selector:") for tag in task.tags]):
## generate selector
abi_selector = Web3.keccak(
text=abi["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi["inputs"]))
+ ")"
)[:4].hex()
tags.append(f"abi_selector:{abi_selector}")
count += 1
entries_tags.append(
{
"entry_id": task.entry_url.split("/")[-1], ## 😭
"tags": tags,
}
)
logger.info(f"Found {count} missing selectors in batch {len(task_batch)} tasks")
## update entries
try:
bc.create_entries_tags(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
entries_tags=entries_tags,
timeout=15,
)
except BugoutResponseException as e:
logger.error(f"Unable to update entries tags: {e}")
continue
def deduplicate_moonworm_task_by_selector():
"""
Find moonworm tasks with same selector and remove old versions
"""
moonworm_tasks = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
search_query="#task_type:moonworm #version:2.0",
limit=100,
content=False,
)
logger.info(f"Found {len(moonworm_tasks)} moonworm tasks versions 2.0")
## loop over tasks
selectors = {}
for task in moonworm_tasks:
tags = task.tags
## get selector
selector = [tag for tag in tags if tag.startswith("abi_selector:")]
address = [tag for tag in tags if tag.startswith("address:")]
if len(selector) == 0:
logger.warn(
f"Unable to find selector in task: {task.entry_url.split()[-1]}"
)
continue
selector = selector[0].split(":")[1]
if len(address) == 0:
logger.warn(f"Unable to find address in task: {task.entry_url.split()[-1]}")
continue
address = address[0].split(":")[1]
if address not in selectors:
selectors[address] = {}
if selector not in selectors[address]:
selectors[address][selector] = {"entries": {}}
selectors[address][selector]["entries"][
task.entry_url.split("/")[-1]
] = task.created_at
logger.info(f"Found {len(selectors)} addresses")
for address, selectors_dict in selectors.items():
for selector, tasks_dict in selectors_dict.items():
if len(tasks_dict["entries"]) == 1:
continue
## find earliest task
earliest_task_id = min(
tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key]
)
## remove all tasks except latest
logger.info(
f"Found {len(tasks_dict['entries'])} tasks with selector {selector} erliest task {earliest_task_id} with created_at: {tasks_dict['entries'][earliest_task_id]}"
)
for task_id in tasks_dict["entries"]:
if task_id == earliest_task_id:
continue
try:
bc.delete_entry(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=task_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
except BugoutResponseException as e:
logger.error(f"Unable to delete entry with id {task_id} : {e}")
continue
logger.info(f"Deleted entry: {task_id}")