Add materialized view as cache of ranks calculation.

pull/1076/head
Andrey 2024-05-31 21:30:12 +03:00
rodzic 5405eb6929
commit cbf282a08f
1 zmienionych plików z 173 dodań i 141 usunięć
engineapi/engineapi

Wyświetl plik

@ -16,8 +16,9 @@ from eth_typing import Address
from hexbytes import HexBytes
import requests # type: ignore
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy import func, text, or_, and_, Subquery
from sqlalchemy.orm import Session, Query
from sqlalchemy import func, text, or_, and_, Subquery, literal_column
from sqlalchemy.sql import exists, select, column, table
from sqlalchemy.engine import Row
from web3 import Web3
from web3.types import ChecksumAddress
@ -1000,6 +1001,152 @@ def leaderboard_version_filter(
return latest_version
def mv_pg_name(leaderboard_id: uuid.UUID) -> str:
return f"mv_leaderboard_{leaderboard_id}".replace("-", "_")
def mv_check(db_session: Session, leaderboard_id: uuid.UUID) -> bool:
mv_name = mv_pg_name(leaderboard_id)
exists_query = text(
f"""
SELECT EXISTS (
SELECT FROM pg_matviews WHERE schemaname = 'public' AND matviewname = '{mv_name}'
);
"""
)
result = db_session.execute(exists_query).scalar().bool()
return result
def create_materialized_view(db_session, leaderboard_id):
# Safely format the view name using the UUID converted to string
mv_name = mv_pg_name(leaderboard_id)
sql_command = text(
f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} AS
SELECT
leaderboard_scores.address AS address,
leaderboard_scores.score AS score,
leaderboard_scores.points_data AS points_data,
rank() OVER (ORDER BY leaderboard_scores.score DESC, address) AS rank
FROM
leaderboard_scores
JOIN leaderboard_versions ON leaderboard_versions.leaderboard_id = leaderboard_scores.leaderboard_id
AND leaderboard_versions.version_number = leaderboard_scores.leaderboard_version_number
WHERE
leaderboard_scores.leaderboard_id = :leaderboard_id
AND leaderboard_versions.published = true
AND leaderboard_versions.version_number = (
SELECT
max(leaderboard_versions.version_number)
FROM
leaderboard_versions
WHERE
leaderboard_versions.leaderboard_id = :leaderboard_id
AND leaderboard_versions.published = true
)
ORDER BY leaderboard_scores.score DESC, address
"""
)
# Execute the command with parameters
db_session.execute(sql_command, {"leaderboard_id": str(leaderboard_id)})
db_session.commit()
def update_materialized_view(db_session: Session, leaderboard_id: uuid.UUID):
mv_name = mv_pg_name(leaderboard_id)
if not mv_check(db_session, leaderboard_id):
try:
create_materialized_view(db_session, leaderboard_id)
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(f"Error creating materialized view: {e}")
raise # Re-raise exception after logging
else:
db_session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {mv_name}"))
def get_leaderboard_materialized_view(db_session: Session, leaderboard_id: uuid.UUID):
### materialized view name
mv_name = mv_pg_name(leaderboard_id)
if not mv_check(db_session, leaderboard_id):
try:
create_materialized_view(db_session, leaderboard_id)
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(f"Error creating materialized view: {e}")
raise # Re-raise exception after logging
### construct the query
# Directly query the materialized view using text
# Define the materialized view as a table object
leaderboard_table = table(
mv_name,
column("address"),
column("score"),
column("points_data"),
column("rank"),
)
# Construct the select statement
query = db_session.query(leaderboard_table)
return query
def generate_ranking_query(
db_session: Session,
leaderboard_id: uuid.UUID,
version_number: Optional[int] = None,
) -> Query:
"""
Generate a query to get the ranking of the leaderboard
"""
if version_number is None:
query = get_leaderboard_materialized_view(db_session, leaderboard_id)
else:
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id
== LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
)
return query
def get_leaderboard_total_count(
db_session: Session, leaderboard_id, version_number: Optional[int] = None
) -> int:
@ -1178,36 +1325,7 @@ def get_position(
Return position by address with window size
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data.label("points_data"),
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
func.row_number()
.over(order_by=LeaderboardScores.score.desc())
.label("number"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
query = generate_ranking_query(db_session, leaderboard_id, version_number)
ranked_leaderboard = query.cte(name="ranked_leaderboard")
@ -1297,33 +1415,7 @@ def get_leaderboard_positions(
# get public leaderboard scores with max version
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
# Main query
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.filter(LeaderboardVersion.published == True)
.filter(LeaderboardVersion.version_number == latest_version)
)
query = generate_ranking_query(db_session, leaderboard_id, version_number)
if len(poitns_data) > 0:
@ -1353,32 +1445,7 @@ def get_qurtiles(
https://docs.sqlalchemy.org/en/14/core/functions.html#sqlalchemy.sql.functions.percentile_disc
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.address,
LeaderboardScores.score,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
query = generate_ranking_query(db_session, leaderboard_id, version_number)
ranked_leaderboard = query.cte(name="ranked_leaderboard")
@ -1409,34 +1476,7 @@ def get_ranks(
Get the leaderboard rank buckets(rank, size, score)
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
)
query = generate_ranking_query(db_session, leaderboard_id, version_number)
ranked_leaderboard = query.cte(name="ranked_leaderboard")
@ -1460,35 +1500,9 @@ def get_rank(
Get bucket in leaderboard by rank
"""
latest_version = leaderboard_version_filter(
db_session=db_session,
leaderboard_id=leaderboard_id,
version_number=version_number,
)
query = generate_ranking_query(db_session, leaderboard_id, version_number)
query = (
db_session.query(
LeaderboardScores.id,
LeaderboardScores.address,
LeaderboardScores.score,
LeaderboardScores.points_data,
func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"),
)
.join(
LeaderboardVersion,
and_(
LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id,
LeaderboardVersion.version_number
== LeaderboardScores.leaderboard_version_number,
),
)
.filter(
LeaderboardVersion.published == True,
LeaderboardVersion.version_number == latest_version,
)
.filter(LeaderboardScores.leaderboard_id == leaderboard_id)
.order_by(text("rank asc, id asc"))
)
query.order_by(text("rank asc, id asc"))
ranked_leaderboard = query.cte(name="ranked_leaderboard")
@ -1546,6 +1560,12 @@ def create_leaderboard(
user_id=str(user.id) if user is not None else None,
)
leaderboard.resource_id = resource.id
try:
create_materialized_view(db_session, leaderboard.id)
except Exception as e:
logger.error(f"Error creating materialized view: {e}")
raise LeaderboardCreateError(f"Error creating materialized view: {e}")
db_session.commit()
except Exception as e:
db_session.rollback()
@ -1712,12 +1732,18 @@ def add_scores(
updated_at=datetime.now(),
),
)
try:
db_session.execute(result_stmt)
db_session.commit()
except:
db_session.rollback()
try:
update_materialized_view(db_session, leaderboard_id)
except Exception as e:
logger.error(f"Error updating materialized view: {e}")
return leaderboard_scores
@ -2031,6 +2057,9 @@ def create_leaderboard_version(
db_session.add(leaderboard_version)
db_session.commit()
if publish:
update_materialized_view(db_session, leaderboard_id)
return leaderboard_version
@ -2051,6 +2080,9 @@ def change_publish_leaderboard_version_status(
db_session.commit()
if published:
update_materialized_view(db_session, leaderboard_id)
return leaderboard_version