Merge pull request #831 from moonstream-to/biling-cli

Biling cli
pull/1046/head
Andrey Dolgolev 2024-04-01 16:35:56 +03:00 zatwierdzone przez GitHub
commit d235d3b9a7
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
4 zmienionych plików z 399 dodań i 2 usunięć

Wyświetl plik

@ -1,6 +1,7 @@
"""
Moonstream CLI
"""
import argparse
import json
import logging
@ -20,7 +21,7 @@ from ..settings import (
)
from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions, moonworm_tasks, queries
from . import subscription_types, subscriptions, moonworm_tasks, queries, usage
from .migrations import (
checksum_address,
update_dashboard_subscription_key,
@ -257,6 +258,25 @@ def moonworm_tasks_add_subscription_handler(args: argparse.Namespace) -> None:
moonworm_tasks.add_subscription(args.id)
def generate_usage_handler(args: argparse.Namespace) -> None:
usage_info = usage.collect_usage_information(
month=args.month,
user_id=args.user_id,
contracts=args.contracts,
)
if args.output is not None:
# create path if not exists
if not os.path.exists(os.path.dirname(args.output)):
os.makedirs(os.path.dirname(args.output))
with open(args.output, "w") as output_file:
output_file.write(json.dumps(usage_info, indent=4))
else:
logger.info(json.dumps(usage_info, indent=4))
def main() -> None:
cli_description = f"""Moonstream Admin CLI
@ -532,6 +552,48 @@ This CLI is configured to work with the following API URLs:
"-n", "--name", required=True, help="Name for the new query"
)
create_query_parser.set_defaults(func=queries.create_query_template)
usage_parser = subcommands.add_parser(
"usage", description="Manage Moonstream usage"
)
usage_parser.set_defaults(func=lambda _: usage_parser.print_help())
usage_subcommands = usage_parser.add_subparsers(description="Usage commands")
generate_usage_parser = usage_subcommands.add_parser(
"generate", description="Generate usage"
)
generate_usage_parser.add_argument(
"--month",
required=True,
type=str,
help="Month for which to generate usage in YYYY-MM format (e.g. 2021-10)",
)
generate_usage_parser.add_argument(
"--user-id",
required=False,
type=str,
help="User token for which to generate usage (not implemented yet - use user-token instead)",
)
generate_usage_parser.add_argument(
"--contracts",
required=False,
type=json.loads,
help="Contracts for which to generate usage Json format( { 'blockchain': ['contract_address',...] })",
)
generate_usage_parser.add_argument(
"--output",
required=False,
type=str,
help="Output file for usage",
)
generate_usage_parser.set_defaults(func=generate_usage_handler)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -0,0 +1,318 @@
from typing import Optional, Dict, Any, Union, List
from datetime import datetime
import json
import logging
import time
import uuid
import requests # type: ignore
import os
from ..actions import get_all_entries_from_search
from ..settings import bugout_client as bc
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN,
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
)
from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER
from bugout.data import BugoutResourceHolders, ResourcePermissions, HolderType
from web3 import Web3
from moonstream.client import (
Moonstream,
ENDPOINT_QUERIES,
MoonstreamQueryResultUrl,
)
logger = logging.getLogger(__name__)
def recive_S3_data_from_query(
client: Moonstream,
token: Union[str, uuid.UUID],
query_name: str,
params: Dict[str, Any] = {},
time_await: int = 2,
max_retries: int = 30,
custom_body: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Await the query to be update data on S3 with if_modified_since and return new the data.
"""
keep_going = True
repeat = 0
if_modified_since_datetime = datetime.utcnow()
if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT")
time.sleep(2)
if custom_body:
headers = {
"Authorization": f"Bearer {token}",
}
json = custom_body
response = requests.post(
url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data",
headers=headers,
json=json,
timeout=5,
)
data_url = MoonstreamQueryResultUrl(url=response.json()["url"])
else:
data_url = client.exec_query(
token=token,
name=query_name,
params=params,
) # S3 presign_url
while keep_going:
time.sleep(time_await)
try:
data_response = requests.get(
data_url.url,
headers={"If-Modified-Since": if_modified_since},
timeout=5,
)
except Exception as e:
logger.error(e)
continue
if data_response.status_code == 200:
break
repeat += 1
if repeat > max_retries:
logger.info("Too many retries")
break
return data_response.json()
def generate_leaderboard_owners(
leaderboards: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Get list of all leaderboard and add owners to it.
"""
leaderboard_owners = []
for leaderboard in leaderboards:
# breakpoint()
leaderboard_id = leaderboard.resource_data["leaderboard_id"]
resource_id = leaderboard.id
holders: BugoutResourceHolders = bc.get_resource_holders(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=resource_id,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
try:
owner = [
holder.id
for holder in holders.holders
if holder.holder_type == HolderType.user
and ResourcePermissions.ADMIN in holder.permissions
][0]
except Exception as e:
logger.error(e)
breakpoint()
continue
leaderboard_owners.append(
{
"leaderboard_id": leaderboard_id,
"owner": str(owner),
"resource_id": str(resource_id),
"created_at": str(leaderboard.created_at),
"updated_at": str(leaderboard.updated_at),
}
)
return leaderboard_owners
def collect_usage_information(
month: str,
user_id: Optional[str] = None,
contracts: Optional[Dict[str, List[str]]] = None,
) -> Dict[str, Any]:
"""
Collect billing information for a user.
By user_id or token.
Collected info:
Resources:
- queries
- subscriptions
- leaderboards
Contracts:
- moonstream contracts
"""
subscription_resources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
params={
"user_id": user_id,
"type": "entity_subscription",
},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
logger.info(
"Found users collection resources: %s", len(subscription_resources.resources)
)
if len(subscription_resources.resources) == 0:
subscription_amount = 0
else:
collection_id = subscription_resources.resources[0].resource_data[
"collection_id"
]
### search in subscriptions collection
subscription_collection = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
journal_id=collection_id,
content=False,
query="",
limit=1000,
)
subscription_amount = subscription_collection.total_results
logger.info("Found users subscriptions: %s", subscription_amount)
### Get user's queries resources
query_resources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
params={"user_id": user_id, "type": BUGOUT_RESOURCE_QUERY_RESOLVER},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
query_amount = len(query_resources.resources)
logger.info("Found users queries: %s", query_amount)
### Get user's leaderboards resources
leaderboard_resources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore
params={"type": "leaderboard"},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
logger.info("Geneating leaderboards owners map")
leaderboards = generate_leaderboard_owners(
leaderboards=leaderboard_resources.resources
)
# Get user leaderboards
leaderboard_configs = get_all_entries_from_search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
search_query="",
content=True,
limit=1000,
)
leaderboard_configs_mapper = {}
for leaderboard_entry in leaderboard_configs:
tags = leaderboard_entry.tags
try:
content = json.loads(leaderboard_entry.content)
except Exception as e:
logger.error(e)
continue
leaderboard_configs_mapper[content["leaderboard_id"]] = {
"query_name": content["query_name"],
"update_activated": True if "status:active" in tags else False,
}
user_leaderboards = []
for leaderboard in leaderboards:
if leaderboard["owner"] != user_id:
continue
if leaderboard["leaderboard_id"] in leaderboard_configs_mapper:
leaderboard["query_name"] = leaderboard_configs_mapper[
leaderboard["leaderboard_id"]
]["query_name"]
leaderboard["update_activated"] = leaderboard_configs_mapper[
leaderboard["leaderboard_id"]
]["update_activated"]
# get leaderboard info
leaderboard_info = requests.get(
f"https://engineapi.moonstream.to/leaderboard/info?leaderboard_id={leaderboard['leaderboard_id']}",
).json()
try:
leaderboard["users_count"] = leaderboard_info["users_count"]
leaderboard["last_updated_at"] = leaderboard_info["last_updated_at"]
except Exception as e:
logger.error(e)
leaderboard["users_count"] = 0
leaderboard["last_updated_at"] = None
user_leaderboards.append(leaderboard)
logger.info("Found users leaderboards: %s", len(user_leaderboards))
### contracts events
contract_data = {}
if contracts is not None:
client = Moonstream()
### run query
for blockchain, addresses in contracts.items():
logger.info(
f"Collecting contracts events for {blockchain} for addresses: {addresses}"
)
contracts_events = recive_S3_data_from_query(
client=client,
token=MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, # type: ignore
query_name="template_contract_events_per_month",
params={},
time_await=2,
max_retries=30,
custom_body={
"blockchain": blockchain,
"params": {
"block_month": month,
"addresses": [
Web3.toChecksumAddress(addresses) for addresses in addresses
],
},
},
)["data"]
contract_data[blockchain] = contracts_events
return {
"subscriptions": subscription_amount,
"queries": query_amount,
"leaderboards": leaderboards,
"leaderboards_amount": len(leaderboards),
"contracts": contract_data,
}

Wyświetl plik

@ -303,3 +303,20 @@ except:
raise Exception(
f"Could not parse MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS as int: {MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS_RAW}"
)
MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN = os.environ.get(
"MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN", ""
)
if MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN == "":
raise ValueError(
"MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN environment variable must be set"
)
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID = os.environ.get(
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID", ""
)
if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
)

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.3.3"
MOONSTREAMAPI_VERSION = "0.3.4"