diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 12a96d4d..16bf60f5 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -1,6 +1,7 @@ """ Moonstream CLI """ + import argparse import json import logging @@ -20,7 +21,7 @@ from ..settings import ( ) from ..web3_provider import yield_web3_provider -from . import subscription_types, subscriptions, moonworm_tasks, queries +from . import subscription_types, subscriptions, moonworm_tasks, queries, usage from .migrations import ( checksum_address, update_dashboard_subscription_key, @@ -257,6 +258,25 @@ def moonworm_tasks_add_subscription_handler(args: argparse.Namespace) -> None: moonworm_tasks.add_subscription(args.id) +def generate_usage_handler(args: argparse.Namespace) -> None: + usage_info = usage.collect_usage_information( + month=args.month, + user_id=args.user_id, + contracts=args.contracts, + ) + + if args.output is not None: + # create path if not exists + + if not os.path.exists(os.path.dirname(args.output)): + os.makedirs(os.path.dirname(args.output)) + + with open(args.output, "w") as output_file: + output_file.write(json.dumps(usage_info, indent=4)) + else: + logger.info(json.dumps(usage_info, indent=4)) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -532,6 +552,48 @@ This CLI is configured to work with the following API URLs: "-n", "--name", required=True, help="Name for the new query" ) create_query_parser.set_defaults(func=queries.create_query_template) + + usage_parser = subcommands.add_parser( + "usage", description="Manage Moonstream usage" + ) + + usage_parser.set_defaults(func=lambda _: usage_parser.print_help()) + + usage_subcommands = usage_parser.add_subparsers(description="Usage commands") + + generate_usage_parser = usage_subcommands.add_parser( + "generate", description="Generate usage" + ) + + generate_usage_parser.add_argument( + "--month", + required=True, + type=str, + help="Month for which to generate usage in YYYY-MM format (e.g. 2021-10)", + ) + + generate_usage_parser.add_argument( + "--user-id", + required=False, + type=str, + help="User token for which to generate usage (not implemented yet - use user-token instead)", + ) + generate_usage_parser.add_argument( + "--contracts", + required=False, + type=json.loads, + help="Contracts for which to generate usage Json format( { 'blockchain': ['contract_address',...] })", + ) + + generate_usage_parser.add_argument( + "--output", + required=False, + type=str, + help="Output file for usage", + ) + + generate_usage_parser.set_defaults(func=generate_usage_handler) + args = parser.parse_args() args.func(args) diff --git a/moonstreamapi/moonstreamapi/admin/usage.py b/moonstreamapi/moonstreamapi/admin/usage.py new file mode 100644 index 00000000..372fbf35 --- /dev/null +++ b/moonstreamapi/moonstreamapi/admin/usage.py @@ -0,0 +1,318 @@ +from typing import Optional, Dict, Any, Union, List +from datetime import datetime +import json +import logging +import time +import uuid +import requests # type: ignore +import os + +from ..actions import get_all_entries_from_search +from ..settings import bugout_client as bc +from ..settings import ( + MOONSTREAM_ADMIN_ACCESS_TOKEN, + BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, + MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID, +) +from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER + + +from bugout.data import BugoutResourceHolders, ResourcePermissions, HolderType +from web3 import Web3 +from moonstream.client import ( + Moonstream, + ENDPOINT_QUERIES, + MoonstreamQueryResultUrl, +) + + +logger = logging.getLogger(__name__) + + +def recive_S3_data_from_query( + client: Moonstream, + token: Union[str, uuid.UUID], + query_name: str, + params: Dict[str, Any] = {}, + time_await: int = 2, + max_retries: int = 30, + custom_body: Optional[Dict[str, Any]] = None, +) -> Any: + """ + Await the query to be update data on S3 with if_modified_since and return new the data. + """ + + keep_going = True + + repeat = 0 + + if_modified_since_datetime = datetime.utcnow() + if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") + + time.sleep(2) + if custom_body: + headers = { + "Authorization": f"Bearer {token}", + } + json = custom_body + + response = requests.post( + url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", + headers=headers, + json=json, + timeout=5, + ) + data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) + else: + data_url = client.exec_query( + token=token, + name=query_name, + params=params, + ) # S3 presign_url + + while keep_going: + time.sleep(time_await) + try: + data_response = requests.get( + data_url.url, + headers={"If-Modified-Since": if_modified_since}, + timeout=5, + ) + except Exception as e: + logger.error(e) + continue + + if data_response.status_code == 200: + break + + repeat += 1 + + if repeat > max_retries: + logger.info("Too many retries") + break + return data_response.json() + + +def generate_leaderboard_owners( + leaderboards: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """ + Get list of all leaderboard and add owners to it. + """ + + leaderboard_owners = [] + + for leaderboard in leaderboards: + # breakpoint() + leaderboard_id = leaderboard.resource_data["leaderboard_id"] + resource_id = leaderboard.id + + holders: BugoutResourceHolders = bc.get_resource_holders( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + resource_id=resource_id, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + try: + owner = [ + holder.id + for holder in holders.holders + if holder.holder_type == HolderType.user + and ResourcePermissions.ADMIN in holder.permissions + ][0] + except Exception as e: + logger.error(e) + breakpoint() + continue + + leaderboard_owners.append( + { + "leaderboard_id": leaderboard_id, + "owner": str(owner), + "resource_id": str(resource_id), + "created_at": str(leaderboard.created_at), + "updated_at": str(leaderboard.updated_at), + } + ) + + return leaderboard_owners + + +def collect_usage_information( + month: str, + user_id: Optional[str] = None, + contracts: Optional[Dict[str, List[str]]] = None, +) -> Dict[str, Any]: + """ + Collect billing information for a user. + + By user_id or token. + + Collected info: + + Resources: + - queries + - subscriptions + - leaderboards + + Contracts: + - moonstream contracts + """ + + subscription_resources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + params={ + "user_id": user_id, + "type": "entity_subscription", + }, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + logger.info( + "Found users collection resources: %s", len(subscription_resources.resources) + ) + + if len(subscription_resources.resources) == 0: + subscription_amount = 0 + else: + collection_id = subscription_resources.resources[0].resource_data[ + "collection_id" + ] + + ### search in subscriptions collection + + subscription_collection = bc.search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + journal_id=collection_id, + content=False, + query="", + limit=1000, + ) + + subscription_amount = subscription_collection.total_results + + logger.info("Found users subscriptions: %s", subscription_amount) + + ### Get user's queries resources + + query_resources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + params={"user_id": user_id, "type": BUGOUT_RESOURCE_QUERY_RESOLVER}, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + query_amount = len(query_resources.resources) + + logger.info("Found users queries: %s", query_amount) + + ### Get user's leaderboards resources + leaderboard_resources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore + params={"type": "leaderboard"}, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + logger.info("Geneating leaderboards owners map") + + leaderboards = generate_leaderboard_owners( + leaderboards=leaderboard_resources.resources + ) + + # Get user leaderboards + + leaderboard_configs = get_all_entries_from_search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID, + search_query="", + content=True, + limit=1000, + ) + + leaderboard_configs_mapper = {} + + for leaderboard_entry in leaderboard_configs: + tags = leaderboard_entry.tags + try: + content = json.loads(leaderboard_entry.content) + except Exception as e: + logger.error(e) + continue + + leaderboard_configs_mapper[content["leaderboard_id"]] = { + "query_name": content["query_name"], + "update_activated": True if "status:active" in tags else False, + } + + user_leaderboards = [] + + for leaderboard in leaderboards: + + if leaderboard["owner"] != user_id: + continue + + if leaderboard["leaderboard_id"] in leaderboard_configs_mapper: + leaderboard["query_name"] = leaderboard_configs_mapper[ + leaderboard["leaderboard_id"] + ]["query_name"] + leaderboard["update_activated"] = leaderboard_configs_mapper[ + leaderboard["leaderboard_id"] + ]["update_activated"] + + # get leaderboard info + + leaderboard_info = requests.get( + f"https://engineapi.moonstream.to/leaderboard/info?leaderboard_id={leaderboard['leaderboard_id']}", + ).json() + try: + leaderboard["users_count"] = leaderboard_info["users_count"] + leaderboard["last_updated_at"] = leaderboard_info["last_updated_at"] + except Exception as e: + logger.error(e) + leaderboard["users_count"] = 0 + leaderboard["last_updated_at"] = None + + user_leaderboards.append(leaderboard) + + logger.info("Found users leaderboards: %s", len(user_leaderboards)) + + ### contracts events + + contract_data = {} + + if contracts is not None: + client = Moonstream() + + ### run query + + for blockchain, addresses in contracts.items(): + logger.info( + f"Collecting contracts events for {blockchain} for addresses: {addresses}" + ) + contracts_events = recive_S3_data_from_query( + client=client, + token=MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, # type: ignore + query_name="template_contract_events_per_month", + params={}, + time_await=2, + max_retries=30, + custom_body={ + "blockchain": blockchain, + "params": { + "block_month": month, + "addresses": [ + Web3.toChecksumAddress(addresses) for addresses in addresses + ], + }, + }, + )["data"] + + contract_data[blockchain] = contracts_events + + return { + "subscriptions": subscription_amount, + "queries": query_amount, + "leaderboards": leaderboards, + "leaderboards_amount": len(leaderboards), + "contracts": contract_data, + } diff --git a/moonstreamapi/moonstreamapi/settings.py b/moonstreamapi/moonstreamapi/settings.py index 37ca5c58..612a7584 100644 --- a/moonstreamapi/moonstreamapi/settings.py +++ b/moonstreamapi/moonstreamapi/settings.py @@ -303,3 +303,20 @@ except: raise Exception( f"Could not parse MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS as int: {MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS_RAW}" ) + +MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN = os.environ.get( + "MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN", "" +) +if MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN == "": + raise ValueError( + "MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN environment variable must be set" + ) + + +MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID = os.environ.get( + "MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID", "" +) +if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "": + raise ValueError( + "MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set" + ) diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index bb58c7f4..d450487b 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.3.3" +MOONSTREAMAPI_VERSION = "0.3.4"