kopia lustrzana https://github.com/bugout-dev/moonstream
Merge branch 'main' into historical-crawl-fixes
commit
9246950293
|
@ -26,6 +26,9 @@ SCRIPT_DIR="$(realpath $(dirname $0))"
|
|||
|
||||
# Service files
|
||||
MOONCRAWL_SERVICE_FILE="mooncrawl.service"
|
||||
LEADERBOARDS_WORKER_SERVICE_FILE="leaderboards-worker.service"
|
||||
LEADERBOARDS_WORKER_TIMER_FILE="leaderboards-worker.timer"
|
||||
|
||||
|
||||
# Ethereum service files
|
||||
ETHEREUM_SYNCHRONIZE_SERVICE_FILE="ethereum-synchronize.service"
|
||||
|
@ -553,3 +556,14 @@ cp "${SCRIPT_DIR}/${ZKSYNC_ERA_TESTNET_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/
|
|||
cp "${SCRIPT_DIR}/${ZKSYNC_ERA_TESTNET_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ZKSYNC_ERA_TESTNET_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ZKSYNC_ERA_TESTNET_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
|
||||
|
||||
|
||||
|
||||
echo
|
||||
echo
|
||||
echo -e "${PREFIX_INFO} Replacing existing Leaderboards worker service and timer with: ${LEADERBOARDS_WORKER_SERVICE_FILE}, ${LEADERBOARDS_WORKER_TIMER_FILE}"
|
||||
chmod 644 "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_SERVICE_FILE}" "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_TIMER_FILE}"
|
||||
cp "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${LEADERBOARDS_WORKER_SERVICE_FILE}"
|
||||
cp "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${LEADERBOARDS_WORKER_TIMER_FILE}"
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${LEADERBOARDS_WORKER_TIMER_FILE}"
|
|
@ -0,0 +1,11 @@
|
|||
[Unit]
|
||||
Description=Runs leaderboards generator worker
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
|
||||
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.leaderboards_generator.cli leaderboards-generate --query-api-access-token "${MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN}"
|
||||
CPUWeight=60
|
||||
SyslogIdentifier=leaderboards-worker
|
|
@ -0,0 +1,9 @@
|
|||
[Unit]
|
||||
Description=Runs leaderboard update script every 10 minutes
|
||||
|
||||
[Timer]
|
||||
OnBootSec=60s
|
||||
OnUnitActiveSec=10m
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
|
@ -0,0 +1,218 @@
|
|||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
import uuid
|
||||
|
||||
import requests # type: ignore
|
||||
|
||||
|
||||
from .utils import get_results_for_moonstream_query
|
||||
from ..settings import (
|
||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
|
||||
BUGOUT_REQUEST_TIMEOUT_SECONDS,
|
||||
MOONSTREAM_API_URL,
|
||||
MOONSTREAM_ENGINE_URL,
|
||||
)
|
||||
|
||||
from ..settings import bugout_client as bc
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
blue_c = "\033[94m"
|
||||
green_c = "\033[92m"
|
||||
end_c = "\033[0m"
|
||||
|
||||
|
||||
def handle_leaderboards(args: argparse.Namespace) -> None:
|
||||
"""
|
||||
Run the leaderboard generator.
|
||||
|
||||
Get query from journal and push results to leaderboard API.
|
||||
"""
|
||||
|
||||
### get leaderboard journal
|
||||
|
||||
query = "#leaderboard"
|
||||
|
||||
if args.leaderboard_id: # way to run only one leaderboard
|
||||
query += f" #leaderboard_id:{args.leaderboard_id}"
|
||||
try:
|
||||
leaderboards = bc.search(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
journal_id=MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
|
||||
query=query,
|
||||
limit=100,
|
||||
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Could not get leaderboards from journal: {e}")
|
||||
return
|
||||
|
||||
if len(leaderboards.results) == 0:
|
||||
logger.error("No leaderboard found")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(leaderboards.results)} leaderboards")
|
||||
|
||||
for leaderboard in leaderboards.results:
|
||||
logger.info(
|
||||
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
|
||||
)
|
||||
|
||||
if leaderboard.content is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
leaderboard_data = json.loads(leaderboard.content)
|
||||
except json.JSONDecodeError:
|
||||
logger.error(
|
||||
f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}"
|
||||
)
|
||||
continue
|
||||
|
||||
### get results from query API
|
||||
|
||||
leaderboard_id = leaderboard_data["leaderboard_id"]
|
||||
|
||||
query_name = leaderboard_data["query_name"]
|
||||
|
||||
if args.params:
|
||||
params = json.loads(args.params)
|
||||
else:
|
||||
params = leaderboard_data["params"]
|
||||
|
||||
blockchain = leaderboard_data.get("blockchain", None)
|
||||
|
||||
### execute query
|
||||
try:
|
||||
query_results = get_results_for_moonstream_query(
|
||||
args.query_api_access_token,
|
||||
query_name,
|
||||
params,
|
||||
blockchain,
|
||||
MOONSTREAM_API_URL,
|
||||
args.max_retries,
|
||||
args.interval,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Could not get results for query {query_name}: error: {e}")
|
||||
continue
|
||||
|
||||
### push results to leaderboard API
|
||||
|
||||
if query_results is None:
|
||||
logger.error(f"Could not get results for query {query_name} in time")
|
||||
continue
|
||||
|
||||
leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"
|
||||
|
||||
leaderboard_api_headers = {
|
||||
"Authorization": f"Bearer {args.query_api_access_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
leaderboard_api_response = requests.put(
|
||||
leaderboard_push_api_url,
|
||||
json=query_results["data"],
|
||||
headers=leaderboard_api_headers,
|
||||
timeout=10,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
leaderboard_api_response.raise_for_status()
|
||||
except requests.exceptions.HTTPError as http_error:
|
||||
logger.error(
|
||||
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||
)
|
||||
continue
|
||||
|
||||
### get leaderboard from leaderboard API
|
||||
|
||||
leaderboard_api_info_url = (
|
||||
f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}"
|
||||
)
|
||||
|
||||
leaderboard_api_response = requests.get(
|
||||
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
|
||||
)
|
||||
|
||||
try:
|
||||
leaderboard_api_response.raise_for_status()
|
||||
except requests.exceptions.HTTPError as http_error:
|
||||
logger.error(
|
||||
f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
|
||||
)
|
||||
continue
|
||||
|
||||
info = leaderboard_api_response.json()
|
||||
|
||||
logger.info(
|
||||
f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}"
|
||||
)
|
||||
logger.info(
|
||||
f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}"
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
CLI for generating leaderboards from Moonstream Query API
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(description="The Judge: Generate leaderboards")
|
||||
parser.set_defaults(func=lambda _: parser.print_help())
|
||||
subparsers = parser.add_subparsers()
|
||||
|
||||
leaderboard_generator_parser = subparsers.add_parser(
|
||||
"leaderboards-generate", description="Generate Leaderboard"
|
||||
)
|
||||
leaderboard_generator_parser.add_argument(
|
||||
"--leaderboard-id",
|
||||
type=uuid.UUID,
|
||||
required=False,
|
||||
help="Leaderboard ID on Engine API",
|
||||
)
|
||||
leaderboard_generator_parser.add_argument(
|
||||
"--max-retries",
|
||||
type=int,
|
||||
default=100,
|
||||
help="Number of times to retry requests for Moonstream Query results",
|
||||
)
|
||||
leaderboard_generator_parser.add_argument(
|
||||
"--interval",
|
||||
type=float,
|
||||
default=30.0,
|
||||
help="Number of seconds to wait between attempts to get results from Moonstream Query API",
|
||||
)
|
||||
leaderboard_generator_parser.add_argument(
|
||||
"--params",
|
||||
type=json.loads,
|
||||
required=False,
|
||||
help="Parameters to pass to Moonstream Query API. Use together with --leaderboard-id",
|
||||
)
|
||||
leaderboard_generator_parser.add_argument(
|
||||
"--query-api-access-token",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Moonstream Access Token to use for Moonstream Query API requests",
|
||||
)
|
||||
|
||||
leaderboard_generator_parser.set_defaults(func=handle_leaderboards)
|
||||
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,102 @@
|
|||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
import requests # type: ignore
|
||||
|
||||
from ..settings import MOONSTREAM_API_URL
|
||||
|
||||
|
||||
logging.basicConfig()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_results_for_moonstream_query(
|
||||
moonstream_access_token: str,
|
||||
query_name: str,
|
||||
params: Dict[str, Any],
|
||||
blockchain: Optional[str] = None,
|
||||
api_url: str = MOONSTREAM_API_URL,
|
||||
max_retries: int = 100,
|
||||
interval: float = 30.0,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
|
||||
Run update of query data and waiting update of query result on S3.
|
||||
TODO: Move to moonstream-client.
|
||||
|
||||
:param moonstream_access_token: Moonstream access token.
|
||||
|
||||
:param query_name: Name of the query to run.
|
||||
|
||||
:param params: Parameters to pass to the query.
|
||||
|
||||
:param api_url: URL of the Moonstream API.
|
||||
|
||||
:param max_retries: Maximum number of times to retry getting results from the Moonstream Query API.
|
||||
|
||||
:param interval: Number of seconds to wait between attempts to get results from the Moonstream Query API.
|
||||
|
||||
:return: Results of the query.
|
||||
|
||||
"""
|
||||
|
||||
result: Optional[Dict[str, Any]] = None
|
||||
|
||||
api_url = api_url.rstrip("/")
|
||||
request_url = f"{api_url}/queries/{query_name}/update_data"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {moonstream_access_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
# Assume our clock is not drifting too much from AWS clocks.
|
||||
if_modified_since_datetime = datetime.datetime.utcnow()
|
||||
if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
|
||||
request_body: Dict[str, Any] = {"params": params}
|
||||
|
||||
if blockchain is not None:
|
||||
request_body["blockchain"] = blockchain
|
||||
|
||||
success = False
|
||||
attempts = 0
|
||||
|
||||
while not success and attempts < max_retries:
|
||||
attempts += 1
|
||||
response = requests.post(
|
||||
request_url, json=request_body, headers=headers, timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
response_body = response.json()
|
||||
data_url = response_body["url"]
|
||||
|
||||
keep_going = True
|
||||
num_retries = 0
|
||||
|
||||
logging.debug(f"If-Modified-Since: {if_modified_since}")
|
||||
while keep_going:
|
||||
time.sleep(interval)
|
||||
num_retries += 1
|
||||
try:
|
||||
data_response = requests.get(
|
||||
data_url,
|
||||
headers={"If-Modified-Since": if_modified_since},
|
||||
timeout=10,
|
||||
)
|
||||
except:
|
||||
logger.error(f"Failed to get data from {data_url}")
|
||||
continue
|
||||
logger.debug(f"Status code: {data_response.status_code}")
|
||||
logger.debug(f"Last-Modified: {data_response.headers['Last-Modified']}")
|
||||
if data_response.status_code == 200:
|
||||
result = data_response.json()
|
||||
keep_going = False
|
||||
success = True
|
||||
if keep_going and max_retries > 0:
|
||||
keep_going = num_retries <= max_retries
|
||||
|
||||
return result
|
|
@ -81,7 +81,7 @@ def get_uris_of_tokens(
|
|||
{}
|
||||
WHERE
|
||||
label = :label
|
||||
AND label_data ->> 'name' = :name
|
||||
AND label_data ->> 'name' in :names
|
||||
ORDER BY
|
||||
label_data -> 'inputs'-> 0 ASC,
|
||||
address ASC,
|
||||
|
@ -90,7 +90,11 @@ def get_uris_of_tokens(
|
|||
table
|
||||
)
|
||||
),
|
||||
{"table": table, "label": VIEW_STATE_CRAWLER_LABEL, "name": "tokenURI"},
|
||||
{
|
||||
"table": table,
|
||||
"label": VIEW_STATE_CRAWLER_LABEL,
|
||||
"names": ("tokenURI", "uri"),
|
||||
},
|
||||
)
|
||||
|
||||
results = [
|
||||
|
|
|
@ -23,6 +23,11 @@ if MOONSTREAM_ENTITY_URL == "":
|
|||
|
||||
entity_client = Entity(MOONSTREAM_ENTITY_URL)
|
||||
|
||||
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
|
||||
MOONSTREAM_ENGINE_URL = os.environ.get(
|
||||
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
|
||||
)
|
||||
|
||||
|
||||
BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
|
||||
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
|
||||
|
@ -310,3 +315,14 @@ HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = {
|
|||
"historical_crawl_status": "historical_crawl_status",
|
||||
"progress_status": "progress",
|
||||
}
|
||||
|
||||
|
||||
# Leaderboard generator
|
||||
|
||||
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID = os.environ.get(
|
||||
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID", ""
|
||||
)
|
||||
if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
|
||||
raise ValueError(
|
||||
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
|
||||
)
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream crawlers version.
|
||||
"""
|
||||
|
||||
MOONCRAWL_VERSION = "0.3.2"
|
||||
MOONCRAWL_VERSION = "0.3.3"
|
||||
|
|
|
@ -7,7 +7,11 @@ export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
|
|||
# Entity environment variables
|
||||
export MOONSTREAM_ENTITY_URL="https://api.moonstream.to/entity"
|
||||
|
||||
# Engine environment variables
|
||||
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"
|
||||
|
||||
# Moonstream environment variables
|
||||
export MOONSTREAM_API_URL="https://api.moonstream.to"
|
||||
export MOONSTREAM_BUGOUT_TIMEOUT_SECONDS=30
|
||||
export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to"
|
||||
export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"
|
||||
|
@ -45,3 +49,6 @@ export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="<public_bucket>"
|
|||
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev"
|
||||
export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="<access token for run queries for public dashboards>"
|
||||
export INFURA_PROJECT_ID="<infura_project_id>"
|
||||
|
||||
# Leaderboard worker
|
||||
export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID=<Bugout_journal_id_for_leaderboards>
|
|
@ -67,6 +67,7 @@ setup(
|
|||
"state-crawler=mooncrawl.state_crawler.cli:main",
|
||||
"metadata-crawler=mooncrawl.metadata_crawler.cli:main",
|
||||
"custom-crawler=mooncrawl.reports_crawler.cli:main",
|
||||
"leaderboards-generator=mooncrawl.leaderboards_generator.cli:main",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
|
Ładowanie…
Reference in New Issue