Merge pull request #820 from moonstream-to/queries-template

Queries template
pull/801/merge
Andrey Dolgolev 2023-06-19 16:16:39 +03:00 zatwierdzone przez GitHub
commit d3b2e350f5
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
9 zmienionych plików z 337 dodań i 61 usunięć

Wyświetl plik

@ -13,6 +13,13 @@ from bugout.data import BugoutResource
from entity.data import EntityResponse # type: ignore from entity.data import EntityResponse # type: ignore
from fastapi import BackgroundTasks, FastAPI from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
get_transaction_model,
)
from sqlalchemy import text from sqlalchemy import text
from .actions import ( from .actions import (
@ -229,9 +236,33 @@ async def queries_data_update_handler(
logger.error(f"Unhandled query execute exception, error: {e}") logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500) raise MoonstreamHTTPException(status_code=500)
requested_query = request_data.query
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}")
raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain")
blockchain = AvailableBlockchainType(request_data.blockchain)
requested_query = (
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
.replace(
"__labels_table__",
get_label_model(blockchain).__tablename__,
)
)
# Check if it can transform to TextClause # Check if it can transform to TextClause
try: try:
query = text(request_data.query) query = text(requested_query)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}" f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}"

Wyświetl plik

@ -1,7 +1,7 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -49,6 +49,7 @@ class QueryDataUpdate(BaseModel):
file_type: str file_type: str
query: str query: str
params: Dict[str, Any] = Field(default_factory=dict) params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
class TokenURIs(BaseModel): class TokenURIs(BaseModel):

Wyświetl plik

@ -15,7 +15,7 @@ from moonstreamdb.db import SessionLocal
from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..web3_provider import yield_web3_provider from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions, moonworm_tasks from . import subscription_types, subscriptions, moonworm_tasks, queries
from .migrations import ( from .migrations import (
checksum_address, checksum_address,
update_dashboard_subscription_key, update_dashboard_subscription_key,
@ -478,6 +478,33 @@ This CLI is configured to work with the following API URLs:
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
queries_parser = subcommands.add_parser(
"queries", description="Manage Moonstream queries"
)
queries_parser.set_defaults(func=lambda _: queries_parser.print_help())
queries_subcommands = queries_parser.add_subparsers(
description="Query commands"
)
create_query_parser = queries_subcommands.add_parser(
"create-template", description="Create query template"
)
create_query_parser.add_argument(
"--query-file",
required=True,
type=argparse.FileType("r"),
help="File containing the query to add",
)
create_query_parser.add_argument(
"-n", "--name", required=True, help="Name for the new query"
)
create_query_parser.set_defaults(func=queries.create_query_template)
args = parser.parse_args() args = parser.parse_args()
args.func(args) args.func(args)

Wyświetl plik

@ -0,0 +1,84 @@
import argparse
from collections import Counter
import json
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from moonstream.client import Moonstream # type: ignore
import logging
from typing import Dict, Any
import textwrap
from sqlalchemy import text
from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_QUERIES_JOURNAL_ID,
)
from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
from ..actions import get_all_entries_from_search, name_normalization
logger = logging.getLogger(__name__)
def create_query_template(args: argparse.Namespace) -> None:
"""
Create query template for all queries resources.
"""
query = ""
with args.query_file:
query = textwrap.indent(args.query_file.read(), " ")
### Create query template
name = f"template_{name_normalization(args.name)}"
try:
entry = bc.create_entry(
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
title=args.name,
content=query,
tags=["query_template", f"query_url:{name}"],
context_id=name,
context_type=MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
context_url=name,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as err:
logger.error(f"Failed to create query template: {err}")
return
except Exception as err:
logger.error(f"Failed to create query template: {err}")
return
logger.info(f"Query template created: {entry.id}")
logger.info(f"Query template created url name: {name}")
### Add query id
try:
bc.create_tags(
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
entry_id=entry.id,
tags=[f"query_id:{entry.id}"],
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as err:
logger.error(f"Failed to add query id: {err}")
return
except Exception as err:
logger.error(f"Failed to add query id: {err}")
return
logger.info(f"Query created: {json.dumps(entry.dict(), indent=4)}")

Wyświetl plik

@ -271,6 +271,7 @@ class DashboardUpdate(BaseModel):
class UpdateDataRequest(BaseModel): class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict) params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
class UpdateQueryRequest(BaseModel): class UpdateQueryRequest(BaseModel):
@ -296,3 +297,7 @@ class QueryInfoResponse(BaseModel):
parameters: Dict[str, Any] = Field(default_factory=dict) parameters: Dict[str, Any] = Field(default_factory=dict)
created_at: Optional[datetime] created_at: Optional[datetime]
updated_at: Optional[datetime] updated_at: Optional[datetime]
class SuggestedQueriesResponse(BaseModel):
interfaces: Dict[str, Any] = Field(default_factory=dict)
queries: List[Any] = Field(default_factory=list)

Wyświetl plik

@ -10,6 +10,7 @@ from bugout.data import BugoutResources, BugoutJournalEntryContent, BugoutJourna
from bugout.exceptions import BugoutResponseException from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Request from fastapi import APIRouter, Body, Request
import requests # type: ignore import requests # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from sqlalchemy import text from sqlalchemy import text
@ -31,7 +32,7 @@ from ..settings import (
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_QUERIES_JOURNAL_ID, MOONSTREAM_QUERIES_JOURNAL_ID,
) )
from ..settings import bugout_client as bc from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -154,6 +155,66 @@ async def create_query_handler(
return entry return entry
@router.get("/templates", tags=["queries"])
def get_suggested_queries(
supported_interfaces: Optional[List[str]] = None,
address: Optional[str] = None,
title: Optional[str] = None,
limit: int = 10,
) -> data.SuggestedQueriesResponse:
"""
Return set of suggested queries for user
"""
filters = ["tag:approved", "tag:query_template"]
if supported_interfaces:
filters.extend(
[f"?#interface:{interface}" for interface in supported_interfaces]
)
if address:
filters.append(f"?#address:{address}")
if title:
filters.append(title)
query = " ".join(filters)
try:
queries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=query,
limit=limit,
timeout=5,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# make split by interfaces
interfaces: Dict[str, Any] = {}
for entry in queries.results:
for tag in entry.tags:
if tag.startswith("interface:"):
interface = tag.split(":")[1]
if interface not in interfaces:
interfaces[interface] = []
interfaces[interface].append(entry)
return data.SuggestedQueriesResponse(
queries=queries.results,
interfaces=interfaces,
)
@router.get("/{query_name}/query", tags=["queries"]) @router.get("/{query_name}/query", tags=["queries"])
async def get_query_handler( async def get_query_handler(
request: Request, query_name: str request: Request, query_name: str
@ -272,64 +333,102 @@ async def update_query_data_handler(
token = request.state.token token = request.state.token
try: if request_update.blockchain:
query_id = get_query_by_name(query_name, token) try:
except NameNormalizationException: AvailableBlockchainType(request_update.blockchain)
raise MoonstreamHTTPException( except ValueError:
status_code=403, raise MoonstreamHTTPException(
detail=f"Provided query name can't be normalize please select different.", status_code=400,
) detail=f"Provided blockchain is not supported.",
except Exception as e: )
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# normalize query name
query_name_normalized = name_normalization(query_name)
# check in templates
try: try:
entries = bc.search( entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", query=f"tag:query_template tag:query_url:{query_name_normalized}",
filters=[
f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}",
],
limit=1, limit=1,
timeout=5,
) )
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
try:
query_id = get_query_by_name(query_name, token)
except NameNormalizationException:
raise MoonstreamHTTPException(
status_code=403,
detail=f"Provided query name can't be normalize please select different.",
)
except Exception as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0: if len(entries.results) == 0:
raise MoonstreamHTTPException( raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet." status_code=403, detail="Query not approved yet."
) )
else:
query_id = entries.results[0].entry_url.split("/")[-1]
s3_response = None s3_response = None
if entries.results[0].content: if entries.results[0].content:
content = entries.results[0].content content = entries.results[0].content
tags = entries.results[0].tags tags = entries.results[0].tags
file_type = "json" file_type = "json"
if "ext:csv" in tags: if "ext:csv" in tags:
file_type = "csv" file_type = "csv"
responce = requests.post( responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={ json={
"query": content, "query": content,
"params": request_update.params, "params": request_update.params,
"file_type": file_type, "file_type": file_type,
}, "blockchain": request_update.blockchain
timeout=5, if request_update.blockchain
else None,
},
timeout=5,
)
if responce.status_code != 200:
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail=responce.text,
) )
if responce.status_code != 200: s3_response = data.QueryPresignUrl(**responce.json())
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail=responce.text,
)
s3_response = data.QueryPresignUrl(**responce.json())
except BugoutResponseException as e:
logger.error(f"Error in updating query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return s3_response return s3_response
@ -348,29 +447,57 @@ async def get_access_link_handler(
token = request.state.token token = request.state.token
try: # normalize query name
query_id = get_query_by_name(query_name, token)
except NameNormalizationException: query_name_normalized = name_normalization(query_name)
raise MoonstreamHTTPException(
status_code=403, # check in templattes
detail=f"Provided query name can't be normalize please select different.",
)
except Exception as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try: try:
entries = bc.search( entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove", query=f"tag:query_template tag:query_url:{query_name_normalized}",
filters=[
f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}"
],
limit=1, limit=1,
timeout=5,
) )
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
query_id = get_query_by_name(query_name, token)
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
try:
s3_response = None s3_response = None
if entries.results and entries.results[0].content: if entries.results[0].content:
passed_params = dict(request_update.params) passed_params = dict(request_update.params)
tags = entries.results[0].tags tags = entries.results[0].tags
@ -393,9 +520,6 @@ async def get_access_link_handler(
http_method="GET", http_method="GET",
) )
s3_response = data.QueryPresignUrl(url=stats_presigned_url) s3_response = data.QueryPresignUrl(url=stats_presigned_url)
except BugoutResponseException as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e: except Exception as e:
logger.error(f"Error in get access link: {str(e)}") logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e) raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -449,4 +573,4 @@ async def remove_query_handler(
except Exception as e: except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e) raise MoonstreamHTTPException(status_code=500, internal_error=e)
return entry return entry

Wyświetl plik

@ -129,3 +129,7 @@ if MOONSTREAM_S3_QUERIES_BUCKET_PREFIX == "":
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription" BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
### Moonstream queries
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE = "moonstream_query_template"

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version. Moonstream library and API version.
""" """
MOONSTREAMAPI_VERSION = "0.2.5" MOONSTREAMAPI_VERSION = "0.2.6"

Wyświetl plik

@ -13,7 +13,7 @@ setup(
install_requires=[ install_requires=[
"appdirs", "appdirs",
"boto3", "boto3",
"bugout>=0.1.19", "bugout>=0.2.9",
"moonstream-entity>=0.0.5", "moonstream-entity>=0.0.5",
"fastapi", "fastapi",
"moonstreamdb>=0.3.3", "moonstreamdb>=0.3.3",