diff --git a/crawlers/deploy/deploy-monitoring.bash b/crawlers/deploy/deploy-monitoring.bash new file mode 100755 index 00000000..12600b48 --- /dev/null +++ b/crawlers/deploy/deploy-monitoring.bash @@ -0,0 +1,77 @@ +#!/usr/bin/env bash + +# Deployment script of monitoring services - intended to run on Moonstream crawlers server + +# Colors +C_RESET='\033[0m' +C_RED='\033[1;31m' +C_GREEN='\033[1;32m' +C_YELLOW='\033[1;33m' + +# Logs +PREFIX_INFO="${C_GREEN}[INFO]${C_RESET} [$(date +%d-%m\ %T)]" +PREFIX_WARN="${C_YELLOW}[WARN]${C_RESET} [$(date +%d-%m\ %T)]" +PREFIX_CRIT="${C_RED}[CRIT]${C_RESET} [$(date +%d-%m\ %T)]" + +# Main +AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" +SECRETS_DIR="${SECRETS_DIR:-/home/ubuntu/moonstream-secrets}" +PARAMETERS_ENV_MONITORING_PATH="${SECRETS_DIR}/monitoring.env" +SCRIPT_DIR="$(realpath $(dirname $0))" + +# Service files +MONITORING_CRAWLERS_SERVICE_FILE="monitoring-crawlers.service" + +set -eu + +echo +echo +echo -e "${PREFIX_INFO} Install checkenv" +HOME=/home/ubuntu /usr/local/go/bin/go install github.com/bugout-dev/checkenv@latest + +echo +echo +echo -e "${PREFIX_INFO} Copy monitoring binary from AWS S3" +aws s3 cp s3://bugout-binaries/prod/monitoring/monitoring "/home/ubuntu/monitoring" +chmod +x "/home/ubuntu/monitoring" +chown ubuntu:ubuntu "/home/ubuntu/monitoring" + +echo +echo +echo -e "${PREFIX_INFO} Retrieving monitoring deployment parameters" +AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION}" /home/ubuntu/go/bin/checkenv show aws_ssm+crawlers:true,monitoring:true > "${PARAMETERS_ENV_MONITORING_PATH}" +chmod 0640 "${PARAMETERS_ENV_MONITORING_PATH}" + +echo +echo +echo -e "${PREFIX_INFO} Add instance local IP to monitoring parameters" +echo "AWS_LOCAL_IPV4=$(ec2metadata --local-ipv4)" >> "${PARAMETERS_ENV_MONITORING_PATH}" + +echo +echo +echo -e "${PREFIX_INFO} Add AWS default region to monitring parameters" +echo "AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}" >> "${PARAMETERS_ENV_MONITORING_PATH}" + +echo +echo +echo -e "${PREFIX_INFO} Prepare monitoring configuration" +if [ ! -d "/home/ubuntu/.monitoring" ]; then + mkdir -p /home/ubuntu/.monitoring + echo -e "${PREFIX_WARN} Created monitoring configuration directory" +fi +cp "${SCRIPT_DIR}/monitoring-crawlers-config.json" /home/ubuntu/.monitoring/monitoring-crawlers-config.json + +echo +echo +if [ ! -d "/home/ubuntu/.config/systemd/user/" ]; then + mkdir -p /home/ubuntu/.config/systemd/user/ + echo -e "${PREFIX_WARN} Created user systemd directory" +fi + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing systemd crawlers monitoring service definition with ${MONITORING_CRAWLERS_SERVICE_FILE}" +chmod 644 "${SCRIPT_DIR}/${MONITORING_CRAWLERS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MONITORING_CRAWLERS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MONITORING_CRAWLERS_SERVICE_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart "${MONITORING_CRAWLERS_SERVICE_FILE}" diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index fc32785a..99e8b889 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -29,12 +29,10 @@ MOONCRAWL_SERVICE_FILE="mooncrawl.service" LEADERBOARDS_WORKER_SERVICE_FILE="leaderboards-worker.service" LEADERBOARDS_WORKER_TIMER_FILE="leaderboards-worker.timer" - # Ethereum service files ETHEREUM_SYNCHRONIZE_SERVICE_FILE="ethereum-synchronize.service" ETHEREUM_TRENDING_SERVICE_FILE="ethereum-trending.service" ETHEREUM_TRENDING_TIMER_FILE="ethereum-trending.timer" -ETHEREUM_TXPOOL_SERVICE_FILE="ethereum-txpool.service" ETHEREUM_MISSING_SERVICE_FILE="ethereum-missing.service" ETHEREUM_MISSING_TIMER_FILE="ethereum-missing.timer" ETHEREUM_MOONWORM_CRAWLER_SERVICE_FILE="ethereum-moonworm-crawler.service" @@ -51,7 +49,6 @@ POLYGON_MISSING_SERVICE_FILE="polygon-missing.service" POLYGON_MISSING_TIMER_FILE="polygon-missing.timer" POLYGON_STATISTICS_SERVICE_FILE="polygon-statistics.service" POLYGON_STATISTICS_TIMER_FILE="polygon-statistics.timer" -POLYGON_TXPOOL_SERVICE_FILE="polygon-txpool.service" POLYGON_MOONWORM_CRAWLER_SERVICE_FILE="polygon-moonworm-crawler.service" POLYGON_STATE_SERVICE_FILE="polygon-state.service" POLYGON_STATE_TIMER_FILE="polygon-state.timer" @@ -131,14 +128,6 @@ ZKSYNC_ERA_TESTNET_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="zksync-era-testnet-histor set -eu -echo -echo -echo -e "${PREFIX_INFO} Building executable Ethereum transaction pool crawler script with Go" -EXEC_DIR=$(pwd) -cd "${APP_CRAWLERS_DIR}/txpool" -HOME=/home/ubuntu /usr/local/go/bin/go build -o "${APP_CRAWLERS_DIR}/txpool/txpool" "${APP_CRAWLERS_DIR}/txpool/main.go" -cd "${EXEC_DIR}" - echo echo echo -e "${PREFIX_INFO} Upgrading Python pip and setuptools" @@ -156,8 +145,11 @@ HOME=/home/ubuntu /usr/local/go/bin/go install github.com/bugout-dev/checkenv@la echo echo -echo -e "${PREFIX_INFO} Retrieving addition deployment parameters" -mkdir -p "${SECRETS_DIR}" +echo -e "${PREFIX_INFO} Retrieving deployment parameters" +if [ ! -d "${SECRETS_DIR}" ]; then + mkdir -p "${SECRETS_DIR}" + echo -e "${PREFIX_WARN} Created new secrets directory" +fi AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION}" /home/ubuntu/go/bin/checkenv show aws_ssm+moonstream:true > "${PARAMETERS_ENV_PATH}" chmod 0640 "${PARAMETERS_ENV_PATH}" @@ -166,6 +158,13 @@ echo echo -e "${PREFIX_INFO} Add instance local IP to parameters" echo "AWS_LOCAL_IPV4=$(ec2metadata --local-ipv4)" >> "${PARAMETERS_ENV_PATH}" +echo +echo +if [ ! -d "/home/ubuntu/.config/systemd/user/" ]; then + mkdir -p /home/ubuntu/.config/systemd/user/ + echo -e "${PREFIX_WARN} Created user systemd directory" +fi + echo echo echo -e "${PREFIX_INFO} Replacing existing Moonstream crawlers HTTP API server service definition with ${MOONCRAWL_SERVICE_FILE}" @@ -191,14 +190,6 @@ cp "${SCRIPT_DIR}/${ETHEREUM_TRENDING_TIMER_FILE}" "/home/ubuntu/.config/systemd XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ETHEREUM_TRENDING_TIMER_FILE}" -# echo -# echo -# echo -e "${PREFIX_INFO} Replacing existing Ethereum transaction pool crawler service definition with ${ETHEREUM_TXPOOL_SERVICE_FILE}" -# chmod 644 "${SCRIPT_DIR}/${ETHEREUM_TXPOOL_SERVICE_FILE}" -# cp "${SCRIPT_DIR}/${ETHEREUM_TXPOOL_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_TXPOOL_SERVICE_FILE}" -# XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -# XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ETHEREUM_TXPOOL_SERVICE_FILE}" - echo echo echo -e "${PREFIX_INFO} Replacing existing Ethereum missing service and timer with: ${ETHEREUM_MISSING_SERVICE_FILE}, ${ETHEREUM_MISSING_TIMER_FILE}" @@ -270,14 +261,6 @@ cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_TIMER_FILE}" "/home/ubuntu/.config/system XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${POLYGON_STATISTICS_TIMER_FILE}" -# echo -# echo -# echo -e "${PREFIX_INFO} Replacing existing Polygon transaction pool crawler service definition with ${POLYGON_TXPOOL_SERVICE_FILE}" -# chmod 644 "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" -# cp "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_TXPOOL_SERVICE_FILE}" -# XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -# XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${POLYGON_TXPOOL_SERVICE_FILE}" - echo echo echo -e "${PREFIX_INFO} Replacing existing Polygon moonworm crawler service definition with ${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}" @@ -620,4 +603,4 @@ chmod 644 "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_SERVICE_FILE}" "${SCRIPT_DIR}/${L cp "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${LEADERBOARDS_WORKER_SERVICE_FILE}" cp "${SCRIPT_DIR}/${LEADERBOARDS_WORKER_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${LEADERBOARDS_WORKER_TIMER_FILE}" XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${LEADERBOARDS_WORKER_TIMER_FILE}" \ No newline at end of file +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${LEADERBOARDS_WORKER_TIMER_FILE}" diff --git a/crawlers/deploy/ethereum-historical-crawl-events.service b/crawlers/deploy/ethereum-historical-crawl-events.service index 006a03c4..881d21b2 100644 --- a/crawlers/deploy/ethereum-historical-crawl-events.service +++ b/crawlers/deploy/ethereum-historical-crawl-events.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-events CPUWeight=70 -SyslogIdentifier=ethereum-historical-crawler-events +SyslogIdentifier=ethereum-historical-crawl-events [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-historical-crawl-transactions.service b/crawlers/deploy/ethereum-historical-crawl-transactions.service index fa9fcd28..25e11793 100644 --- a/crawlers/deploy/ethereum-historical-crawl-transactions.service +++ b/crawlers/deploy/ethereum-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-functions CPUWeight=70 -SyslogIdentifier=ethereum-historical-crawler-transactions +SyslogIdentifier=ethereum-historical-crawl-transactions [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-txpool.service b/crawlers/deploy/ethereum-txpool.service deleted file mode 100644 index 39e6d280..00000000 --- a/crawlers/deploy/ethereum-txpool.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=Ethereum txpool crawler -After=network.target -StartLimitIntervalSec=300 -StartLimitBurst=3 - -[Service] -WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool -EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -Restart=on-failure -RestartSec=15s -ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain ethereum -access-id "${NB_CONTROLLER_ACCESS_ID}" -CPUWeight=30 -SyslogIdentifier=ethereum-txpool - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/monitoring-crawlers-config.json b/crawlers/deploy/monitoring-crawlers-config.json new file mode 100644 index 00000000..654139c5 --- /dev/null +++ b/crawlers/deploy/monitoring-crawlers-config.json @@ -0,0 +1,7 @@ +{ + "notification_pagerduty": true, + "notification_telegram": true, + "notification_sendgrid": true, + "notification_humbug": true, + "silent": [] +} \ No newline at end of file diff --git a/crawlers/deploy/monitoring-crawlers.service b/crawlers/deploy/monitoring-crawlers.service new file mode 100644 index 00000000..5a5aa09f --- /dev/null +++ b/crawlers/deploy/monitoring-crawlers.service @@ -0,0 +1,17 @@ +[Unit] +Description=Monitor crawlers systemd state +StartLimitIntervalSec=300 +StartLimitBurst=3 +After=network.target + +[Service] +Restart=on-failure +RestartSec=15s +WorkingDirectory=/home/ubuntu/ +EnvironmentFile=/home/ubuntu/moonstream-secrets/monitoring.env +ExecStart=/home/ubuntu/monitoring -plugin systemd -host "${AWS_LOCAL_IPV4}" -port 7171 -healthcheck -server -threshold 3 -config /home/ubuntu/.monitoring/monitoring-crawlers-config.json -service ethereum-moonworm-crawler.service -service mumbai-moonworm-crawler.service -service polygon-moonworm-crawler.service -service zksync-era-moonworm-crawler.service +CPUWeight=90 +SyslogIdentifier=monitoring-crawlers + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/moonworm-unicorns-mainnet.service b/crawlers/deploy/moonworm-unicorns-mainnet.service deleted file mode 100644 index 67448c67..00000000 --- a/crawlers/deploy/moonworm-unicorns-mainnet.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=Moonworm CryptoUnicorns watch custom systemd service -StartLimitIntervalSec=300 -StartLimitBurst=3 -After=network.target - -[Service] -Restart=on-failure -RestartSec=15s -WorkingDirectory=/home/ubuntu -EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonworm-env/bin/python -m moonworm.cli watch-cu -w "${MOONSTREAM_POLYGON_WEB3_PROVIDER_URI}?access_id=${NB_CONTROLLER_ACCESS_ID}&data_source=blockchain" -c 0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f -d 21418707 --confirmations 60 -CPUWeight=70 -SyslogIdentifier=moonworm-unicorns-mainnet - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-historical-crawl-events.service b/crawlers/deploy/mumbai-historical-crawl-events.service index 488a1a94..cc56464e 100644 --- a/crawlers/deploy/mumbai-historical-crawl-events.service +++ b/crawlers/deploy/mumbai-historical-crawl-events.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-events CPUWeight=70 -SyslogIdentifier=mumbai-historical-crawler-events +SyslogIdentifier=mumbai-historical-crawl-events [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-historical-crawl-transactions.service b/crawlers/deploy/mumbai-historical-crawl-transactions.service index 968d88bb..9a88ac2f 100644 --- a/crawlers/deploy/mumbai-historical-crawl-transactions.service +++ b/crawlers/deploy/mumbai-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-functions CPUWeight=70 -SyslogIdentifier=mumbai-historical-crawler-transactions +SyslogIdentifier=mumbai-historical-crawl-transactions [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/parameters.py b/crawlers/deploy/parameters.py deleted file mode 100644 index c8df5797..00000000 --- a/crawlers/deploy/parameters.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -Collect secrets from AWS SSM Parameter Store and output as environment variable exports. -""" -import argparse -from dataclasses import dataclass -import sys -from typing import Any, Dict, Iterable, List, Optional - -import boto3 - - -@dataclass -class EnvironmentVariable: - name: str - value: str - - -def get_parameters(path: str) -> List[Dict[str, Any]]: - """ - Retrieve parameters from AWS SSM Parameter Store. Decrypts any encrypted parameters. - - Relies on the appropriate environment variables to authenticate against AWS: - https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html - """ - ssm = boto3.client("ssm") - next_token: Optional[bool] = True - parameters: List[Dict[str, Any]] = [] - while next_token is not None: - kwargs = {"Path": path, "Recursive": False, "WithDecryption": True} - if next_token is not True: - kwargs["NextToken"] = next_token - response = ssm.get_parameters_by_path(**kwargs) - new_parameters = response.get("Parameters", []) - parameters.extend(new_parameters) - next_token = response.get("NextToken") - - return parameters - - -def parameter_to_env(parameter_object: Dict[str, Any]) -> EnvironmentVariable: - """ - Transforms parameters returned by the AWS SSM API into EnvironmentVariables. - """ - parameter_path = parameter_object.get("Name") - if parameter_path is None: - raise ValueError('Did not find "Name" in parameter object') - name = parameter_path.split("/")[-1].upper() - - value = parameter_object.get("Value") - if value is None: - raise ValueError('Did not find "Value" in parameter object') - - return EnvironmentVariable(name, value) - - -def env_string(env_vars: Iterable[EnvironmentVariable], with_export: bool) -> str: - """ - Produces a string which, when executed in a shell, exports the desired environment variables as - specified by env_vars. - """ - prefix = "export " if with_export else "" - return "\n".join([f'{prefix}{var.name}="{var.value}"' for var in env_vars]) - - -def extract_handler(args: argparse.Namespace) -> None: - """ - Save environment variables to file. - """ - result = env_string(map(parameter_to_env, get_parameters(args.path)), args.export) - with args.outfile as ofp: - print(result, file=ofp) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Materialize environment variables from AWS SSM Parameter Store" - ) - parser.set_defaults(func=lambda _: parser.print_help()) - subcommands = parser.add_subparsers(description="Parameters commands") - - parser_extract = subcommands.add_parser( - "extract", description="Parameters extract commands" - ) - parser_extract.set_defaults(func=lambda _: parser_extract.print_help()) - parser_extract.add_argument( - "-o", "--outfile", type=argparse.FileType("w"), default=sys.stdout - ) - parser_extract.add_argument( - "--export", - action="store_true", - help="Set to output environment strings with export statements", - ) - parser_extract.add_argument( - "-p", - "--path", - default=None, - help="SSM path from which to pull environment variables (pull is NOT recursive)", - ) - parser_extract.set_defaults(func=extract_handler) - - args = parser.parse_args() - args.func(args) diff --git a/crawlers/deploy/polygon-historical-crawl-events.service b/crawlers/deploy/polygon-historical-crawl-events.service index c8017a93..965e6747 100644 --- a/crawlers/deploy/polygon-historical-crawl-events.service +++ b/crawlers/deploy/polygon-historical-crawl-events.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-events CPUWeight=70 -SyslogIdentifier=polygon-historical-crawler-events +SyslogIdentifier=polygon-historical-crawl-events [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.service b/crawlers/deploy/polygon-historical-crawl-transactions.service index d8229b02..36639e3e 100644 --- a/crawlers/deploy/polygon-historical-crawl-transactions.service +++ b/crawlers/deploy/polygon-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-functions CPUWeight=70 -SyslogIdentifier=polygon-historical-crawler-transactions +SyslogIdentifier=polygon-historical-crawl-transactions [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-txpool.service b/crawlers/deploy/polygon-txpool.service deleted file mode 100644 index f4f871bc..00000000 --- a/crawlers/deploy/polygon-txpool.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=Polygon txpool crawler -After=network.target -StartLimitIntervalSec=300 -StartLimitBurst=3 - -[Service] -WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool -EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -Restart=on-failure -RestartSec=15s -ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain polygon -access-id "${NB_CONTROLLER_ACCESS_ID}" -CPUWeight=30 -SyslogIdentifier=polygon-txpool - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-events.service b/crawlers/deploy/wyrm-historical-crawl-events.service index cd73d8c0..7ecefc11 100644 --- a/crawlers/deploy/wyrm-historical-crawl-events.service +++ b/crawlers/deploy/wyrm-historical-crawl-events.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-events CPUWeight=70 -SyslogIdentifier=wyrm-historical-crawler-events +SyslogIdentifier=wyrm-historical-crawl-events [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-transactions.service b/crawlers/deploy/wyrm-historical-crawl-transactions.service index 0ef17a88..41186544 100644 --- a/crawlers/deploy/wyrm-historical-crawl-transactions.service +++ b/crawlers/deploy/wyrm-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-functions CPUWeight=70 -SyslogIdentifier=wyrm-historical-crawler-transactions +SyslogIdentifier=wyrm-historical-crawl-transactions [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-events.service b/crawlers/deploy/xdai-historical-crawl-events.service index 11b0fa5a..66d56405 100644 --- a/crawlers/deploy/xdai-historical-crawl-events.service +++ b/crawlers/deploy/xdai-historical-crawl-events.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-events CPUWeight=70 -SyslogIdentifier=xdai-historical-crawler-events +SyslogIdentifier=xdai-historical-crawl-events [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-transactions.service b/crawlers/deploy/xdai-historical-crawl-transactions.service index 48b37eeb..20a3c616 100644 --- a/crawlers/deploy/xdai-historical-crawl-transactions.service +++ b/crawlers/deploy/xdai-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-functions CPUWeight=70 -SyslogIdentifier=xdai-historical-crawler-transactions +SyslogIdentifier=xdai-historical-crawl-transactions [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 0767042f..39b8f0f7 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -97,6 +97,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None: MOONSTREAM_API_URL, args.max_retries, args.interval, + args.query_api_retries, ) except Exception as e: logger.error(f"Could not get results for query {query_name}: error: {e}") @@ -185,13 +186,19 @@ def main(): leaderboard_generator_parser.add_argument( "--max-retries", type=int, - default=100, + default=12, help="Number of times to retry requests for Moonstream Query results", ) + leaderboard_generator_parser.add_argument( + "--query-api-retries", + type=int, + default=3, + help="Number of times to retry updating Moonstream Query data", + ) leaderboard_generator_parser.add_argument( "--interval", type=float, - default=30.0, + default=10.0, help="Number of seconds to wait between attempts to get results from Moonstream Query API", ) leaderboard_generator_parser.add_argument( diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index fa235cca..fc4d1e01 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -23,6 +23,7 @@ def get_results_for_moonstream_query( api_url: str = MOONSTREAM_API_URL, max_retries: int = 100, interval: float = 30.0, + query_api_retries: int = 3, ) -> Optional[Dict[str, Any]]: """ @@ -65,11 +66,11 @@ def get_results_for_moonstream_query( success = False attempts = 0 - while not success and attempts < max_retries: - attempts += 1 + while not success and attempts < query_api_retries: response = requests.post( request_url, json=request_body, headers=headers, timeout=10 ) + attempts += 1 response.raise_for_status() response_body = response.json() data_url = response_body["url"] diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index a2c5f741..73f14f1b 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -158,12 +158,13 @@ def _autoscale_crawl_events( all_events = [] for job in jobs: raw_events, batch_size = moonworm_autoscale_crawl_events( - web3, - job.event_abi, - from_block, - to_block, - batch_size, - job.contracts[0], + web3=web3, + event_abi=job.event_abi, + from_block=from_block, + to_block=to_block, + batch_size=batch_size, + contract_address=job.contracts[0], + max_blocks_batch=3000, ) for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 5ff280e0..f1eae777 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -90,7 +90,10 @@ def execute_query(query: Dict[str, Any], token: str): result = [] for item in data: - result.append(tuple([item[key] for key in keys])) + if len(keys) == 1: + result.append(item[keys[0]]) + else: + result.append(tuple([item[key] for key in keys])) return result @@ -193,7 +196,7 @@ def crawl_calls_level( block_number, blockchain_type, block_timestamp, - max_batch_size=5000, + max_batch_size=3000, min_batch_size=4, ): calls_of_level = [] @@ -203,8 +206,6 @@ def crawl_calls_level( continue parameters = [] - logger.info(f"Call: {json.dumps(call, indent=4)}") - for input in call["inputs"]: if type(input["value"]) in (str, int): if input["value"] not in responces: @@ -260,9 +261,6 @@ def crawl_calls_level( block_number, ) make_multicall_result = future.result(timeout=20) - logger.info( - f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}" - ) retry = 0 calls_of_level = calls_of_level[batch_size:] logger.info(f"lenght of task left {len(calls_of_level)}.") @@ -274,7 +272,7 @@ def crawl_calls_level( time.sleep(4) if retry > 5: raise (e) - batch_size = max(batch_size // 3, min_batch_size) + batch_size = max(batch_size // 4, min_batch_size) except TimeoutError as e: # timeout logger.error(f"TimeoutError: {e}, retrying") retry += 1 @@ -285,7 +283,7 @@ def crawl_calls_level( logger.error(f"Exception: {e}") raise (e) time.sleep(2) - logger.info(f"Retry: {retry}") + logger.debug(f"Retry: {retry}") # results parsing and writing to database add_to_session_count = 0 for result in make_multicall_result: @@ -471,8 +469,7 @@ def parse_jobs( # run crawling of levels try: # initial call of level 0 all call without subcalls directly moved there - logger.info("Crawl level: 0") - logger.info(f"Jobs amount: {len(calls[0])}") + logger.info(f"Crawl level: 0. Jobs amount: {len(calls[0])}") logger.info(f"call_tree_levels: {call_tree_levels}") batch_size = crawl_calls_level( @@ -490,8 +487,7 @@ def parse_jobs( ) for level in call_tree_levels: - logger.info(f"Crawl level: {level}") - logger.info(f"Jobs amount: {len(calls[level])}") + logger.info(f"Crawl level: {level}. Jobs amount: {len(calls[level])}") batch_size = crawl_calls_level( web3_client, diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json index 6de0ccc9..e255672d 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json @@ -62,5 +62,95 @@ } ], "address": "0x230E4e85d4549343A460F5dE0a7035130F62d74C" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "seasonId", + "type": "uint16", + "value": { + "type": "queryAPI", + "query_url": "twilight_seasons", + "blockchain": "mumbai", + "params": { + "address": "0x665B8Db5b9E3b396e2Ccb0Bd768dc74fC47Ec20D" + }, + "keys": [ + "season_id" + ] + } + }, + { + "internalType": "address", + "name": "user", + "type": "address", + "value": { + "type": "queryAPI", + "query_url": "twilight_tactics_players", + "blockchain": "mumbai", + "params": { + "address": "0x665B8Db5b9E3b396e2Ccb0Bd768dc74fC47Ec20D" + }, + "keys": [ + "player" + ] + } + } + ], + "name": "twtGetSeasonalDominationPointsByAccount", + "address": "0x665B8Db5b9E3b396e2Ccb0Bd768dc74fC47Ec20D", + "outputs": [ + { + "internalType": "uint56[5]", + "name": "shadowcornDominationPoints", + "type": "uint56[5]" + }, + { + "internalType": "uint56[5]", + "name": "unicornDominationPoints", + "type": "uint56[5]" + } + ], + "stateMutability": "view", + "type": "function", + "selector": "0x0b4ef829" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "seasonId", + "type": "uint16", + "value": { + "type": "queryAPI", + "query_url": "twilight_seasons", + "blockchain": "mumbai", + "params": { + "address": "0x665B8Db5b9E3b396e2Ccb0Bd768dc74fC47Ec20D" + }, + "keys": [ + "season_id" + ] + } + } + ], + "name": "twtGetSeasonalDominationPointsForAllRegions", + "address": "0x665B8Db5b9E3b396e2Ccb0Bd768dc74fC47Ec20D", + "outputs": [ + { + "internalType": "uint56[5]", + "name": "shadowcornDominationPoints", + "type": "uint56[5]" + }, + { + "internalType": "uint56[5]", + "name": "unicornDominationPoints", + "type": "uint56[5]" + } + ], + "stateMutability": "view", + "type": "function", + "selector": "0xbddb218c" } ] \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json index 3f230278..c25a9a09 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json @@ -1,223 +1,313 @@ [ - { - "type": "function", - "stateMutability": "view", - "inputs": [ + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "function", + "name": "totalSupply", + "outputs": [ { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", - "inputs": [] - } + "internalType": "uint256", + "name": "", + "type": "uint256" } - ], - "name": "tokenURI", - "outputs": [ + ], + "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f", + "inputs": [] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "polygon", + "params": { + "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" + }, + "keys": [ + "token_id" + ] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "function", + "name": "totalSupply", + "outputs": [ { - "internalType": "string", - "name": "", - "type": "string" + "internalType": "uint256", + "name": "", + "type": "uint256" } - ], - "address": "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "queryAPI", - "query_url": "template_erc721_minting", - "blockchain": "polygon", - "params": { - "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" - }, - "keys": [ - "token_id" - ] - } - } - ], - "name": "tokenURI", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0xA2a13cE1824F3916fC84C65e559391fc6674e6e8" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "function", - "name": "totalSupply", - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "address": "0x20B807b9AF56977EF475C089A0e7977540743560", - "inputs": [] - } - } - ], - "name": "tokenURI", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0x20B807b9AF56977EF475C089A0e7977540743560" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "queryAPI", - "query_url": "template_erc1155_token_ids", - "blockchain": "polygon", - "params": { - "address": "0xd4D53d8D61adc3B8114C1cd17B89393640db9733" - }, - "keys": [ - "token_id" - ] - } - } - ], - "name": "uri", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0xd4D53d8D61adc3B8114C1cd17B89393640db9733" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "queryAPI", - "query_url": "template_erc1155_token_ids", - "blockchain": "polygon", - "params": { - "address": "0x74d4567fd8B0b873B61FA180618a82183012F369" - }, - "keys": [ - "token_id" - ] - } - } - ], - "name": "uri", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0x74d4567fd8B0b873B61FA180618a82183012F369" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "queryAPI", - "query_url": "template_erc1155_token_ids", - "blockchain": "polygon", - "params": { - "address": "0x44b3f42e2BF34F62868Ff9e9dAb7C2F807ba97Cb" - }, - "keys": [ - "token_id" - ] - } - } - ], - "name": "uri", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0x44b3f42e2BF34F62868Ff9e9dAb7C2F807ba97Cb" - }, - { - "type": "function", - "stateMutability": "view", - "inputs": [ - { - "internalType": "uint256", - "name": "tokenId", - "type": "uint256", - "value": { - "type": "queryAPI", - "query_url": "template_erc721_minting", - "blockchain": "polygon", - "params": { - "address": "0xa7D50EE3D7485288107664cf758E877a0D351725" - }, - "keys": [ - "token_id" - ] - } - } - ], - "name": "tokenURI", - "outputs": [ - { - "internalType": "string", - "name": "", - "type": "string" - } - ], - "address": "0xa7D50EE3D7485288107664cf758E877a0D351725" - } + ], + "address": "0x20B807b9AF56977EF475C089A0e7977540743560", + "inputs": [] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0x20B807b9AF56977EF475C089A0e7977540743560" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "queryAPI", + "query_url": "template_erc1155_token_ids", + "blockchain": "polygon", + "params": { + "address": "0xd4D53d8D61adc3B8114C1cd17B89393640db9733" + }, + "keys": [ + "token_id" + ] + } + } + ], + "name": "uri", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0xd4D53d8D61adc3B8114C1cd17B89393640db9733" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "queryAPI", + "query_url": "template_erc1155_token_ids", + "blockchain": "polygon", + "params": { + "address": "0x74d4567fd8B0b873B61FA180618a82183012F369" + }, + "keys": [ + "token_id" + ] + } + } + ], + "name": "uri", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0x74d4567fd8B0b873B61FA180618a82183012F369" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "queryAPI", + "query_url": "template_erc1155_token_ids", + "blockchain": "polygon", + "params": { + "address": "0x44b3f42e2BF34F62868Ff9e9dAb7C2F807ba97Cb" + }, + "keys": [ + "token_id" + ] + } + } + ], + "name": "uri", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0x44b3f42e2BF34F62868Ff9e9dAb7C2F807ba97Cb" + }, + { + "type": "function", + "stateMutability": "view", + "inputs": [ + { + "internalType": "uint256", + "name": "tokenId", + "type": "uint256", + "value": { + "type": "queryAPI", + "query_url": "template_erc721_minting", + "blockchain": "polygon", + "params": { + "address": "0xa7D50EE3D7485288107664cf758E877a0D351725" + }, + "keys": [ + "token_id" + ] + } + } + ], + "name": "tokenURI", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "address": "0xa7D50EE3D7485288107664cf758E877a0D351725" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "seasonId", + "type": "uint16", + "value": { + "type": "queryAPI", + "query_url": "twilight_seasons", + "blockchain": "polygon", + "params": { + "address": "0xe570fAC6513A8018145aB398Ea71988C688F22C4" + }, + "keys": [ + "season_id" + ] + } + }, + { + "internalType": "address", + "name": "user", + "type": "address", + "value": { + "type": "queryAPI", + "query_url": "twilight_tactics_players", + "blockchain": "polygon", + "params": { + "address": "0xe570fAC6513A8018145aB398Ea71988C688F22C4" + }, + "keys": [ + "player" + ] + } + } + ], + "name": "twtGetSeasonalDominationPointsByAccount", + "address": "0xe570fAC6513A8018145aB398Ea71988C688F22C4", + "outputs": [ + { + "internalType": "uint56[5]", + "name": "shadowcornDominationPoints", + "type": "uint56[5]" + }, + { + "internalType": "uint56[5]", + "name": "unicornDominationPoints", + "type": "uint56[5]" + } + ], + "stateMutability": "view", + "type": "function", + "selector": "0x0b4ef829" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "seasonId", + "type": "uint16", + "value": { + "type": "queryAPI", + "query_url": "twilight_seasons", + "blockchain": "polygon", + "params": { + "address": "0xe570fAC6513A8018145aB398Ea71988C688F22C4" + }, + "keys": [ + "season_id" + ] + } + } + ], + "name": "twtGetSeasonalDominationPointsForAllRegions", + "address": "0xe570fAC6513A8018145aB398Ea71988C688F22C4", + "outputs": [ + { + "internalType": "uint56[5]", + "name": "shadowcornDominationPoints", + "type": "uint56[5]" + }, + { + "internalType": "uint56[5]", + "name": "unicornDominationPoints", + "type": "uint56[5]" + } + ], + "stateMutability": "view", + "type": "function", + "selector": "0xbddb218c" + } ] \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index a5d954c2..4c9b9453 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -23,7 +23,7 @@ from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -QUERY_REGEX = re.compile("[\[\]@#$%^&?;`/]") +QUERY_REGEX = re.compile(r"[\[\]@#$%^&?;`]|/\*|\*/") class QueryNotValid(Exception): diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py index a71911d7..c3c76d34 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py @@ -62,4 +62,4 @@ class TestQueries(unittest.TestCase): queries.query_validation("OR(1=1)#") with self.assertRaises(queries.QueryNotValid): - queries.query_validation("/etc/hosts") + queries.query_validation("0/**/or/**/1") diff --git a/engineapi/README.md b/engineapi/README.md index 77e6257c..5c9c0c37 100644 --- a/engineapi/README.md +++ b/engineapi/README.md @@ -1,43 +1,9 @@ -# lootbox +## `client` -Use lootboxes in your game economy with ready to use contracts +This repository contains a lightweight Python client for the Engine API. -## Deployment - -Deployment with local signer server +To use, for example, with Leaderboard API: ```bash -MOONSTREAM_SIGNING_SERVER_IP=127.0.0.1 ./dev.sh -``` - -## Run frontend - -Do from root directory workspace directory: - -Engine: - -Run dev - -``` -yarn workspace engine run dev -``` - -Build - -``` -yarn workspace engine run build -``` - -Player: - -Run dev - -``` -yarn workspace player run dev -``` - -Build - -``` -yarn workspace player run build +python -m client.leaderboards -h ``` diff --git a/engineapi/alembic/versions/4f05d212ea49_live_at_for_metatx.py b/engineapi/alembic/versions/6d07739cb13e_live_at_for_metatx.py similarity index 67% rename from engineapi/alembic/versions/4f05d212ea49_live_at_for_metatx.py rename to engineapi/alembic/versions/6d07739cb13e_live_at_for_metatx.py index 04224d0d..5db963d4 100644 --- a/engineapi/alembic/versions/4f05d212ea49_live_at_for_metatx.py +++ b/engineapi/alembic/versions/6d07739cb13e_live_at_for_metatx.py @@ -1,8 +1,8 @@ """Live at for metatx -Revision ID: 4f05d212ea49 -Revises: 040f2dfde5a5 -Create Date: 2023-10-03 10:00:09.730620 +Revision ID: 6d07739cb13e +Revises: cc80e886e153 +Create Date: 2023-12-06 14:33:04.814144 """ from alembic import op @@ -10,15 +10,15 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '4f05d212ea49' -down_revision = '040f2dfde5a5' +revision = '6d07739cb13e' +down_revision = 'cc80e886e153' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.add_column('call_requests', sa.Column('live_at', sa.DateTime(timezone=True), server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), nullable=False)) + op.add_column('call_requests', sa.Column('live_at', sa.DateTime(timezone=True), nullable=True)) # ### end Alembic commands ### diff --git a/engineapi/alembic/versions/cc80e886e153_added_leaderboard_versions_table_and_.py b/engineapi/alembic/versions/cc80e886e153_added_leaderboard_versions_table_and_.py new file mode 100644 index 00000000..b6dff83e --- /dev/null +++ b/engineapi/alembic/versions/cc80e886e153_added_leaderboard_versions_table_and_.py @@ -0,0 +1,154 @@ +"""Added leaderboard_versions table and corresponding constraints + +Revision ID: cc80e886e153 +Revises: 040f2dfde5a5 +Create Date: 2023-11-08 16:16:39.265150 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "cc80e886e153" +down_revision = "040f2dfde5a5" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "leaderboard_versions", + sa.Column("leaderboard_id", sa.UUID(), nullable=False), + sa.Column("version_number", sa.DECIMAL(), nullable=False), + sa.Column("published", sa.Boolean(), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["leaderboard_id"], + ["leaderboards.id"], + name=op.f("fk_leaderboard_versions_leaderboard_id_leaderboards"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint( + "leaderboard_id", "version_number", name=op.f("pk_leaderboard_versions") + ), + sa.UniqueConstraint( + "leaderboard_id", + "version_number", + name=op.f("uq_leaderboard_versions_leaderboard_id"), + ), + ) + op.create_index( + op.f("ix_leaderboard_versions_created_at"), + "leaderboard_versions", + ["created_at"], + unique=False, + ) + op.add_column( + "leaderboard_scores", + sa.Column("leaderboard_version_number", sa.DECIMAL(), nullable=True), + ) + op.drop_constraint( + "uq_leaderboard_scores_leaderboard_id", "leaderboard_scores", type_="unique" + ) + op.create_unique_constraint( + op.f("uq_leaderboard_scores_leaderboard_id"), + "leaderboard_scores", + ["leaderboard_id", "address", "leaderboard_version_number"], + ) + op.drop_constraint( + "fk_leaderboard_scores_leaderboard_id_leaderboards", + "leaderboard_scores", + type_="foreignkey", + ) + op.create_foreign_key( + op.f("fk_leaderboard_scores_leaderboard_id_leaderboard_versions"), + "leaderboard_scores", + "leaderboard_versions", + ["leaderboard_id", "leaderboard_version_number"], + ["leaderboard_id", "version_number"], + ondelete="CASCADE", + ) + # ### end Alembic commands ### + + # Insert version 0 for all existing leaderboards + op.execute( + """ + INSERT INTO leaderboard_versions (leaderboard_id, version_number, published) + SELECT id, 0, true FROM leaderboards + """ + ) + # Set the leaderboard_version_number for all existing scores to the version 0 + op.execute( + """ + UPDATE leaderboard_scores SET leaderboard_version_number = 0 + """ + ) + # Alter leaderboard_scores to make leaderboard_version_number non-nullable + op.alter_column( + "leaderboard_scores", + "leaderboard_version_number", + nullable=False, + ) + + +def downgrade(): + op.execute( + """ + WITH latest_version_for_leaderboard AS ( + SELECT leaderboard_id, MAX(version_number) AS latest_version + FROM leaderboard_versions WHERE published = true + GROUP BY leaderboard_id + ) + DELETE FROM leaderboard_scores WHERE + (leaderboard_id, leaderboard_version_number) NOT IN ( + SELECT + leaderboard_id, + latest_version AS leaderboard_version_number + FROM + latest_version_for_leaderboard + ) + """ + ) + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + op.f("fk_leaderboard_scores_leaderboard_id_leaderboard_versions"), + "leaderboard_scores", + type_="foreignkey", + ) + op.create_foreign_key( + "fk_leaderboard_scores_leaderboard_id_leaderboards", + "leaderboard_scores", + "leaderboards", + ["leaderboard_id"], + ["id"], + ondelete="CASCADE", + ) + op.drop_constraint( + op.f("uq_leaderboard_scores_leaderboard_id"), + "leaderboard_scores", + type_="unique", + ) + op.create_unique_constraint( + "uq_leaderboard_scores_leaderboard_id", + "leaderboard_scores", + ["leaderboard_id", "address"], + ) + op.drop_column("leaderboard_scores", "leaderboard_version_number") + op.drop_index( + op.f("ix_leaderboard_versions_created_at"), table_name="leaderboard_versions" + ) + op.drop_table("leaderboard_versions") + # ### end Alembic commands ### diff --git a/engineapi/client/__init__.py b/engineapi/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/engineapi/client/leaderboards.py b/engineapi/client/leaderboards.py new file mode 100644 index 00000000..59e439ed --- /dev/null +++ b/engineapi/client/leaderboards.py @@ -0,0 +1,234 @@ +import argparse +import json +import os +import sys +from typing import Optional +import uuid + +import requests + +LEADERBOARD_API_URL = os.environ.get( + "LEADERBOARD_API_URL", "http://localhost:7191/leaderboard/" +) + + +def moonstream_access_token(value: Optional[str]) -> uuid.UUID: + if value is None: + value = os.environ.get("MOONSTREAM_ACCESS_TOKEN") + + if value is None: + raise ValueError( + "Moonstream access token is required: either via -A/--authorization, or via the MOONSTREAM_ACCESS_TOKEN environment variable" + ) + + try: + value_uuid = uuid.UUID(value) + except Exception: + raise ValueError("Moonstream access token must be a valid UUID") + + return value_uuid + + +def requires_authorization(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "-A", + "--authorization", + type=moonstream_access_token, + required=False, + default=os.environ.get("MOONSTREAM_ACCESS_TOKEN"), + help="Moonstream API access token (if not provided, must be specified using the MOONSTREAM_ACCESS_TOKEN environment variable)", + ) + + +def handle_get(args: argparse.Namespace) -> None: + url = LEADERBOARD_API_URL + params = { + "leaderboard_id": str(args.id), + "limit": str(args.limit), + "offset": str(args.offset), + } + if args.version is not None: + params["version"] = str(args.version) + + response = requests.get(url, params=params) + response.raise_for_status() + + print(json.dumps(response.json())) + + +def handle_create(args: argparse.Namespace) -> None: + url = LEADERBOARD_API_URL + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {str(args.authorization)}", + } + + body = { + "title": args.title, + "description": args.description, + } + + response = requests.post(url, headers=headers, json=body) + response.raise_for_status() + print(json.dumps(response.json())) + + +def handle_versions(args: argparse.Namespace) -> None: + url = f"{LEADERBOARD_API_URL}{args.id}/versions" + + headers = { + "Authorization": f"Bearer {str(args.authorization)}", + } + + response = requests.get(url, headers=headers) + response.raise_for_status() + print(json.dumps(response.json())) + + +def handle_create_version(args: argparse.Namespace) -> None: + url = f"{LEADERBOARD_API_URL}{args.id}/versions" + + headers = { + "Authorization": f"Bearer {str(args.authorization)}", + "Content-Type": "application/json", + } + + body = { + "publish": args.publish, + } + + response = requests.post(url, headers=headers, json=body) + response.raise_for_status() + print(json.dumps(response.json())) + + +def handle_publish(args: argparse.Namespace) -> None: + url = f"{LEADERBOARD_API_URL}{args.id}/versions/{args.version}" + + headers = { + "Authorization": f"Bearer {str(args.authorization)}", + "Content-Type": "application/json", + } + + body = { + "publish": args.publish, + } + + response = requests.put(url, headers=headers, json=body) + response.raise_for_status() + print(json.dumps(response.json())) + + +def handle_upload_scores(args: argparse.Namespace) -> None: + url = f"{LEADERBOARD_API_URL}{args.id}/scores" + if args.version is not None: + url = f"{LEADERBOARD_API_URL}{args.id}/versions/{args.version}/scores" + + params = { + "overwrite": "true", + "normalize_addresses": "false", + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {str(args.authorization)}", + } + + if args.scores is None: + args.scores = sys.stdin + + with args.scores as ifp: + body = json.load(ifp) + + response = requests.put(url, headers=headers, params=params, json=body) + response.raise_for_status() + print(json.dumps(response.json())) + + +def generate_cli() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="HTTP client for Leaderboard API") + parser.set_defaults(func=lambda _: parser.print_help()) + + subparsers = parser.add_subparsers() + + # GET /leaderboard/?leaderboard_id=&limit=&offset=&version= + get_parser = subparsers.add_parser("get") + get_parser.add_argument("-i", "--id", type=uuid.UUID, required=True) + get_parser.add_argument("-l", "--limit", type=int, default=10) + get_parser.add_argument("-o", "--offset", type=int, default=0) + get_parser.add_argument("-v", "--version", type=int, default=None) + get_parser.set_defaults(func=handle_get) + + # POST /leaderboard/ + create_parser = subparsers.add_parser("create") + create_parser.add_argument( + "-t", "--title", type=str, required=True, help="Title for leaderboard" + ) + create_parser.add_argument( + "-d", + "--description", + type=str, + required=False, + default="", + help="Description for leaderboard", + ) + requires_authorization(create_parser) + create_parser.set_defaults(func=handle_create) + + # GET /leaderboard//versions + versions_parser = subparsers.add_parser("versions") + versions_parser.add_argument("-i", "--id", type=uuid.UUID, required=True) + requires_authorization(versions_parser) + versions_parser.set_defaults(func=handle_versions) + + # POST /leaderboard//versions + create_version_parser = subparsers.add_parser("create-version") + create_version_parser.add_argument("-i", "--id", type=uuid.UUID, required=True) + create_version_parser.add_argument( + "--publish", + action="store_true", + help="Set this flag to publish the version immediately upon creation", + ) + requires_authorization(create_version_parser) + create_version_parser.set_defaults(func=handle_create_version) + + # PUT /leaderboard//versions/ + publish_parser = subparsers.add_parser("publish") + publish_parser.add_argument("-i", "--id", type=uuid.UUID, required=True) + publish_parser.add_argument("-v", "--version", type=int, required=True) + publish_parser.add_argument( + "--publish", action="store_true", help="Set to publish, leave to unpublish" + ) + requires_authorization(publish_parser) + publish_parser.set_defaults(func=handle_publish) + + # PUT /leaderboard//scores and PUT /leaderboard//versions//scores + upload_scores_parser = subparsers.add_parser("upload-scores") + upload_scores_parser.add_argument("-i", "--id", type=uuid.UUID, required=True) + upload_scores_parser.add_argument( + "-v", + "--version", + type=int, + required=False, + default=None, + help="Specify a version to upload scores to (if not specified a new version is created)", + ) + upload_scores_parser.add_argument( + "-s", + "--scores", + type=argparse.FileType("r"), + required=False, + default=None, + help="Path to scores file. If not provided, reads from stdin.", + ) + upload_scores_parser.set_defaults(func=handle_upload_scores) + requires_authorization(upload_scores_parser) + + return parser + + +if __name__ == "__main__": + parser = generate_cli() + args = parser.parse_args() + args.func(args) diff --git a/engineapi/client/sample-score.json b/engineapi/client/sample-score.json new file mode 100644 index 00000000..3523a324 --- /dev/null +++ b/engineapi/client/sample-score.json @@ -0,0 +1,10 @@ +[ + { + "address": "0x0000000000000000000000000000000000000000", + "score": 19, + "points_data": { + "secondary_score_1": 7, + "secondary_score_2": 29 + } + } +] diff --git a/engineapi/engineapi/actions.py b/engineapi/engineapi/actions.py index 23c3e20f..ef7f79b6 100644 --- a/engineapi/engineapi/actions.py +++ b/engineapi/engineapi/actions.py @@ -11,7 +11,7 @@ from hexbytes import HexBytes import requests # type: ignore from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from sqlalchemy import func, text, or_ +from sqlalchemy import func, text, or_, and_, Subquery from sqlalchemy.engine import Row from web3 import Web3 from web3.types import ChecksumAddress @@ -24,6 +24,7 @@ from .models import ( DropperClaim, Leaderboard, LeaderboardScores, + LeaderboardVersion, ) from . import signatures from .settings import ( @@ -91,6 +92,10 @@ class LeaderboardConfigAlreadyInactive(Exception): pass +class LeaderboardVersionNotFound(Exception): + pass + + BATCH_SIGNATURE_PAGE_SIZE = 500 logger = logging.getLogger(__name__) @@ -959,24 +964,71 @@ def refetch_drop_signatures( return claimant_objects -def get_leaderboard_total_count(db_session: Session, leaderboard_id) -> int: +def leaderboard_version_filter( + db_session: Session, + leaderboard_id: uuid.UUID, + version_number: Optional[int] = None, +) -> Union[Subquery, int]: + # Subquery to get the latest version number for the given leaderboard + if version_number is None: + latest_version = ( + db_session.query(func.max(LeaderboardVersion.version_number)).filter( + LeaderboardVersion.leaderboard_id == leaderboard_id, + LeaderboardVersion.published == True, + ) + ).scalar_subquery() + else: + latest_version = version_number + + return latest_version + + +def get_leaderboard_total_count( + db_session: Session, leaderboard_id, version_number: Optional[int] = None +) -> int: """ - Get the total number of claimants in the leaderboard + Get the total number of position in the leaderboard """ - return ( - db_session.query(LeaderboardScores) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - .count() + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, ) + total_count = ( + db_session.query(func.count(LeaderboardScores.id)) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ).scalar() + + return total_count + def get_leaderboard_info( - db_session: Session, leaderboard_id: uuid.UUID + db_session: Session, leaderboard_id: uuid.UUID, version_number: Optional[int] = None ) -> Row[Tuple[uuid.UUID, str, str, int, Optional[datetime]]]: """ Get the leaderboard from the database with users count """ + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + leaderboard = ( db_session.query( Leaderboard.id, @@ -990,10 +1042,22 @@ def get_leaderboard_info( LeaderboardScores.leaderboard_id == Leaderboard.id, isouter=True, ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + isouter=True, + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) .filter(Leaderboard.id == leaderboard_id) .group_by(Leaderboard.id, Leaderboard.title, Leaderboard.description) - .one() - ) + ).one() return leaderboard @@ -1078,19 +1142,48 @@ def get_leaderboards( def get_position( - db_session: Session, leaderboard_id, address, window_size, limit: int, offset: int + db_session: Session, + leaderboard_id, + address, + window_size, + limit: int, + offset: int, + version_number: Optional[int] = None, ) -> List[Row[Tuple[str, int, int, int, Any]]]: """ - Return position by address with window size """ - query = db_session.query( - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data.label("points_data"), - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - func.row_number().over(order_by=LeaderboardScores.score.desc()).label("number"), - ).filter(LeaderboardScores.leaderboard_id == leaderboard_id) + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + query = ( + db_session.query( + LeaderboardScores.address, + LeaderboardScores.score, + LeaderboardScores.points_data.label("points_data"), + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + func.row_number() + .over(order_by=LeaderboardScores.score.desc()) + .label("number"), + ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1130,11 +1223,25 @@ def get_position( def get_leaderboard_positions( - db_session: Session, leaderboard_id, limit: int, offset: int + db_session: Session, + leaderboard_id, + limit: int, + offset: int, + version_number: Optional[int] = None, ) -> List[Row[Tuple[uuid.UUID, str, int, str, int]]]: """ Get the leaderboard positions """ + + # get public leaderboard scores with max version + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + # Main query query = ( db_session.query( LeaderboardScores.id, @@ -1143,8 +1250,17 @@ def get_leaderboard_positions( LeaderboardScores.points_data, func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - .order_by(text("rank asc, id asc")) + .filter(LeaderboardVersion.published == True) + .filter(LeaderboardVersion.version_number == latest_version) ) if limit: @@ -1157,18 +1273,39 @@ def get_leaderboard_positions( def get_qurtiles( - db_session: Session, leaderboard_id + db_session: Session, leaderboard_id, version_number: Optional[int] = None ) -> Tuple[Row[Tuple[str, float, int]], ...]: """ Get the leaderboard qurtiles https://docs.sqlalchemy.org/en/14/core/functions.html#sqlalchemy.sql.functions.percentile_disc """ - query = db_session.query( - LeaderboardScores.address, - LeaderboardScores.score, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ).filter(LeaderboardScores.leaderboard_id == leaderboard_id) + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + query = ( + db_session.query( + LeaderboardScores.address, + LeaderboardScores.score, + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1192,17 +1329,41 @@ def get_qurtiles( return q1, q2, q3 -def get_ranks(db_session: Session, leaderboard_id) -> List[Row[Tuple[int, int, int]]]: +def get_ranks( + db_session: Session, leaderboard_id, version_number: Optional[int] = None +) -> List[Row[Tuple[int, int, int]]]: """ Get the leaderboard rank buckets(rank, size, score) """ - query = db_session.query( - LeaderboardScores.id, - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ).filter(LeaderboardScores.leaderboard_id == leaderboard_id) + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + query = ( + db_session.query( + LeaderboardScores.id, + LeaderboardScores.address, + LeaderboardScores.score, + LeaderboardScores.points_data, + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1220,10 +1381,18 @@ def get_rank( rank: int, limit: Optional[int] = None, offset: Optional[int] = None, + version_number: Optional[int] = None, ) -> List[Row[Tuple[uuid.UUID, str, int, str, int]]]: """ Get bucket in leaderboard by rank """ + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + query = ( db_session.query( LeaderboardScores.id, @@ -1232,6 +1401,18 @@ def get_rank( LeaderboardScores.points_data, func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) .filter(LeaderboardScores.leaderboard_id == leaderboard_id) .order_by(text("rank asc, id asc")) ) @@ -1377,7 +1558,7 @@ def add_scores( db_session: Session, leaderboard_id: uuid.UUID, scores: List[Score], - overwrite: bool = False, + version_number: int, normalize_addresses: bool = True, ): """ @@ -1397,16 +1578,6 @@ def add_scores( raise DuplicateLeaderboardAddressError("Dublicated addresses", duplicates) - if overwrite: - db_session.query(LeaderboardScores).filter( - LeaderboardScores.leaderboard_id == leaderboard_id - ).delete() - try: - db_session.commit() - except: - db_session.rollback() - raise LeaderboardDeleteScoresError("Error deleting leaderboard scores") - for score in scores: leaderboard_scores.append( { @@ -1414,13 +1585,18 @@ def add_scores( "address": normalizer_fn(score.address), "score": score.score, "points_data": score.points_data, + "leaderboard_version_number": version_number, } ) insert_statement = insert(LeaderboardScores).values(leaderboard_scores) result_stmt = insert_statement.on_conflict_do_update( - index_elements=[LeaderboardScores.address, LeaderboardScores.leaderboard_id], + index_elements=[ + LeaderboardScores.address, + LeaderboardScores.leaderboard_id, + LeaderboardScores.leaderboard_version_number, + ], set_=dict( score=insert_statement.excluded.score, points_data=insert_statement.excluded.points_data, @@ -1436,7 +1612,7 @@ def add_scores( return leaderboard_scores -# leadrboard access actions +# leaderboard access actions def create_leaderboard_resource( @@ -1675,3 +1851,159 @@ def check_leaderboard_resource_permissions( return True return False + + +def get_leaderboard_version( + db_session: Session, leaderboard_id: uuid.UUID, version_number: int +) -> LeaderboardVersion: + """ + Get the leaderboard version by id + """ + return ( + db_session.query(LeaderboardVersion) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .filter(LeaderboardVersion.version_number == version_number) + .one() + ) + + +def create_leaderboard_version( + db_session: Session, + leaderboard_id: uuid.UUID, + version_number: Optional[int] = None, + publish: bool = False, +) -> LeaderboardVersion: + """ + Create a leaderboard version + """ + + if version_number is None: + latest_version_result = ( + db_session.query(func.max(LeaderboardVersion.version_number)) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .one() + ) + + latest_version = latest_version_result[0] + + if latest_version is None: + version_number = 0 + else: + version_number = latest_version + 1 + + leaderboard_version = LeaderboardVersion( + leaderboard_id=leaderboard_id, + version_number=version_number, + published=publish, + ) + + db_session.add(leaderboard_version) + db_session.commit() + + return leaderboard_version + + +def change_publish_leaderboard_version_status( + db_session: Session, leaderboard_id: uuid.UUID, version_number: int, published: bool +) -> LeaderboardVersion: + """ + Publish a leaderboard version + """ + leaderboard_version = ( + db_session.query(LeaderboardVersion) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .filter(LeaderboardVersion.version_number == version_number) + .one() + ) + + leaderboard_version.published = published + + db_session.commit() + + return leaderboard_version + + +def get_leaderboard_versions( + db_session: Session, leaderboard_id: uuid.UUID +) -> List[LeaderboardVersion]: + """ + Get all leaderboard versions + """ + return ( + db_session.query(LeaderboardVersion) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .all() + ) + + +def delete_leaderboard_version( + db_session: Session, leaderboard_id: uuid.UUID, version_number: int +) -> LeaderboardVersion: + """ + Delete a leaderboard version + """ + leaderboard_version = ( + db_session.query(LeaderboardVersion) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .filter(LeaderboardVersion.version_number == version_number) + .one() + ) + + db_session.delete(leaderboard_version) + db_session.commit() + + return leaderboard_version + + +def get_leaderboard_version_scores( + db_session: Session, + leaderboard_id: uuid.UUID, + version_number: int, + limit: int, + offset: int, +) -> List[LeaderboardScores]: + """ + Get the leaderboard scores by version number + """ + + query = ( + db_session.query( + LeaderboardScores.id, + LeaderboardScores.address.label("address"), + LeaderboardScores.score.label("score"), + LeaderboardScores.points_data.label("points_data"), + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + .filter(LeaderboardScores.leaderboard_version_number == version_number) + ) + + if limit: + query = query.limit(limit) + + if offset: + query = query.offset(offset) + + return query + + +def delete_previous_versions( + db_session: Session, + leaderboard_id: uuid.UUID, + threshold_version_number: int, +) -> int: + """ + Delete old leaderboard versions + """ + + versions_to_delete = ( + db_session.query(LeaderboardVersion) + .filter(LeaderboardVersion.leaderboard_id == leaderboard_id) + .filter(LeaderboardVersion.version_number < threshold_version_number) + ) + + num_deleted = versions_to_delete.delete(synchronize_session=False) + + db_session.commit() + + return num_deleted diff --git a/engineapi/engineapi/contracts_actions.py b/engineapi/engineapi/contracts_actions.py index f3378410..4d9f6ca8 100644 --- a/engineapi/engineapi/contracts_actions.py +++ b/engineapi/engineapi/contracts_actions.py @@ -5,7 +5,7 @@ import uuid from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple -from sqlalchemy import func, text +from sqlalchemy import func, or_, text from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Row from sqlalchemy.exc import IntegrityError, NoResultFound @@ -414,7 +414,7 @@ def create_request_calls( request_id=specification.request_id, parameters=specification.parameters, expires_at=expires_at, - live_at=datetime.fromtimestamp(live_at), + live_at=datetime.fromtimestamp(live_at) if live_at is not None else None, ) db_session.add(request) @@ -485,7 +485,10 @@ def list_call_requests( metatx_requester_id: Optional[uuid.UUID] = None, ) -> List[Row[Tuple[CallRequest, RegisteredContract, CallRequestType]]]: """ - List call requests for the given moonstream_user_id + List call requests. + + Argument moonstream_user_id took from authorization workflow. And if it is specified + then user has access to call_requests before live_at param. """ if caller is None: raise ValueError("caller must be specified") @@ -526,11 +529,11 @@ def list_call_requests( ) if not show_before_live_at: query = query.filter( - CallRequest.live_at < func.now(), + or_(CallRequest.live_at < func.now(), CallRequest.live_at == None) ) else: query = query.filter( - CallRequest.live_at < func.now(), + or_(CallRequest.live_at < func.now(), CallRequest.live_at == None) ) if offset is not None: diff --git a/engineapi/engineapi/data.py b/engineapi/engineapi/data.py index dd601efe..c1b01beb 100644 --- a/engineapi/engineapi/data.py +++ b/engineapi/engineapi/data.py @@ -308,7 +308,7 @@ class CallRequestResponse(BaseModel): parameters: Dict[str, Any] tx_hash: Optional[str] = None expires_at: Optional[datetime] = None - live_at: datetime + live_at: Optional[datetime] = None created_at: datetime updated_at: datetime @@ -445,3 +445,15 @@ class LeaderboardConfigUpdate(BaseModel): query_name: Optional[str] = None params: Dict[str, int] normalize_addresses: Optional[bool] = None + + +class LeaderboardVersion(BaseModel): + leaderboard_id: UUID + version: int + published: bool + created_at: datetime + updated_at: datetime + + +class LeaderboardVersionRequest(BaseModel): + publish: bool diff --git a/engineapi/engineapi/models.py b/engineapi/engineapi/models.py index f763fea9..3cf4ba28 100644 --- a/engineapi/engineapi/models.py +++ b/engineapi/engineapi/models.py @@ -13,6 +13,7 @@ from sqlalchemy import ( MetaData, String, UniqueConstraint, + ForeignKeyConstraint, ) from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.ext.compiler import compiles @@ -316,7 +317,7 @@ class CallRequest(Base): tx_hash = Column(VARCHAR(256), unique=True, nullable=True) expires_at = Column(DateTime(timezone=True), nullable=True, index=True) - live_at = Column(DateTime(timezone=True), server_default=utcnow(), nullable=False) + live_at = Column(DateTime(timezone=True), nullable=True) created_at = Column( DateTime(timezone=True), server_default=utcnow(), nullable=False @@ -359,9 +360,45 @@ class Leaderboard(Base): # type: ignore ) +class LeaderboardVersion(Base): # type: ignore + __tablename__ = "leaderboard_versions" + __table_args__ = (UniqueConstraint("leaderboard_id", "version_number"),) + + leaderboard_id = Column( + UUID(as_uuid=True), + ForeignKey("leaderboards.id", ondelete="CASCADE"), + primary_key=True, + nullable=False, + ) + version_number = Column(DECIMAL, primary_key=True, nullable=False) + published = Column(Boolean, default=False, nullable=False) + created_at = Column( + DateTime(timezone=True), + server_default=utcnow(), + nullable=False, + index=True, + ) + updated_at = Column( + DateTime(timezone=True), + server_default=utcnow(), + onupdate=utcnow(), + nullable=False, + ) + + class LeaderboardScores(Base): # type: ignore __tablename__ = "leaderboard_scores" - __table_args__ = (UniqueConstraint("leaderboard_id", "address"),) + __table_args__ = ( + UniqueConstraint("leaderboard_id", "address", "leaderboard_version_number"), + ForeignKeyConstraint( + ["leaderboard_id", "leaderboard_version_number"], + [ + "leaderboard_versions.leaderboard_id", + "leaderboard_versions.version_number", + ], + ondelete="CASCADE", + ), + ) id = Column( UUID(as_uuid=True), @@ -372,7 +409,10 @@ class LeaderboardScores(Base): # type: ignore ) leaderboard_id = Column( UUID(as_uuid=True), - ForeignKey("leaderboards.id", ondelete="CASCADE"), + nullable=False, + ) + leaderboard_version_number = Column( + DECIMAL, nullable=False, ) address = Column(VARCHAR(256), nullable=False, index=True) diff --git a/engineapi/engineapi/routes/leaderboard.py b/engineapi/engineapi/routes/leaderboard.py index 15050f31..20c6e0dd 100644 --- a/engineapi/engineapi/routes/leaderboard.py +++ b/engineapi/engineapi/routes/leaderboard.py @@ -48,7 +48,7 @@ AuthHeader = Header( ) -leaderboad_whitelist = { +leaderboard_whitelist = { f"/leaderboard/{DOCS_TARGET_PATH}": "GET", "/leaderboard/openapi.json": "GET", "/leaderboard/info": "GET", @@ -76,7 +76,7 @@ app = FastAPI( ) -app.add_middleware(ExtractBearerTokenMiddleware, whitelist=leaderboad_whitelist) +app.add_middleware(ExtractBearerTokenMiddleware, whitelist=leaderboard_whitelist) app.add_middleware( CORSMiddleware, @@ -87,13 +87,19 @@ app.add_middleware( ) -@app.get("", response_model=List[data.LeaderboardPosition], tags=["Public Endpoints"]) +@app.get( + "", + response_model=List[data.LeaderboardPosition], + tags=["Public Endpoints"], + include_in_schema=False, +) @app.get("/", response_model=List[data.LeaderboardPosition], tags=["Public Endpoints"]) async def leaderboard( leaderboard_id: UUID = Query(..., description="Leaderboard ID"), limit: int = Query(10), offset: int = Query(0), db_session: Session = Depends(db.yield_db_session), + version: Optional[str] = Query(None, description="Version of the leaderboard."), ) -> List[data.LeaderboardPosition]: """ Returns the leaderboard positions. @@ -112,7 +118,7 @@ async def leaderboard( raise EngineHTTPException(status_code=500, detail="Internal server error") leaderboard_positions = actions.get_leaderboard_positions( - db_session, leaderboard_id, limit, offset + db_session, leaderboard_id, limit, offset, version ) result = [ data.LeaderboardPosition( @@ -128,7 +134,10 @@ async def leaderboard( @app.post( - "", response_model=data.LeaderboardCreatedResponse, tags=["Authorized Endpoints"] + "", + response_model=data.LeaderboardCreatedResponse, + tags=["Authorized Endpoints"], + include_in_schema=False, ) @app.post( "/", response_model=data.LeaderboardCreatedResponse, tags=["Authorized Endpoints"] @@ -346,6 +355,7 @@ async def get_leaderboards( ) async def count_addresses( leaderboard_id: UUID = Query(..., description="Leaderboard ID"), + version: Optional[int] = Query(None, description="Version of the leaderboard."), db_session: Session = Depends(db.yield_db_session), ) -> data.CountAddressesResponse: """ @@ -364,7 +374,7 @@ async def count_addresses( logger.error(f"Error while getting leaderboard: {e}") raise EngineHTTPException(status_code=500, detail="Internal server error") - count = actions.get_leaderboard_total_count(db_session, leaderboard_id) + count = actions.get_leaderboard_total_count(db_session, leaderboard_id, version) return data.CountAddressesResponse(count=count) @@ -375,12 +385,13 @@ async def count_addresses( async def leadeboard_info( leaderboard_id: UUID = Query(..., description="Leaderboard ID"), db_session: Session = Depends(db.yield_db_session), + version: Optional[int] = Query(None, description="Version of the leaderboard."), ) -> data.LeaderboardInfoResponse: """ Returns leaderboard info. """ try: - leaderboard = actions.get_leaderboard_info(db_session, leaderboard_id) + leaderboard = actions.get_leaderboard_info(db_session, leaderboard_id, version) except NoResultFound as e: raise EngineHTTPException( status_code=404, @@ -434,6 +445,7 @@ async def get_scores_changes( async def quartiles( leaderboard_id: UUID = Query(..., description="Leaderboard ID"), db_session: Session = Depends(db.yield_db_session), + version: Optional[int] = Query(None, description="Version of the leaderboard."), ) -> data.QuartilesResponse: """ Returns the quartiles of the leaderboard. @@ -451,7 +463,7 @@ async def quartiles( raise EngineHTTPException(status_code=500, detail="Internal server error") try: - q1, q2, q3 = actions.get_qurtiles(db_session, leaderboard_id) + q1, q2, q3 = actions.get_qurtiles(db_session, leaderboard_id, version) except actions.LeaderboardIsEmpty: raise EngineHTTPException(status_code=204, detail="Leaderboard is empty.") @@ -480,6 +492,7 @@ async def position( normalize_addresses: bool = Query( True, description="Normalize addresses to checksum." ), + version: Optional[int] = Query(None, description="Version of the leaderboard."), db_session: Session = Depends(db.yield_db_session), ) -> List[data.LeaderboardPosition]: """ @@ -503,7 +516,13 @@ async def position( address = Web3.toChecksumAddress(address) positions = actions.get_position( - db_session, leaderboard_id, address, window_size, limit, offset + db_session, + leaderboard_id, + address, + window_size, + limit, + offset, + version, ) results = [ @@ -527,6 +546,7 @@ async def rank( rank: int = Query(1, description="Rank to get."), limit: Optional[int] = Query(None), offset: Optional[int] = Query(None), + version: Optional[int] = Query(None, description="Version of the leaderboard."), db_session: Session = Depends(db.yield_db_session), ) -> List[data.LeaderboardPosition]: """ @@ -546,7 +566,12 @@ async def rank( raise EngineHTTPException(status_code=500, detail="Internal server error") leaderboard_rank = actions.get_rank( - db_session, leaderboard_id, rank, limit=limit, offset=offset + db_session, + leaderboard_id, + rank, + limit=limit, + offset=offset, + version_number=version, ) results = [ data.LeaderboardPosition( @@ -563,6 +588,7 @@ async def rank( @app.get("/ranks", response_model=List[data.RanksResponse], tags=["Public Endpoints"]) async def ranks( leaderboard_id: UUID = Query(..., description="Leaderboard ID"), + version: Optional[int] = Query(None, description="Version of the leaderboard."), db_session: Session = Depends(db.yield_db_session), ) -> List[data.RanksResponse]: """ @@ -581,7 +607,7 @@ async def ranks( logger.error(f"Error while getting leaderboard: {e}") raise EngineHTTPException(status_code=500, detail="Internal server error") - ranks = actions.get_ranks(db_session, leaderboard_id) + ranks = actions.get_ranks(db_session, leaderboard_id, version) results = [ data.RanksResponse( score=rank.score, @@ -604,10 +630,6 @@ async def leaderboard_push_scores( scores: List[data.Score] = Body( ..., description="Scores to put to the leaderboard." ), - overwrite: bool = Query( - False, - description="If enabled, this will delete all current scores and replace them with the new scores provided.", - ), normalize_addresses: bool = Query( True, description="Normalize addresses to checksum." ), @@ -635,13 +657,22 @@ async def leaderboard_push_scores( status_code=403, detail="You don't have access to this leaderboard." ) + try: + new_version = actions.create_leaderboard_version( + db_session=db_session, + leaderboard_id=leaderboard_id, + ) + except Exception as e: + logger.error(f"Error while creating leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + try: leaderboard_points = actions.add_scores( db_session=db_session, leaderboard_id=leaderboard_id, scores=scores, - overwrite=overwrite, normalize_addresses=normalize_addresses, + version_number=new_version.version_number, ) except actions.DuplicateLeaderboardAddressError as e: raise EngineHTTPException( @@ -658,6 +689,27 @@ async def leaderboard_push_scores( logger.error(f"Score update failed with error: {e}") raise EngineHTTPException(status_code=500, detail="Score update failed.") + try: + actions.change_publish_leaderboard_version_status( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=new_version.version_number, + published=True, + ) + except Exception as e: + logger.error(f"Error while updating leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + try: + actions.delete_previous_versions( + db_session=db_session, + leaderboard_id=leaderboard_id, + threshold_version_number=new_version.version_number, + ) + except Exception as e: + logger.error(f"Error while deleting leaderboard versions: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + result = [ data.LeaderboardScore( leaderboard_id=score["leaderboard_id"], @@ -881,3 +933,422 @@ async def leaderboard_config_deactivate( raise EngineHTTPException(status_code=500, detail="Internal server error") return True + + +@app.get( + "/{leaderboard_id}/versions", + response_model=List[data.LeaderboardVersion], + tags=["Authorized Endpoints"], +) +async def leaderboard_versions_list( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> List[data.LeaderboardVersion]: + """ + Get leaderboard versions list. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, + leaderboard_id=leaderboard_id, + token=token, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard version." + ) + + try: + leaderboard_versions = actions.get_leaderboard_versions( + db_session=db_session, + leaderboard_id=leaderboard_id, + ) + except Exception as e: + logger.error(f"Error while getting leaderboard versions list: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + result = [ + data.LeaderboardVersion( + leaderboard_id=version.leaderboard_id, + version=version.version_number, + published=version.published, + created_at=version.created_at, + updated_at=version.updated_at, + ) + for version in leaderboard_versions + ] + + return result + + +@app.get( + "/{leaderboard_id}/versions/{version}", + response_model=data.LeaderboardVersion, + tags=["Authorized Endpoints"], +) +async def leaderboard_version_handler( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + version: int = Path(..., description="Version of the leaderboard."), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> data.LeaderboardVersion: + """ + Get leaderboard version. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, + leaderboard_id=leaderboard_id, + token=token, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard." + ) + + try: + leaderboard_version = actions.get_leaderboard_version( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + except Exception as e: + logger.error(f"Error while getting leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + return data.LeaderboardVersion( + leaderboard_id=leaderboard_version.leaderboard_id, + version=leaderboard_version.version_number, + published=leaderboard_version.published, + created_at=leaderboard_version.created_at, + updated_at=leaderboard_version.updated_at, + ) + + +@app.post( + "/{leaderboard_id}/versions", + response_model=data.LeaderboardVersion, + tags=["Authorized Endpoints"], +) +async def create_leaderboard_version( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + db_session: Session = Depends(db.yield_db_session), + request_body: data.LeaderboardVersionRequest = Body( + ..., + description="JSON object specifying whether to publish or unpublish version.", + ), + Authorization: str = AuthHeader, +) -> data.LeaderboardVersion: + """ + Create leaderboard version. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, + leaderboard_id=leaderboard_id, + token=token, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard." + ) + + try: + new_version = actions.create_leaderboard_version( + db_session=db_session, + leaderboard_id=leaderboard_id, + publish=request_body.publish, + ) + except Exception as e: + logger.error(f"Error while creating leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + return data.LeaderboardVersion( + leaderboard_id=new_version.leaderboard_id, + version=new_version.version_number, + published=new_version.published, + created_at=new_version.created_at, + updated_at=new_version.updated_at, + ) + + +@app.put( + "/{leaderboard_id}/versions/{version}", + response_model=data.LeaderboardVersion, + tags=["Authorized Endpoints"], +) +async def update_leaderboard_version_handler( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + version: int = Path(..., description="Version of the leaderboard."), + request_body: data.LeaderboardVersionRequest = Body( + ..., + description="JSON object specifying whether to publish or unpublish version.", + ), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> data.LeaderboardVersion: + """ + Update leaderboard version. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, + leaderboard_id=leaderboard_id, + token=token, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard version." + ) + + try: + leaderboard_version = actions.change_publish_leaderboard_version_status( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version, + published=request_body.publish, + ) + except Exception as e: + logger.error(f"Error while updating leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + return data.LeaderboardVersion( + leaderboard_id=leaderboard_version.leaderboard_id, + version=leaderboard_version.version_number, + published=leaderboard_version.published, + created_at=leaderboard_version.created_at, + updated_at=leaderboard_version.updated_at, + ) + + +@app.delete( + "/{leaderboard_id}/versions/{version}", + response_model=data.LeaderboardVersion, + tags=["Authorized Endpoints"], +) +async def delete_leaderboard_version_handler( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + version: int = Path(..., description="Version of the leaderboard."), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> data.LeaderboardVersion: + """ + Delete leaderboard version. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, leaderboard_id=leaderboard_id, token=token + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard version." + ) + + try: + leaderboard_version = actions.delete_leaderboard_version( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + except Exception as e: + logger.error(f"Error while deleting leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + return data.LeaderboardVersion( + leaderboard_id=leaderboard_version.leaderboard_id, + version=leaderboard_version.version_number, + published=leaderboard_version.published, + created_at=leaderboard_version.created_at, + updated_at=leaderboard_version.updated_at, + ) + + +@app.get( + "/{leaderboard_id}/versions/{version}/scores", + response_model=List[data.LeaderboardPosition], + tags=["Authorized Endpoints"], +) +async def leaderboard_version_scores_handler( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + version: int = Path(..., description="Version of the leaderboard."), + limit: int = Query(10), + offset: int = Query(0), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> List[data.LeaderboardPosition]: + """ + Get leaderboard version scores. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, leaderboard_id=leaderboard_id, token=token + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard version." + ) + + try: + leaderboard_version_scores = actions.get_leaderboard_version_scores( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version, + limit=limit, + offset=offset, + ) + except Exception as e: + logger.error(f"Error while getting leaderboard version scores: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + result = [ + data.LeaderboardPosition( + address=score.address, + score=score.score, + rank=score.rank, + points_data=score.points_data, + ) + for score in leaderboard_version_scores + ] + + return result + + +@app.put( + "/{leaderboard_id}/versions/{version}/scores", + response_model=List[data.LeaderboardScore], + tags=["Authorized Endpoints"], +) +async def leaderboard_version_push_scores_handler( + request: Request, + leaderboard_id: UUID = Path(..., description="Leaderboard ID"), + version: int = Path(..., description="Version of the leaderboard."), + scores: List[data.Score] = Body( + ..., description="Scores to put to the leaderboard version." + ), + normalize_addresses: bool = Query( + True, description="Normalize addresses to checksum." + ), + db_session: Session = Depends(db.yield_db_session), + Authorization: str = AuthHeader, +) -> List[data.LeaderboardScore]: + """ + Put the leaderboard version to the database. + """ + token = request.state.token + try: + access = actions.check_leaderboard_resource_permissions( + db_session=db_session, leaderboard_id=leaderboard_id, token=token + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + + if not access: + raise EngineHTTPException( + status_code=403, detail="You don't have access to this leaderboard version." + ) + + try: + leaderboard_version = actions.get_leaderboard_version( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version, + ) + except NoResultFound as e: + raise EngineHTTPException( + status_code=404, + detail="Leaderboard version not found.", + ) + except Exception as e: + logger.error(f"Error while getting leaderboard version: {e}") + raise EngineHTTPException(status_code=500, detail="Internal server error") + + try: + leaderboard_points = actions.add_scores( + db_session=db_session, + leaderboard_id=leaderboard_id, + scores=scores, + normalize_addresses=normalize_addresses, + version_number=leaderboard_version.version_number, + ) + except actions.DuplicateLeaderboardAddressError as e: + raise EngineHTTPException( + status_code=409, + detail=f"Duplicates in push to database is disallowed.\n List of duplicates:{e.duplicates}.\n Please handle duplicates manualy.", + ) + except Exception as e: + logger.error(f"Score update failed with error: {e}") + raise EngineHTTPException(status_code=500, detail="Score update failed.") + + result = [ + data.LeaderboardScore( + leaderboard_id=score["leaderboard_id"], + address=score["address"], + score=score["score"], + points_data=score["points_data"], + ) + for score in leaderboard_points + ] + + return result diff --git a/moonstreamapi/configs/sample.env b/moonstreamapi/configs/sample.env index 7da28466..1d816578 100644 --- a/moonstreamapi/configs/sample.env +++ b/moonstreamapi/configs/sample.env @@ -4,6 +4,7 @@ export MOONSTREAM_DB_URI_READ_ONLY="postgresql://:@ export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to" export BUGOUT_BROOD_URL="https://auth.bugout.dev" export BUGOUT_SPIRE_URL="https://spire.bugout.dev" +export BUGOUT_SPIRE_EXTERNAL_URL="https://spire.bugout.dev" export MOONSTREAM_APPLICATION_ID="" export MOONSTREAM_ADMIN_ACCESS_TOKEN="" export MOONSTREAM_POOL_SIZE=0 diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index e0f5f035..5a1a0189 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -14,7 +14,7 @@ from bugout.data import ( BugoutSearchResult, ) from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Body, Path, Request +from fastapi import APIRouter, Body, Path, Request, Query from moonstreamdb.blockchain import AvailableBlockchainType from sqlalchemy import text @@ -36,6 +36,7 @@ from ..settings import ( MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE, MOONSTREAM_S3_QUERIES_BUCKET, MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, + BUGOUT_REQUEST_TIMEOUT_SECONDS, ) from ..settings import bugout_client as bc @@ -48,6 +49,10 @@ router = APIRouter( @router.get("/list", tags=["queries"]) async def get_list_of_queries_handler(request: Request) -> List[Dict[str, Any]]: + """ + Return list of queries which user own + """ + token = request.state.token # Check already existed queries @@ -73,7 +78,7 @@ async def create_query_handler( request: Request, query_applied: data.PreapprovedQuery = Body(...) ) -> BugoutJournalEntry: """ - Create query in bugout journal + Create query in bugout journal with preapprove status required approval from moonstream team """ token = request.state.token @@ -117,6 +122,7 @@ async def create_query_handler( title=f"Query:{query_name}", tags=["type:query"], content=query_applied.query, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS * 2, ) except BugoutResponseException as e: raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) @@ -161,10 +167,15 @@ async def create_query_handler( @router.get("/templates", tags=["queries"]) def get_suggested_queries( - supported_interfaces: Optional[List[str]] = None, - address: Optional[str] = None, - title: Optional[str] = None, - limit: int = 10, + supported_interfaces: Optional[List[str]] = Query( + None, description="Supported interfaces in format: d73f4e3a erc1155" + ), + address: Optional[str] = Query( + None, + description="Query address for search if template applied to particular address", + ), + title: Optional[str] = Query(None, description="Query title for search"), + limit: int = Query(10), ) -> data.SuggestedQueriesResponse: """ Return set of suggested queries for user @@ -191,9 +202,10 @@ def get_suggested_queries( journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, query=query, limit=limit, - timeout=5, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) except BugoutResponseException as e: + logger.error(f"Error in get suggested queries templates: {str(e)}") raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -222,7 +234,7 @@ def get_suggested_queries( @router.get("/{query_name}/query", tags=["queries"]) async def get_query_handler( - request: Request, query_name: str + request: Request, query_name: str = Path(..., description="Query name") ) -> data.QueryInfoResponse: token = request.state.token @@ -248,7 +260,7 @@ async def get_query_handler( limit=1, ) except BugoutResponseException as e: - logger.error(f"Error in get query: {str(e)}") + logger.error(f"Error in search template: {str(e)}") raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -263,12 +275,10 @@ async def get_query_handler( ) try: - entries = bc.search( + entry = bc.get_entry( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, - query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", - limit=1, - timeout=5, + entry_id=query_id, ) except BugoutResponseException as e: logger.error(f"Error in get query: {str(e)}") @@ -276,23 +286,23 @@ async def get_query_handler( except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) - if len(entries.results) == 0: - raise MoonstreamHTTPException( - status_code=403, detail="Query not approved yet." - ) else: entries_results = cast(List[BugoutSearchResult], entries.results) query_id = entries_results[0].entry_url.split("/")[-1] + entry = entries_results[0] - entries_results = cast(List[BugoutSearchResult], entries.results) - entry = entries_results[0] + content = entry.content + tags = entry.tags + created_at = entry.created_at + updated_at = entry.updated_at + + if content is None: + raise MoonstreamHTTPException( + status_code=403, detail=f"Query is empty. Please update it." + ) try: - if entry.content is None: - raise MoonstreamHTTPException( - status_code=403, detail=f"Query is empty. Please update it." - ) - query = text(entry.content) + query = text(content) except Exception as e: raise MoonstreamHTTPException( status_code=500, internal_error=e, detail="Error in query parsing" @@ -301,8 +311,7 @@ async def get_query_handler( query_parameters_names = list(query._bindparams.keys()) tags_dict = { - tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True) - for tag in entry.tags + tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True) for tag in tags } query_parameters: Dict[str, Any] = {} @@ -313,23 +322,21 @@ async def get_query_handler( else: query_parameters[param] = None - print(type(entry.created_at)) - return data.QueryInfoResponse( - query=entry.content, + query=content, query_id=str(query_id), preapprove="preapprove" in tags_dict, approved="approved" in tags_dict, parameters=query_parameters, - created_at=entry.created_at, # type: ignore - updated_at=entry.updated_at, # type: ignore + created_at=created_at, # type: ignore + updated_at=updated_at, # type: ignore ) @router.put("/{query_name}", tags=["queries"]) async def update_query_handler( request: Request, - query_name: str, + query_name: str = Path(..., description="Query name"), request_update: data.UpdateQueryRequest = Body(...), ) -> BugoutJournalEntryContent: token = request.state.token @@ -367,9 +374,9 @@ async def update_query_handler( ) async def update_query_data_handler( request: Request, - query_name: str, + query_name: str = Path(..., description="Query name"), request_update: data.UpdateDataRequest = Body(...), -) -> Optional[data.QueryPresignUrl]: +) -> data.QueryPresignUrl: """ Request update data on S3 bucket """ @@ -407,7 +414,7 @@ async def update_query_data_handler( limit=1, ) except BugoutResponseException as e: - logger.error(f"Error in get query: {str(e)}") + logger.error(f"Error in search template: {str(e)}") raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -422,52 +429,55 @@ async def update_query_data_handler( ) try: - entries = bc.search( + entry = bc.get_entry( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, - query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", - limit=1, - timeout=5, + entry_id=query_id, ) + except BugoutResponseException as e: logger.error(f"Error in get query: {str(e)}") raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) - if len(entries.results) == 0: + ### check tags + + if "preapprove" in entry.tags or "approved" not in entry.tags: raise MoonstreamHTTPException( status_code=403, detail="Query not approved yet." ) + + content = entry.content + tags = entry.tags + else: entries_results = cast(List[BugoutSearchResult], entries.results) query_id = entries_results[0].entry_url.split("/")[-1] - - s3_response = None - - entries_results = cast(List[BugoutSearchResult], entries.results) - if entries_results[0].content: content = entries_results[0].content - tags = entries_results[0].tags + if content: file_type = "json" if "ext:csv" in tags: file_type = "csv" - - responce = requests.post( - f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", - json={ - "query": content, - "params": request_update.params, - "file_type": file_type, - "blockchain": request_update.blockchain - if request_update.blockchain - else None, - }, - timeout=5, - ) + try: + responce = requests.post( + f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", + json={ + "query": content, + "params": request_update.params, + "file_type": file_type, + "blockchain": request_update.blockchain + if request_update.blockchain + else None, + }, + timeout=5, + ) + except Exception as e: + logger.error(f"Error interaction with crawlers: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) if responce.status_code != 200: raise MoonstreamHTTPException( @@ -476,6 +486,10 @@ async def update_query_data_handler( ) s3_response = data.QueryPresignUrl(**responce.json()) + else: + raise MoonstreamHTTPException( + status_code=403, detail=f"Query is empty. Please update it." + ) return s3_response @@ -483,7 +497,7 @@ async def update_query_data_handler( @router.post("/{query_name}", tags=["queries"]) async def get_access_link_handler( request: Request, - query_name: str, + query_name: str = Path(..., description="Query name"), request_update: data.UpdateDataRequest = Body(...), ) -> Optional[data.QueryPresignUrl]: """ @@ -513,7 +527,7 @@ async def get_access_link_handler( limit=1, ) except BugoutResponseException as e: - logger.error(f"Error in get query: {str(e)}") + logger.error(f"Error in search template: {str(e)}") raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -528,12 +542,10 @@ async def get_access_link_handler( ) try: - entries = bc.search( + entry = bc.get_entry( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, - query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", - limit=1, - timeout=5, + entry_id=query_id, ) except BugoutResponseException as e: logger.error(f"Error in get query: {str(e)}") @@ -541,38 +553,37 @@ async def get_access_link_handler( except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) - if len(entries.results) == 0: - raise MoonstreamHTTPException( - status_code=403, detail="Query not approved yet." - ) + else: + entry = cast(BugoutJournalEntry, entries.results[0]) - entries_results = cast(List[BugoutSearchResult], entries.results) + content = entry.content + tags = entry.tags + + if not content: + raise MoonstreamHTTPException( + status_code=403, detail=f"Query is empty. Please update it." + ) try: - s3_response = None + passed_params = dict(request_update.params) - if entries_results[0].content: - passed_params = dict(request_update.params) + file_type = "json" - tags = entries_results[0].tags + if "ext:csv" in tags: + file_type = "csv" - file_type = "json" + params_hash = query_parameter_hash(passed_params) - if "ext:csv" in tags: - file_type = "csv" + bucket = MOONSTREAM_S3_QUERIES_BUCKET + key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}" - params_hash = query_parameter_hash(passed_params) - - bucket = MOONSTREAM_S3_QUERIES_BUCKET - key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}" - - stats_presigned_url = generate_s3_access_links( - method_name="get_object", - bucket=bucket, - key=key, - expiration=300000, - http_method="GET", - ) - s3_response = data.QueryPresignUrl(url=stats_presigned_url) + stats_presigned_url = generate_s3_access_links( + method_name="get_object", + bucket=bucket, + key=key, + expiration=300000, + http_method="GET", + ) + s3_response = data.QueryPresignUrl(url=stats_presigned_url) except Exception as e: logger.error(f"Error in get access link: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -582,8 +593,7 @@ async def get_access_link_handler( @router.delete("/{query_name}", tags=["queries"]) async def remove_query_handler( - request: Request, - query_name: str, + request: Request, query_name: str = Path(..., description="Query name") ) -> BugoutJournalEntry: """ Request delete query from journal diff --git a/moonstreamapi/moonstreamapi/settings.py b/moonstreamapi/moonstreamapi/settings.py index d6035877..95f7491a 100644 --- a/moonstreamapi/moonstreamapi/settings.py +++ b/moonstreamapi/moonstreamapi/settings.py @@ -7,9 +7,14 @@ from moonstreamdb.blockchain import AvailableBlockchainType # Bugout BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") +BUGOUT_SPIRE_EXTERNAL_URL = os.environ.get( + "BUGOUT_SPIRE_EXTERNAL_URL", "https://spire.bugout.dev" +) -bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) +bugout_client = Bugout( + brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_EXTERNAL_URL +) BUGOUT_REQUEST_TIMEOUT_SECONDS = 5 diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 0928d039..b6d3c966 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.3.0" +MOONSTREAMAPI_VERSION = "0.3.2" diff --git a/nodebalancer/cmd/nodebalancer/balancer.go b/nodebalancer/cmd/nodebalancer/balancer.go index ff43a45a..d8dea3c4 100644 --- a/nodebalancer/cmd/nodebalancer/balancer.go +++ b/nodebalancer/cmd/nodebalancer/balancer.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "sync/atomic" + "time" ) // Main variable of pool of blockchains which contains pool of nodes @@ -50,7 +51,8 @@ type BlockchainPool struct { // Node status response struct for HealthCheck type NodeStatusResultResponse struct { - Number string `json:"number"` + BlockNumber uint64 `json:"block_number"` + Number string `json:"number"` } type NodeStatusResponse struct { @@ -194,14 +196,21 @@ func (bpool *BlockchainPool) StatusLog() { // HealthCheck fetch the node latest block func (bpool *BlockchainPool) HealthCheck() { for _, b := range bpool.Blockchains { + var timeout time.Duration + getLatestBlockReq := `{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}` + if b.Blockchain == "starknet" { + getLatestBlockReq = `{"jsonrpc":"2.0","method":"starknet_getBlockWithTxHashes","params":["latest"],"id":"0"}` + timeout = NB_HEALTH_CHECK_CALL_TIMEOUT * 2 + } + for _, n := range b.Nodes { alive := false - httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} + httpClient := http.Client{Timeout: timeout} resp, err := httpClient.Post( n.Endpoint.String(), "application/json", - bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), + bytes.NewBuffer([]byte(getLatestBlockReq)), ) if err != nil { n.UpdateNodeState(0, alive) @@ -231,12 +240,17 @@ func (bpool *BlockchainPool) HealthCheck() { continue } - blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) - blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) - if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to parse block number from hex to string, err: %v", err) - continue + var blockNumber uint64 + if b.Blockchain == "starknet" { + blockNumber = statusResponse.Result.BlockNumber + } else { + blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) + blockNumber, err = strconv.ParseUint(blockNumberHex, 16, 64) + if err != nil { + n.UpdateNodeState(0, alive) + log.Printf("Unable to parse block number from hex to string, err: %v", err) + continue + } } // Mark node in list of pool as alive and update current block diff --git a/nodebalancer/cmd/nodebalancer/blockchain.go b/nodebalancer/cmd/nodebalancer/blockchain.go index b0d8c57e..9f073fdf 100644 --- a/nodebalancer/cmd/nodebalancer/blockchain.go +++ b/nodebalancer/cmd/nodebalancer/blockchain.go @@ -27,9 +27,9 @@ var ( "eth_getUncleCountByBlockNumber": true, "eth_getWork": true, "eth_mining": true, - // "eth_sendRawTransaction": true, - "eth_protocolVersion": true, - "eth_syncing": true, + "eth_sendRawTransaction": true, + "eth_protocolVersion": true, + "eth_syncing": true, "net_listening": true, "net_peerCount": true, @@ -38,33 +38,64 @@ var ( "web3_clientVersion": true, // zksync methods - "zks_estimateFee": true, - "zks_estimateGasL1ToL2": true, - "zks_getAllAccountBalances": true, - "zks_getBlockDetails": true, - "zks_getBridgeContracts": true, - "zks_getBytecodeByHash": true, - "zks_getConfirmedTokens": true, - "zks_getL1BatchBlockRange": true, - "zks_getL1BatchDetails": true, - "zks_getL2ToL1LogProof": true, - "zks_getL2ToL1MsgProof": true, - "zks_getMainContract": true, + "zks_estimateFee": true, + "zks_estimateGasL1ToL2": true, + "zks_getAllAccountBalances": true, + "zks_getBlockDetails": true, + "zks_getBridgeContracts": true, + "zks_getBytecodeByHash": true, + "zks_getConfirmedTokens": true, + "zks_getL1BatchBlockRange": true, + "zks_getL1BatchDetails": true, + "zks_getL2ToL1LogProof": true, + "zks_getL2ToL1MsgProof": true, + "zks_getMainContract": true, "zks_getRawBlockTransactions": true, - "zks_getTestnetPaymaster": true, - "zks_getTokenPrice": true, - "zks_getTransactionDetails": true, - "zks_L1BatchNumber": true, - "zks_L1ChainId": true, + "zks_getTestnetPaymaster": true, + "zks_getTokenPrice": true, + "zks_getTransactionDetails": true, + "zks_L1BatchNumber": true, + "zks_L1ChainId": true, + // starknet methods + "starknet_specVersion": true, + "starknet_getBlockWithTxHashes": true, + "starknet_getBlockWithTxs": true, + "starknet_getStateUpdate": true, + "starknet_getStorageAt": true, + "starknet_getTransactionStatus": true, + "starknet_getTransactionByHash": true, + "starknet_getTransactionByBlockIdAndIndex": true, + "starknet_getTransactionReceipt": true, + "starknet_getClass": true, + "starknet_getClassHashAt": true, + "starknet_getClassAt": true, + "starknet_getBlockTransactionCount": true, + "starknet_call": true, + "starknet_estimateFee": true, + "starknet_estimateMessageFee": true, + "starknet_blockNumber": true, + "starknet_blockHashAndNumber": true, + "starknet_chainId": true, + "starknet_syncing": true, + "starknet_getEvents": true, + "starknet_getNonce": true, + + "starknet_traceTransaction": true, + "starknet_simulateTransactions": true, + "starknet_traceBlockTransactions": true, + + "starknet_addInvokeTransaction": true, + "starknet_addDeclareTransaction": true, + "starknet_addDeployAccountTransaction": true, } ) type JSONRPCRequest struct { - Jsonrpc string `json:"jsonrpc"` - Method string `json:"method"` - Params []interface{} `json:"params"` - ID uint64 `json:"id"` + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params"` + ID interface{} `json:"id"` // According to the JSON-RPC specification, the id can be a string, number, or null } type BlockchainConfig struct { diff --git a/nodebalancer/cmd/nodebalancer/middleware.go b/nodebalancer/cmd/nodebalancer/middleware.go index e5068909..e99ec037 100644 --- a/nodebalancer/cmd/nodebalancer/middleware.go +++ b/nodebalancer/cmd/nodebalancer/middleware.go @@ -390,6 +390,7 @@ func jsonrpcRequestParser(body []byte) ([]JSONRPCRequest, error) { var jsonrpcRequest []JSONRPCRequest firstByte := bytes.TrimLeft(body, " \t\r\n") + switch { case len(firstByte) > 0 && firstByte[0] == '[': err := json.Unmarshal(body, &jsonrpcRequest) @@ -407,6 +408,17 @@ func jsonrpcRequestParser(body []byte) ([]JSONRPCRequest, error) { return nil, fmt.Errorf("incorrect first byte in JSON RPC request") } + for _, req := range jsonrpcRequest { + switch v := req.ID.(type) { + case float64: + req.ID = uint64(v) + case string: + case nil: + default: + return nil, fmt.Errorf("unexpected type for id: %T", v) + } + } + return jsonrpcRequest, nil } diff --git a/nodebalancer/cmd/nodebalancer/server.go b/nodebalancer/cmd/nodebalancer/server.go index c3c4c0fd..648fa3da 100644 --- a/nodebalancer/cmd/nodebalancer/server.go +++ b/nodebalancer/cmd/nodebalancer/server.go @@ -201,6 +201,11 @@ func Server() { r.URL.RawQuery = "" r.Header.Del(strings.Title(NB_ACCESS_ID_HEADER)) r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER)) + + r.URL.Scheme = endpoint.Scheme + r.URL.Host = endpoint.Host + r.URL.Path = endpoint.Path + // Change r.Host from nodebalancer's to end host so TLS check will be passed r.Host = r.URL.Host } diff --git a/nodebalancer/cmd/nodebalancer/version.go b/nodebalancer/cmd/nodebalancer/version.go index 013fae97..dd47a51c 100644 --- a/nodebalancer/cmd/nodebalancer/version.go +++ b/nodebalancer/cmd/nodebalancer/version.go @@ -1,3 +1,3 @@ package main -var NB_VERSION = "0.2.4" +var NB_VERSION = "0.2.5"