From a28270ef07bfaab70af419b3fa3d6a16f92a54bb Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 30 May 2024 17:29:23 +0300 Subject: [PATCH 01/10] Added user tasks migration. --- moonstreamapi/moonstreamapi/admin/cli.py | 38 ++++++ .../moonstreamapi/admin/moonworm_tasks.py | 125 +++++++++++++++++- moonstreamapi/setup.py | 1 + 3 files changed, 162 insertions(+), 2 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 0089108f..1c82c600 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -8,6 +8,7 @@ import logging import os from posix import listdir from typing import Any, Callable, Dict, List, Optional, Union +import uuid from moonstreamdb.db import SessionLocal from sqlalchemy.orm import with_expression @@ -35,6 +36,13 @@ logger = logging.getLogger(__name__) MIGRATIONS_FOLDER = "./moonstreamapi/admin/migrations" +def uuid_type(value): + try: + return uuid.UUID(value) + except ValueError: + raise argparse.ArgumentTypeError(f"{value} is not a valid UUID.") + + def parse_boolean_arg(raw_arg: Optional[str]) -> Optional[bool]: if raw_arg is None: return None @@ -285,6 +293,15 @@ def generate_usage_handler(args: argparse.Namespace) -> None: output_file.write(json.dumps(usage_info, indent=4)) +def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None: + """ + Read users subsriptions and rewrite them to v3 jobs table + """ + ### Request user resources from brood + + moonworm_tasks.migrate_v3_tasks(user_id=args.user_id, customer_id=args.customer_id) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -539,6 +556,27 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) + parser_moonworm_tasks_migrate = subcommands_moonworm_tasks.add_parser( + "migrate-v3", + description="Migrate moonworm tasks to abi_jobs of moonstream index", + ) + + parser_moonworm_tasks_migrate.add_argument( + "--user-id", + required=True, + type=uuid_type, + help="user-id of which we want see subscription.", + ) + + parser_moonworm_tasks_migrate.add_argument( + "--customer-id", + required=True, + type=uuid_type, + help="customer-id of which we want see subscription.", + ) + + parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate) + queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 7a368d35..0ed18710 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,13 +1,24 @@ import json import logging +from typing import List, Dict, Union, Any +from uuid import UUID import boto3 # type: ignore -from bugout.data import BugoutResource, BugoutResources +from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult from bugout.exceptions import BugoutResponseException +from moonstreamdbv3.db import MoonstreamDBEngine +from moonstreamdbv3.models_indexes import AbiJobs +from web3 import Web3 + from ..actions import apply_moonworm_tasks, get_all_entries_from_search -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL +from ..settings import ( + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + BUGOUT_REQUEST_TIMEOUT_SECONDS, +) from ..settings import bugout_client as bc +from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES logger = logging.getLogger(__name__) @@ -81,3 +92,113 @@ def add_subscription(id: str): ) else: logging.info("For apply to moonworm tasks subscriptions must have an abi.") + + +def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: + """ + Migrate moonworm tasks + + """ + + ### get user subscription entity journal id + + subscription_resources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + params={ + "user_id": user_id, + "type": "entity_subscription", + }, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + logger.info( + "Found users collection resources: %s", len(subscription_resources.resources) + ) + + if len(subscription_resources.resources) == 0: + raise Exception("User has no subscriptions") + + collection_id = subscription_resources.resources[0].resource_data["collection_id"] + + subscriptions: List[BugoutSearchResult] = get_all_entries_from_search( + journal_id=collection_id, + search_query=f"tag:type:subscription", + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + logger.info("Found users subscriptions: %s", len(subscriptions)) + + if len(subscriptions) == 0: + raise Exception("User has no subscriptions") + + for subscription in subscriptions: + + abi = None + address = None + subscription_type_id = None + + if subscription.content is None: + continue + + subscription_data = json.loads(subscription.content) + + if "abi" in subscription_data: + abi = subscription_data["abi"] + + for tag in subscription.tags: + if tag.startswith("subscription_type_id:"): + subscription_type_id = tag.split(":")[1] + if tag.startswith("address:"): + address = tag.split(":")[1] + + if subscription_type_id is None: + continue + + ### reformat abi to separate abi tasks + + if abi is None: + continue + + chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id]["blockchain"] + + with MoonstreamDBEngine.yield_db_session_ctx() as session: + for abi_task in abi: + + if abi_task["type"] not in ("event", "function"): + continue + + abi_selector = Web3.keccak( + text=abi_task["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi_task["inputs"])) + + ")" + )[:4].hex() + + try: + + subscription = AbiJobs( + address=address, + user_id=user_id, + customer_id=customer_id, + abi_selector=abi_selector, + chain=chain, + abi_name=abi_task["name"], + status="active", + historical_crawl_status="pending", + progress=0, + moonworm_task_pickedup=False, + abi=abi_task, + ) + + session.add(subscription) + + except Exception as e: + logger.error( + f"Error creating subscription for subscription {subscription.id}: {str(e)}" + ) + session.rollback() + continue + + session.commit() + + return None diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index 10b43de9..79c93d08 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,6 +17,7 @@ setup( "fastapi", "moonstream", "moonstreamdb>=0.4.4", + "moonstreamdb-v3>=0.0.6", "humbug", "pydantic==1.10.2", "pyevmasm", From 5405eb6929294a9fab3b06d0f54f74355f489695 Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 31 May 2024 18:35:48 +0300 Subject: [PATCH 02/10] Add temp changes. --- moonstreamapi/moonstreamapi/admin/cli.py | 49 +++++++++++++++++++++- moonstreamdb-v3/moonstreamdbv3/db.py | 12 ++++-- moonstreamdb-v3/moonstreamdbv3/version.txt | 2 +- 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 1c82c600..3c544905 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -29,6 +29,7 @@ from .migrations import ( generate_entity_subscriptions, update_dashboard_subscription_key, ) +from .databases import databases_v2_to_v3_labels_migration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -302,6 +303,14 @@ def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None: moonworm_tasks.migrate_v3_tasks(user_id=args.user_id, customer_id=args.customer_id) +def databases_v2_to_v3_labels_migration_handler(args: argparse.Namespace) -> None: + """ + Migrate labels in database + """ + + databases_v2_to_v3_labels_migration(args.user_id, args.blockchain) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -557,7 +566,7 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) parser_moonworm_tasks_migrate = subcommands_moonworm_tasks.add_parser( - "migrate-v3", + "migrate-v2-tasks", description="Migrate moonworm tasks to abi_jobs of moonstream index", ) @@ -648,6 +657,44 @@ This CLI is configured to work with the following API URLs: generate_usage_parser.set_defaults(func=generate_usage_handler) + ### databases commands + databases_parser = subcommands.add_parser( + "databases", description="Manage Moonstream databases" + ) + + databases_parser.set_defaults(func=lambda _: databases_parser.print_help()) + + databases_subcommands = databases_parser.add_subparsers( + description="Database commands" + ) + + database_labels_migration_parser = databases_subcommands.add_parser( + "v2-to-v3-labels-migration", + description="Migrate labels in database", + ) + + database_labels_migration_parser.add_argument( + "--user-id", + type=uuid_type, + help="User ID for which to migrate labels", + ) + + database_labels_migration_parser.add_argument( + "--customer-id", + type=uuid_type, + help="Customer ID for which to migrate labels", + ) + + database_labels_migration_parser.add_argument( + "--blockchain", + type=str, + help="Blockchain for which to migrate labels", + ) + + database_labels_migration_parser.set_defaults( + func=lambda args: print("Not implemented yet") + ) + args = parser.parse_args() args.func(args) diff --git a/moonstreamdb-v3/moonstreamdbv3/db.py b/moonstreamdb-v3/moonstreamdbv3/db.py index 1c983952..04f3f2bf 100644 --- a/moonstreamdb-v3/moonstreamdbv3/db.py +++ b/moonstreamdb-v3/moonstreamdbv3/db.py @@ -14,13 +14,17 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: - MOONSTREAM_DB_URI = os.environ.get("MOONSTREAM_DB_URI") + MOONSTREAM_DB_URI = os.environ.get("MOONSTREAM_DB_V3_INDEXES_URI") if MOONSTREAM_DB_URI is None: - raise Warning("MOONSTREAM_DB_URI environment variable must be set") + raise Warning("MOONSTREAM_DB_V3_INDEXES_URI environment variable must be set") - MOONSTREAM_DB_URI_READ_ONLY = os.environ.get("MOONSTREAM_DB_URI_READ_ONLY") + MOONSTREAM_DB_URI_READ_ONLY = os.environ.get( + "MOONSTREAM_DB_V3_INDEXES_URI_READ_ONLY" + ) if MOONSTREAM_DB_URI_READ_ONLY is None: - raise Warning("MOONSTREAM_DB_URI_READ_ONLY environment variable must be set") + raise Warning( + "MOONSTREAM_DB_V3_INDEXES_URI_READ_ONLY environment variable must be set" + ) except ValueError as e: raise ValueError(e) except Warning: diff --git a/moonstreamdb-v3/moonstreamdbv3/version.txt b/moonstreamdb-v3/moonstreamdbv3/version.txt index 99d85ecd..5c4511c3 100644 --- a/moonstreamdb-v3/moonstreamdbv3/version.txt +++ b/moonstreamdb-v3/moonstreamdbv3/version.txt @@ -1 +1 @@ -0.0.6 \ No newline at end of file +0.0.7 \ No newline at end of file From 795b631ce27a876dcbedb7591764417696b93da4 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 4 Jun 2024 23:37:44 +0300 Subject: [PATCH 03/10] Add temp state. --- moonstreamapi/moonstreamapi/admin/cli.py | 11 +- .../moonstreamapi/admin/moonworm_tasks.py | 149 +++++++++++++----- moonstreamapi/setup.py | 2 +- 3 files changed, 118 insertions(+), 44 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 3c544905..91df9538 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -300,7 +300,9 @@ def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None: """ ### Request user resources from brood - moonworm_tasks.migrate_v3_tasks(user_id=args.user_id, customer_id=args.customer_id) + moonworm_tasks.migrate_v3_tasks( + user_id=args.user_id, customer_id=args.customer_id, blockchain=args.blockchain + ) def databases_v2_to_v3_labels_migration_handler(args: argparse.Namespace) -> None: @@ -584,6 +586,13 @@ This CLI is configured to work with the following API URLs: help="customer-id of which we want see subscription.", ) + parser_moonworm_tasks_migrate.add_argument( + "--blockchain", + required=False, + type=str, + help="Blockchain of which we want see subscription.", + ) + parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate) queries_parser = subcommands.add_parser( diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 0ed18710..471b5f02 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,13 +1,15 @@ import json import logging -from typing import List, Dict, Union, Any +import os +from typing import List, Dict, Union, Any, Optional from uuid import UUID import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult from bugout.exceptions import BugoutResponseException -from moonstreamdbv3.db import MoonstreamDBEngine +from moonstreamdbv3.db import MoonstreamDBIndexesEngine from moonstreamdbv3.models_indexes import AbiJobs +from sqlalchemy.dialects.postgresql import insert from web3 import Web3 @@ -94,7 +96,9 @@ def add_subscription(id: str): logging.info("For apply to moonworm tasks subscriptions must have an abi.") -def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: +def migrate_v3_tasks( + user_id: UUID, customer_id: UUID, blockchain: Optional[str] = None +): """ Migrate moonworm tasks @@ -111,19 +115,33 @@ def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) + chain_to_subscription_type = { + CANONICAL_SUBSCRIPTION_TYPES[key].blockchain: key + for key in CANONICAL_SUBSCRIPTION_TYPES.keys() + } + logger.info( "Found users collection resources: %s", len(subscription_resources.resources) ) + db_engine = MoonstreamDBIndexesEngine() + if len(subscription_resources.resources) == 0: raise Exception("User has no subscriptions") collection_id = subscription_resources.resources[0].resource_data["collection_id"] + query = f"tag:type:subscription" + + if blockchain is not None: + query += f" tag:subscription_type_id:{chain_to_subscription_type[blockchain]}" + subscriptions: List[BugoutSearchResult] = get_all_entries_from_search( journal_id=collection_id, search_query=f"tag:type:subscription", - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + token=os.environ.get("SPECIFIC_ACCESS_TOKEN"), + limit=100, + content=True, ) logger.info("Found users subscriptions: %s", len(subscriptions)) @@ -131,38 +149,46 @@ def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: if len(subscriptions) == 0: raise Exception("User has no subscriptions") - for subscription in subscriptions: + with db_engine.yield_db_session_ctx() as session: - abi = None - address = None - subscription_type_id = None + user_subscriptions = [] - if subscription.content is None: - continue + for subscription in subscriptions: - subscription_data = json.loads(subscription.content) + abis = None + address = None + subscription_type_id = None - if "abi" in subscription_data: - abi = subscription_data["abi"] + if subscription.content is None: + continue - for tag in subscription.tags: - if tag.startswith("subscription_type_id:"): - subscription_type_id = tag.split(":")[1] - if tag.startswith("address:"): - address = tag.split(":")[1] + subscription_data = json.loads(subscription.content) - if subscription_type_id is None: - continue + if "abi" in subscription_data: + abis_container = subscription_data["abi"] - ### reformat abi to separate abi tasks + for tag in subscription.tags: + if tag.startswith("subscription_type_id:"): + subscription_type_id = tag.split(":")[1] + if tag.startswith("address:"): + address = tag.split(":")[1] - if abi is None: - continue + if subscription_type_id is None: + continue - chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id]["blockchain"] + if abis_container is not None: + abis = json.loads(abis_container) - with MoonstreamDBEngine.yield_db_session_ctx() as session: - for abi_task in abi: + ### reformat abi to separate abi tasks + + if abis is None: + continue + + chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain + + for abi_task in abis: + + print(abi_task) if abi_task["type"] not in ("event", "function"): continue @@ -176,21 +202,43 @@ def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: try: - subscription = AbiJobs( - address=address, - user_id=user_id, - customer_id=customer_id, - abi_selector=abi_selector, - chain=chain, - abi_name=abi_task["name"], - status="active", - historical_crawl_status="pending", - progress=0, - moonworm_task_pickedup=False, - abi=abi_task, - ) + # subscription = AbiJobs( + # address=address, + # user_id=user_id, + # customer_id=customer_id, + # abi_selector=abi_selector, + # chain=chain, + # abi_name=abi_task["name"], + # status="active", + # historical_crawl_status="pending", + # progress=0, + # moonworm_task_pickedup=False, + # abi=abi_task, + # ) - session.add(subscription) + abi_job = { + "address": address, + "user_id": user_id, + "customer_id": customer_id, + "abi_selector": abi_selector, + "chain": chain, + "abi_name": abi_task["name"], + "status": "active", + "historical_crawl_status": "pending", + "progress": 0, + "moonworm_task_pickedup": False, + "abi": abi_task, + } + + try: + AbiJobs(**abi_job) + except Exception as e: + logger.error( + f"Error creating subscription for subscription {subscription.id}: {str(e)}" + ) + continue + + user_subscriptions.append(abi_job) except Exception as e: logger.error( @@ -199,6 +247,23 @@ def migrate_v3_tasks(user_id: UUID, customer_id: UUID) -> None: session.rollback() continue - session.commit() + insert_statement = insert(AbiJobs).values(user_subscriptions) - return None + result_stmt = insert_statement.on_conflict_do_nothing( + index_elements=[ + abi_job.c.address, + abi_job.c.abi_selector, + abi_job.c.chain, + abi_job.c.customer_id, + ] + ) + + try: + session.execute(result_stmt) + + session.commit() + except Exception as e: + logger.error(f"Error inserting subscriptions: {str(e)}") + session.rollback() + + return None diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index 79c93d08..337e0245 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,7 +17,7 @@ setup( "fastapi", "moonstream", "moonstreamdb>=0.4.4", - "moonstreamdb-v3>=0.0.6", + "moonstreamdb-v3>=0.0.8", "humbug", "pydantic==1.10.2", "pyevmasm", From 17b038b1b1ca4af7e51ccbd7b321fd2be0103e96 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 6 Jun 2024 12:09:54 +0300 Subject: [PATCH 04/10] Add fixes. --- .../moonstreamapi/admin/moonworm_tasks.py | 63 +++++++++++-------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 471b5f02..e7ba9e1c 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -118,6 +118,7 @@ def migrate_v3_tasks( chain_to_subscription_type = { CANONICAL_SUBSCRIPTION_TYPES[key].blockchain: key for key in CANONICAL_SUBSCRIPTION_TYPES.keys() + if key.endswith("smartcontract") } logger.info( @@ -138,7 +139,7 @@ def migrate_v3_tasks( subscriptions: List[BugoutSearchResult] = get_all_entries_from_search( journal_id=collection_id, - search_query=f"tag:type:subscription", + search_query=query, token=os.environ.get("SPECIFIC_ACCESS_TOKEN"), limit=100, content=True, @@ -153,7 +154,7 @@ def migrate_v3_tasks( user_subscriptions = [] - for subscription in subscriptions: + for index, subscription in enumerate(subscriptions): abis = None address = None @@ -177,19 +178,20 @@ def migrate_v3_tasks( continue if abis_container is not None: - abis = json.loads(abis_container) + try: + abis = json.loads(abis_container) + except Exception as e: + logger.error( + f"Error loading abi for subscription {subscription.id}: {str(e)}" + ) + continue ### reformat abi to separate abi tasks - if abis is None: - continue - chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain for abi_task in abis: - print(abi_task) - if abi_task["type"] not in ("event", "function"): continue @@ -198,7 +200,12 @@ def migrate_v3_tasks( + "(" + ",".join(map(lambda x: x["type"], abi_task["inputs"])) + ")" - )[:4].hex() + ) + + if abi_task["type"] == "function": + abi_selector = abi_selector[:4] + + abi_selector = abi_selector.hex() try: @@ -217,7 +224,9 @@ def migrate_v3_tasks( # ) abi_job = { - "address": address, + "address": ( + bytes.fromhex(address[2:]) if address is not None else None + ), "user_id": user_id, "customer_id": customer_id, "abi_selector": abi_selector, @@ -227,7 +236,7 @@ def migrate_v3_tasks( "historical_crawl_status": "pending", "progress": 0, "moonworm_task_pickedup": False, - "abi": abi_task, + "abi": json.dumps(abi_task), } try: @@ -247,23 +256,25 @@ def migrate_v3_tasks( session.rollback() continue - insert_statement = insert(AbiJobs).values(user_subscriptions) + insert_statement = insert(AbiJobs).values(user_subscriptions) - result_stmt = insert_statement.on_conflict_do_nothing( - index_elements=[ - abi_job.c.address, - abi_job.c.abi_selector, - abi_job.c.chain, - abi_job.c.customer_id, - ] - ) + result_stmt = insert_statement.on_conflict_do_nothing( + index_elements=[ + AbiJobs.chain, + AbiJobs.address, + AbiJobs.abi_selector, + AbiJobs.customer_id, + ] + ) - try: - session.execute(result_stmt) + try: + session.execute(result_stmt) - session.commit() - except Exception as e: - logger.error(f"Error inserting subscriptions: {str(e)}") - session.rollback() + session.commit() + except Exception as e: + logger.error(f"Error inserting subscriptions: {str(e)}") + session.rollback() + + logger.info(f"Processed {index} subscriptions") return None From bf21dfb812ea6517f5a53e7030e1a0f38267c7f0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 7 Jun 2024 19:28:31 +0300 Subject: [PATCH 05/10] Add add tasks cli. --- moonstreamapi/moonstreamapi/admin/cli.py | 44 +++++++++ .../moonstreamapi/admin/moonworm_tasks.py | 91 ++++++++++++++++--- 2 files changed, 121 insertions(+), 14 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 91df9538..890ecdc2 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -595,6 +595,50 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate) + parser_moonworm_tasks_v3_create = subcommands_moonworm_tasks.add_parser( + "create_v3_tasks", + description="Create v3 tasks from v2 tasks", + ) + + parser_moonworm_tasks_v3_create.add_argument( + "--user-id", + required=True, + type=uuid_type, + help="user-id of which we want see subscription.", + ) + + parser_moonworm_tasks_v3_create.add_argument( + "--customer-id", + required=True, + type=uuid_type, + help="customer-id of which we want see subscription.", + ) + + parser_moonworm_tasks_v3_create.add_argument( + "--blockchain", + required=True, + type=str, + help="Blockchain of which we want see subscription.", + ) + + parser_moonworm_tasks_v3_create.add_argument( + "--address", + required=True, + type=str, + help="Address of which we want see subscription.", + ) + + parser_moonworm_tasks_v3_create.add_argument( + "--abi", + required=True, + type=argparse.FileType("r"), + help="ABI of which we want see subscription.", + ) + + parser_moonworm_tasks_v3_create.set_defaults( + func=moonworm_tasks.create_v3_task_handler + ) + queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index e7ba9e1c..cf4761c3 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -96,6 +96,83 @@ def add_subscription(id: str): logging.info("For apply to moonworm tasks subscriptions must have an abi.") +def create_v3_task( + customer_id: str, + user_id: str, + abi: Dict[str, Any], + address: str, + blockchain: str, +): + """ + Create moonworm task for v3 + """ + + abi_tasks = [] + + db_engine = MoonstreamDBIndexesEngine() + + with db_engine.yield_db_session_ctx() as db_session_v3: + + for task in abi: + + abi_selector = Web3.keccak( + text=abi["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi["inputs"])) + + ")" + ) + + if abi["type"] == "function": + abi_selector = abi_selector[:4] + + abi_selector = abi_selector.hex() + + try: + + abi_tasks.append( + { + "address": bytes.fromhex(address[2:]), + "user_id": user_id, + "customer_id": customer_id, + "abi_selector": abi_selector, + "chain": blockchain, + "abi_name": abi["name"], + "status": "active", + "historical_crawl_status": "pending", + "progress": 0, + "moonworm_task_pickedup": False, + "abi": json.dumps(abi), + } + ) + + except Exception as e: + logger.error( + f"Error creating subscription for subscription for abi {abi['name']}: {str(e)}" + ) + db_session_v3.rollback() + raise e + + insert_statement = insert(AbiJobs).values(abi_tasks) + + result_stmt = insert_statement.on_conflict_do_nothing( + index_elements=[ + AbiJobs.chain, + AbiJobs.address, + AbiJobs.abi_selector, + AbiJobs.customer_id, + ] + ) + + try: + db_session_v3.execute(result_stmt) + + db_session_v3.commit() + except Exception as e: + logger.error(f"Error inserting subscriptions: {str(e)}") + db_session_v3.rollback() + return None + + def migrate_v3_tasks( user_id: UUID, customer_id: UUID, blockchain: Optional[str] = None ): @@ -209,20 +286,6 @@ def migrate_v3_tasks( try: - # subscription = AbiJobs( - # address=address, - # user_id=user_id, - # customer_id=customer_id, - # abi_selector=abi_selector, - # chain=chain, - # abi_name=abi_task["name"], - # status="active", - # historical_crawl_status="pending", - # progress=0, - # moonworm_task_pickedup=False, - # abi=abi_task, - # ) - abi_job = { "address": ( bytes.fromhex(address[2:]) if address is not None else None From fd66f1a69843255bd5c0e339c9abca2bcd5c336f Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 10 Jun 2024 17:21:20 +0300 Subject: [PATCH 06/10] Add fix function link. --- moonstreamapi/moonstreamapi/admin/cli.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 890ecdc2..60119c07 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -313,6 +313,17 @@ def databases_v2_to_v3_labels_migration_handler(args: argparse.Namespace) -> Non databases_v2_to_v3_labels_migration(args.user_id, args.blockchain) +def create_v3_task_handler(args: argparse.Namespace) -> None: + + moonworm_tasks.create_v3_task( + user_id=args.user_id, + customer_id=args.customer_id, + blockchain=args.blockchain, + address=args.address, + abi=args.abi.read(), + ) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -635,9 +646,7 @@ This CLI is configured to work with the following API URLs: help="ABI of which we want see subscription.", ) - parser_moonworm_tasks_v3_create.set_defaults( - func=moonworm_tasks.create_v3_task_handler - ) + parser_moonworm_tasks_v3_create.set_defaults(func=create_v3_task_handler) queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" From f07ac302453c33d59ace6c3ad568e6f70677cf4f Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jun 2024 14:26:16 +0000 Subject: [PATCH 07/10] Updated version and json loads --- moonstreamapi/moonstreamapi/admin/cli.py | 6 +++--- moonstreamapi/moonstreamapi/admin/moonworm_tasks.py | 6 ++---- moonstreamapi/setup.py | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 60119c07..528f544f 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -29,7 +29,7 @@ from .migrations import ( generate_entity_subscriptions, update_dashboard_subscription_key, ) -from .databases import databases_v2_to_v3_labels_migration +# from .databases import databases_v2_to_v3_labels_migration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -310,7 +310,7 @@ def databases_v2_to_v3_labels_migration_handler(args: argparse.Namespace) -> Non Migrate labels in database """ - databases_v2_to_v3_labels_migration(args.user_id, args.blockchain) + # databases_v2_to_v3_labels_migration(args.user_id, args.blockchain) def create_v3_task_handler(args: argparse.Namespace) -> None: @@ -320,7 +320,7 @@ def create_v3_task_handler(args: argparse.Namespace) -> None: customer_id=args.customer_id, blockchain=args.blockchain, address=args.address, - abi=args.abi.read(), + abi=json.loads(args.abi.read()), ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index cf4761c3..144eb79d 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -106,7 +106,6 @@ def create_v3_task( """ Create moonworm task for v3 """ - abi_tasks = [] db_engine = MoonstreamDBIndexesEngine() @@ -114,9 +113,8 @@ def create_v3_task( with db_engine.yield_db_session_ctx() as db_session_v3: for task in abi: - abi_selector = Web3.keccak( - text=abi["name"] + text=task["name"] + "(" + ",".join(map(lambda x: x["type"], abi["inputs"])) + ")" @@ -141,7 +139,7 @@ def create_v3_task( "historical_crawl_status": "pending", "progress": 0, "moonworm_task_pickedup": False, - "abi": json.dumps(abi), + "abi": json.dumps(task), } ) diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index 337e0245..2e3567be 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,7 +17,7 @@ setup( "fastapi", "moonstream", "moonstreamdb>=0.4.4", - "moonstreamdb-v3>=0.0.8", + "moonstreamdb-v3>=0.0.9", "humbug", "pydantic==1.10.2", "pyevmasm", From fe9476fde4a6af2dd713512edafc28e0a527c00e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jun 2024 14:45:43 +0000 Subject: [PATCH 08/10] Fix for custom abi push jobs to v3 --- .../moonstreamapi/admin/moonworm_tasks.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 144eb79d..06119d35 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,7 +1,7 @@ import json import logging import os -from typing import List, Dict, Union, Any, Optional +from typing import Any, Dict, List, Optional, Union from uuid import UUID import boto3 # type: ignore @@ -12,12 +12,11 @@ from moonstreamdbv3.models_indexes import AbiJobs from sqlalchemy.dialects.postgresql import insert from web3 import Web3 - from ..actions import apply_moonworm_tasks, get_all_entries_from_search from ..settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL, - BUGOUT_REQUEST_TIMEOUT_SECONDS, ) from ..settings import bugout_client as bc from .subscription_types import CANONICAL_SUBSCRIPTION_TYPES @@ -113,14 +112,17 @@ def create_v3_task( with db_engine.yield_db_session_ctx() as db_session_v3: for task in abi: + if task["type"] != "event" and task["type"] != "function": + continue + abi_selector = Web3.keccak( text=task["name"] + "(" - + ",".join(map(lambda x: x["type"], abi["inputs"])) + + ",".join(map(lambda x: x["type"], task["inputs"])) + ")" ) - if abi["type"] == "function": + if task["type"] == "function": abi_selector = abi_selector[:4] abi_selector = abi_selector.hex() @@ -134,7 +136,7 @@ def create_v3_task( "customer_id": customer_id, "abi_selector": abi_selector, "chain": blockchain, - "abi_name": abi["name"], + "abi_name": task["name"], "status": "active", "historical_crawl_status": "pending", "progress": 0, @@ -145,7 +147,7 @@ def create_v3_task( except Exception as e: logger.error( - f"Error creating subscription for subscription for abi {abi['name']}: {str(e)}" + f"Error creating subscription for subscription for abi {task['name']}: {str(e)}" ) db_session_v3.rollback() raise e From 859fc99e0a5e1157039db2e1e8c4934c960eee47 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jun 2024 16:42:01 +0000 Subject: [PATCH 09/10] Fixes for tasks in mnstr, removed unused import --- moonstreamapi/moonstreamapi/admin/cli.py | 11 +---------- .../moonstreamapi/admin/moonworm_tasks.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 528f544f..422aa5a7 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -6,9 +6,9 @@ import argparse import json import logging import os +import uuid from posix import listdir from typing import Any, Callable, Dict, List, Optional, Union -import uuid from moonstreamdb.db import SessionLocal from sqlalchemy.orm import with_expression @@ -29,7 +29,6 @@ from .migrations import ( generate_entity_subscriptions, update_dashboard_subscription_key, ) -# from .databases import databases_v2_to_v3_labels_migration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -305,14 +304,6 @@ def moonworm_tasks_v3_migrate(args: argparse.Namespace) -> None: ) -def databases_v2_to_v3_labels_migration_handler(args: argparse.Namespace) -> None: - """ - Migrate labels in database - """ - - # databases_v2_to_v3_labels_migration(args.user_id, args.blockchain) - - def create_v3_task_handler(args: argparse.Namespace) -> None: moonworm_tasks.create_v3_task( diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 06119d35..29b8da26 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -111,18 +111,18 @@ def create_v3_task( with db_engine.yield_db_session_ctx() as db_session_v3: - for task in abi: - if task["type"] != "event" and task["type"] != "function": + for abi_task in abi: + if abi_task["type"] != "event" and abi_task["type"] != "function": continue abi_selector = Web3.keccak( - text=task["name"] + text=abi_task["name"] + "(" - + ",".join(map(lambda x: x["type"], task["inputs"])) + + ",".join(map(lambda x: x["type"], abi_task["inputs"])) + ")" ) - if task["type"] == "function": + if abi_task["type"] == "function": abi_selector = abi_selector[:4] abi_selector = abi_selector.hex() @@ -136,18 +136,18 @@ def create_v3_task( "customer_id": customer_id, "abi_selector": abi_selector, "chain": blockchain, - "abi_name": task["name"], + "abi_name": abi_task["name"], "status": "active", "historical_crawl_status": "pending", "progress": 0, "moonworm_task_pickedup": False, - "abi": json.dumps(task), + "abi": json.dumps(abi_task), } ) except Exception as e: logger.error( - f"Error creating subscription for subscription for abi {task['name']}: {str(e)}" + f"Error creating subscription for subscription for abi {abi_task['name']}: {str(e)}" ) db_session_v3.rollback() raise e From 8506f310593e03370fb0f9fe312a9bdfcd8d3a65 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 11 Jun 2024 16:43:36 +0000 Subject: [PATCH 10/10] Updated requirements and version --- moonstreamapi/moonstreamapi/version.py | 2 +- moonstreamapi/requirements.txt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 39eec17e..80c607e1 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.4.2" +MOONSTREAMAPI_VERSION = "0.4.3" diff --git a/moonstreamapi/requirements.txt b/moonstreamapi/requirements.txt index 43553a72..57840006 100644 --- a/moonstreamapi/requirements.txt +++ b/moonstreamapi/requirements.txt @@ -38,6 +38,7 @@ Mako==1.2.3 MarkupSafe==2.1.1 moonstream==0.1.1 moonstreamdb==0.4.4 +moonstreamdb-v3==0.0.9 multiaddr==0.0.9 multidict==6.0.2 netaddr==0.8.0