Merge pull request #984 from moonstream-to/leaderboard-push-batching

Add batching leaderboard generator.
pull/993/head
Andrey Dolgolev 2023-12-13 22:03:23 +02:00 zatwierdzone przez GitHub
commit b7ea3445ce
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
3 zmienionych plików z 169 dodań i 22 usunięć

Wyświetl plik

@ -7,12 +7,14 @@ import uuid
import requests # type: ignore
from bugout.data import BugoutSearchResult
from .utils import get_results_for_moonstream_query
from .utils import get_results_for_moonstream_query, leaderboard_push_batch
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
MOONSTREAM_API_URL,
MOONSTREAM_ENGINE_URL,
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
)
from ..settings import bugout_client as bc
@ -35,10 +37,15 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
### get leaderboard journal
leaderboard_push_batch_size = args.leaderboard_push_batch_size
leaderboard_push_timeout_seconds = args.leaderboard_push_timeout_seconds
query = "#leaderboard #status:active"
if args.leaderboard_id: # way to run only one leaderboard
query += f" #leaderboard_id:{args.leaderboard_id}"
if args.leaderboard_id: # way to run only one leaderboard without status:active
query = f"#leaderboard #leaderboard_id:{args.leaderboard_id}"
try:
leaderboards = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -116,26 +123,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
"Content-Type": "application/json",
}
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
if len(query_results["data"]) > leaderboard_push_batch_size:
logger.info(
f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}"
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
leaderboard_push_batch(
leaderboard_id,
leaderboard_data,
query_results["data"],
leaderboard_api_headers,
leaderboard_push_batch_size,
timeout=leaderboard_push_timeout_seconds,
)
continue
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue
else:
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=leaderboard_push_timeout_seconds,
)
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue
### get leaderboard from leaderboard API
@ -213,6 +227,18 @@ def main():
required=True,
help="Moonstream Access Token to use for Moonstream Query API requests",
)
leaderboard_generator_parser.add_argument(
"--leaderboard-push-batch-size",
type=int,
default=MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
help="Number of scores to push to leaderboard API at once",
)
leaderboard_generator_parser.add_argument(
"--leaderboard-push-timeout-seconds",
type=int,
default=MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
help="Timeout for leaderboard API requests",
)
leaderboard_generator_parser.set_defaults(func=handle_leaderboards)

Wyświetl plik

@ -3,12 +3,17 @@ import json
import logging
import os
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
import requests # type: ignore
from ..settings import MOONSTREAM_API_URL
from ..settings import (
MOONSTREAM_API_URL,
MOONSTREAM_ENGINE_URL,
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
)
logging.basicConfig()
@ -101,3 +106,115 @@ def get_results_for_moonstream_query(
keep_going = num_retries <= max_retries
return result
def get_data_from_url(url):
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get data: HTTP {response.status_code}")
def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10):
for index, chunk in enumerate(chunks):
try:
logger.info(f"Pushing chunk {index} to leaderboard API")
response = requests.put(
endpoint_url, headers=headers, json=chunk, timeout=timeout
)
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
raise http_error
def leaderboard_push_batch(
leaderboard_id: str,
leaderboard_config: Dict[str, Any],
data: List[Dict[str, Any]],
headers: Dict[str, str],
batch_size: int = MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
timeout: int = 10,
) -> None:
"""
Push leaderboard data to the leaderboard API in batches.
"""
## first step create leaderboard version
leaderboard_version_api_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions"
)
json_data = {
"publish": False,
}
leaderboard_api_response = requests.post(
leaderboard_version_api_url, json=json_data, headers=headers, timeout=10
)
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not create leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return
leaderboard_version_id = leaderboard_api_response.json()["version"]
## second step push data to leaderboard version
leaderboard_version_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}/scores?normalize_addresses={leaderboard_config['normalize_addresses']}&overwrite=false"
chunks = [data[x : x + batch_size] for x in range(0, len(data), batch_size)]
send_data_to_endpoint(
chunks, leaderboard_version_push_api_url, headers, timeout=timeout
)
## third step publish leaderboard version
leaderboard_version_publish_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}"
json_data = {
"publish": True,
}
try:
leaderboard_api_response = requests.put(
leaderboard_version_publish_api_url,
json=json_data,
headers=headers,
timeout=10,
)
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not publish leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return
## delete leaderboard version -1
try:
leaderboard_version_delete_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id - 1}"
leaderboard_api_response = requests.delete(
leaderboard_version_delete_api_url,
headers=headers,
timeout=timeout,
)
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not delete leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return

Wyświetl plik

@ -322,3 +322,7 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
)
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60