kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #892 from moonstream-to/db-request-id-constr
Request ID unique constrpull/896/head
commit
fbee1a068c
|
@ -0,0 +1,75 @@
|
|||
"""Request ID decimal column
|
||||
|
||||
Revision ID: 040f2dfde5a5
|
||||
Revises: b4257b10daaf
|
||||
Create Date: 2023-08-10 08:58:22.052336
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '040f2dfde5a5'
|
||||
down_revision = 'b4257b10daaf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('call_requests', sa.Column('request_id', sa.DECIMAL(), nullable=True))
|
||||
op.create_index(op.f('ix_call_requests_request_id'), 'call_requests', ['request_id'], unique=False)
|
||||
op.create_unique_constraint(op.f('uq_call_requests_registered_contract_id'), 'call_requests', ['registered_contract_id', 'request_id'])
|
||||
|
||||
# Manual
|
||||
# Fetch IDs of duplicates for 'dropper-v0.2.0' call_request_type and delete it
|
||||
op.execute("""WITH Duplicates AS (
|
||||
SELECT
|
||||
id,
|
||||
registered_contract_id,
|
||||
call_request_type_name,
|
||||
parameters->'requestID' AS request_id,
|
||||
created_at,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY
|
||||
registered_contract_id,
|
||||
call_request_type_name,
|
||||
parameters->'requestID'
|
||||
ORDER BY created_at ASC
|
||||
) AS row_num
|
||||
FROM call_requests
|
||||
WHERE call_request_type_name = 'dropper-v0.2.0'
|
||||
),
|
||||
DeleteDuplicates AS (
|
||||
SELECT id
|
||||
FROM
|
||||
Duplicates
|
||||
WHERE
|
||||
row_num < (
|
||||
SELECT COUNT(*) FROM Duplicates d2
|
||||
WHERE d2.registered_contract_id = Duplicates.registered_contract_id
|
||||
AND d2.call_request_type_name = Duplicates.call_request_type_name
|
||||
AND d2.request_id = Duplicates.request_id
|
||||
)
|
||||
)
|
||||
DELETE FROM call_requests WHERE id IN (SELECT id FROM DeleteDuplicates);""")
|
||||
|
||||
# Fulfill not empty requestID values
|
||||
op.execute("UPDATE call_requests SET request_id = CAST(parameters->>'requestID' AS DECIMAL) WHERE parameters->>'requestID' IS NOT NULL;")
|
||||
# Fulfill raw types with random requestID
|
||||
op.execute("UPDATE call_requests SET request_id = FLOOR(RANDOM()* 120500600 + 120400600) WHERE parameters->>'requestID' IS NULL;")
|
||||
op.alter_column("call_requests", "request_id", nullable=False)
|
||||
|
||||
# Other
|
||||
op.create_unique_constraint(op.f('uq_blockchains_id'), 'blockchains', ['id'])
|
||||
op.create_unique_constraint(op.f('uq_call_request_types_name'), 'call_request_types', ['name'])
|
||||
op.create_unique_constraint(op.f('uq_metatx_requesters_id'), 'metatx_requesters', ['id'])
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_call_requests_request_id'), table_name='call_requests')
|
||||
op.drop_column('call_requests', 'request_id')
|
||||
# ### end Alembic commands ###
|
|
@ -65,6 +65,12 @@ class ContractAlreadyRegistered(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class CallRequestAlreadyRegistered(Exception):
|
||||
"""
|
||||
Raised when call request with same parameters registered.
|
||||
"""
|
||||
|
||||
|
||||
def parse_registered_contract_response(
|
||||
obj: Tuple[RegisteredContract, Blockchain]
|
||||
) -> data.RegisteredContractResponse:
|
||||
|
@ -92,6 +98,7 @@ def parse_call_request_response(
|
|||
call_request_type=obj[0].call_request_type_name,
|
||||
caller=obj[0].caller,
|
||||
method=obj[0].method,
|
||||
request_id=str(obj[0].request_id),
|
||||
parameters=obj[0].parameters,
|
||||
expires_at=obj[0].expires_at,
|
||||
created_at=obj[0].created_at,
|
||||
|
@ -112,7 +119,6 @@ def validate_method_and_params(
|
|||
)
|
||||
required_params = {
|
||||
"dropId",
|
||||
"requestID",
|
||||
"blockDeadline",
|
||||
"amount",
|
||||
"signer",
|
||||
|
@ -397,6 +403,7 @@ def request_calls(
|
|||
metatx_requester_id=metatx_requester_id,
|
||||
caller=normalized_caller,
|
||||
method=specification.method,
|
||||
request_id=specification.request_id,
|
||||
parameters=specification.parameters,
|
||||
expires_at=expires_at,
|
||||
)
|
||||
|
@ -405,6 +412,9 @@ def request_calls(
|
|||
# Insert the new rows into the database in a single transaction
|
||||
try:
|
||||
db_session.commit()
|
||||
except IntegrityError as err:
|
||||
db_session.rollback()
|
||||
raise CallRequestAlreadyRegistered()
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
raise e
|
||||
|
|
|
@ -271,6 +271,7 @@ class CallSpecification(BaseModel):
|
|||
caller: str
|
||||
method: str
|
||||
call_request_type: str = "dropper-v0.2.0"
|
||||
request_id: str
|
||||
parameters: Dict[str, Any]
|
||||
|
||||
@validator("caller")
|
||||
|
@ -302,6 +303,7 @@ class CallRequestResponse(BaseModel):
|
|||
call_request_type: Optional[str] = None
|
||||
caller: str
|
||||
method: str
|
||||
request_id: str
|
||||
parameters: Dict[str, Any]
|
||||
expires_at: Optional[datetime] = None
|
||||
created_at: datetime
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import uuid
|
||||
|
||||
from sqlalchemy import (
|
||||
DECIMAL,
|
||||
VARCHAR,
|
||||
BigInteger,
|
||||
Boolean,
|
||||
|
@ -8,15 +9,15 @@ from sqlalchemy import (
|
|||
DateTime,
|
||||
ForeignKey,
|
||||
Index,
|
||||
Integer,
|
||||
MetaData,
|
||||
String,
|
||||
UniqueConstraint,
|
||||
Integer,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB, UUID, ARRAY
|
||||
from sqlalchemy.dialects.postgresql import JSONB, UUID
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import and_, expression
|
||||
|
||||
"""
|
||||
|
@ -278,6 +279,12 @@ class RegisteredContract(Base): # type: ignore
|
|||
|
||||
class CallRequest(Base):
|
||||
__tablename__ = "call_requests"
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"registered_contract_id",
|
||||
"request_id",
|
||||
),
|
||||
)
|
||||
|
||||
id = Column(
|
||||
UUID(as_uuid=True),
|
||||
|
@ -304,7 +311,7 @@ class CallRequest(Base):
|
|||
|
||||
caller = Column(VARCHAR(256), nullable=False, index=True)
|
||||
method = Column(String, nullable=False, index=True)
|
||||
# TODO(zomglings): Should we conditional indices on parameters depending on the contract type?
|
||||
request_id = Column(DECIMAL, nullable=False, index=True)
|
||||
parameters = Column(JSONB, nullable=False)
|
||||
|
||||
expires_at = Column(DateTime(timezone=True), nullable=True, index=True)
|
||||
|
|
|
@ -382,6 +382,11 @@ async def create_requests(
|
|||
status_code=400,
|
||||
detail=f"Unacceptable call request required params specified, err: {err}",
|
||||
)
|
||||
except contracts_actions.CallRequestAlreadyRegistered:
|
||||
raise EngineHTTPException(
|
||||
status_code=409,
|
||||
detail="Call request with same request_id already registered",
|
||||
)
|
||||
except Exception as err:
|
||||
logger.error(repr(err))
|
||||
raise EngineHTTPException(status_code=500)
|
||||
|
|
Ładowanie…
Reference in New Issue