Merge pull request #1108 from moonstream-to/add-v3-subscriptions

Add subcription_id to abi_jobs.
multiple-addresses moonstreamdbv3/v0.0.13
Andrey Dolgolev 2024-07-16 00:19:52 +03:00 zatwierdzone przez GitHub
commit dcc5eeb5e2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
10 zmienionych plików z 386 dodań i 11 usunięć

Wyświetl plik

@ -26,13 +26,17 @@ from eth_utils.address import is_address # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.models import EthereumLabel
from moonstreamdb.subscriptions import blockchain_by_subscription_id
from moonstreamdbv3.models_indexes import AbiJobs, AbiSubscriptions
from slugify import slugify # type: ignore
from sqlalchemy import text
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from web3 import Web3
from web3._utils.validation import validate_abi
from . import data
from .admin.subscription_types import CANONICAL_SUBSCRIPTION_TYPES
from .middleware import MoonstreamHTTPException
from .reporter import reporter
from .selectors_storage import selectors
@ -559,6 +563,7 @@ def apply_moonworm_tasks(
],
}
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
reporter.error_report(e)
@ -576,6 +581,193 @@ def apply_moonworm_tasks(
reporter.error_report(e)
def create_seer_subscription(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
subscription_type: str,
abi: Any,
subscription_id: str,
) -> None:
chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type].blockchain
add_abi_to_db(
db_session=db_session,
user_id=user_id,
customer_id=customer_id,
address=address,
abis=abi,
chain=chain,
subscription_id=subscription_id,
)
def delete_seer_subscription(
db_session: Session,
subscription_id,
) -> None:
"""
Delete seer subscription from db
If there are no more subscriptions for this address,abi_selector delete all abis
"""
## Delete subscription from db
try:
db_session.query(AbiSubscriptions).filter(
AbiSubscriptions.subscription_id == subscription_id
).delete(synchronize_session=False)
db_session.commit()
except Exception as e:
logger.error(f"Error delete subscription from db: {str(e)}")
db_session.rollback()
not_connected_abi_jobs = (
db_session.query(AbiJobs)
.join(AbiSubscriptions, AbiJobs.id == AbiSubscriptions.abi_job_id, isouter=True)
.filter(AbiSubscriptions.subscription_id == None)
.cte("not_connected_abi_jobs")
)
## Delete abi jobs from db
try:
db_session.query(AbiJobs).filter(
AbiJobs.id.in_(db_session.query(not_connected_abi_jobs.c.id))
).delete(synchronize_session=False)
db_session.commit()
except Exception as e:
logger.error(f"Error delete abi jobs from db: {str(e)}")
db_session.rollback()
def add_abi_to_db(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
abis: List[Dict[str, Any]],
chain: str,
subscription_id: Optional[str] = None,
) -> None:
abis_to_insert = []
subscriptions_to_insert = []
try:
existing_abi_job = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == chain)
.filter(AbiJobs.address == bytes.fromhex(address[2:]))
.filter(AbiJobs.customer_id == customer_id)
).all()
except Exception as e:
logger.error(f"Error get abi from db: {str(e)}")
db_session.rollback()
raise MoonstreamHTTPException(status_code=500, internal_error=e)
job_by_abi_selector = {abi.abi_selector: abi for abi in existing_abi_job}
for abi in abis:
if abi["type"] not in ("event", "function"):
continue
abi_selector = Web3.keccak(
text=abi["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi["inputs"]))
+ ")"
)
if abi["type"] == "function":
abi_selector = abi_selector[:4]
abi_selector = abi_selector.hex()
try:
if abi_selector in job_by_abi_selector:
# ABI job already exists, create subscription link
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": job_by_abi_selector[abi_selector].id,
"subscription_id": subscription_id,
}
)
else:
# ABI job does not exist, create new ABI job
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi),
}
try:
abi_job_instance = AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error validating abi for address {address}:{abi} {str(e)}"
)
continue
abis_to_insert.append(abi_job_instance)
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": abi_job_instance.id,
"subscription_id": subscription_id,
}
)
except Exception as e:
logger.error(f"Error creating abi for address {address}:{abi} {str(e)}")
continue
try:
# Insert ABI jobs
if abis_to_insert:
insert_stmt = (
insert(AbiJobs)
.values([abi_job.__dict__ for abi_job in abis_to_insert])
.on_conflict_do_nothing(
index_elements=["address", "chain", "abi_selector"]
)
)
db_session.execute(insert_stmt)
# Insert corresponding subscriptions
if subscriptions_to_insert:
insert_stmt = (
insert(AbiSubscriptions)
.values(subscriptions_to_insert)
.on_conflict_do_nothing(
index_elements=["abi_job_id", "subscription_id"]
)
)
db_session.execute(insert_stmt)
db_session.commit()
except Exception as e:
logger.error(f"Error inserting abi to db: {str(e)}")
db_session.rollback()
def name_normalization(query_name: str) -> str:
"""
Sanitize provided query name.
@ -992,3 +1184,29 @@ def create_resource_for_user(
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return resource
def chekc_user_resource_access(
customer_id: uuid.UUID,
user_token: uuid.UUID,
) -> bool:
"""
Check if user has access to customer_id
"""
try:
response = bc.get_resource(
token=user_token,
resource_id=str(customer_id),
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as e:
if e.status_code == 404:
return False
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error get customer: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return str(response.id) == customer_id

Wyświetl plik

@ -245,6 +245,7 @@ class UpdateSubscriptionRequest(BaseModel):
abi: Optional[str] = Form(None)
description: Optional[str] = Form(None)
tags: Optional[List[Dict[str, str]]] = Form(None)
customer_id: Optional[str] = Form(None)
@validator("tags", pre=True, always=True)
def transform_to_dict(cls, v):
@ -263,6 +264,7 @@ class CreateSubscriptionRequest(BaseModel):
abi: Optional[str] = Form(None)
description: Optional[str] = Form(None)
tags: Optional[List[Dict[str, str]]] = Form(None)
customer_id: Optional[str] = Form(None)
@validator("tags", pre=True, always=True)
def transform_to_dict(cls, v):

Wyświetl plik

@ -20,10 +20,13 @@ from ..actions import (
EntityJournalNotFoundException,
apply_moonworm_tasks,
check_if_smart_contract,
chekc_user_resource_access,
get_entity_subscription_journal_id,
get_list_of_support_interfaces,
get_moonworm_tasks,
validate_abi_json,
create_seer_subscription,
delete_seer_subscription,
)
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
@ -32,6 +35,7 @@ from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_ENTITIES_RESERVED_TAGS,
THREAD_TIMEOUT_SECONDS,
MOONSTREAM_DB_V3_INDEX_INSTANCE,
)
from ..settings import bugout_client as bc
from ..web3_provider import yield_web3_provider
@ -51,11 +55,13 @@ async def add_subscription_handler(
request: Request,
background_tasks: BackgroundTasks,
web3: Web3 = Depends(yield_web3_provider),
db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session),
) -> data.SubscriptionResourceData:
"""
Add subscription to blockchain stream data for user.
"""
token = request.state.token
user = request.state.user
form = await request.form()
@ -71,6 +77,7 @@ async def add_subscription_handler(
description = form_data.description
tags = form_data.tags
subscription_type_id = form_data.subscription_type_id
customer_id = form_data.customer_id
if subscription_type_id != "ethereum_whalewatch":
try:
@ -94,6 +101,19 @@ async def add_subscription_handler(
detail="Currently ethereum_whalewatch not supported",
)
if customer_id is not None:
results = chekc_user_resource_access(
customer_id=customer_id,
user_token=token,
)
if not results:
raise MoonstreamHTTPException(
status_code=403,
detail="User has no access to this customer",
)
active_subscription_types_response = subscription_types.list_subscription_types(
active_only=True
)
@ -129,10 +149,7 @@ async def add_subscription_handler(
content["abi_hash"] = hash
background_tasks.add_task(
apply_moonworm_tasks,
subscription_type_id,
json_abi,
address,
apply_moonworm_tasks, subscription_type_id, json_abi, address
)
if description:
@ -202,6 +219,16 @@ async def add_subscription_handler(
if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS
]
if entity_secondary_fields.get("abi") and customer_id is not None:
create_seer_subscription(
db_session=db_session,
user_id=user.id,
customer_id=customer_id,
subscription_id=entity.id,
abi=abi,
subscription_type_id=subscription_type_id,
)
return data.SubscriptionResourceData(
id=str(entity.id),
user_id=str(user.id),
@ -223,7 +250,9 @@ async def add_subscription_handler(
response_model=data.SubscriptionResourceData,
)
async def delete_subscription_handler(
request: Request, subscription_id: str = Path(...)
request: Request,
subscription_id: str = Path(...),
db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session),
):
"""
Delete subscriptions.
@ -285,6 +314,11 @@ async def delete_subscription_handler(
abi = deleted_entity.secondary_fields.get("abi")
description = deleted_entity.secondary_fields.get("description")
delete_seer_subscription(
db_session=db_session,
subscription_id=subscription_id,
)
return data.SubscriptionResourceData(
id=str(deleted_entity.id),
user_id=str(user.id),
@ -400,12 +434,12 @@ async def update_subscriptions_handler(
request: Request,
background_tasks: BackgroundTasks,
subscription_id: str = Path(...),
db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session),
) -> data.SubscriptionResourceData:
"""
Get user's subscriptions.
"""
token = request.state.token
user = request.state.user
form = await request.form()
@ -419,6 +453,20 @@ async def update_subscriptions_handler(
abi = form_data.abi
description = form_data.description
tags = form_data.tags
customer_id = form_data.customer_id
if customer_id is not None:
results = chekc_user_resource_access(
customer_id=customer_id,
user_token=token,
)
if not results:
raise MoonstreamHTTPException(
status_code=403,
detail="User has no access to this customer",
)
try:
journal_id = get_entity_subscription_journal_id(
@ -541,13 +589,24 @@ async def update_subscriptions_handler(
logger.error(f"Error update user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if abi:
if abi is not None and customer_id is not None:
background_tasks.add_task(
apply_moonworm_tasks,
subscription_type_id,
json_abi,
address,
)
create_seer_subscription(
db_session=db_session,
user_id=user.id,
customer_id=customer_id,
address=address,
subscription_type=subscription_type_id,
abi=json_abi,
subscription_id=subscription_id,
)
subscription_required_fields = (
subscription.required_fields if subscription.required_fields is not None else {}
)

Wyświetl plik

@ -3,6 +3,7 @@ from typing import Dict, Optional
from bugout.app import Bugout
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdbv3.db import MoonstreamDBIndexesEngine
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
@ -408,3 +409,8 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
MOONSTREAM_USAGE_REPORTS_JOURNAL_ID = os.environ.get(
"MOONSTREAM_USAGE_REPORTS_JOURNAL_ID"
)
### Moonstreamdb v3 instance
MOONSTREAM_DB_V3_INDEX_INSTANCE = MoonstreamDBIndexesEngine()

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.4.3"
MOONSTREAMAPI_VERSION = "0.4.4"

Wyświetl plik

@ -38,7 +38,7 @@ Mako==1.2.3
MarkupSafe==2.1.1
moonstream==0.1.1
moonstreamdb==0.4.5
moonstreamdb-v3==0.0.9
moonstreamdb-v3==0.0.13
multiaddr==0.0.9
multidict==6.0.2
netaddr==0.8.0

Wyświetl plik

@ -17,7 +17,7 @@ setup(
"fastapi",
"moonstream",
"moonstreamdb>=0.4.5",
"moonstreamdb-v3>=0.0.9",
"moonstreamdb-v3>=0.0.13",
"humbug",
"pydantic==1.10.2",
"pyevmasm",

Wyświetl plik

@ -65,6 +65,7 @@ from moonstreamdbv3.models_indexes import (
MantleSepoliaLogIndex,
MantleSepoliaReorgs,
AbiJobs,
AbiSubscriptions,
)
@ -107,6 +108,7 @@ def include_symbol(tablename, schema):
MantleSepoliaLogIndex.__tablename__,
MantleSepoliaReorgs.__tablename__,
AbiJobs.__tablename__,
AbiSubscriptions.__tablename__,
}

Wyświetl plik

@ -0,0 +1,67 @@
"""Add v3 subscription
Revision ID: f2c6aa92e5d2
Revises: 27086791044c
Create Date: 2024-07-11 19:41:49.899157
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "f2c6aa92e5d2"
down_revision: Union[str, None] = "27086791044c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"abi_subscriptions",
sa.Column("abi_job_id", sa.UUID(), nullable=False),
sa.Column("subscription_id", sa.UUID(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("TIMEZONE('utc', statement_timestamp())"),
nullable=False,
),
sa.ForeignKeyConstraint(
["abi_job_id"],
["abi_jobs.id"],
name=op.f("fk_abi_subscriptions_abi_job_id_abi_jobs"),
),
sa.PrimaryKeyConstraint(
"abi_job_id", "subscription_id", name=op.f("pk_abi_subscriptions")
),
)
op.create_index(
op.f("ix_abi_subscriptions_abi_job_id"),
"abi_subscriptions",
["abi_job_id"],
unique=False,
)
op.create_index(
op.f("ix_abi_subscriptions_subscription_id"),
"abi_subscriptions",
["subscription_id"],
unique=False,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
op.f("ix_abi_subscriptions_subscription_id"), table_name="abi_subscriptions"
)
op.drop_index(
op.f("ix_abi_subscriptions_abi_job_id"), table_name="abi_subscriptions"
)
op.drop_table("abi_subscriptions")
# ### end Alembic commands ###

Wyświetl plik

@ -567,7 +567,11 @@ class AbiJobs(Base):
__table_args__ = (
UniqueConstraint(
"chain", "address", "abi_selector", "customer_id", name="uq_abi_jobs"
"chain",
"address",
"abi_selector",
"customer_id",
name="uq_abi_jobs",
),
)
@ -589,3 +593,20 @@ class AbiJobs(Base):
updated_at = Column(
DateTime(timezone=True), server_default=utcnow(), nullable=False
)
class AbiSubscriptions(Base):
__tablename__ = "abi_subscriptions"
__table_args__ = (PrimaryKeyConstraint("abi_job_id", "subscription_id"),)
abi_job_id = Column(
UUID(as_uuid=True),
ForeignKey("abi_jobs.id"),
nullable=False,
index=True,
)
subscription_id = Column(UUID(as_uuid=True), nullable=False, index=True)
created_at = Column(
DateTime(timezone=True), server_default=utcnow(), nullable=False
)