kopia lustrzana https://github.com/bugout-dev/moonstream
Add changes.
rodzic
854a473043
commit
9fa949ec0f
|
@ -39,6 +39,15 @@ def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_moonworm_tasks_report(args: argparse.Namespace) -> str:
|
||||||
|
repot = moonworm_tasks.get_moonworm_tasks_state(
|
||||||
|
blockchain=args.blockchain,
|
||||||
|
)
|
||||||
|
|
||||||
|
with open("moonworm_tasks_report.json", "w") as f:
|
||||||
|
json.dump(repot, f, indent=4)
|
||||||
|
|
||||||
|
|
||||||
def migration_run(step_map, command, step, step_order):
|
def migration_run(step_map, command, step, step_order):
|
||||||
if step is None:
|
if step is None:
|
||||||
# run all steps
|
# run all steps
|
||||||
|
@ -478,6 +487,58 @@ This CLI is configured to work with the following API URLs:
|
||||||
|
|
||||||
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
|
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_report = subcommands_moonworm_tasks.add_parser(
|
||||||
|
"report", description="Return report of moonworm tasks."
|
||||||
|
)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_report.add_argument(
|
||||||
|
"-b",
|
||||||
|
"--blockchain",
|
||||||
|
type=str,
|
||||||
|
help="Blockchain for report.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_report.set_defaults(func=get_moonworm_tasks_report)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_manage = subcommands_moonworm_tasks.add_parser(
|
||||||
|
"manage", description="Manage moonworm tasks."
|
||||||
|
)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_manage.add_argument(
|
||||||
|
"-a",
|
||||||
|
"--action",
|
||||||
|
type=str,
|
||||||
|
choices=["view", "delete", "restart", "mark_as_finished"],
|
||||||
|
default="view",
|
||||||
|
help="Action to perform on the tasks (view, delete, restart, mark_as_finished).",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add argument for specifying the blockchain
|
||||||
|
parser_moonworm_tasks_manage.add_argument(
|
||||||
|
"-b", "--blockchain", type=str, required=True, help="Blockchain for the tasks."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add argument for specifying the addresses
|
||||||
|
parser_moonworm_tasks_manage.add_argument(
|
||||||
|
"-ad",
|
||||||
|
"--addresses",
|
||||||
|
nargs="+",
|
||||||
|
required=True,
|
||||||
|
help="List of addresses associated with the tasks.",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add argument for specifying the task type
|
||||||
|
parser_moonworm_tasks_manage.add_argument(
|
||||||
|
"-t",
|
||||||
|
"--task-type",
|
||||||
|
type=str,
|
||||||
|
help="Type of the tasks (event or function).",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser_moonworm_tasks_manage.set_defaults(
|
||||||
|
func=moonworm_tasks.moonworm_tasks_manage_handler
|
||||||
|
)
|
||||||
|
|
||||||
queries_parser = subcommands.add_parser(
|
queries_parser = subcommands.add_parser(
|
||||||
"queries", description="Manage Moonstream queries"
|
"queries", description="Manage Moonstream queries"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
from enum import Enum
|
||||||
|
from typing import List, Optional, Literal
|
||||||
|
|
||||||
import boto3 # type: ignore
|
import boto3 # type: ignore
|
||||||
from bugout.data import BugoutResource, BugoutResources
|
from bugout.data import BugoutResource, BugoutResources
|
||||||
|
@ -9,10 +11,19 @@ from bugout.exceptions import BugoutResponseException
|
||||||
from ..actions import get_all_entries_from_search, apply_moonworm_tasks
|
from ..actions import get_all_entries_from_search, apply_moonworm_tasks
|
||||||
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL
|
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL
|
||||||
from ..settings import bugout_client as bc
|
from ..settings import bugout_client as bc
|
||||||
|
from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ActionType(Enum):
|
||||||
|
DELETE = "delete"
|
||||||
|
RESTART = "restart"
|
||||||
|
MARK_AS_FINISHED = "mark_as_finished"
|
||||||
|
VIEW = "view"
|
||||||
|
|
||||||
|
|
||||||
def get_list_of_addresses():
|
def get_list_of_addresses():
|
||||||
"""
|
"""
|
||||||
Return list of addresses of tasks
|
Return list of addresses of tasks
|
||||||
|
@ -82,3 +93,307 @@ def add_subscription(id: str):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logging.info("For apply to moonworm tasks subscriptions must have an abi.")
|
logging.info("For apply to moonworm tasks subscriptions must have an abi.")
|
||||||
|
|
||||||
|
|
||||||
|
def restart_tasks(bc, token, journal_id, tasks):
|
||||||
|
delete_entries_tags = []
|
||||||
|
add_entries_tags = []
|
||||||
|
for task in tasks:
|
||||||
|
entry_id = task["entry_id"]
|
||||||
|
# Find tags to delete
|
||||||
|
tags_to_delete = [
|
||||||
|
tag
|
||||||
|
for tag in task["tags"]
|
||||||
|
if tag.startswith("historical_crawl_status:") or tag.startswith("progress:")
|
||||||
|
]
|
||||||
|
|
||||||
|
# Delete tags
|
||||||
|
if tags_to_delete:
|
||||||
|
delete_entries_tags.append({"entry_id": entry_id, "tags": tags_to_delete})
|
||||||
|
|
||||||
|
# Add new tags
|
||||||
|
tags_to_add = ["progress:0", "historical_crawl_status:pending"]
|
||||||
|
add_entries_tags.append({"entry_id": entry_id, "tags": tags_to_add})
|
||||||
|
|
||||||
|
print(delete_entries_tags)
|
||||||
|
|
||||||
|
# Delete tags
|
||||||
|
if delete_entries_tags:
|
||||||
|
bc.delete_entries_tags(
|
||||||
|
token=token, journal_id=journal_id, entries_tags=delete_entries_tags
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add tags
|
||||||
|
if add_entries_tags:
|
||||||
|
bc.create_entries_tags(
|
||||||
|
token=token, journal_id=journal_id, entries_tags=add_entries_tags
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def mark_tasks_as_finished(bc, token, journal_id, tasks):
|
||||||
|
add_entries_tags = []
|
||||||
|
for task in tasks:
|
||||||
|
entry_id = task["entry_id"]
|
||||||
|
# Add new tags
|
||||||
|
tags_to_add = ["progress:100", "historical_crawl_status:finished"]
|
||||||
|
add_entries_tags.append({"entry_id": entry_id, "tags": tags_to_add})
|
||||||
|
|
||||||
|
# Add tags
|
||||||
|
if add_entries_tags:
|
||||||
|
bc.create_entries_tags(
|
||||||
|
token=token, journal_id=journal_id, entries_tags=add_entries_tags
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_tasks(bc, token, journal_id, tasks):
|
||||||
|
for task in tasks:
|
||||||
|
entry_id = task["entry_id"]
|
||||||
|
# Delete the task
|
||||||
|
bc.delete_entry(token, journal_id, entry_id)
|
||||||
|
|
||||||
|
|
||||||
|
def moonworm_tasks_manage_handler(args):
|
||||||
|
print(
|
||||||
|
f"Managing moonworm tasks with action: {args.action}, blockchain: {args.blockchain}, addresses: {args.addresses}, task type: {args.task_type}"
|
||||||
|
)
|
||||||
|
return manage_moonworm_tasks(
|
||||||
|
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
|
||||||
|
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
|
blockchain=args.blockchain,
|
||||||
|
addresses=args.addresses,
|
||||||
|
task_type=args.task_type,
|
||||||
|
action=ActionType(args.action),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_moonworm_tasks_by_filters(
|
||||||
|
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
|
||||||
|
token: str = MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
|
blockchain: str = "ethereum",
|
||||||
|
addresses: List[str] = [],
|
||||||
|
task_type: Optional[Literal["event", "function"]] = None,
|
||||||
|
task_names: Optional[List[str]] = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Return list of tags depends on query and tag
|
||||||
|
"""
|
||||||
|
|
||||||
|
blockchain_to_subscription_type = {
|
||||||
|
value.blockchain: key
|
||||||
|
for key, value in CANONICAL_SUBSCRIPTION_TYPES.items()
|
||||||
|
if "smartcontract" in key
|
||||||
|
}
|
||||||
|
|
||||||
|
if blockchain not in blockchain_to_subscription_type:
|
||||||
|
logger.error(f"Unknown blockchain {blockchain}")
|
||||||
|
return
|
||||||
|
|
||||||
|
search_query = f"#subscription_type:{blockchain_to_subscription_type[blockchain]} #moonworm_task_pickedup:True"
|
||||||
|
|
||||||
|
if addresses:
|
||||||
|
search_query += " " + " ".join([f"#address:{address}" for address in addresses])
|
||||||
|
|
||||||
|
if task_type:
|
||||||
|
search_query += f" #type:{task_type}"
|
||||||
|
|
||||||
|
if task_names:
|
||||||
|
search_query += " " + " ".join([f"abi_name:{name}" for name in task_names])
|
||||||
|
|
||||||
|
print(search_query)
|
||||||
|
|
||||||
|
entries = get_all_entries_from_search(
|
||||||
|
journal_id=journal_id,
|
||||||
|
search_query=search_query,
|
||||||
|
limit=100,
|
||||||
|
token=token,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Found {len(entries)} moonworm tasks")
|
||||||
|
|
||||||
|
tasks = []
|
||||||
|
|
||||||
|
for entry in entries:
|
||||||
|
historical_crawl_status = [
|
||||||
|
tag.split(":")[-1]
|
||||||
|
for tag in entry.tags
|
||||||
|
if tag.startswith("historical_crawl_status:")
|
||||||
|
]
|
||||||
|
|
||||||
|
address = [tag for tag in entry.tags if tag.startswith("address:")]
|
||||||
|
|
||||||
|
progress = [tag for tag in entry.tags if tag.startswith("progress:")]
|
||||||
|
|
||||||
|
abi_name = [tag for tag in entry.tags if tag.startswith("abi_name:")]
|
||||||
|
|
||||||
|
moonworm_task_pickedup = [
|
||||||
|
tag.split(":")[-1]
|
||||||
|
for tag in entry.tags
|
||||||
|
if tag.startswith("moonworm_task_pickedup:")
|
||||||
|
]
|
||||||
|
|
||||||
|
historical_crawl_status = " ".join(historical_crawl_status)
|
||||||
|
address = address[0].split(":")[1] if len(address) > 0 else "address not found"
|
||||||
|
progress = (
|
||||||
|
progress[0].split(":")[1] if len(progress) > 0 else "progress not found"
|
||||||
|
)
|
||||||
|
abi_name = (
|
||||||
|
abi_name[0].split(":")[1] if len(abi_name) > 0 else "abi_name not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
tasks.append(
|
||||||
|
{
|
||||||
|
"entry_id": entry.entry_url.split("/")[-1],
|
||||||
|
"tags": entry.tags,
|
||||||
|
"historical_crawl_status": historical_crawl_status,
|
||||||
|
"address": address,
|
||||||
|
"progress": progress,
|
||||||
|
"abi_name": abi_name,
|
||||||
|
"moonworm_task_pickedup": " ".join(moonworm_task_pickedup),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return tasks
|
||||||
|
|
||||||
|
|
||||||
|
def get_moonworm_tasks_state(
|
||||||
|
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
|
||||||
|
token: str = MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
|
blockchain: str = "ethereum",
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Return list of tags depends on query and tag
|
||||||
|
"""
|
||||||
|
|
||||||
|
blockchain_to_subscription_type = {
|
||||||
|
value["blockchain"]: key
|
||||||
|
for key, value in CANONICAL_SUBSCRIPTION_TYPES.items()
|
||||||
|
if "smartcontract" in value
|
||||||
|
}
|
||||||
|
|
||||||
|
if blockchain not in blockchain_to_subscription_type:
|
||||||
|
logger.error(f"Unknown blockchain {blockchain}")
|
||||||
|
return
|
||||||
|
|
||||||
|
entries = get_all_entries_from_search(
|
||||||
|
journal_id=journal_id,
|
||||||
|
search_query=f"#subscription_type:{blockchain_to_subscription_type[blockchain]} #moonworm_task_pickedup:True",
|
||||||
|
limit=100,
|
||||||
|
token=token,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Found {len(entries)} moonworm tasks")
|
||||||
|
|
||||||
|
### loop over tasks split by historical_crawl_status:in_progress and historical_crawl_status:finished and historical_crawl_status:pending
|
||||||
|
|
||||||
|
tasks = {
|
||||||
|
"in_progress": {},
|
||||||
|
"finished": {},
|
||||||
|
"pending": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry in entries:
|
||||||
|
historical_crawl_status = [
|
||||||
|
tag for tag in entry.tags if tag.startswith("historical_crawl_status:")
|
||||||
|
]
|
||||||
|
|
||||||
|
address = [tag for tag in entry.tags if tag.startswith("address:")]
|
||||||
|
|
||||||
|
progress = [tag for tag in entry.tags if tag.startswith("progress:")]
|
||||||
|
|
||||||
|
abi_name = [tag for tag in entry.tags if tag.startswith("abi_name:")]
|
||||||
|
|
||||||
|
if len(historical_crawl_status) == 0:
|
||||||
|
logger.warn(
|
||||||
|
f"Unable to find historical_crawl_status in task: {entry.entry_url.split()[-1]}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
historical_crawl_status = historical_crawl_status[0].split(":")[1]
|
||||||
|
address = address[0].split(":")[1]
|
||||||
|
progress = progress[0].split(":")[1]
|
||||||
|
abi_name = abi_name[0].split(":")[1]
|
||||||
|
|
||||||
|
if historical_crawl_status not in tasks:
|
||||||
|
tasks[historical_crawl_status] = {}
|
||||||
|
if address not in tasks[historical_crawl_status]:
|
||||||
|
tasks[historical_crawl_status][address] = {}
|
||||||
|
|
||||||
|
if abi_name not in tasks[historical_crawl_status][address]:
|
||||||
|
tasks[historical_crawl_status][address][abi_name] = progress
|
||||||
|
|
||||||
|
return tasks
|
||||||
|
|
||||||
|
|
||||||
|
def confirm_action(action, tasks):
|
||||||
|
"""Ask the user for confirmation before proceeding with the action."""
|
||||||
|
print(f"You are about to {action} the following tasks:")
|
||||||
|
tasks = sorted(tasks, key=lambda task: task["address"])
|
||||||
|
for task in tasks:
|
||||||
|
print(f" {task['address']} {task['abi_name']} {task['progress']}%")
|
||||||
|
|
||||||
|
confirmation = input(
|
||||||
|
f"Do you want to proceed with {action} these tasks? (yes/no): "
|
||||||
|
)
|
||||||
|
return confirmation.lower() == "yes"
|
||||||
|
|
||||||
|
|
||||||
|
def view_tasks(tasks):
|
||||||
|
"""Display details of the tasks."""
|
||||||
|
if not tasks:
|
||||||
|
print("No tasks to display.")
|
||||||
|
return
|
||||||
|
|
||||||
|
tasks = sorted(tasks, key=lambda task: task["address"])
|
||||||
|
for task in tasks:
|
||||||
|
print(
|
||||||
|
f" {task['address']} {task['abi_name']} {task['progress']}% HC statuses:{task['historical_crawl_status']} moonworm:{task['moonworm_task_pickedup']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def manage_moonworm_tasks(
|
||||||
|
journal_id: str,
|
||||||
|
token: str,
|
||||||
|
blockchain: str,
|
||||||
|
addresses: List[str],
|
||||||
|
task_type: Optional[str],
|
||||||
|
action: ActionType,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Manage moonworm tasks based on the provided parameters.
|
||||||
|
|
||||||
|
:param journal_id: ID of the journal containing the tasks.
|
||||||
|
:param token: Access token for authentication.
|
||||||
|
:param blockchain: Type of blockchain (e.g., 'ethereum').
|
||||||
|
:param addresses: List of contract addresses to filter tasks.
|
||||||
|
:param task_type: Type of the task ('event' or 'function').
|
||||||
|
:param action: Action to perform ('delete', 'restart', 'mark_as_finished').
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Get all tasks matching the provided filters
|
||||||
|
filtered_tasks = get_moonworm_tasks_by_filters(
|
||||||
|
journal_id=journal_id,
|
||||||
|
token=token,
|
||||||
|
blockchain=blockchain,
|
||||||
|
addresses=addresses,
|
||||||
|
task_type=task_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Confirm action with the user
|
||||||
|
if action == ActionType.VIEW:
|
||||||
|
view_tasks(filtered_tasks)
|
||||||
|
else:
|
||||||
|
if confirm_action(action.value, filtered_tasks):
|
||||||
|
if action == ActionType.DELETE:
|
||||||
|
delete_tasks(bc, token, journal_id, filtered_tasks)
|
||||||
|
elif action == ActionType.RESTART:
|
||||||
|
restart_tasks(bc, token, journal_id, filtered_tasks)
|
||||||
|
elif action == ActionType.MARK_AS_FINISHED:
|
||||||
|
mark_tasks_as_finished(bc, token, journal_id, filtered_tasks)
|
||||||
|
print(f"Action '{action.value}' completed on filtered tasks.")
|
||||||
|
else:
|
||||||
|
print("Action cancelled by the user.")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "success",
|
||||||
|
"message": f"Action '{action.value}' was approved and executed.",
|
||||||
|
}
|
||||||
|
|
Ładowanie…
Reference in New Issue