diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index eacdf951..ea0ddd37 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -26,13 +26,17 @@ from eth_utils.address import is_address # type: ignore from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.models import EthereumLabel from moonstreamdb.subscriptions import blockchain_by_subscription_id +from moonstreamdbv3.db import MoonstreamDBIndexesEngine +from moonstreamdbv3.models_indexes import AbiJobs from slugify import slugify # type: ignore -from sqlalchemy import text +from sqlalchemy import text, insert from sqlalchemy.orm import Session from web3 import Web3 from web3._utils.validation import validate_abi + from . import data +from .admin.subscription_types import CANONICAL_SUBSCRIPTION_TYPES from .middleware import MoonstreamHTTPException from .reporter import reporter from .selectors_storage import selectors @@ -559,6 +563,7 @@ def apply_moonworm_tasks( ], } ) + except Exception as e: logger.error(f"Error get moonworm tasks: {str(e)}") reporter.error_report(e) @@ -576,6 +581,125 @@ def apply_moonworm_tasks( reporter.error_report(e) +def create_seer_subscription( + db_session: Session, + user_id: uuid.UUID, + customer_id: uuid.UUID, + address: str, + subscription_type: str, + abi: Any, + subscription_id: str, +) -> None: + + chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type].blockchain + + add_abi_to_db( + db_session=db_session, + user_id=user_id, + customer_id=customer_id, + address=address, + abis=abi, + chain=chain, + subscription_id=subscription_id, + ) + + +def delete_seer_subscription( + db_session: Session, + user_id: uuid.UUID, + subscription_id, +) -> None: + """ + Delete seer subscription from db + """ + + db_session.query(AbiJobs).filter( + AbiJobs.user_id == user_id, + AbiJobs.subscription_id == subscription_id, + ).delete() + + db_session.commit() + + +def add_abi_to_db( + db_session: Session, + user_id: uuid.UUID, + customer_id: uuid.UUID, + address: str, + abis: List[Dict[str, Any]], + chain: str, + subscription_id: Optional[str] = None, +) -> None: + + abis_to_insert = [] + + for abi in abis: + if abi["type"] not in ("event", "function"): + continue + + abi_selector = Web3.keccak( + text=abi["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi["inputs"])) + + ")" + ) + + if abi["type"] == "function": + abi_selector = abi_selector[:4] + + abi_selector = abi_selector.hex() + + try: + abi_job = { + "address": ( + bytes.fromhex(address[2:]) if address is not None else None + ), + "user_id": user_id, + "customer_id": customer_id, + "subscription_id": subscription_id, + "abi_selector": abi_selector, + "chain": chain, + "abi_name": abi["name"], + "status": "active", + "historical_crawl_status": "pending", + "progress": 0, + "moonworm_task_pickedup": False, + "abi": json.dumps(abi), + } + + try: + AbiJobs(**abi_job) + except Exception as e: + logger.error( + f"Error validating abi for address {address}:{abi} {str(e)}" + ) + continue + + abis_to_insert.append(abi_job) + + except Exception as e: + logger.error(f"Error creating abi for address {address}:{abi} {str(e)}") + continue + + insert_statement = insert(AbiJobs).values(abis_to_insert) + + result_stmt = insert_statement.on_conflict_do_nothing( + index_elements=[ + AbiJobs.chain, + AbiJobs.address, + AbiJobs.abi_selector, + AbiJobs.customer_id, + ] + ) + + try: + db_session.execute(result_stmt) + db_session.commit() + except Exception as e: + logger.error(f"Error inserting abi to db: {str(e)}") + db_session.rollback() + + def name_normalization(query_name: str) -> str: """ Sanitize provided query name. diff --git a/moonstreamapi/moonstreamapi/data.py b/moonstreamapi/moonstreamapi/data.py index 7484f976..44533111 100644 --- a/moonstreamapi/moonstreamapi/data.py +++ b/moonstreamapi/moonstreamapi/data.py @@ -263,6 +263,7 @@ class CreateSubscriptionRequest(BaseModel): abi: Optional[str] = Form(None) description: Optional[str] = Form(None) tags: Optional[List[Dict[str, str]]] = Form(None) + customer_id: Optional[str] = Form(None) @validator("tags", pre=True, always=True) def transform_to_dict(cls, v): diff --git a/moonstreamapi/moonstreamapi/routes/subscriptions.py b/moonstreamapi/moonstreamapi/routes/subscriptions.py index b8806314..b5d04743 100644 --- a/moonstreamapi/moonstreamapi/routes/subscriptions.py +++ b/moonstreamapi/moonstreamapi/routes/subscriptions.py @@ -12,6 +12,7 @@ from bugout.data import BugoutSearchResult, BugoutSearchResultAsEntity from bugout.exceptions import BugoutResponseException from fastapi import APIRouter, BackgroundTasks, Depends, Form, Path, Query, Request from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamdbv3.db import MoonstreamDBIndexesEngineInstance # type: ignore from web3 import Web3 from .. import data @@ -24,6 +25,7 @@ from ..actions import ( get_list_of_support_interfaces, get_moonworm_tasks, validate_abi_json, + create_seer_subscription, ) from ..admin import subscription_types from ..middleware import MoonstreamHTTPException @@ -51,11 +53,13 @@ async def add_subscription_handler( request: Request, background_tasks: BackgroundTasks, web3: Web3 = Depends(yield_web3_provider), + db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session), ) -> data.SubscriptionResourceData: """ Add subscription to blockchain stream data for user. """ token = request.state.token + user = request.state.user form = await request.form() @@ -71,6 +75,7 @@ async def add_subscription_handler( description = form_data.description tags = form_data.tags subscription_type_id = form_data.subscription_type_id + customer_id = form_data.customer_id if subscription_type_id != "ethereum_whalewatch": try: @@ -129,10 +134,7 @@ async def add_subscription_handler( content["abi_hash"] = hash background_tasks.add_task( - apply_moonworm_tasks, - subscription_type_id, - json_abi, - address, + apply_moonworm_tasks, subscription_type_id, json_abi, address ) if description: @@ -202,6 +204,16 @@ async def add_subscription_handler( if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS ] + if entity_secondary_fields.get("abi"): + create_seer_subscription( + db_session=db_session, + user_id=user.id, + customer_id=customer_id, + subscription_id=entity.id, + abi=abi, + subscription_type_id=subscription_type_id, + ) + return data.SubscriptionResourceData( id=str(entity.id), user_id=str(user.id), @@ -223,7 +235,9 @@ async def add_subscription_handler( response_model=data.SubscriptionResourceData, ) async def delete_subscription_handler( - request: Request, subscription_id: str = Path(...) + request: Request, + subscription_id: str = Path(...), + db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session), ): """ Delete subscriptions. @@ -285,6 +299,12 @@ async def delete_subscription_handler( abi = deleted_entity.secondary_fields.get("abi") description = deleted_entity.secondary_fields.get("description") + delete_seer_subscription( + db_session=db_session, + user_id=user.id, + subscription_id=subscription_id, + ) + return data.SubscriptionResourceData( id=str(deleted_entity.id), user_id=str(user.id), @@ -400,6 +420,7 @@ async def update_subscriptions_handler( request: Request, background_tasks: BackgroundTasks, subscription_id: str = Path(...), + db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session), ) -> data.SubscriptionResourceData: """ Get user's subscriptions. @@ -548,6 +569,15 @@ async def update_subscriptions_handler( json_abi, address, ) + + create_seer_subscription( + db_session=db_session, + user_id=user.id, + subscription_id=subscription_id, + abi=abi, + subscription_type_id=subscription_type_id, + ) + subscription_required_fields = ( subscription.required_fields if subscription.required_fields is not None else {} ) diff --git a/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/6b4ab39794d8_add_subscription_id.py b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/6b4ab39794d8_add_subscription_id.py new file mode 100644 index 00000000..d7a51549 --- /dev/null +++ b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/6b4ab39794d8_add_subscription_id.py @@ -0,0 +1,37 @@ +"""add subscription id + +Revision ID: 6b4ab39794d8 +Revises: e02c90ea67bb +Create Date: 2024-07-05 00:06:29.856486 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "6b4ab39794d8" +down_revision: Union[str, None] = "e02c90ea67bb" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("abi_jobs", sa.Column("subscription_id", sa.UUID(), nullable=True)) + op.create_index( + op.f("ix_abi_jobs_subscription_id"), + "abi_jobs", + ["subscription_id"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("abi_jobs", "subscription_id") + # ### end Alembic commands ### diff --git a/moonstreamdb-v3/moonstreamdbv3/db.py b/moonstreamdb-v3/moonstreamdbv3/db.py index 14a1e02b..9897921b 100644 --- a/moonstreamdb-v3/moonstreamdbv3/db.py +++ b/moonstreamdb-v3/moonstreamdbv3/db.py @@ -260,3 +260,6 @@ class MoonstreamDBIndexesEngineRO(DBEngine): yield session finally: session.close() + + +MoonstreamDBIndexesEngineInstance = MoonstreamDBIndexesEngine() diff --git a/moonstreamdb-v3/moonstreamdbv3/models_indexes.py b/moonstreamdb-v3/moonstreamdbv3/models_indexes.py index 3ac1bcfe..5433ad86 100644 --- a/moonstreamdb-v3/moonstreamdbv3/models_indexes.py +++ b/moonstreamdb-v3/moonstreamdbv3/models_indexes.py @@ -574,6 +574,7 @@ class AbiJobs(Base): address = Column(LargeBinary, nullable=False, index=True) user_id = Column(UUID(as_uuid=True), nullable=False, index=True) customer_id = Column(UUID(as_uuid=True), nullable=True, index=True) + subscription_id = Column(UUID(as_uuid=True), nullable=True, index=True) abi_selector = Column(VARCHAR(256), nullable=False, index=True) chain = Column(VARCHAR(256), nullable=False, index=True) abi_name = Column(VARCHAR(256), nullable=False, index=True)