kopia lustrzana https://github.com/bugout-dev/moonstream
Add batching leaderboard generator.
rodzic
29c8c2c4a0
commit
21fb0ca0d7
|
@ -7,12 +7,14 @@ import uuid
|
||||||
import requests # type: ignore
|
import requests # type: ignore
|
||||||
from bugout.data import BugoutSearchResult
|
from bugout.data import BugoutSearchResult
|
||||||
|
|
||||||
from .utils import get_results_for_moonstream_query
|
from .utils import get_results_for_moonstream_query, leaderboard_push_batch
|
||||||
from ..settings import (
|
from ..settings import (
|
||||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
|
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
|
||||||
MOONSTREAM_API_URL,
|
MOONSTREAM_API_URL,
|
||||||
MOONSTREAM_ENGINE_URL,
|
MOONSTREAM_ENGINE_URL,
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ..settings import bugout_client as bc
|
from ..settings import bugout_client as bc
|
||||||
|
@ -35,10 +37,15 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
|
||||||
|
|
||||||
### get leaderboard journal
|
### get leaderboard journal
|
||||||
|
|
||||||
|
leaderboard_push_batch_size = args.leaderboard_push_batch_size
|
||||||
|
|
||||||
|
leaderboard_push_timeout_seconds = args.leaderboard_push_timeout_seconds
|
||||||
|
|
||||||
query = "#leaderboard #status:active"
|
query = "#leaderboard #status:active"
|
||||||
|
|
||||||
if args.leaderboard_id: # way to run only one leaderboard
|
if args.leaderboard_id: # way to run only one leaderboard without status:active
|
||||||
query += f" #leaderboard_id:{args.leaderboard_id}"
|
query = f"#leaderboard #leaderboard_id:{args.leaderboard_id}"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
leaderboards = bc.search(
|
leaderboards = bc.search(
|
||||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
|
@ -116,26 +123,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
if len(query_results["data"]) > leaderboard_push_batch_size:
|
||||||
leaderboard_api_response = requests.put(
|
logger.info(
|
||||||
leaderboard_push_api_url,
|
f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}"
|
||||||
json=query_results["data"],
|
|
||||||
headers=leaderboard_api_headers,
|
|
||||||
timeout=10,
|
|
||||||
)
|
)
|
||||||
except Exception as e:
|
leaderboard_push_batch(
|
||||||
logger.error(
|
leaderboard_id,
|
||||||
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
|
leaderboard_data,
|
||||||
|
query_results["data"],
|
||||||
|
leaderboard_api_headers,
|
||||||
|
leaderboard_push_batch_size,
|
||||||
|
timeout=leaderboard_push_timeout_seconds,
|
||||||
)
|
)
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
else:
|
||||||
leaderboard_api_response.raise_for_status()
|
try:
|
||||||
except requests.exceptions.HTTPError as http_error:
|
leaderboard_api_response = requests.put(
|
||||||
logger.error(
|
leaderboard_push_api_url,
|
||||||
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
|
json=query_results["data"],
|
||||||
)
|
headers=leaderboard_api_headers,
|
||||||
continue
|
timeout=leaderboard_push_timeout_seconds,
|
||||||
|
)
|
||||||
|
leaderboard_api_response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as http_error:
|
||||||
|
logger.error(
|
||||||
|
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
### get leaderboard from leaderboard API
|
### get leaderboard from leaderboard API
|
||||||
|
|
||||||
|
@ -213,6 +227,18 @@ def main():
|
||||||
required=True,
|
required=True,
|
||||||
help="Moonstream Access Token to use for Moonstream Query API requests",
|
help="Moonstream Access Token to use for Moonstream Query API requests",
|
||||||
)
|
)
|
||||||
|
leaderboard_generator_parser.add_argument(
|
||||||
|
"--leaderboard-push-batch-size",
|
||||||
|
type=int,
|
||||||
|
default=MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
|
||||||
|
help="Number of scores to push to leaderboard API at once",
|
||||||
|
)
|
||||||
|
leaderboard_generator_parser.add_argument(
|
||||||
|
"--leaderboard-push-timeout-seconds",
|
||||||
|
type=int,
|
||||||
|
default=MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
|
||||||
|
help="Timeout for leaderboard API requests",
|
||||||
|
)
|
||||||
|
|
||||||
leaderboard_generator_parser.set_defaults(func=handle_leaderboards)
|
leaderboard_generator_parser.set_defaults(func=handle_leaderboards)
|
||||||
|
|
||||||
|
|
|
@ -3,12 +3,17 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional, List
|
||||||
|
|
||||||
|
|
||||||
import requests # type: ignore
|
import requests # type: ignore
|
||||||
|
|
||||||
from ..settings import MOONSTREAM_API_URL
|
from ..settings import (
|
||||||
|
MOONSTREAM_API_URL,
|
||||||
|
MOONSTREAM_ENGINE_URL,
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
|
@ -101,3 +106,120 @@ def get_results_for_moonstream_query(
|
||||||
keep_going = num_retries <= max_retries
|
keep_going = num_retries <= max_retries
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def get_data_from_url(url):
|
||||||
|
response = requests.get(url)
|
||||||
|
if response.status_code == 200:
|
||||||
|
return response.json()
|
||||||
|
else:
|
||||||
|
raise Exception(f"Failed to get data: HTTP {response.status_code}")
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_data(data, chunk_size=100000):
|
||||||
|
for i in range(0, len(data), chunk_size):
|
||||||
|
yield data[i : i + chunk_size]
|
||||||
|
|
||||||
|
|
||||||
|
def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10):
|
||||||
|
for index, chunk in enumerate(chunks):
|
||||||
|
try:
|
||||||
|
logger.info(f"Pushing chunk {index} to leaderboard API")
|
||||||
|
response = requests.put(
|
||||||
|
endpoint_url, headers=headers, json=chunk, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as http_error:
|
||||||
|
logger.error(
|
||||||
|
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
def leaderboard_push_batch(
|
||||||
|
leaderboard_id: str,
|
||||||
|
leaderboard_config: Dict[str, Any],
|
||||||
|
data: List[Dict[str, Any]],
|
||||||
|
headers: Dict[str, str],
|
||||||
|
batch_size: int = MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
|
||||||
|
timeout: int = 10,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Push leaderboard data to the leaderboard API in batches.
|
||||||
|
"""
|
||||||
|
|
||||||
|
## first step create leaderboard version
|
||||||
|
|
||||||
|
leaderboard_version_api_url = (
|
||||||
|
f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions"
|
||||||
|
)
|
||||||
|
|
||||||
|
json_data = {
|
||||||
|
"publish": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderboard_api_response = requests.post(
|
||||||
|
leaderboard_version_api_url, json=json_data, headers=headers, timeout=5
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
leaderboard_api_response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as http_error:
|
||||||
|
logger.error(
|
||||||
|
f"Could not create leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
leaderboard_version_id = leaderboard_api_response.json()["version"]
|
||||||
|
|
||||||
|
## second step push data to leaderboard version
|
||||||
|
|
||||||
|
leaderboard_version_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}/scores?normalize_addresses={leaderboard_config['normalize_addresses']}&overwrite=false"
|
||||||
|
|
||||||
|
chunks = chunk_data(data, chunk_size=batch_size)
|
||||||
|
|
||||||
|
send_data_to_endpoint(
|
||||||
|
chunks, leaderboard_version_push_api_url, headers, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
## third step publish leaderboard version
|
||||||
|
|
||||||
|
leaderboard_version_publish_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}"
|
||||||
|
|
||||||
|
json_data = {
|
||||||
|
"publish": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
leaderboard_api_response = requests.put(
|
||||||
|
leaderboard_version_publish_api_url,
|
||||||
|
json=json_data,
|
||||||
|
headers=headers,
|
||||||
|
timeout=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
leaderboard_api_response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as http_error:
|
||||||
|
logger.error(
|
||||||
|
f"Could not publish leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
## delete leaderboard version -1
|
||||||
|
|
||||||
|
try:
|
||||||
|
leaderboard_version_delete_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id - 1}"
|
||||||
|
|
||||||
|
leaderboard_api_response = requests.delete(
|
||||||
|
leaderboard_version_delete_api_url,
|
||||||
|
headers=headers,
|
||||||
|
timeout=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
leaderboard_api_response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as http_error:
|
||||||
|
logger.error(
|
||||||
|
f"Could not delete leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
|
@ -321,3 +321,7 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
|
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 20000
|
||||||
|
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
|
||||||
|
|
Ładowanie…
Reference in New Issue