moonstream/engineapi/engineapi/actions.py

2141 wiersze
59 KiB
Python

2023-06-06 12:11:49 +00:00
from datetime import datetime
from collections import Counter
import json
from typing import List, Any, Optional, Dict, Union, Tuple, cast
2023-06-06 12:11:49 +00:00
import uuid
import logging
2024-01-25 04:25:18 +00:00
from bugout.data import (
BugoutResource,
BugoutSearchResult,
ResourcePermissions,
HolderType,
BugoutResourceHolder,
)
2023-06-06 12:11:49 +00:00
from eth_typing import Address
from hexbytes import HexBytes
import requests # type: ignore
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy import func, text, or_, and_, Subquery
2023-08-12 06:14:33 +00:00
from sqlalchemy.engine import Row
2023-06-06 12:11:49 +00:00
from web3 import Web3
from web3.types import ChecksumAddress
2023-11-15 16:00:18 +00:00
from .data import (
Score,
LeaderboardScore,
LeaderboardConfigUpdate,
LeaderboardConfig,
LeaderboardPosition,
2024-01-27 07:28:45 +00:00
ColumnsNames,
2023-11-15 16:00:18 +00:00
)
2023-06-06 12:11:49 +00:00
from .contracts import Dropper_interface, ERC20_interface, Terminus_interface
from .models import (
DropperClaimant,
DropperContract,
DropperClaim,
Leaderboard,
LeaderboardScores,
LeaderboardVersion,
2023-06-06 12:11:49 +00:00
)
from . import signatures
from .settings import (
bugout_client as bc,
2023-06-06 12:11:49 +00:00
BLOCKCHAIN_WEB3_PROVIDERS,
LEADERBOARD_RESOURCE_TYPE,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID,
2023-06-06 12:11:49 +00:00
)
class LeaderboardsResourcesNotFound(Exception):
pass
2023-06-06 12:11:49 +00:00
class AuthorizationError(Exception):
pass
class DropWithNotSettedBlockDeadline(Exception):
pass
class DublicateClaimantError(Exception):
pass
class DuplicateLeaderboardAddressError(Exception):
def __init__(self, message, duplicates):
super(DuplicateLeaderboardAddressError, self).__init__(message)
self.message = message
self.duplicates = duplicates
class LeaderboardIsEmpty(Exception):
pass
class LeaderboardDeleteScoresError(Exception):
pass
2023-08-06 13:55:56 +00:00
class LeaderboardCreateError(Exception):
pass
class LeaderboardUpdateError(Exception):
pass
class LeaderboardDeleteError(Exception):
pass
class LeaderboardConfigNotFound(Exception):
pass
class LeaderboardConfigAlreadyActive(Exception):
pass
class LeaderboardConfigAlreadyInactive(Exception):
pass
class LeaderboardVersionNotFound(Exception):
pass
2024-01-25 04:25:18 +00:00
class LeaderboardAssignResourceError(Exception):
pass
2023-06-06 12:11:49 +00:00
BATCH_SIGNATURE_PAGE_SIZE = 500
logger = logging.getLogger(__name__)
def create_dropper_contract(
db_session: Session,
blockchain: Optional[str],
dropper_contract_address,
title,
description,
image_uri,
):
"""
Create a new dropper contract.
"""
dropper_contract = DropperContract(
blockchain=blockchain,
address=Web3.toChecksumAddress(dropper_contract_address),
title=title,
description=description,
image_uri=image_uri,
)
db_session.add(dropper_contract)
db_session.commit()
return dropper_contract
def delete_dropper_contract(
db_session: Session, blockchain: Optional[str], dropper_contract_address
):
dropper_contract = (
db_session.query(DropperContract)
.filter(
DropperContract.address == Web3.toChecksumAddress(dropper_contract_address)
)
.filter(DropperContract.blockchain == blockchain)
.one()
)
db_session.delete(dropper_contract)
db_session.commit()
return dropper_contract
def list_dropper_contracts(
db_session: Session, blockchain: Optional[str]
) -> List[Dict[str, Any]]:
"""
List all dropper contracts
"""
dropper_contracts = []
dropper_contracts = db_session.query(DropperContract)
if blockchain:
dropper_contracts = dropper_contracts.filter(
DropperContract.blockchain == blockchain
)
return dropper_contracts
def get_dropper_contract_by_id(
db_session: Session, dropper_contract_id: uuid.UUID
) -> DropperContract:
"""
Get a dropper contract by its ID
"""
query = db_session.query(DropperContract).filter(
DropperContract.id == dropper_contract_id
)
return query.one()
def list_drops_terminus(db_session: Session, blockchain: Optional[str] = None):
"""
List distinct of terminus addressess
"""
terminus = (
db_session.query(
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
DropperContract.blockchain,
)
.join(DropperContract)
.filter(DropperClaim.terminus_address.isnot(None))
.filter(DropperClaim.terminus_pool_id.isnot(None))
)
if blockchain:
terminus = terminus.filter(DropperContract.blockchain == blockchain)
terminus = terminus.distinct(
DropperClaim.terminus_address, DropperClaim.terminus_pool_id
)
return terminus
def list_drops_blockchains(db_session: Session):
"""
List distinct of blockchains
"""
blockchains = (
db_session.query(DropperContract.blockchain)
.filter(DropperContract.blockchain.isnot(None))
.distinct(DropperContract.blockchain)
)
return blockchains
def list_claims(db_session: Session, dropper_contract_id, active=True):
"""
List all claims
"""
claims = (
db_session.query(
DropperClaim.id,
DropperClaim.title,
DropperClaim.description,
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
DropperClaim.claim_block_deadline,
)
.filter(DropperClaim.dropper_contract_id == dropper_contract_id)
.filter(DropperClaim.active == active)
.all()
)
return claims
def delete_claim(db_session: Session, dropper_claim_id):
"""
Delete a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session.query(DropperClaim).filter(DropperClaim.id == dropper_claim_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
db_session.delete(claim)
db_session.commit()
return claim
def create_claim(
db_session: Session,
dropper_contract_id: uuid.UUID,
claim_id: Optional[int] = None,
title: Optional[str] = None,
description: Optional[str] = None,
terminus_address: Optional[ChecksumAddress] = None,
terminus_pool_id: Optional[int] = None,
claim_block_deadline: Optional[int] = None,
):
"""
Create a new dropper claim.
"""
# get the dropper contract
dropper_contract = (
db_session.query(DropperContract)
.filter(DropperContract.id == dropper_contract_id)
.one()
)
dropper_claim = DropperClaim(
dropper_contract_id=dropper_contract.id,
claim_id=claim_id,
title=title,
description=description,
terminus_address=terminus_address,
terminus_pool_id=terminus_pool_id,
claim_block_deadline=claim_block_deadline,
)
db_session.add(dropper_claim)
db_session.commit()
db_session.refresh(dropper_claim) # refresh the object to get the id
return dropper_claim
def activate_drop(db_session: Session, dropper_claim_id: uuid.UUID):
"""
Activate a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session.query(DropperClaim).filter(DropperClaim.id == dropper_claim_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
claim.active = True
db_session.commit()
return claim
def deactivate_drop(db_session: Session, dropper_claim_id: uuid.UUID):
"""
Activate a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session.query(DropperClaim).filter(DropperClaim.id == dropper_claim_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
claim.active = False
db_session.commit()
return claim
def update_drop(
db_session: Session,
dropper_claim_id: uuid.UUID,
title: Optional[str] = None,
description: Optional[str] = None,
terminus_address: Optional[str] = None,
terminus_pool_id: Optional[int] = None,
claim_block_deadline: Optional[int] = None,
claim_id: Optional[int] = None,
address: Optional[str] = None,
):
"""
Update a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session.query(DropperClaim).filter(DropperClaim.id == dropper_claim_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
if title:
claim.title = title
if description:
claim.description = description
if terminus_address or terminus_pool_id:
ensure_dropper_contract_owner(db_session, claim.dropper_contract_id, address)
if terminus_address:
terminus_address = Web3.toChecksumAddress(terminus_address)
claim.terminus_address = terminus_address
if terminus_pool_id:
claim.terminus_pool_id = terminus_pool_id
if claim_block_deadline:
claim.claim_block_deadline = claim_block_deadline
if claim_id:
claim.claim_id = claim_id
db_session.commit()
return claim
def get_free_drop_number_in_range(
db_session: Session, dropper_contract_id: uuid.UUID, start: Optional[int], end: int
):
"""
Return list of free drops number in range
"""
if start is None:
start = 0
drops = (
db_session.query(DropperClaim)
.filter(DropperClaim.dropper_contract_id == dropper_contract_id)
.filter(DropperClaim.claim_id >= start)
.filter(DropperClaim.claim_id <= end)
.all()
)
free_numbers = list(range(start, end + 1))
for drop in drops:
free_numbers.remove(drop.claim_id)
return drops
def add_claimants(db_session: Session, dropper_claim_id, claimants, added_by):
"""
Add a claimants to a claim
"""
# On conflict requirements https://stackoverflow.com/questions/42022362/no-unique-or-exclusion-constraint-matching-the-on-conflict
claimant_objects = []
addresses = [Web3.toChecksumAddress(claimant.address) for claimant in claimants]
if len(claimants) > len(set(addresses)):
raise DublicateClaimantError("Duplicate claimants")
for claimant in claimants:
claimant_objects.append(
{
"dropper_claim_id": dropper_claim_id,
"address": Web3.toChecksumAddress(claimant.address),
"amount": 0,
"raw_amount": str(claimant.amount),
"added_by": added_by,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
)
insert_statement = insert(DropperClaimant).values(claimant_objects)
result_stmt = insert_statement.on_conflict_do_update(
index_elements=[DropperClaimant.address, DropperClaimant.dropper_claim_id],
set_=dict(
amount=insert_statement.excluded.amount,
raw_amount=insert_statement.excluded.raw_amount,
added_by=insert_statement.excluded.added_by,
updated_at=datetime.now(),
),
)
db_session.execute(result_stmt)
db_session.commit()
return claimant_objects
def transform_claim_amount(
db_session: Session, dropper_claim_id: uuid.UUID, db_amount: int
) -> int:
claim = (
db_session.query(
DropperClaim.claim_id, DropperContract.address, DropperContract.blockchain
)
.join(DropperContract, DropperContract.id == DropperClaim.dropper_contract_id)
.filter(DropperClaim.id == dropper_claim_id)
.one()
)
dropper_contract = Dropper_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[claim.blockchain], claim.address
)
claim_info = dropper_contract.getClaim(claim.claim_id).call()
if claim_info[0] != 20:
return db_amount
erc20_contract = ERC20_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[claim.blockchain], claim_info[1]
)
decimals = int(erc20_contract.decimals().call())
return db_amount * (10**decimals)
def batch_transform_claim_amounts(
db_session: Session, dropper_claim_id: uuid.UUID, db_amounts: List[int]
) -> List[int]:
claim = (
db_session.query(
DropperClaim.claim_id, DropperContract.address, DropperContract.blockchain
)
.join(DropperContract, DropperContract.id == DropperClaim.dropper_contract_id)
.filter(DropperClaim.id == dropper_claim_id)
.one()
)
dropper_contract = Dropper_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[claim.blockchain], claim.address
)
claim_info = dropper_contract.getClaim(claim.claim_id)
if claim_info[0] != 20:
return db_amounts
erc20_contract = ERC20_interface.Contract(claim.blockchain, claim_info[1])
decimals = int(erc20_contract.decimals().call())
return [db_amount * (10**decimals) for db_amount in db_amounts]
def get_claimants(
db_session: Session,
dropper_claim_id: uuid.UUID,
amount: Optional[int] = None,
added_by: Optional[str] = None,
address: Optional[ChecksumAddress] = None,
limit=None,
offset=None,
):
"""
Search for a claimant by address
"""
claimants_query = db_session.query(
DropperClaimant.address,
DropperClaimant.amount,
DropperClaimant.added_by,
DropperClaimant.raw_amount,
).filter(DropperClaimant.dropper_claim_id == dropper_claim_id)
if amount:
claimants_query = claimants_query.filter(DropperClaimant.amount == amount)
if added_by:
claimants_query = claimants_query.filter(DropperClaimant.added_by == added_by)
if address:
claimants_query = claimants_query.filter(DropperClaimant.address == address)
if limit:
claimants_query = claimants_query.limit(limit)
if offset:
claimants_query = claimants_query.offset(offset)
return claimants_query.all()
def get_claimant(db_session: Session, dropper_claim_id, address):
"""
Search for a claimant by address
"""
claimant_query = (
db_session.query(
DropperClaimant.id.label("dropper_claimant_id"),
DropperClaimant.address,
DropperClaimant.amount,
DropperClaimant.raw_amount,
DropperClaimant.signature,
DropperClaim.id.label("dropper_claim_id"),
DropperClaim.claim_id,
DropperClaim.active,
DropperClaim.claim_block_deadline,
DropperClaim.title,
DropperClaim.description,
DropperContract.address.label("dropper_contract_address"),
(DropperClaim.updated_at < DropperClaimant.updated_at).label(
"is_recent_signature"
),
DropperContract.blockchain.label("blockchain"),
)
.join(DropperClaim, DropperClaimant.dropper_claim_id == DropperClaim.id)
.join(DropperContract, DropperClaim.dropper_contract_id == DropperContract.id)
.filter(DropperClaimant.dropper_claim_id == dropper_claim_id)
.filter(DropperClaimant.address == Web3.toChecksumAddress(address))
)
return claimant_query.one()
def get_claimant_drops(
db_session: Session,
blockchain: str,
address,
current_block_number=None,
limit=None,
offset=None,
):
"""
Search for a claimant by address
"""
claimant_query = (
db_session.query(
DropperClaimant.id.label("dropper_claimant_id"),
DropperClaimant.address,
DropperClaimant.amount,
DropperClaimant.raw_amount,
DropperClaimant.signature,
DropperClaim.id.label("dropper_claim_id"),
DropperClaim.claim_id,
DropperClaim.active,
DropperClaim.claim_block_deadline,
DropperClaim.title,
DropperClaim.description,
DropperContract.address.label("dropper_contract_address"),
DropperContract.blockchain,
(DropperClaim.updated_at < DropperClaimant.updated_at).label(
"is_recent_signature"
),
)
.join(DropperClaim, DropperClaimant.dropper_claim_id == DropperClaim.id)
.join(DropperContract, DropperClaim.dropper_contract_id == DropperContract.id)
.filter(DropperClaim.active == True)
.filter(DropperClaimant.address == address)
.filter(DropperContract.blockchain == blockchain)
)
if current_block_number:
logger.info("Trying to filter block number " + str(current_block_number))
claimant_query = claimant_query.filter(
DropperClaim.claim_block_deadline > current_block_number
)
claimant_query.order_by(DropperClaimant.created_at.asc())
if limit:
claimant_query = claimant_query.limit(limit)
if offset:
claimant_query = claimant_query.offset(offset)
return claimant_query.all()
def get_terminus_claims(
db_session: Session,
blockchain: str,
terminus_address: ChecksumAddress,
terminus_pool_id: int,
dropper_contract_address: Optional[ChecksumAddress] = None,
active: Optional[bool] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
):
"""
Search for a claimant by address
"""
query = (
db_session.query(
DropperClaim.id,
DropperClaim.title,
DropperClaim.description,
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
DropperClaim.claim_block_deadline,
DropperClaim.claim_id,
DropperClaim.active,
DropperContract.address.label("dropper_contract_address"),
)
.join(DropperContract)
.filter(DropperClaim.terminus_address == terminus_address)
.filter(DropperClaim.terminus_pool_id == terminus_pool_id)
.filter(DropperContract.blockchain == blockchain)
)
if dropper_contract_address:
query = query.filter(DropperContract.address == dropper_contract_address)
if active:
query = query.filter(DropperClaim.active == active)
# TODO: add ordering in all pagination queries
query = query.order_by(DropperClaim.created_at.asc())
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query
def get_drop(db_session: Session, dropper_claim_id: uuid.UUID):
"""
Return particular drop
"""
drop = (
2023-08-06 13:55:56 +00:00
db_session.query(DropperClaim).filter(DropperClaim.id == dropper_claim_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
return drop
def get_claims(
db_session: Session,
blockchain: str,
dropper_contract_address: Optional[ChecksumAddress] = None,
claimant_address: Optional[ChecksumAddress] = None,
terminus_address: Optional[ChecksumAddress] = None,
terminus_pool_id: Optional[int] = None,
active: Optional[bool] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
):
"""
Search for a claimant by address
"""
query = (
db_session.query(
DropperClaim.id,
DropperClaim.title,
DropperClaim.description,
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
DropperClaim.claim_block_deadline,
DropperClaim.claim_id,
DropperClaim.active,
DropperClaimant.amount,
DropperContract.address.label("dropper_contract_address"),
)
.join(DropperContract)
.join(DropperClaimant)
.filter(DropperContract.blockchain == blockchain)
)
if dropper_contract_address:
query = query.filter(DropperContract.address == dropper_contract_address)
if claimant_address:
query = query.filter(DropperClaimant.address == claimant_address)
if terminus_address:
query = query.filter(DropperClaim.terminus_address == terminus_address)
if terminus_pool_id:
query = query.filter(DropperClaim.terminus_pool_id == terminus_pool_id)
if active:
query = query.filter(DropperClaim.active == active)
query = query.order_by(DropperClaim.created_at.asc())
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query
def get_drops(
db_session: Session,
blockchain: str,
dropper_contract_address: Optional[ChecksumAddress] = None,
drop_number: Optional[int] = None,
terminus_address: Optional[ChecksumAddress] = None,
terminus_pool_id: Optional[int] = None,
active: Optional[bool] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
):
"""
Get drops
"""
query = (
db_session.query(
DropperClaim.id,
DropperClaim.title,
DropperClaim.description,
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
DropperClaim.claim_block_deadline,
DropperClaim.claim_id.label("drop_number"),
DropperClaim.active,
DropperContract.address.label("dropper_contract_address"),
)
.join(DropperContract)
.filter(DropperContract.blockchain == blockchain)
)
if dropper_contract_address:
query = query.filter(DropperContract.address == dropper_contract_address)
if drop_number:
query = query.filter(DropperClaim.claim_id == drop_number)
if terminus_address:
query = query.filter(DropperClaim.terminus_address == terminus_address)
if terminus_pool_id:
query = query.filter(DropperClaim.terminus_pool_id == terminus_pool_id)
if active:
query = query.filter(DropperClaim.active == active)
query = query.order_by(DropperClaim.created_at.desc())
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query
def get_claim_admin_pool(
db_session: Session,
dropper_claim_id: uuid.UUID,
) -> Any:
"""
Search for a claimant by address
"""
query = (
db_session.query(
DropperContract.blockchain,
DropperClaim.terminus_address,
DropperClaim.terminus_pool_id,
)
.join(DropperContract)
.filter(DropperClaim.id == dropper_claim_id)
)
return query.one()
def ensure_admin_token_holder(
db_session: Session, dropper_claim_id: uuid.UUID, address: ChecksumAddress
) -> bool:
blockchain, terminus_address, terminus_pool_id = get_claim_admin_pool(
db_session=db_session, dropper_claim_id=dropper_claim_id
)
terminus = Terminus_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[blockchain], terminus_address
)
balance = terminus.balanceOf(address, terminus_pool_id).call()
if balance == 0:
raise AuthorizationError(
f"Address has insufficient balance in Terminus pool: address={address}, blockchain={blockchain}, terminus_address={terminus_address}, terminus_pool_id={terminus_pool_id}"
)
return True
def ensure_dropper_contract_owner(
db_session: Session, dropper_contract_id: uuid.UUID, address: ChecksumAddress
) -> bool:
dropper_contract_info = get_dropper_contract_by_id(
db_session=db_session, dropper_contract_id=dropper_contract_id
)
dropper = Dropper_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[dropper_contract_info.blockchain],
dropper_contract_info.address,
)
dropper_owner_address = dropper.owner().call()
if address != Web3.toChecksumAddress(dropper_owner_address):
raise AuthorizationError(
f"Given address is not the owner of the given dropper contract: address={address}, blockchain={dropper_contract_info.blockchain}, dropper_address={dropper_contract_info.address}"
)
return True
def delete_claimants(db_session: Session, dropper_claim_id, addresses):
"""
Delete all claimants for a claim
"""
normalize_addresses = [Web3.toChecksumAddress(address) for address in addresses]
was_deleted = []
deleted_addresses = (
db_session.query(DropperClaimant)
.filter(DropperClaimant.dropper_claim_id == dropper_claim_id)
.filter(DropperClaimant.address.in_(normalize_addresses))
)
for deleted_address in deleted_addresses:
was_deleted.append(deleted_address.address)
db_session.delete(deleted_address)
db_session.commit()
return was_deleted
def refetch_drop_signatures(
db_session: Session, dropper_claim_id: uuid.UUID, added_by: str
):
"""
Refetch signatures for drop
"""
claim = (
db_session.query(
DropperClaim.claim_id,
DropperClaim.claim_block_deadline,
DropperContract.address,
DropperContract.blockchain,
)
.join(DropperContract, DropperClaim.dropper_contract_id == DropperContract.id)
.filter(DropperClaim.id == dropper_claim_id)
2023-08-06 13:55:56 +00:00
).one() # type: ignore
2023-06-06 12:11:49 +00:00
if claim.claim_block_deadline is None:
raise DropWithNotSettedBlockDeadline(
f"Claim block deadline is not set for dropper claim: {dropper_claim_id}"
)
outdated_signatures = (
db_session.query(
DropperClaim.claim_id,
DropperClaimant.address,
DropperClaimant.amount,
)
.join(DropperClaimant, DropperClaimant.dropper_claim_id == DropperClaim.id)
.filter(DropperClaim.id == dropper_claim_id)
.filter(
or_(
DropperClaimant.updated_at < DropperClaim.updated_at,
DropperClaimant.signature == None,
)
)
).limit(BATCH_SIGNATURE_PAGE_SIZE)
current_offset = 0
users_hashes = {}
users_amount = {}
hashes_signature = {}
dropper_contract = Dropper_interface.Contract(
BLOCKCHAIN_WEB3_PROVIDERS[claim.blockchain], claim.address
)
while True:
signature_requests = []
page = outdated_signatures.offset(current_offset).all()
claim_amounts = [outdated_signature.amount for outdated_signature in page]
transformed_claim_amounts = batch_transform_claim_amounts(
db_session, dropper_claim_id, claim_amounts
)
for outdated_signature, transformed_claim_amount in zip(
page, transformed_claim_amounts
):
message_hash_raw = dropper_contract.claimMessageHash(
claim.claim_id,
outdated_signature.address,
claim.claim_block_deadline,
int(transformed_claim_amount),
).call()
message_hash = HexBytes(message_hash_raw).hex()
signature_requests.append(message_hash)
users_hashes[outdated_signature.address] = message_hash
users_amount[outdated_signature.address] = outdated_signature.amount
message_hashes = [signature_request for signature_request in signature_requests]
signed_messages = signatures.DROP_SIGNER.batch_sign_message(message_hashes)
hashes_signature.update(signed_messages)
if len(page) == 0:
break
current_offset += BATCH_SIGNATURE_PAGE_SIZE
claimant_objects = []
for address, hash in users_hashes.items():
claimant_objects.append(
{
"dropper_claim_id": dropper_claim_id,
"address": address,
"signature": hashes_signature[hash],
"amount": users_amount[address],
"added_by": added_by,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
)
insert_statement = insert(DropperClaimant).values(claimant_objects)
result_stmt = insert_statement.on_conflict_do_update(
index_elements=[DropperClaimant.address, DropperClaimant.dropper_claim_id],
set_=dict(
signature=insert_statement.excluded.signature,
updated_at=datetime.now(),
),
)
db_session.execute(result_stmt)
db_session.commit()
return claimant_objects
def leaderboard_version_filter(
db_session: Session,
leaderboard_id: uuid.UUID,
version_number: Optional[int] = None,
) -> Union[Subquery, int]:
# Subquery to get the latest version number for the given leaderboard
if version_number is None:
latest_version = (
db_session.query(func.max(LeaderboardVersion.version_number)).filter(
LeaderboardVersion.leaderboard_id == leaderboard_id,
LeaderboardVersion.published == True,
)
).scalar_subquery()
else:
latest_version = version_number
return latest_version
def get_leaderboard_total_count(
db_session: Session, leaderboard_id, version_number: Optional[int] = None
) -> int:
2023-06-06 12:11:49 +00:00
"""
Get the total number of position in the leaderboard
2023-06-06 12:11:49 +00:00
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
2023-06-06 12:11:49 +00:00
)
total_count = (
db_session.query(func.count(LeaderboardScores.id))
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
).scalar()
return total_count
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_info(
db_session: Session, leaderboard_id: uuid.UUID, version_number: Optional[int] = None
2023-08-12 06:14:33 +00:00
) -> Row[Tuple[uuid.UUID, str, str, int, Optional[datetime]]]:
"""
2023-08-06 13:55:56 +00:00
Get the leaderboard from the database with users count
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
2023-12-20 02:51:20 +00:00
query = (
2023-08-06 13:55:56 +00:00
db_session.query(
Leaderboard.id,
Leaderboard.title,
Leaderboard.description,
func.count(LeaderboardScores.id).label("users_count"),
func.max(LeaderboardScores.updated_at).label("last_update"),
)
2023-08-12 06:14:33 +00:00
.join(
2023-12-20 02:51:20 +00:00
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == Leaderboard.id,
LeaderboardVersion.published == True,
),
2023-08-12 06:14:33 +00:00
isouter=True,
)
.join(
LeaderboardScores,
and_(
2023-12-20 02:51:20 +00:00
LeaderboardScores.leaderboard_id == Leaderboard.id,
LeaderboardScores.leaderboard_version_number
== LeaderboardVersion.version_number,
),
2023-08-12 06:14:33 +00:00
isouter=True,
)
.filter(
2023-12-20 02:51:20 +00:00
or_(
LeaderboardVersion.published == None,
LeaderboardVersion.version_number == latest_version,
)
)
2023-08-06 13:55:56 +00:00
.filter(Leaderboard.id == leaderboard_id)
.group_by(Leaderboard.id, Leaderboard.title, Leaderboard.description)
)
2023-12-20 02:51:20 +00:00
leaderboard = query.one()
return leaderboard
2023-08-06 13:55:56 +00:00
def get_leaderboard_scores_changes(
db_session: Session, leaderboard_id: uuid.UUID
2023-08-12 06:14:33 +00:00
) -> List[Row[Tuple[int, datetime]]]:
2023-08-06 13:55:56 +00:00
"""
Return the leaderboard scores changes timeline changes of leaderboard scores
"""
leaderboard_scores_changes = (
db_session.query(
func.count(LeaderboardScores.address).label("players_count"),
# func.extract("epoch", LeaderboardScores.updated_at).label("timestamp"),
LeaderboardScores.updated_at.label("date"),
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.group_by(LeaderboardScores.updated_at)
.order_by(LeaderboardScores.updated_at.desc())
2023-08-12 06:14:33 +00:00
).all()
2023-08-06 13:55:56 +00:00
return leaderboard_scores_changes
def get_leaderboard_scores_by_timestamp(
db_session: Session,
leaderboard_id: uuid.UUID,
date: datetime,
limit: int,
offset: int,
2023-08-12 06:14:33 +00:00
) -> List[LeaderboardScores]:
2023-08-06 13:55:56 +00:00
"""
Return the leaderboard scores by timestamp
"""
leaderboard_scores = (
db_session.query(
LeaderboardScores.leaderboard_id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(LeaderboardScores.updated_at == date)
.order_by(LeaderboardScores.score.desc())
.limit(limit)
.offset(offset)
)
return leaderboard_scores
def get_leaderboards(
db_session: Session,
token: Union[str, uuid.UUID],
2023-07-06 05:51:46 +00:00
) -> List[Leaderboard]:
"""
Get the leaderboards resources
"""
user_resources = bc.list_resources(
token=token,
params={"type": "leaderboard"},
)
if len(user_resources.resources) == 0:
raise LeaderboardsResourcesNotFound(f"Leaderboard not found for token")
leaderboards_ids = []
for resource in user_resources.resources:
leaderboard_id = resource.resource_data["leaderboard_id"]
leaderboards_ids.append(leaderboard_id)
leaderboards = (
db_session.query(Leaderboard).filter(Leaderboard.id.in_(leaderboards_ids)).all()
)
return leaderboards
2023-06-06 12:11:49 +00:00
def get_position(
db_session: Session,
leaderboard_id,
address,
window_size,
limit: int,
offset: int,
version_number: Optional[int] = None,
2023-08-12 06:14:33 +00:00
) -> List[Row[Tuple[str, int, int, int, Any]]]:
2023-06-06 12:11:49 +00:00
"""
Return position by address with window size
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data.label("points_data"),
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
func.row_number()
.over(order_by=LeaderboardScores.score.desc())
.label("number"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query.cte(name="ranked_leaderboard")
query = db_session.query(
ranked_leaderboard.c.address,
ranked_leaderboard.c.score,
ranked_leaderboard.c.rank,
ranked_leaderboard.c.number,
).filter(
ranked_leaderboard.c.address == address,
)
my_position = query.cte(name="my_position")
# get the position with the window size
query = db_session.query(
ranked_leaderboard.c.address,
ranked_leaderboard.c.score,
ranked_leaderboard.c.rank,
ranked_leaderboard.c.number,
ranked_leaderboard.c.points_data,
).filter(
ranked_leaderboard.c.number.between( # taking off my hat!
my_position.c.number - window_size,
my_position.c.number + window_size,
)
)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query.all()
2024-01-10 15:25:30 +00:00
def get_leaderboard_score(
db_session: Session,
leaderboard_id,
address,
version_number: Optional[int] = None,
) -> Optional[LeaderboardScores]:
"""
Return address score
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(LeaderboardScores)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(LeaderboardScores.address == address)
)
return query.one_or_none()
2023-06-06 12:11:49 +00:00
def get_leaderboard_positions(
db_session: Session,
2024-01-27 07:28:45 +00:00
leaderboard_id: uuid.UUID,
limit: int,
offset: int,
2024-02-28 16:05:42 +00:00
poitns_data: Dict[str, str],
version_number: Optional[int] = None,
2023-08-12 06:14:33 +00:00
) -> List[Row[Tuple[uuid.UUID, str, int, str, int]]]:
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard positions
"""
# get public leaderboard scores with max version
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
# Main query
2023-06-06 12:11:49 +00:00
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
2023-06-06 12:11:49 +00:00
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.published == True)
.filter(LeaderboardVersion.version_number == latest_version)
2023-06-06 12:11:49 +00:00
)
2024-02-28 16:05:42 +00:00
if len(poitns_data) > 0:
2024-02-24 01:11:45 +00:00
query = query.filter(
or_(
*[
LeaderboardScores.points_data[point_key].astext == point_value
for point_key, point_value in poitns_data.items()
]
)
)
2023-06-06 12:11:49 +00:00
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query
2023-08-12 06:14:33 +00:00
def get_qurtiles(
db_session: Session, leaderboard_id, version_number: Optional[int] = None
2023-08-12 06:14:33 +00:00
) -> Tuple[Row[Tuple[str, float, int]], ...]:
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard qurtiles
https://docs.sqlalchemy.org/en/14/core/functions.html#sqlalchemy.sql.functions.percentile_disc
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.address,
LeaderboardScores.score,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query.cte(name="ranked_leaderboard")
current_count = db_session.query(ranked_leaderboard).count()
if current_count == 0:
raise LeaderboardIsEmpty(f"Leaderboard {leaderboard_id} is empty")
index_75 = int(current_count / 4)
index_50 = int(current_count / 2)
index_25 = int(current_count * 3 / 4)
q1 = db_session.query(ranked_leaderboard).limit(1).offset(index_25).first()
q2 = db_session.query(ranked_leaderboard).limit(1).offset(index_50).first()
q3 = db_session.query(ranked_leaderboard).limit(1).offset(index_75).first()
return q1, q2, q3
def get_ranks(
db_session: Session, leaderboard_id, version_number: Optional[int] = None
) -> List[Row[Tuple[int, int, int]]]:
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard rank buckets(rank, size, score)
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query.cte(name="ranked_leaderboard")
ranks = db_session.query(
ranked_leaderboard.c.rank,
func.count(ranked_leaderboard.c.id).label("size"),
ranked_leaderboard.c.score,
).group_by(ranked_leaderboard.c.rank, ranked_leaderboard.c.score)
return ranks
def get_rank(
db_session: Session,
leaderboard_id: uuid.UUID,
rank: int,
limit: Optional[int] = None,
offset: Optional[int] = None,
version_number: Optional[int] = None,
2023-08-12 06:14:33 +00:00
) -> List[Row[Tuple[uuid.UUID, str, int, str, int]]]:
2023-06-06 12:11:49 +00:00
"""
Get bucket in leaderboard by rank
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
2023-06-06 12:11:49 +00:00
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
2023-06-06 12:11:49 +00:00
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.order_by(text("rank asc, id asc"))
)
ranked_leaderboard = query.cte(name="ranked_leaderboard")
positions = db_session.query(ranked_leaderboard).filter(
ranked_leaderboard.c.rank == rank
)
if limit:
positions = positions.limit(limit)
if offset:
positions = positions.offset(offset)
return positions
2023-08-06 13:55:56 +00:00
def create_leaderboard(
db_session: Session,
title: str,
description: Optional[str],
2023-08-12 06:18:19 +00:00
token: Optional[Union[uuid.UUID, str]] = None,
2023-11-15 15:28:15 +00:00
wallet_connect: bool = False,
blockchain_ids: List[int] = [],
2024-01-27 07:28:45 +00:00
columns_names: ColumnsNames = None,
2023-08-12 06:14:33 +00:00
) -> Leaderboard:
2023-06-06 12:11:49 +00:00
"""
Create a leaderboard
"""
2023-08-12 06:14:33 +00:00
2024-01-27 07:28:45 +00:00
if columns_names is not None:
columns_names = columns_names.dict()
2023-11-15 16:00:18 +00:00
2023-08-12 06:14:33 +00:00
if not token:
token = uuid.UUID(MOONSTREAM_ADMIN_ACCESS_TOKEN)
2023-08-06 13:55:56 +00:00
try:
# deduplicate and sort
blockchain_ids = sorted(list(set(blockchain_ids)))
2023-11-09 15:41:14 +00:00
leaderboard = Leaderboard(
title=title,
description=description,
2023-11-15 15:28:15 +00:00
wallet_connect=wallet_connect,
2023-11-09 15:41:14 +00:00
blockchain_ids=blockchain_ids,
columns_names=columns_names,
)
2023-08-06 13:55:56 +00:00
db_session.add(leaderboard)
db_session.commit()
2024-01-25 04:25:18 +00:00
user = None
if token is not None:
user = bc.get_user(token=token)
2023-08-06 13:55:56 +00:00
resource = create_leaderboard_resource(
leaderboard_id=str(leaderboard.id),
2024-01-25 04:25:18 +00:00
user_id=str(user.id) if user is not None else None,
2023-08-06 13:55:56 +00:00
)
leaderboard.resource_id = resource.id
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(f"Error creating leaderboard: {e}")
raise LeaderboardCreateError(f"Error creating leaderboard: {e}")
return leaderboard
def delete_leaderboard(
db_session: Session, leaderboard_id: uuid.UUID, token: uuid.UUID
2023-08-12 06:14:33 +00:00
) -> Leaderboard:
2023-08-06 13:55:56 +00:00
"""
Delete a leaderboard
"""
try:
leaderboard = (
db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
)
if leaderboard.resource_id is not None:
try:
bc.delete_resource(
token=token,
resource_id=leaderboard.resource_id,
)
except Exception as e:
logger.error(f"Error deleting leaderboard resource: {e}")
2023-08-12 06:14:33 +00:00
else:
logger.error(
f"Leaderboard {leaderboard_id} has no resource id. Skipping. Better delete it manually."
)
2023-08-06 13:55:56 +00:00
db_session.delete(leaderboard)
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(e)
raise LeaderboardDeleteError(f"Error deleting leaderboard: {e}")
return leaderboard
def update_leaderboard(
db_session: Session,
leaderboard_id: uuid.UUID,
title: Optional[str],
description: Optional[str],
2023-11-15 15:28:15 +00:00
wallet_connect: Optional[bool],
2023-11-09 15:41:14 +00:00
blockchain_ids: Optional[List[int]],
2024-01-27 07:28:45 +00:00
columns_names: Optional[ColumnsNames],
2023-08-12 06:14:33 +00:00
) -> Leaderboard:
2023-08-06 13:55:56 +00:00
"""
Update a leaderboard
"""
leaderboard = (
db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
)
if title is not None:
leaderboard.title = title
if description is not None:
leaderboard.description = description
2023-11-15 15:28:15 +00:00
if wallet_connect is not None:
leaderboard.wallet_connect = wallet_connect
2023-11-09 15:41:14 +00:00
if blockchain_ids is not None:
# deduplicate and sort
blockchain_ids = sorted(list(set(blockchain_ids)))
2023-11-09 15:41:14 +00:00
leaderboard.blockchain_ids = blockchain_ids
2023-11-09 15:41:14 +00:00
if columns_names is not None:
2024-01-27 07:28:45 +00:00
if leaderboard.columns_names is not None:
current_columns_names = ColumnsNames(**leaderboard.columns_names)
for key, value in columns_names.dict(exclude_none=True).items():
setattr(current_columns_names, key, value)
else:
current_columns_names = columns_names
leaderboard.columns_names = current_columns_names.dict()
2023-06-06 12:11:49 +00:00
db_session.commit()
2023-08-06 13:55:56 +00:00
return leaderboard
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_by_id(db_session: Session, leaderboard_id) -> Leaderboard:
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard by id
"""
return db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_by_title(db_session: Session, title) -> Leaderboard:
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard by title
"""
2023-08-06 13:55:56 +00:00
return db_session.query(Leaderboard).filter(Leaderboard.title == title).one() # type: ignore
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def list_leaderboards(
db_session: Session, limit: int, offset: int
) -> List[Row[Tuple[uuid.UUID, str, str]]]:
2023-06-06 12:11:49 +00:00
"""
List all leaderboards
"""
query = db_session.query(Leaderboard.id, Leaderboard.title, Leaderboard.description)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query.all()
def add_scores(
db_session: Session,
leaderboard_id: uuid.UUID,
scores: List[Score],
version_number: int,
2023-06-06 12:11:49 +00:00
normalize_addresses: bool = True,
):
"""
Add scores to the leaderboard
"""
leaderboard_scores = []
normalizer_fn = Web3.toChecksumAddress
if not normalize_addresses:
normalizer_fn = lambda x: x # type: ignore
addresses = [score.address for score in scores]
if len(addresses) != len(set(addresses)):
duplicates = [key for key, value in Counter(addresses).items() if value > 1]
raise DuplicateLeaderboardAddressError("Dublicated addresses", duplicates)
for score in scores:
leaderboard_scores.append(
{
"leaderboard_id": leaderboard_id,
"address": normalizer_fn(score.address),
"score": score.score,
"points_data": score.points_data,
"leaderboard_version_number": version_number,
2023-06-06 12:11:49 +00:00
}
)
insert_statement = insert(LeaderboardScores).values(leaderboard_scores)
result_stmt = insert_statement.on_conflict_do_update(
index_elements=[
LeaderboardScores.address,
LeaderboardScores.leaderboard_id,
LeaderboardScores.leaderboard_version_number,
],
2023-06-06 12:11:49 +00:00
set_=dict(
score=insert_statement.excluded.score,
points_data=insert_statement.excluded.points_data,
updated_at=datetime.now(),
),
)
try:
db_session.execute(result_stmt)
db_session.commit()
except:
db_session.rollback()
return leaderboard_scores
2023-11-10 17:20:26 +00:00
# leaderboard access actions
2023-06-06 12:11:49 +00:00
2024-01-25 04:25:18 +00:00
def create_leaderboard_resource(leaderboard_id: str, user_id: Optional[str] = None):
2023-06-06 12:11:49 +00:00
resource_data: Dict[str, Any] = {
"type": LEADERBOARD_RESOURCE_TYPE,
"leaderboard_id": leaderboard_id,
}
2023-08-06 13:55:56 +00:00
try:
resource = bc.create_resource(
2024-01-25 04:25:18 +00:00
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
2023-08-06 13:55:56 +00:00
application_id=MOONSTREAM_APPLICATION_ID,
resource_data=resource_data,
timeout=10,
)
except Exception as e:
raise LeaderboardCreateError(f"Error creating leaderboard resource: {e}")
2024-01-25 04:25:18 +00:00
if user_id is not None:
try:
bc.add_resource_holder_permissions(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=resource.id,
holder_permissions=BugoutResourceHolder(
holder_type=HolderType.user,
holder_id=user_id,
2024-01-27 20:12:53 +00:00
permissions=[
2024-01-25 04:25:18 +00:00
ResourcePermissions.ADMIN,
ResourcePermissions.READ,
ResourcePermissions.UPDATE,
ResourcePermissions.DELETE,
],
),
)
except Exception as e:
raise LeaderboardCreateError(
f"Error adding resource holder permissions: {e}"
)
2023-06-06 12:11:49 +00:00
return resource
def assign_resource(
db_session: Session,
leaderboard_id: uuid.UUID,
2024-01-25 04:25:18 +00:00
user_token: Optional[Union[uuid.UUID, str]] = None,
2023-06-06 12:11:49 +00:00
resource_id: Optional[uuid.UUID] = None,
):
"""
Assign a resource handler to a leaderboard
"""
2024-01-25 04:25:18 +00:00
### get user_name from token
user = None
if user_token is not None:
user = bc.get_user(token=user_token)
2023-06-06 12:11:49 +00:00
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
if resource_id is not None:
leaderboard.resource_id = resource_id
else:
resource = create_leaderboard_resource(
2023-08-06 13:55:56 +00:00
leaderboard_id=str(leaderboard_id),
2024-01-29 09:43:03 +00:00
user_id=user.id if user is not None else None,
2023-06-06 12:11:49 +00:00
)
leaderboard.resource_id = resource.id
db_session.commit()
db_session.flush()
return leaderboard.resource_id
def list_leaderboards_resources(
db_session: Session,
):
"""
List all leaderboards resources
"""
query = db_session.query(Leaderboard.id, Leaderboard.title, Leaderboard.resource_id)
return query.all()
def get_leaderboard_config_entry(
leaderboard_id: uuid.UUID,
) -> BugoutSearchResult:
2023-08-31 16:32:20 +00:00
query = f"#leaderboard_id:{leaderboard_id}"
configs = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID,
2023-08-31 16:32:20 +00:00
query=query,
limit=1,
)
results = cast(List[BugoutSearchResult], configs.results)
if len(configs.results) == 0 or results[0].content is None:
raise LeaderboardConfigNotFound(
f"Leaderboard config not found for {leaderboard_id}"
)
return results[0]
def get_leaderboard_config(
leaderboard_id: uuid.UUID,
) -> Dict[str, Any]:
"""
Return leaderboard config from leaderboard generator journal
"""
entry = get_leaderboard_config_entry(leaderboard_id)
2023-08-31 16:32:20 +00:00
content = json.loads(entry.content) # type: ignore
if "status:active" not in entry.tags:
content["leaderboard_auto_update_active"] = False
else:
content["leaderboard_auto_update_active"] = True
return content
def update_leaderboard_config(
leaderboard_id: uuid.UUID, config: LeaderboardConfigUpdate
) -> Dict[str, Any]:
"""
Update leaderboard config in leaderboard generator journal
"""
entry_config = get_leaderboard_config_entry(leaderboard_id)
current_config = LeaderboardConfig(**json.loads(entry_config.content)) # type: ignore
new_params = config.params
for key, value in new_params.items():
if key not in current_config.params:
continue
current_config.params[key] = value
# we replace values of parameters that are not None
entry = bc.update_entry_content(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID,
title=entry_config.title,
entry_id=entry_config.entry_url.split("/")[-1],
content=json.dumps(current_config.dict()),
)
2023-08-31 16:32:20 +00:00
new_config = json.loads(entry.content)
if "status:active" not in entry.tags:
new_config["leaderboard_auto_update_active"] = False
else:
new_config["leaderboard_auto_update_active"] = True
return new_config
def activate_leaderboard_config(
leaderboard_id: uuid.UUID,
):
"""
Add tag status:active to leaderboard config journal entry
"""
entry_config = get_leaderboard_config_entry(leaderboard_id)
if "status:active" in entry_config.tags:
raise LeaderboardConfigAlreadyActive(
f"Leaderboard config {leaderboard_id} already active"
)
bc.create_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID,
entry_id=entry_config.entry_url.split("/")[-1],
tags=["status:active"],
)
def deactivate_leaderboard_config(
leaderboard_id: uuid.UUID,
):
"""
Remove tag status:active from leaderboard config journal entry
"""
entry_config = get_leaderboard_config_entry(leaderboard_id)
if "status:active" not in entry_config.tags:
raise LeaderboardConfigAlreadyInactive(
f"Leaderboard config {leaderboard_id} not active"
)
2023-08-31 16:32:20 +00:00
bc.delete_tag(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID,
2023-08-31 16:32:20 +00:00
entry_id=entry_config.entry_url.split("/")[-1],
tag="status:active",
)
2023-08-12 06:14:33 +00:00
def revoke_resource(
db_session: Session, leaderboard_id: uuid.UUID
) -> Optional[uuid.UUID]:
2023-06-06 12:11:49 +00:00
"""
Revoke a resource handler to a leaderboard
"""
# TODO(ANDREY): Delete resource via admin token
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
if leaderboard.resource_id is None:
raise Exception("Leaderboard does not have a resource")
leaderboard.resource_id = None
db_session.commit()
db_session.flush()
return leaderboard.resource_id
def check_leaderboard_resource_permissions(
db_session: Session, leaderboard_id: uuid.UUID, token: uuid.UUID
2023-08-12 06:14:33 +00:00
) -> bool:
2023-06-06 12:11:49 +00:00
"""
Check if the user has permissions to access the leaderboard
"""
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session.query(Leaderboard).filter(Leaderboard.id == leaderboard_id).one() # type: ignore
2023-06-06 12:11:49 +00:00
)
permission_url = f"{bc.brood_url}/resources/{leaderboard.resource_id}/holders"
headers = {
"Authorization": f"Bearer {token}",
}
2023-08-31 16:32:20 +00:00
2023-06-06 12:11:49 +00:00
# If user don't have at least read permission return 404
result = requests.get(url=permission_url, headers=headers, timeout=10)
if result.status_code == 200:
return True
return False
def get_leaderboard_version(
db_session: Session, leaderboard_id: uuid.UUID, version_number: int
) -> LeaderboardVersion:
"""
Get the leaderboard version by id
"""
return (
db_session.query(LeaderboardVersion)
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.version_number == version_number)
.one()
)
def create_leaderboard_version(
db_session: Session,
leaderboard_id: uuid.UUID,
version_number: Optional[int] = None,
publish: bool = False,
) -> LeaderboardVersion:
"""
Create a leaderboard version
"""
if version_number is None:
latest_version_result = (
db_session.query(func.max(LeaderboardVersion.version_number))
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.one()
)
latest_version = latest_version_result[0]
if latest_version is None:
version_number = 0
else:
version_number = latest_version + 1
leaderboard_version = LeaderboardVersion(
leaderboard_id=leaderboard_id,
version_number=version_number,
published=publish,
)
db_session.add(leaderboard_version)
db_session.commit()
return leaderboard_version
def change_publish_leaderboard_version_status(
db_session: Session, leaderboard_id: uuid.UUID, version_number: int, published: bool
) -> LeaderboardVersion:
"""
Publish a leaderboard version
"""
leaderboard_version = (
db_session.query(LeaderboardVersion)
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.version_number == version_number)
.one()
)
leaderboard_version.published = published
db_session.commit()
return leaderboard_version
def get_leaderboard_versions(
db_session: Session, leaderboard_id: uuid.UUID
) -> List[LeaderboardVersion]:
"""
Get all leaderboard versions
"""
return (
db_session.query(LeaderboardVersion)
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.all()
)
def delete_leaderboard_version(
db_session: Session, leaderboard_id: uuid.UUID, version_number: int
) -> LeaderboardVersion:
"""
Delete a leaderboard version
"""
leaderboard_version = (
db_session.query(LeaderboardVersion)
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.version_number == version_number)
.one()
)
db_session.delete(leaderboard_version)
db_session.commit()
return leaderboard_version
def get_leaderboard_version_scores(
db_session: Session,
leaderboard_id: uuid.UUID,
version_number: int,
limit: int,
offset: int,
) -> List[LeaderboardScores]:
"""
Get the leaderboard scores by version number
"""
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address.label("address"),
LeaderboardScores.score.label("score"),
LeaderboardScores.points_data.label("points_data"),
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(LeaderboardScores.leaderboard_version_number == version_number)
)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
return query
def delete_previous_versions(
db_session: Session,
leaderboard_id: uuid.UUID,
threshold_version_number: int,
) -> int:
"""
Delete old leaderboard versions
"""
versions_to_delete = (
db_session.query(LeaderboardVersion)
.filter(LeaderboardVersion.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.version_number < threshold_version_number)
)
num_deleted = versions_to_delete.delete(synchronize_session=False)
db_session.commit()
return num_deleted