Add subcription_id to abi_jobs.

pull/1108/head
Andrey 2024-07-05 16:19:10 +03:00
rodzic 82cde6d3b0
commit 07a58abe5f
6 zmienionych plików z 202 dodań i 6 usunięć

Wyświetl plik

@ -26,13 +26,17 @@ from eth_utils.address import is_address # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.models import EthereumLabel
from moonstreamdb.subscriptions import blockchain_by_subscription_id
from moonstreamdbv3.db import MoonstreamDBIndexesEngine
from moonstreamdbv3.models_indexes import AbiJobs
from slugify import slugify # type: ignore
from sqlalchemy import text
from sqlalchemy import text, insert
from sqlalchemy.orm import Session
from web3 import Web3
from web3._utils.validation import validate_abi
from . import data
from .admin.subscription_types import CANONICAL_SUBSCRIPTION_TYPES
from .middleware import MoonstreamHTTPException
from .reporter import reporter
from .selectors_storage import selectors
@ -559,6 +563,7 @@ def apply_moonworm_tasks(
],
}
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
reporter.error_report(e)
@ -576,6 +581,125 @@ def apply_moonworm_tasks(
reporter.error_report(e)
def create_seer_subscription(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
subscription_type: str,
abi: Any,
subscription_id: str,
) -> None:
chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type].blockchain
add_abi_to_db(
db_session=db_session,
user_id=user_id,
customer_id=customer_id,
address=address,
abis=abi,
chain=chain,
subscription_id=subscription_id,
)
def delete_seer_subscription(
db_session: Session,
user_id: uuid.UUID,
subscription_id,
) -> None:
"""
Delete seer subscription from db
"""
db_session.query(AbiJobs).filter(
AbiJobs.user_id == user_id,
AbiJobs.subscription_id == subscription_id,
).delete()
db_session.commit()
def add_abi_to_db(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
abis: List[Dict[str, Any]],
chain: str,
subscription_id: Optional[str] = None,
) -> None:
abis_to_insert = []
for abi in abis:
if abi["type"] not in ("event", "function"):
continue
abi_selector = Web3.keccak(
text=abi["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi["inputs"]))
+ ")"
)
if abi["type"] == "function":
abi_selector = abi_selector[:4]
abi_selector = abi_selector.hex()
try:
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"user_id": user_id,
"customer_id": customer_id,
"subscription_id": subscription_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi),
}
try:
AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error validating abi for address {address}:{abi} {str(e)}"
)
continue
abis_to_insert.append(abi_job)
except Exception as e:
logger.error(f"Error creating abi for address {address}:{abi} {str(e)}")
continue
insert_statement = insert(AbiJobs).values(abis_to_insert)
result_stmt = insert_statement.on_conflict_do_nothing(
index_elements=[
AbiJobs.chain,
AbiJobs.address,
AbiJobs.abi_selector,
AbiJobs.customer_id,
]
)
try:
db_session.execute(result_stmt)
db_session.commit()
except Exception as e:
logger.error(f"Error inserting abi to db: {str(e)}")
db_session.rollback()
def name_normalization(query_name: str) -> str:
"""
Sanitize provided query name.

Wyświetl plik

@ -263,6 +263,7 @@ class CreateSubscriptionRequest(BaseModel):
abi: Optional[str] = Form(None)
description: Optional[str] = Form(None)
tags: Optional[List[Dict[str, str]]] = Form(None)
customer_id: Optional[str] = Form(None)
@validator("tags", pre=True, always=True)
def transform_to_dict(cls, v):

Wyświetl plik

@ -12,6 +12,7 @@ from bugout.data import BugoutSearchResult, BugoutSearchResultAsEntity
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Path, Query, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdbv3.db import MoonstreamDBIndexesEngineInstance # type: ignore
from web3 import Web3
from .. import data
@ -24,6 +25,7 @@ from ..actions import (
get_list_of_support_interfaces,
get_moonworm_tasks,
validate_abi_json,
create_seer_subscription,
)
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
@ -51,11 +53,13 @@ async def add_subscription_handler(
request: Request,
background_tasks: BackgroundTasks,
web3: Web3 = Depends(yield_web3_provider),
db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session),
) -> data.SubscriptionResourceData:
"""
Add subscription to blockchain stream data for user.
"""
token = request.state.token
user = request.state.user
form = await request.form()
@ -71,6 +75,7 @@ async def add_subscription_handler(
description = form_data.description
tags = form_data.tags
subscription_type_id = form_data.subscription_type_id
customer_id = form_data.customer_id
if subscription_type_id != "ethereum_whalewatch":
try:
@ -129,10 +134,7 @@ async def add_subscription_handler(
content["abi_hash"] = hash
background_tasks.add_task(
apply_moonworm_tasks,
subscription_type_id,
json_abi,
address,
apply_moonworm_tasks, subscription_type_id, json_abi, address
)
if description:
@ -202,6 +204,16 @@ async def add_subscription_handler(
if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS
]
if entity_secondary_fields.get("abi"):
create_seer_subscription(
db_session=db_session,
user_id=user.id,
customer_id=customer_id,
subscription_id=entity.id,
abi=abi,
subscription_type_id=subscription_type_id,
)
return data.SubscriptionResourceData(
id=str(entity.id),
user_id=str(user.id),
@ -223,7 +235,9 @@ async def add_subscription_handler(
response_model=data.SubscriptionResourceData,
)
async def delete_subscription_handler(
request: Request, subscription_id: str = Path(...)
request: Request,
subscription_id: str = Path(...),
db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session),
):
"""
Delete subscriptions.
@ -285,6 +299,12 @@ async def delete_subscription_handler(
abi = deleted_entity.secondary_fields.get("abi")
description = deleted_entity.secondary_fields.get("description")
delete_seer_subscription(
db_session=db_session,
user_id=user.id,
subscription_id=subscription_id,
)
return data.SubscriptionResourceData(
id=str(deleted_entity.id),
user_id=str(user.id),
@ -400,6 +420,7 @@ async def update_subscriptions_handler(
request: Request,
background_tasks: BackgroundTasks,
subscription_id: str = Path(...),
db_session: Any = Depends(MoonstreamDBIndexesEngineInstance.yield_db_session),
) -> data.SubscriptionResourceData:
"""
Get user's subscriptions.
@ -548,6 +569,15 @@ async def update_subscriptions_handler(
json_abi,
address,
)
create_seer_subscription(
db_session=db_session,
user_id=user.id,
subscription_id=subscription_id,
abi=abi,
subscription_type_id=subscription_type_id,
)
subscription_required_fields = (
subscription.required_fields if subscription.required_fields is not None else {}
)

Wyświetl plik

@ -0,0 +1,37 @@
"""add subscription id
Revision ID: 6b4ab39794d8
Revises: e02c90ea67bb
Create Date: 2024-07-05 00:06:29.856486
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "6b4ab39794d8"
down_revision: Union[str, None] = "e02c90ea67bb"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("abi_jobs", sa.Column("subscription_id", sa.UUID(), nullable=True))
op.create_index(
op.f("ix_abi_jobs_subscription_id"),
"abi_jobs",
["subscription_id"],
unique=False,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("abi_jobs", "subscription_id")
# ### end Alembic commands ###

Wyświetl plik

@ -260,3 +260,6 @@ class MoonstreamDBIndexesEngineRO(DBEngine):
yield session
finally:
session.close()
MoonstreamDBIndexesEngineInstance = MoonstreamDBIndexesEngine()

Wyświetl plik

@ -574,6 +574,7 @@ class AbiJobs(Base):
address = Column(LargeBinary, nullable=False, index=True)
user_id = Column(UUID(as_uuid=True), nullable=False, index=True)
customer_id = Column(UUID(as_uuid=True), nullable=True, index=True)
subscription_id = Column(UUID(as_uuid=True), nullable=True, index=True)
abi_selector = Column(VARCHAR(256), nullable=False, index=True)
chain = Column(VARCHAR(256), nullable=False, index=True)
abi_name = Column(VARCHAR(256), nullable=False, index=True)