diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index d482be89..f4d35cb6 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -1,4 +1,5 @@ import hashlib +from hexbytes import HexBytes import json import logging import uuid @@ -654,13 +655,13 @@ def add_abi_to_db( subscription_id: Optional[str] = None, ) -> None: abis_to_insert = [] - subscriptions_to_insert = [] + subscriptions_to_insert = {} try: existing_abi_job = ( db_session.query(AbiJobs) .filter(AbiJobs.chain == chain) - .filter(AbiJobs.address == bytes.fromhex(address[2:])) + .filter(AbiJobs.address == HexBytes(address)) .filter(AbiJobs.customer_id == customer_id) ).all() except Exception as e: @@ -688,22 +689,16 @@ def add_abi_to_db( abi_selector = abi_selector.hex() try: - if abi_selector in job_by_abi_selector: # ABI job already exists, create subscription link if subscription_id: - subscriptions_to_insert.append( - { - "abi_job_id": job_by_abi_selector[abi_selector].id, - "subscription_id": subscription_id, - } - ) + subscriptions_to_insert[ + str(job_by_abi_selector[abi_selector].id) + ] = subscription_id else: # ABI job does not exist, create new ABI job abi_job = { - "address": ( - bytes.fromhex(address[2:]) if address is not None else None - ), + "address": HexBytes(address), "user_id": user_id, "customer_id": customer_id, "abi_selector": abi_selector, @@ -717,21 +712,14 @@ def add_abi_to_db( } try: - abi_job_instance = AbiJobs(**abi_job) + AbiJobs(**abi_job) except Exception as e: logger.error( f"Error validating abi for address {address}:{abi} {str(e)}" ) - continue + raise MoonstreamHTTPException(status_code=409, detail=str(e)) - abis_to_insert.append(abi_job_instance) - if subscription_id: - subscriptions_to_insert.append( - { - "abi_job_id": abi_job_instance.id, - "subscription_id": subscription_id, - } - ) + abis_to_insert.append(abi_job) except Exception as e: logger.error(f"Error creating abi for address {address}:{abi} {str(e)}") @@ -743,19 +731,26 @@ def add_abi_to_db( insert_stmt = ( insert(AbiJobs) - .values([abi_job.__dict__ for abi_job in abis_to_insert]) - .on_conflict_do_nothing( - index_elements=["address", "chain", "abi_selector"] - ) + .values([abi for abi in abis_to_insert]) + .on_conflict_do_nothing() + .returning(AbiJobs.id) ) - db_session.execute(insert_stmt) + abi_job_ids = db_session.execute(insert_stmt).fetchall() + + for id in abi_job_ids: + subscriptions_to_insert[id[0]] = subscription_id # Insert corresponding subscriptions if subscriptions_to_insert: insert_stmt = ( insert(AbiSubscriptions) - .values(subscriptions_to_insert) + .values( + [ + {"abi_job_id": k, "subscription_id": v} + for k, v in subscriptions_to_insert.items() + ] + ) .on_conflict_do_nothing( index_elements=["abi_job_id", "subscription_id"] ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index 29b8da26..f6dd1dd5 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -1,4 +1,5 @@ import json +from hexbytes import HexBytes import logging import os from typing import Any, Dict, List, Optional, Union @@ -8,7 +9,7 @@ import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult from bugout.exceptions import BugoutResponseException from moonstreamdbv3.db import MoonstreamDBIndexesEngine -from moonstreamdbv3.models_indexes import AbiJobs +from moonstreamdbv3.models_indexes import AbiJobs, AbiSubscriptions from sqlalchemy.dialects.postgresql import insert from web3 import Web3 @@ -229,14 +230,18 @@ def migrate_v3_tasks( with db_engine.yield_db_session_ctx() as session: - user_subscriptions = [] - for index, subscription in enumerate(subscriptions): + user_subscription_abis = [] + + subscriptions_to_insert = {} + abis = None address = None subscription_type_id = None + subscription_id = subscription.entry_url.split("/")[-1] + if subscription.content is None: continue @@ -267,6 +272,15 @@ def migrate_v3_tasks( chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain + existing_abi_jobs = ( + session.query(AbiJobs) + .filter(AbiJobs.customer_id == customer_id) + .filter(AbiJobs.user_id == user_id) + .filter(AbiJobs.chain == blockchain) + ).all() + + job_by_abi_selector = {abi.abi_selector: abi for abi in existing_abi_jobs} + for abi_task in abis: if abi_task["type"] not in ("event", "function"): @@ -284,55 +298,80 @@ def migrate_v3_tasks( abi_selector = abi_selector.hex() - try: - - abi_job = { - "address": ( - bytes.fromhex(address[2:]) if address is not None else None - ), - "user_id": user_id, - "customer_id": customer_id, - "abi_selector": abi_selector, - "chain": chain, - "abi_name": abi_task["name"], - "status": "active", - "historical_crawl_status": "pending", - "progress": 0, - "moonworm_task_pickedup": False, - "abi": json.dumps(abi_task), - } + if abi_selector in job_by_abi_selector: + # ABI job already exists, create subscription link + if subscription_id: + subscriptions_to_insert[ + str(job_by_abi_selector[abi_selector].id) + ] = subscription_id + else: try: - AbiJobs(**abi_job) + + abi_job = { + "address": (HexBytes(address)), + "user_id": user_id, + "customer_id": customer_id, + "abi_selector": abi_selector, + "chain": chain, + "abi_name": abi_task["name"], + "status": "active", + "historical_crawl_status": "pending", + "progress": 0, + "moonworm_task_pickedup": False, + "abi": json.dumps(abi_task), + } + + try: + AbiJobs(**abi_job) + except Exception as e: + logger.error( + f"Error creating subscription for subscription {subscription.id}: {str(e)}" + ) + continue + + user_subscription_abis.append(abi_job) + except Exception as e: logger.error( f"Error creating subscription for subscription {subscription.id}: {str(e)}" ) + session.rollback() continue + if len(user_subscription_abis) > 0: + insert_statement = insert(AbiJobs).values(user_subscription_abis) - user_subscriptions.append(abi_job) + result_stmt = insert_statement.on_conflict_do_nothing().returning( + AbiJobs.id + ) + try: + ids = session.execute(result_stmt) except Exception as e: - logger.error( - f"Error creating subscription for subscription {subscription.id}: {str(e)}" - ) + logger.error(f"Error inserting subscriptions: {str(e)}") session.rollback() - continue - insert_statement = insert(AbiJobs).values(user_subscriptions) + for id in ids: + subscriptions_to_insert[id[0]] = subscription_id - result_stmt = insert_statement.on_conflict_do_nothing( - index_elements=[ - AbiJobs.chain, - AbiJobs.address, - AbiJobs.abi_selector, - AbiJobs.customer_id, - ] - ) + if len(subscriptions_to_insert) > 0: + + insert_statement = insert(AbiSubscriptions).values( + [ + {"abi_job_id": k, "subscription_id": v} + for k, v in subscriptions_to_insert.items() + ] + ) + + result_stmt = insert_statement.on_conflict_do_nothing() + + try: + session.execute(result_stmt) + except Exception as e: + logger.error(f"Error inserting subscriptions: {str(e)}") + session.rollback() try: - session.execute(result_stmt) - session.commit() except Exception as e: logger.error(f"Error inserting subscriptions: {str(e)}") diff --git a/moonstreamapi/moonstreamapi/routes/subscriptions.py b/moonstreamapi/moonstreamapi/routes/subscriptions.py index 07a15d72..f53a3982 100644 --- a/moonstreamapi/moonstreamapi/routes/subscriptions.py +++ b/moonstreamapi/moonstreamapi/routes/subscriptions.py @@ -224,9 +224,10 @@ async def add_subscription_handler( db_session=db_session, user_id=user.id, customer_id=customer_id, + address=address, subscription_id=entity.id, - abi=abi, - subscription_type_id=subscription_type_id, + abi=json_abi, + subscription_type=subscription_type_id, ) return data.SubscriptionResourceData( diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 653faa76..bc857d8e 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.4.5" +MOONSTREAMAPI_VERSION = "0.4.6"