Merge pull request #1082 from moonstream-to/moonworm-jobs-v3-sync-cli

Moonworm jobs v3 sync cli
pull/1086/head
Sergei Sumarokov 2024-06-11 19:47:05 +03:00 zatwierdzone przez GitHub
commit c7afa53b05
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
5 zmienionych plików z 403 dodań i 3 usunięć

Wyświetl plik

@ -6,6 +6,7 @@ import argparse
import json
import logging
import os
import uuid
from posix import listdir
from typing import Any, Callable, Dict, List, Optional, Union
@ -35,6 +36,13 @@ logger = logging.getLogger(__name__)
MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations"
def uuid_type(value):
try:
return uuid.UUID(value)
except ValueError:
raise argparse.ArgumentTypeError(f"{value} is not a valid UUID.")
def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]:
if raw_arg is None:
return None
@ -285,6 +293,28 @@ def generate_usage_handler(args: argparse.Namespace) -> None:
output_file.write(json.dumps(usage_info, indent=4))
def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None:
"""
Read users subsriptions and rewrite them to v3 jobs table
"""
### Request user resources from brood
moonworm_tasks.migrate_v3_tasks(
user_id=args.user_id, customer_id=args.customer_id, blockchain=args.blockchain
)
def create_v3_task_handler(args: argparse.Namespace) -> None:
moonworm_tasks.create_v3_task(
user_id=args.user_id,
customer_id=args.customer_id,
blockchain=args.blockchain,
address=args.address,
abi=json.loads(args.abi.read()),
)
def main() -> None:
cli_description = f"""Moonstream Admin CLI
@ -539,6 +569,76 @@ This CLI is configured to work with the following API URLs:
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
parser_moonworm_tasks_migrate = subcommands_moonworm_tasks.add_parser(
"migrate-v2-tasks",
description="Migrate moonworm tasks to abi_jobs of moonstream index",
)
parser_moonworm_tasks_migrate.add_argument(
"--user-id",
required=True,
type=uuid_type,
help="user-id of which we want see subscription.",
)
parser_moonworm_tasks_migrate.add_argument(
"--customer-id",
required=True,
type=uuid_type,
help="customer-id of which we want see subscription.",
)
parser_moonworm_tasks_migrate.add_argument(
"--blockchain",
required=False,
type=str,
help="Blockchain of which we want see subscription.",
)
parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate)
parser_moonworm_tasks_v3_create = subcommands_moonworm_tasks.add_parser(
"create_v3_tasks",
description="Create v3 tasks from v2 tasks",
)
parser_moonworm_tasks_v3_create.add_argument(
"--user-id",
required=True,
type=uuid_type,
help="user-id of which we want see subscription.",
)
parser_moonworm_tasks_v3_create.add_argument(
"--customer-id",
required=True,
type=uuid_type,
help="customer-id of which we want see subscription.",
)
parser_moonworm_tasks_v3_create.add_argument(
"--blockchain",
required=True,
type=str,
help="Blockchain of which we want see subscription.",
)
parser_moonworm_tasks_v3_create.add_argument(
"--address",
required=True,
type=str,
help="Address of which we want see subscription.",
)
parser_moonworm_tasks_v3_create.add_argument(
"--abi",
required=True,
type=argparse.FileType("r"),
help="ABI of which we want see subscription.",
)
parser_moonworm_tasks_v3_create.set_defaults(func=create_v3_task_handler)
queries_parser = subcommands.add_parser(
"queries", description="Manage Moonstream queries"
)
@ -610,6 +710,44 @@ This CLI is configured to work with the following API URLs:
generate_usage_parser.set_defaults(func=generate_usage_handler)
### databases commands
databases_parser = subcommands.add_parser(
"databases", description="Manage Moonstream databases"
)
databases_parser.set_defaults(func=lambda _: databases_parser.print_help())
databases_subcommands = databases_parser.add_subparsers(
description="Database commands"
)
database_labels_migration_parser = databases_subcommands.add_parser(
"v2-to-v3-labels-migration",
description="Migrate labels in database",
)
database_labels_migration_parser.add_argument(
"--user-id",
type=uuid_type,
help="User ID for which to migrate labels",
)
database_labels_migration_parser.add_argument(
"--customer-id",
type=uuid_type,
help="Customer ID for which to migrate labels",
)
database_labels_migration_parser.add_argument(
"--blockchain",
type=str,
help="Blockchain for which to migrate labels",
)
database_labels_migration_parser.set_defaults(
func=lambda args: print("Not implemented yet")
)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -1,13 +1,25 @@
import json
import logging
import os
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException
from moonstreamdbv3.db import MoonstreamDBIndexesEngine
from moonstreamdbv3.models_indexes import AbiJobs
from sqlalchemy.dialects.postgresql import insert
from web3 import Web3
from ..actions import apply_moonworm_tasks, get_all_entries_from_search
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from ..settings import bugout_client as bc
from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES
logger = logging.getLogger(__name__)
@ -81,3 +93,251 @@ def add_subscription(id: str):
)
else:
logging.info("For apply to moonworm tasks subscriptions must have an abi.")
def create_v3_task(
customer_id: str,
user_id: str,
abi: Dict[str, Any],
address: str,
blockchain: str,
):
"""
Create moonworm task for v3
"""
abi_tasks = []
db_engine = MoonstreamDBIndexesEngine()
with db_engine.yield_db_session_ctx() as db_session_v3:
for abi_task in abi:
if abi_task["type"] != "event" and abi_task["type"] != "function":
continue
abi_selector = Web3.keccak(
text=abi_task["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi_task["inputs"]))
+ ")"
)
if abi_task["type"] == "function":
abi_selector = abi_selector[:4]
abi_selector = abi_selector.hex()
try:
abi_tasks.append(
{
"address": bytes.fromhex(address[2:]),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": blockchain,
"abi_name": abi_task["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi_task),
}
)
except Exception as e:
logger.error(
f"Error creating subscription for subscription for abi {abi_task['name']}: {str(e)}"
)
db_session_v3.rollback()
raise e
insert_statement = insert(AbiJobs).values(abi_tasks)
result_stmt = insert_statement.on_conflict_do_nothing(
index_elements=[
AbiJobs.chain,
AbiJobs.address,
AbiJobs.abi_selector,
AbiJobs.customer_id,
]
)
try:
db_session_v3.execute(result_stmt)
db_session_v3.commit()
except Exception as e:
logger.error(f"Error inserting subscriptions: {str(e)}")
db_session_v3.rollback()
return None
def migrate_v3_tasks(
user_id: UUID, customer_id: UUID, blockchain: Optional[str] = None
):
"""
Migrate moonworm tasks
"""
### get user subscription entity journal id
subscription_resources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
params={
"user_id": user_id,
"type": "entity_subscription",
},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
chain_to_subscription_type = {
CANONICAL_SUBSCRIPTION_TYPES[key].blockchain: key
for key in CANONICAL_SUBSCRIPTION_TYPES.keys()
if key.endswith("smartcontract")
}
logger.info(
"Found users collection resources: %s", len(subscription_resources.resources)
)
db_engine = MoonstreamDBIndexesEngine()
if len(subscription_resources.resources) == 0:
raise Exception("User has no subscriptions")
collection_id = subscription_resources.resources[0].resource_data["collection_id"]
query = f"tag:type:subscription"
if blockchain is not None:
query += f" tag:subscription_type_id:{chain_to_subscription_type[blockchain]}"
subscriptions: List[BugoutSearchResult] = get_all_entries_from_search(
journal_id=collection_id,
search_query=query,
token=os.environ.get("SPECIFIC_ACCESS_TOKEN"),
limit=100,
content=True,
)
logger.info("Found users subscriptions: %s", len(subscriptions))
if len(subscriptions) == 0:
raise Exception("User has no subscriptions")
with db_engine.yield_db_session_ctx() as session:
user_subscriptions = []
for index, subscription in enumerate(subscriptions):
abis = None
address = None
subscription_type_id = None
if subscription.content is None:
continue
subscription_data = json.loads(subscription.content)
if "abi" in subscription_data:
abis_container = subscription_data["abi"]
for tag in subscription.tags:
if tag.startswith("subscription_type_id:"):
subscription_type_id = tag.split(":")[1]
if tag.startswith("address:"):
address = tag.split(":")[1]
if subscription_type_id is None:
continue
if abis_container is not None:
try:
abis = json.loads(abis_container)
except Exception as e:
logger.error(
f"Error loading abi for subscription {subscription.id}: {str(e)}"
)
continue
### reformat abi to separate abi tasks
chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain
for abi_task in abis:
if abi_task["type"] not in ("event", "function"):
continue
abi_selector = Web3.keccak(
text=abi_task["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi_task["inputs"]))
+ ")"
)
if abi_task["type"] == "function":
abi_selector = abi_selector[:4]
abi_selector = abi_selector.hex()
try:
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi_task["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi_task),
}
try:
AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
continue
user_subscriptions.append(abi_job)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
session.rollback()
continue
insert_statement = insert(AbiJobs).values(user_subscriptions)
result_stmt = insert_statement.on_conflict_do_nothing(
index_elements=[
AbiJobs.chain,
AbiJobs.address,
AbiJobs.abi_selector,
AbiJobs.customer_id,
]
)
try:
session.execute(result_stmt)
session.commit()
except Exception as e:
logger.error(f"Error inserting subscriptions: {str(e)}")
session.rollback()
logger.info(f"Processed {index} subscriptions")
return None

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.4.2"
MOONSTREAMAPI_VERSION = "0.4.3"

Wyświetl plik

@ -38,6 +38,7 @@ Mako==1.2.3
MarkupSafe==2.1.1
moonstream==0.1.1
moonstreamdb==0.4.4
moonstreamdb-v3==0.0.9
multiaddr==0.0.9
multidict==6.0.2
netaddr==0.8.0

Wyświetl plik

@ -17,6 +17,7 @@ setup(
"fastapi",
"moonstream",
"moonstreamdb>=0.4.4",
"moonstreamdb-v3>=0.0.9",
"humbug",
"pydantic==1.10.2",
"pyevmasm",