diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 970c43df..08bee314 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -13,6 +13,13 @@ from bugout.data import BugoutResource from entity.data import EntityResponse # type: ignore from fastapi import BackgroundTasks, FastAPI from fastapi.middleware.cors import CORSMiddleware +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_label_model, + get_block_model, + get_transaction_model, +) + from sqlalchemy import text from .actions import ( @@ -229,9 +236,33 @@ async def queries_data_update_handler( logger.error(f"Unhandled query execute exception, error: {e}") raise MoonstreamHTTPException(status_code=500) + requested_query = request_data.query + + if request_data.blockchain: + if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: + logger.error(f"Unknown blockchain {request_data.blockchain}") + raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain") + + blockchain = AvailableBlockchainType(request_data.blockchain) + + requested_query = ( + requested_query.replace( + "__transactions_table__", + get_transaction_model(blockchain).__tablename__, + ) + .replace( + "__blocks_table__", + get_block_model(blockchain).__tablename__, + ) + .replace( + "__labels_table__", + get_label_model(blockchain).__tablename__, + ) + ) + # Check if it can transform to TextClause try: - query = text(request_data.query) + query = text(requested_query) except Exception as e: logger.error( f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}" diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 7a923225..3cef9a75 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field @@ -49,6 +49,7 @@ class QueryDataUpdate(BaseModel): file_type: str query: str params: Dict[str, Any] = Field(default_factory=dict) + blockchain: Optional[str] = None class TokenURIs(BaseModel): diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index a765e820..5ede37dd 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -15,7 +15,7 @@ from moonstreamdb.db import SessionLocal from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID from ..web3_provider import yield_web3_provider -from . import subscription_types, subscriptions, moonworm_tasks +from . import subscription_types, subscriptions, moonworm_tasks, queries from .migrations import ( checksum_address, update_dashboard_subscription_key, @@ -478,6 +478,33 @@ This CLI is configured to work with the following API URLs: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) + + + queries_parser = subcommands.add_parser( + "queries", description="Manage Moonstream queries" + ) + queries_parser.set_defaults(func=lambda _: queries_parser.print_help()) + + + + queries_subcommands = queries_parser.add_subparsers( + description="Query commands" + ) + + create_query_parser = queries_subcommands.add_parser( + "create-template", description="Create query template" + ) + + create_query_parser.add_argument( + "--query-file", + required=True, + type=argparse.FileType("r"), + help="File containing the query to add", + ) + create_query_parser.add_argument( + "-n", "--name", required=True, help="Name for the new query" + ) + create_query_parser.set_defaults(func=queries.create_query_template) args = parser.parse_args() args.func(args) diff --git a/moonstreamapi/moonstreamapi/admin/queries.py b/moonstreamapi/moonstreamapi/admin/queries.py new file mode 100644 index 00000000..9828a0fa --- /dev/null +++ b/moonstreamapi/moonstreamapi/admin/queries.py @@ -0,0 +1,84 @@ +import argparse +from collections import Counter +import json + +from bugout.data import BugoutResources +from bugout.exceptions import BugoutResponseException +from moonstream.client import Moonstream # type: ignore +import logging +from typing import Dict, Any +import textwrap +from sqlalchemy import text + + +from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER +from ..settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_QUERIES_JOURNAL_ID, +) +from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE +from ..actions import get_all_entries_from_search, name_normalization + + +logger = logging.getLogger(__name__) + + + +def create_query_template(args: argparse.Namespace) -> None: + """ + Create query template for all queries resources. + """ + + query = "" + with args.query_file: + query = textwrap.indent(args.query_file.read(), " ") + + ### Create query template + + name = f"template_{name_normalization(args.name)}" + + try: + + entry = bc.create_entry( + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + title=args.name, + content=query, + tags=["query_template", f"query_url:{name}"], + context_id=name, + context_type=MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE, + context_url=name, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + except BugoutResponseException as err: + logger.error(f"Failed to create query template: {err}") + return + except Exception as err: + logger.error(f"Failed to create query template: {err}") + return + + logger.info(f"Query template created: {entry.id}") + logger.info(f"Query template created url name: {name}") + + + ### Add query id + + try: + bc.create_tags( + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + entry_id=entry.id, + tags=[f"query_id:{entry.id}"], + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + except BugoutResponseException as err: + logger.error(f"Failed to add query id: {err}") + return + except Exception as err: + logger.error(f"Failed to add query id: {err}") + return + + logger.info(f"Query created: {json.dumps(entry.dict(), indent=4)}") \ No newline at end of file diff --git a/moonstreamapi/moonstreamapi/data.py b/moonstreamapi/moonstreamapi/data.py index ac67968e..e61e86c0 100644 --- a/moonstreamapi/moonstreamapi/data.py +++ b/moonstreamapi/moonstreamapi/data.py @@ -271,6 +271,7 @@ class DashboardUpdate(BaseModel): class UpdateDataRequest(BaseModel): params: Dict[str, Any] = Field(default_factory=dict) + blockchain: Optional[str] = None class UpdateQueryRequest(BaseModel): @@ -296,3 +297,7 @@ class QueryInfoResponse(BaseModel): parameters: Dict[str, Any] = Field(default_factory=dict) created_at: Optional[datetime] updated_at: Optional[datetime] + +class SuggestedQueriesResponse(BaseModel): + interfaces: Dict[str, Any] = Field(default_factory=dict) + queries: List[Any] = Field(default_factory=list) \ No newline at end of file diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index 97c71de7..ec9f3e34 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -10,6 +10,7 @@ from bugout.data import BugoutResources, BugoutJournalEntryContent, BugoutJourna from bugout.exceptions import BugoutResponseException from fastapi import APIRouter, Body, Request import requests # type: ignore +from moonstreamdb.blockchain import AvailableBlockchainType from sqlalchemy import text @@ -31,7 +32,7 @@ from ..settings import ( MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, MOONSTREAM_QUERIES_JOURNAL_ID, ) -from ..settings import bugout_client as bc +from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE logger = logging.getLogger(__name__) @@ -154,6 +155,66 @@ async def create_query_handler( return entry + +@router.get("/templates", tags=["queries"]) +def get_suggested_queries( + supported_interfaces: Optional[List[str]] = None, + address: Optional[str] = None, + title: Optional[str] = None, + limit: int = 10, +) -> data.SuggestedQueriesResponse: + """ + Return set of suggested queries for user + """ + + filters = ["tag:approved", "tag:query_template"] + + if supported_interfaces: + filters.extend( + [f"?#interface:{interface}" for interface in supported_interfaces] + ) + + if address: + filters.append(f"?#address:{address}") + + if title: + filters.append(title) + + query = " ".join(filters) + + try: + queries = bc.search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + query=query, + limit=limit, + timeout=5, + ) + except BugoutResponseException as e: + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + # make split by interfaces + + interfaces: Dict[str, Any] = {} + + for entry in queries.results: + for tag in entry.tags: + if tag.startswith("interface:"): + interface = tag.split(":")[1] + + if interface not in interfaces: + interfaces[interface] = [] + + interfaces[interface].append(entry) + + return data.SuggestedQueriesResponse( + queries=queries.results, + interfaces=interfaces, + ) + + @router.get("/{query_name}/query", tags=["queries"]) async def get_query_handler( request: Request, query_name: str @@ -272,64 +333,102 @@ async def update_query_data_handler( token = request.state.token - try: - query_id = get_query_by_name(query_name, token) - except NameNormalizationException: - raise MoonstreamHTTPException( - status_code=403, - detail=f"Provided query name can't be normalize please select different.", - ) - except Exception as e: - raise MoonstreamHTTPException(status_code=500, internal_error=e) + if request_update.blockchain: + try: + AvailableBlockchainType(request_update.blockchain) + except ValueError: + raise MoonstreamHTTPException( + status_code=400, + detail=f"Provided blockchain is not supported.", + ) + + # normalize query name + + query_name_normalized = name_normalization(query_name) + + # check in templates try: entries = bc.search( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, - query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", + query=f"tag:query_template tag:query_url:{query_name_normalized}", + filters=[ + f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}", + ], limit=1, - timeout=5, ) + except BugoutResponseException as e: + logger.error(f"Error in get query: {str(e)}") + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + if len(entries.results) == 0: + try: + query_id = get_query_by_name(query_name, token) + except NameNormalizationException: + raise MoonstreamHTTPException( + status_code=403, + detail=f"Provided query name can't be normalize please select different.", + ) + except Exception as e: + logger.error(f"Error in get query: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + try: + entries = bc.search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", + limit=1, + timeout=5, + ) + except BugoutResponseException as e: + logger.error(f"Error in get query: {str(e)}") + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise MoonstreamHTTPException(status_code=500, internal_error=e) if len(entries.results) == 0: raise MoonstreamHTTPException( status_code=403, detail="Query not approved yet." ) + else: + query_id = entries.results[0].entry_url.split("/")[-1] - s3_response = None + s3_response = None - if entries.results[0].content: - content = entries.results[0].content + if entries.results[0].content: + content = entries.results[0].content - tags = entries.results[0].tags + tags = entries.results[0].tags - file_type = "json" + file_type = "json" - if "ext:csv" in tags: - file_type = "csv" + if "ext:csv" in tags: + file_type = "csv" - responce = requests.post( - f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", - json={ - "query": content, - "params": request_update.params, - "file_type": file_type, - }, - timeout=5, + responce = requests.post( + f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", + json={ + "query": content, + "params": request_update.params, + "file_type": file_type, + "blockchain": request_update.blockchain + if request_update.blockchain + else None, + }, + timeout=5, + ) + + if responce.status_code != 200: + raise MoonstreamHTTPException( + status_code=responce.status_code, + detail=responce.text, ) - if responce.status_code != 200: - raise MoonstreamHTTPException( - status_code=responce.status_code, - detail=responce.text, - ) - - s3_response = data.QueryPresignUrl(**responce.json()) - except BugoutResponseException as e: - logger.error(f"Error in updating query: {str(e)}") - raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) - except Exception as e: - raise MoonstreamHTTPException(status_code=500, internal_error=e) + s3_response = data.QueryPresignUrl(**responce.json()) return s3_response @@ -348,29 +447,57 @@ async def get_access_link_handler( token = request.state.token - try: - query_id = get_query_by_name(query_name, token) - except NameNormalizationException: - raise MoonstreamHTTPException( - status_code=403, - detail=f"Provided query name can't be normalize please select different.", - ) - except Exception as e: - logger.error(f"Error in get query: {str(e)}") - raise MoonstreamHTTPException(status_code=500, internal_error=e) + # normalize query name + + query_name_normalized = name_normalization(query_name) + + # check in templattes try: entries = bc.search( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, - query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", + query=f"tag:query_template tag:query_url:{query_name_normalized}", + filters=[ + f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}" + ], limit=1, - timeout=5, ) + except BugoutResponseException as e: + logger.error(f"Error in get query: {str(e)}") + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + if len(entries.results) == 0: + + query_id = get_query_by_name(query_name, token) + + try: + + entries = bc.search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", + limit=1, + timeout=5, + ) + except BugoutResponseException as e: + logger.error(f"Error in get query: {str(e)}") + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + if len(entries.results) == 0: + raise MoonstreamHTTPException( + status_code=403, detail="Query not approved yet." + ) + + try: s3_response = None - if entries.results and entries.results[0].content: + if entries.results[0].content: passed_params = dict(request_update.params) tags = entries.results[0].tags @@ -393,9 +520,6 @@ async def get_access_link_handler( http_method="GET", ) s3_response = data.QueryPresignUrl(url=stats_presigned_url) - except BugoutResponseException as e: - logger.error(f"Error in get access link: {str(e)}") - raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: logger.error(f"Error in get access link: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -449,4 +573,4 @@ async def remove_query_handler( except Exception as e: raise MoonstreamHTTPException(status_code=500, internal_error=e) - return entry + return entry \ No newline at end of file diff --git a/moonstreamapi/moonstreamapi/settings.py b/moonstreamapi/moonstreamapi/settings.py index b988a85c..052cbb4b 100644 --- a/moonstreamapi/moonstreamapi/settings.py +++ b/moonstreamapi/moonstreamapi/settings.py @@ -129,3 +129,7 @@ if MOONSTREAM_S3_QUERIES_BUCKET_PREFIX == "": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + +### Moonstream queries + +MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE = "moonstream_query_template" diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 286247fc..fbc7f516 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.2.5" +MOONSTREAMAPI_VERSION = "0.2.6" diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index dc940b79..9602d99a 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -13,7 +13,7 @@ setup( install_requires=[ "appdirs", "boto3", - "bugout>=0.1.19", + "bugout>=0.2.9", "moonstream-entity>=0.0.5", "fastapi", "moonstreamdb>=0.3.3",