kopia lustrzana https://github.com/bugout-dev/moonstream
Fix delete subscritpiton.
rodzic
feb2971b25
commit
bba32bebe3
|
@ -607,19 +607,42 @@ def create_seer_subscription(
|
||||||
|
|
||||||
def delete_seer_subscription(
|
def delete_seer_subscription(
|
||||||
db_session: Session,
|
db_session: Session,
|
||||||
user_id: uuid.UUID,
|
|
||||||
subscription_id,
|
subscription_id,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Delete seer subscription from db
|
Delete seer subscription from db
|
||||||
|
If there are no more subscriptions for this address,abi_selector delete all abis
|
||||||
"""
|
"""
|
||||||
|
|
||||||
db_session.query(AbiJobs).filter(
|
## Delete subscription from db
|
||||||
AbiJobs.user_id == user_id,
|
|
||||||
AbiJobs.subscription_id == subscription_id,
|
|
||||||
).delete()
|
|
||||||
|
|
||||||
db_session.commit()
|
try:
|
||||||
|
db_session.query(AbiSubscriptions).filter(
|
||||||
|
AbiSubscriptions.subscription_id == subscription_id
|
||||||
|
).delete(synchronize_session=False)
|
||||||
|
db_session.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error delete subscription from db: {str(e)}")
|
||||||
|
db_session.rollback()
|
||||||
|
|
||||||
|
not_connected_abi_jobs = (
|
||||||
|
db_session.query(AbiJobs)
|
||||||
|
.join(AbiSubscriptions, AbiJobs.id == AbiSubscriptions.abi_job_id, isouter=True)
|
||||||
|
.filter(AbiSubscriptions.subscription_id == None)
|
||||||
|
.cte("not_connected_abi_jobs")
|
||||||
|
)
|
||||||
|
|
||||||
|
## Delete abi jobs from db
|
||||||
|
|
||||||
|
try:
|
||||||
|
db_session.query(AbiJobs).filter(
|
||||||
|
AbiJobs.id.in_(db_session.query(not_connected_abi_jobs.c.id))
|
||||||
|
).delete(synchronize_session=False)
|
||||||
|
|
||||||
|
db_session.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error delete abi jobs from db: {str(e)}")
|
||||||
|
db_session.rollback()
|
||||||
|
|
||||||
|
|
||||||
def add_abi_to_db(
|
def add_abi_to_db(
|
||||||
|
@ -718,13 +741,27 @@ def add_abi_to_db(
|
||||||
try:
|
try:
|
||||||
# Insert ABI jobs
|
# Insert ABI jobs
|
||||||
if abis_to_insert:
|
if abis_to_insert:
|
||||||
db_session.bulk_save_objects(abis_to_insert)
|
|
||||||
db_session.flush() # Ensure the ABI job IDs are generated
|
insert_stmt = (
|
||||||
|
insert(AbiJobs)
|
||||||
|
.values([abi_job.__dict__ for abi_job in abis_to_insert])
|
||||||
|
.on_conflict_do_nothing(
|
||||||
|
index_elements=["address", "chain", "abi_selector"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
db_session.execute(insert_stmt)
|
||||||
|
|
||||||
# Insert corresponding subscriptions
|
# Insert corresponding subscriptions
|
||||||
if subscriptions_to_insert:
|
if subscriptions_to_insert:
|
||||||
|
|
||||||
db_session.bulk_insert_mappings(AbiSubscriptions, subscriptions_to_insert)
|
insert_stmt = (
|
||||||
|
insert(AbiSubscriptions)
|
||||||
|
.values(subscriptions_to_insert)
|
||||||
|
.on_conflict_do_nothing(
|
||||||
|
index_elements=["abi_job_id", "subscription_id"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
db_session.execute(insert_stmt)
|
||||||
|
|
||||||
db_session.commit()
|
db_session.commit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -302,7 +302,6 @@ async def delete_subscription_handler(
|
||||||
|
|
||||||
delete_seer_subscription(
|
delete_seer_subscription(
|
||||||
db_session=db_session,
|
db_session=db_session,
|
||||||
user_id=user.id,
|
|
||||||
subscription_id=subscription_id,
|
subscription_id=subscription_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Ładowanie…
Reference in New Issue