Merge pull request #1118 from moonstream-to/fix-add-subscription

Add abi id resolution.
pull/1121/head
Andrey Dolgolev 2024-07-29 17:35:40 +03:00 zatwierdzone przez GitHub
commit 1c0e656b1f
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
4 zmienionych plików z 103 dodań i 68 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
import hashlib
from hexbytes import HexBytes
import json
import logging
import uuid
@ -654,13 +655,13 @@ def add_abi_to_db(
subscription_id: Optional[str] = None,
) -> None:
abis_to_insert = []
subscriptions_to_insert = []
subscriptions_to_insert = {}
try:
existing_abi_job = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == chain)
.filter(AbiJobs.address == bytes.fromhex(address[2:]))
.filter(AbiJobs.address == HexBytes(address))
.filter(AbiJobs.customer_id == customer_id)
).all()
except Exception as e:
@ -688,22 +689,16 @@ def add_abi_to_db(
abi_selector = abi_selector.hex()
try:
if abi_selector in job_by_abi_selector:
# ABI job already exists, create subscription link
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": job_by_abi_selector[abi_selector].id,
"subscription_id": subscription_id,
}
)
subscriptions_to_insert[
str(job_by_abi_selector[abi_selector].id)
] = subscription_id
else:
# ABI job does not exist, create new ABI job
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"address": HexBytes(address),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
@ -717,21 +712,14 @@ def add_abi_to_db(
}
try:
abi_job_instance = AbiJobs(**abi_job)
AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error validating abi for address {address}:{abi} {str(e)}"
)
continue
raise MoonstreamHTTPException(status_code=409, detail=str(e))
abis_to_insert.append(abi_job_instance)
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": abi_job_instance.id,
"subscription_id": subscription_id,
}
)
abis_to_insert.append(abi_job)
except Exception as e:
logger.error(f"Error creating abi for address {address}:{abi} {str(e)}")
@ -743,19 +731,26 @@ def add_abi_to_db(
insert_stmt = (
insert(AbiJobs)
.values([abi_job.__dict__ for abi_job in abis_to_insert])
.on_conflict_do_nothing(
index_elements=["address", "chain", "abi_selector"]
)
.values([abi for abi in abis_to_insert])
.on_conflict_do_nothing()
.returning(AbiJobs.id)
)
db_session.execute(insert_stmt)
abi_job_ids = db_session.execute(insert_stmt).fetchall()
for id in abi_job_ids:
subscriptions_to_insert[id[0]] = subscription_id
# Insert corresponding subscriptions
if subscriptions_to_insert:
insert_stmt = (
insert(AbiSubscriptions)
.values(subscriptions_to_insert)
.values(
[
{"abi_job_id": k, "subscription_id": v}
for k, v in subscriptions_to_insert.items()
]
)
.on_conflict_do_nothing(
index_elements=["abi_job_id", "subscription_id"]
)

Wyświetl plik

@ -1,4 +1,5 @@
import json
from hexbytes import HexBytes
import logging
import os
from typing import Any, Dict, List, Optional, Union
@ -8,7 +9,7 @@ import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException
from moonstreamdbv3.db import MoonstreamDBIndexesEngine
from moonstreamdbv3.models_indexes import AbiJobs
from moonstreamdbv3.models_indexes import AbiJobs, AbiSubscriptions
from sqlalchemy.dialects.postgresql import insert
from web3 import Web3
@ -229,14 +230,18 @@ def migrate_v3_tasks(
with db_engine.yield_db_session_ctx() as session:
user_subscriptions = []
for index, subscription in enumerate(subscriptions):
user_subscription_abis = []
subscriptions_to_insert = {}
abis = None
address = None
subscription_type_id = None
subscription_id = subscription.entry_url.split("/")[-1]
if subscription.content is None:
continue
@ -267,6 +272,15 @@ def migrate_v3_tasks(
chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain
existing_abi_jobs = (
session.query(AbiJobs)
.filter(AbiJobs.customer_id == customer_id)
.filter(AbiJobs.user_id == user_id)
.filter(AbiJobs.chain == blockchain)
).all()
job_by_abi_selector = {abi.abi_selector: abi for abi in existing_abi_jobs}
for abi_task in abis:
if abi_task["type"] not in ("event", "function"):
@ -284,55 +298,80 @@ def migrate_v3_tasks(
abi_selector = abi_selector.hex()
try:
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi_task["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi_task),
}
if abi_selector in job_by_abi_selector:
# ABI job already exists, create subscription link
if subscription_id:
subscriptions_to_insert[
str(job_by_abi_selector[abi_selector].id)
] = subscription_id
else:
try:
AbiJobs(**abi_job)
abi_job = {
"address": (HexBytes(address)),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi_task["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi_task),
}
try:
AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
continue
user_subscription_abis.append(abi_job)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
session.rollback()
continue
if len(user_subscription_abis) > 0:
insert_statement = insert(AbiJobs).values(user_subscription_abis)
user_subscriptions.append(abi_job)
result_stmt = insert_statement.on_conflict_do_nothing().returning(
AbiJobs.id
)
try:
ids = session.execute(result_stmt)
except Exception as e:
logger.error(
f"Error creating subscription for subscription {subscription.id}: {str(e)}"
)
logger.error(f"Error inserting subscriptions: {str(e)}")
session.rollback()
continue
insert_statement = insert(AbiJobs).values(user_subscriptions)
for id in ids:
subscriptions_to_insert[id[0]] = subscription_id
result_stmt = insert_statement.on_conflict_do_nothing(
index_elements=[
AbiJobs.chain,
AbiJobs.address,
AbiJobs.abi_selector,
AbiJobs.customer_id,
]
)
if len(subscriptions_to_insert) > 0:
insert_statement = insert(AbiSubscriptions).values(
[
{"abi_job_id": k, "subscription_id": v}
for k, v in subscriptions_to_insert.items()
]
)
result_stmt = insert_statement.on_conflict_do_nothing()
try:
session.execute(result_stmt)
except Exception as e:
logger.error(f"Error inserting subscriptions: {str(e)}")
session.rollback()
try:
session.execute(result_stmt)
session.commit()
except Exception as e:
logger.error(f"Error inserting subscriptions: {str(e)}")

Wyświetl plik

@ -224,9 +224,10 @@ async def add_subscription_handler(
db_session=db_session,
user_id=user.id,
customer_id=customer_id,
address=address,
subscription_id=entity.id,
abi=abi,
subscription_type_id=subscription_type_id,
abi=json_abi,
subscription_type=subscription_type_id,
)
return data.SubscriptionResourceData(

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.4.5"
MOONSTREAMAPI_VERSION = "0.4.6"