import json import logging import textwrap import time import uuid from datetime import datetime from typing import Any, Dict, List, Optional, Union import requests # type: ignore from bugout.data import BugoutResourceHolders, HolderType, ResourcePermissions from moonstream.client import ENDPOINT_QUERIES, Moonstream, MoonstreamQueryResultUrl from web3 import Web3 from ..actions import get_all_entries_from_search from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER from ..settings import ( BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID, MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, MOONSTREAM_USAGE_REPORTS_JOURNAL_ID, ) from ..settings import bugout_client as bc logger = logging.getLogger(__name__) def recive_S3_data_from_query( client: Moonstream, token: Union[str, uuid.UUID], query_name: str, params: Dict[str, Any] = {}, time_await: int = 2, max_retries: int = 30, custom_body: Optional[Dict[str, Any]] = None, ) -> Any: """ Await the query to be update data on S3 with if_modified_since and return new the data. """ keep_going = True repeat = 0 if_modified_since_datetime = datetime.utcnow() if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") time.sleep(2) if custom_body: headers = { "Authorization": f"Bearer {token}", } json = custom_body response = requests.post( url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", headers=headers, json=json, timeout=5, ) data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) else: data_url = client.exec_query( token=token, name=query_name, params=params, ) # S3 presign_url while keep_going: time.sleep(time_await) try: data_response = requests.get( data_url.url, headers={"If-Modified-Since": if_modified_since}, timeout=5, ) except Exception as e: logger.error(e) continue if data_response.status_code == 200: break repeat += 1 if repeat > max_retries: logger.info("Too many retries") break return data_response.json() def generate_leaderboard_owners( leaderboards: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """ Get list of all leaderboard and add owners to it. """ leaderboards_cache = {} leaderboard_owners = [] ### Get leaderboard owners cache entry entries = [] if MOONSTREAM_USAGE_REPORTS_JOURNAL_ID is not None: try: entries = get_all_entries_from_search( journal_id=MOONSTREAM_USAGE_REPORTS_JOURNAL_ID, search_query=f"tag:leaderboard_owners tag:cache", limit=100, token=MOONSTREAM_ADMIN_ACCESS_TOKEN, content=True, ) except Exception as e: logger.error(f"Error getting leaderboard_owners_cache entry: {e}") if len(entries) > 0: try: leaderboards_cache = json.loads(entries[0].content) except Exception as e: logger.error(f"Error loading leaderboard_owners_cache: {e}") for leaderboard in leaderboards: leaderboard_id = leaderboard.resource_data["leaderboard_id"] resource_id = leaderboard.id if leaderboard_id in leaderboards_cache: leaderboard_owners.append(leaderboards_cache[leaderboard_id]) continue holders: BugoutResourceHolders = bc.get_resource_holders( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, resource_id=resource_id, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) try: owner = [ holder.id for holder in holders.holders if holder.holder_type == HolderType.user and ResourcePermissions.ADMIN in holder.permissions ][0] except Exception as e: logger.error(e) breakpoint() continue leaderboard_owners.append( { "leaderboard_id": leaderboard_id, "owner": str(owner), "resource_id": str(resource_id), "created_at": str(leaderboard.created_at), "updated_at": str(leaderboard.updated_at), } ) leaderboards_cache[leaderboard_id] = { "leaderboard_id": leaderboard_id, "owner": str(owner), "resource_id": str(resource_id), "created_at": str(leaderboard.created_at), "updated_at": str(leaderboard.updated_at), } ### update cache if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID is not None: if len(entries) > 0: try: bc.update_entry_content( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_USAGE_REPORTS_JOURNAL_ID, entry_id=entries[0].entry_url.split("/")[-1], content=textwrap.indent( json.dumps(leaderboards_cache, indent=4), " " ), title=entries[0].title, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) except Exception as e: logger.error( f"Error updating leaderboard_owners_cache entry: {e} continue..." ) else: title = "leaderboard_owners_cache" tags = [ "leaderboard_owners", "cache", ] try: report_entry = bc.create_entry( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_USAGE_REPORTS_JOURNAL_ID, title=title, content=textwrap.indent( json.dumps(leaderboards_cache, indent=4), " " ), tags=tags, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) except Exception as e: logger.error( f"Error creating leaderboard_owners_cache entry: {e} continue..." ) return leaderboard_owners def collect_usage_information( month: str, user_id: Optional[str] = None, contracts: Optional[Dict[str, List[str]]] = None, ) -> Dict[str, Any]: """ Collect billing information for a user. By user_id or token. Collected info: Resources: - queries - subscriptions - leaderboards Contracts: - moonstream contracts """ subscription_resources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore params={ "user_id": user_id, "type": "entity_subscription", }, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) logger.info( "Found users collection resources: %s", len(subscription_resources.resources) ) if len(subscription_resources.resources) == 0: subscription_amount = 0 else: collection_id = subscription_resources.resources[0].resource_data[ "collection_id" ] ### search in subscriptions collection subscription_collection = bc.search( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore journal_id=collection_id, content=False, query="", limit=1000, ) subscription_amount = subscription_collection.total_results logger.info("Found users subscriptions: %s", subscription_amount) ### Get user's queries resources query_resources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore params={"user_id": user_id, "type": BUGOUT_RESOURCE_QUERY_RESOLVER}, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) query_amount = len(query_resources.resources) logger.info("Found users queries: %s", query_amount) ### Get user's leaderboards resources leaderboard_resources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, # type: ignore params={"type": "leaderboard"}, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) logger.info("Geneating leaderboards owners map") leaderboards = generate_leaderboard_owners( leaderboards=leaderboard_resources.resources ) # Get user leaderboards leaderboard_configs = get_all_entries_from_search( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID, search_query="", content=True, limit=1000, ) leaderboard_configs_mapper = {} for leaderboard_entry in leaderboard_configs: tags = leaderboard_entry.tags try: content = json.loads(leaderboard_entry.content) except Exception as e: logger.error(e) continue leaderboard_configs_mapper[content["leaderboard_id"]] = { "query_name": content["query_name"], "update_activated": True if "status:active" in tags else False, } logger.info("Found leaderboards: %s", len(leaderboards)) logger.info("Fill leaderboards with users data") user_leaderboards = [] for leaderboard in leaderboards: if leaderboard["owner"] != user_id: continue if leaderboard["leaderboard_id"] in leaderboard_configs_mapper: leaderboard["query_name"] = leaderboard_configs_mapper[ leaderboard["leaderboard_id"] ]["query_name"] leaderboard["update_activated"] = leaderboard_configs_mapper[ leaderboard["leaderboard_id"] ]["update_activated"] # get leaderboard info leaderboard_info = requests.get( f"https://engineapi.moonstream.to/leaderboard/info?leaderboard_id={leaderboard['leaderboard_id']}", ).json() try: leaderboard["users_count"] = leaderboard_info["users_count"] leaderboard["last_updated_at"] = leaderboard_info["last_updated_at"] except Exception as e: logger.error(e) leaderboard["users_count"] = 0 leaderboard["last_updated_at"] = None user_leaderboards.append(leaderboard) logger.info("Found users leaderboards: %s", len(user_leaderboards)) ### contracts events contract_data = {} if contracts is not None: client = Moonstream() ### run query for blockchain, addresses in contracts.items(): logger.info( f"Collecting contracts events for {blockchain} for addresses: {addresses}" ) contracts_events = recive_S3_data_from_query( client=client, token=MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN, # type: ignore query_name="template_contract_events_per_month", params={}, time_await=2, max_retries=30, custom_body={ "blockchain": blockchain, "params": { "block_month": month, "addresses": [ Web3.toChecksumAddress(addresses) for addresses in addresses ], }, }, )["data"] contract_data[blockchain] = contracts_events return { "subscriptions": subscription_amount, "queries": query_amount, "leaderboards": leaderboards, "leaderboards_amount": len(leaderboards), "contracts": contract_data, } def push_report_to_bugout_journal( name: str, user_id: str, month: str, journal_id: str, report: Dict[str, Any], token: str, ) -> None: """ Push report to bugout journal. """ ### search by month if entry already exists entries = get_all_entries_from_search( journal_id=journal_id, search_query=f"tag:month:{month} tag:report tag:user_id:{user_id}", limit=100, token=token, ) if len(entries) > 0: entry = entries[0] ### ensure additional tags tags = entry.tags if f"customer:{name}" not in tags: tags.append(f"customer:{name}") if "moonstream" not in tags: tags.append("moonstream") bc.update_entry_content( token=token, journal_id=journal_id, entry_id=entry.entry_url.split("/")[-1], content=textwrap.indent(json.dumps(report, indent=4), " "), title=entry.title, tags=tags, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) logger.info("Report entry updated: %s", entry.entry_url) else: title = f"{name} - {month} - {user_id}" tags = [ f"month:{month}", "report", f"user_id:{user_id}", "moonstream", f"customer:{name}", ] report_entry = bc.create_entry( token=token, journal_id=journal_id, title=title, content=textwrap.indent(json.dumps(report, indent=4), " "), tags=tags, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) logger.info("Report entry created: %s", report_entry.id)