diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 8cc2def2..fdec37de 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -16,7 +16,21 @@ from ..settings import ( MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS, ) from ..settings import bugout_client as bc -from .utils import get_results_for_moonstream_query, leaderboard_push_batch +from ..db import ( + create_moonstream_engine, + MOONSTREAM_DB_URI_READ_ONLY, + MOONSTREAM_POOL_SIZE, + sessionmaker, + contextmanager, + Session, + Generator, +) +from .utils import ( + get_results_for_moonstream_query, + leaderboard_push_batch, + get_query_by_name, +) +from sqlalchemy import text logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -26,6 +40,15 @@ green_c = "\033[92m" end_c = "\033[0m" +def to_json_types(value): + if isinstance(value, (str, int, tuple, dict, list)): + return value + elif isinstance(value, set): + return list(value) + else: + return str(value) + + def handle_leaderboards(args: argparse.Namespace) -> None: """ Run the leaderboard generator. @@ -63,6 +86,24 @@ def handle_leaderboards(args: argparse.Namespace) -> None: logger.info(f"Found {len(leaderboards_results)} leaderboards") + if args.execute_over_db: + RO_custom_engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI_READ_ONLY, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=420 * 1000, # 7 minutes + ) + + RO_SessionLocal = sessionmaker(bind=RO_custom_engine) + + def yield_db_read_only_session() -> Generator[Session, None, None]: + session = RO_SessionLocal() + try: + yield session + finally: + session.close() + + yield_db_read_only_session_ctx = contextmanager(yield_db_read_only_session) + for leaderboard in leaderboards_results: logger.info( f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}" @@ -79,8 +120,6 @@ def handle_leaderboards(args: argparse.Namespace) -> None: ) continue - ### get results from query API - leaderboard_id = leaderboard_data["leaderboard_id"] query_name = leaderboard_data["query_name"] @@ -90,29 +129,70 @@ def handle_leaderboards(args: argparse.Namespace) -> None: else: params = leaderboard_data["params"] - blockchain = leaderboard_data.get("blockchain", None) + if args.execute_over_db is False: - ### execute query - try: - query_results = get_results_for_moonstream_query( - args.query_api_access_token, - query_name, - params, - blockchain, - MOONSTREAM_API_URL, - args.max_retries, - args.interval, - args.query_api_retries, - ) - except Exception as e: - logger.error(f"Could not get results for query {query_name}: error: {e}") - continue + ### get results from query API - ### push results to leaderboard API + blockchain = leaderboard_data.get("blockchain", None) - if query_results is None: - logger.error(f"Could not get results for query {query_name} in time") - continue + ### execute query + try: + query_results = get_results_for_moonstream_query( + args.query_api_access_token, + query_name, + params, + blockchain, + MOONSTREAM_API_URL, + args.max_retries, + args.interval, + args.query_api_retries, + ) + except Exception as e: + logger.error( + f"Could not get results for query {query_name}: error: {e}" + ) + continue + + ### push results to leaderboard API + + if query_results is None: + logger.error(f"Could not get results for query {query_name} in time") + continue + + records = query_results["data"] + + else: + + logger.info(f"Executing query {query_name} over database") + ### get query + + try: + query = get_query_by_name( + args.query_api_access_token, query_name, MOONSTREAM_API_URL + ) + except Exception as e: + logger.error(f"Could not get queries from Moonstream API: {e}") + continue + + ### query content + + print(query) + + if query["query"] is None: + logger.error(f"Query {query_name} has no content") + continue + + with yield_db_read_only_session_ctx() as db_session: + try: + results = db_session.execute(text(query["query"]), params) + except Exception as e: + logger.error(f"Could not execute query {query_name}: {e}") + continue + + records = [ + {key: to_json_types(value) for key, value in row._asdict().items()} + for row in db_session.execute(text(query["query"]), params).all() + ] leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true" @@ -121,14 +201,14 @@ def handle_leaderboards(args: argparse.Namespace) -> None: "Content-Type": "application/json", } - if len(query_results["data"]) > leaderboard_push_batch_size: + if len(records) > leaderboard_push_batch_size: logger.info( - f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}" + f"Pushing {len(records)} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}" ) leaderboard_push_batch( leaderboard_id, leaderboard_data, - query_results["data"], + records, leaderboard_api_headers, leaderboard_push_batch_size, timeout=leaderboard_push_timeout_seconds, @@ -138,7 +218,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None: try: leaderboard_api_response = requests.put( leaderboard_push_api_url, - json=query_results["data"], + json=records, headers=leaderboard_api_headers, timeout=leaderboard_push_timeout_seconds, ) @@ -156,7 +236,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None: ) leaderboard_api_response = requests.get( - leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10 + leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=30 ) try: @@ -225,6 +305,11 @@ def main(): required=True, help="Moonstream Access Token to use for Moonstream Query API requests", ) + leaderboard_generator_parser.add_argument( + "--execute-over-db", + action="store_true", + help="Execute query over database instead of Moonstream Query API", + ) leaderboard_generator_parser.add_argument( "--leaderboard-push-batch-size", type=int, diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index f1e9e87f..78cc4c91 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -106,6 +106,48 @@ def get_results_for_moonstream_query( return result +def list_queries( + moonstream_access_token: str, + api_url: str = MOONSTREAM_API_URL, +) -> List[Dict[str, Any]]: + """ + Return a list of queries available in account. + """ + + api_url = api_url.rstrip("/") + request_url = f"{api_url}/queries/list" + headers = { + "Authorization": f"Bearer {moonstream_access_token}", + "Content-Type": "application/json", + } + + response = requests.get(request_url, headers=headers, timeout=10) + response.raise_for_status() + return response.json() + + +def get_query_by_name( + moonstream_access_token: str, + query_name: str, + api_url: str = MOONSTREAM_API_URL, +) -> Dict[str, Any]: + """ + Return a query by name. + """ + + api_url = api_url.rstrip("/") + request_url = f"{api_url}/queries/{query_name}/query" + headers = { + "Authorization": f"Bearer {moonstream_access_token}", + "Content-Type": "application/json", + } + + response = requests.get(request_url, headers=headers, timeout=10) + + response.raise_for_status() + return response.json() + + def get_data_from_url(url): response = requests.get(url) if response.status_code == 200: diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index f9a16b04..bc304043 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -383,5 +383,5 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "": ) -MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000 +MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 10000 MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60