pull/820/head
Andrey 2023-06-19 00:05:45 +03:00
rodzic 4788a3d8a7
commit d473db954c
3 zmienionych plików z 102 dodań i 86 usunięć

Wyświetl plik

@ -818,3 +818,10 @@ tokenomics_orange_dao_queries = [
""",
},
]
"""
"""
template_queries = []

Wyświetl plik

@ -32,7 +32,7 @@ from ..settings import (
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_QUERIES_JOURNAL_ID,
)
from ..settings import bugout_client as bc
from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
logger = logging.getLogger(__name__)
@ -286,24 +286,26 @@ async def update_query_data_handler(
query_name_normalized = name_normalization(query_name)
# check in admin resources
params = {
"type": data.BUGOUT_RESOURCE_QUERY_RESOLVER,
"name": query_name_normalized,
}
# check in templates
try:
admin_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:template",
filters=[
f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}",
f"context_id:{query_name_normalized}",
],
limit=1,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(admin_resources.resources) == 0:
if len(entries.results) == 0:
try:
query_id = get_query_by_name(query_name, token)
except NameNormalizationException:
@ -314,60 +316,59 @@ async def update_query_data_handler(
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
else:
query_id = admin_resources.resources[0].resource_data["entry_id"]
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
)
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
else:
query_id = entries.results[0].id
s3_response = None
s3_response = None
if entries.results[0].content:
content = entries.results[0].content
if entries.results[0].content:
content = entries.results[0].content
tags = entries.results[0].tags
tags = entries.results[0].tags
file_type = "json"
file_type = "json"
if "ext:csv" in tags:
file_type = "csv"
if "ext:csv" in tags:
file_type = "csv"
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": request_update.blockchain
if request_update.blockchain
else None,
},
timeout=5,
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": request_update.blockchain
if request_update.blockchain
else None,
},
timeout=5,
)
if responce.status_code != 200:
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail=responce.text,
)
if responce.status_code != 200:
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail=responce.text,
)
s3_response = data.QueryPresignUrl(**responce.json())
except BugoutResponseException as e:
logger.error(f"Error in updating query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
s3_response = data.QueryPresignUrl(**responce.json())
return s3_response
@ -390,47 +391,54 @@ async def get_access_link_handler(
query_name_normalized = name_normalization(query_name)
params = {
"type": data.BUGOUT_RESOURCE_QUERY_RESOLVER,
"name": query_name_normalized,
}
try:
admin_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(admin_resources.resources) == 0:
try:
query_id = get_query_by_name(query_name, token)
except NameNormalizationException:
raise MoonstreamHTTPException(
status_code=403,
detail=f"Provided query name can't be normalize please select different.",
)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
else:
query_id = admin_resources.resources[0].resource_data["entry_id"]
# check in templattes
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
query=f"tag:template",
filters=[
f"context_type:{MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE}",
f"context_id:{query_name_normalized}",
],
limit=1,
timeout=5,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
query_id = get_query_by_name(query_name, token)
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
try:
s3_response = None
if entries.results and entries.results[0].content:
if entries.results[0].content:
passed_params = dict(request_update.params)
tags = entries.results[0].tags
@ -453,9 +461,6 @@ async def get_access_link_handler(
http_method="GET",
)
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
except BugoutResponseException as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)

Wyświetl plik

@ -129,3 +129,7 @@ if MOONSTREAM_S3_QUERIES_BUCKET_PREFIX == "":
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
### Moonstream queries
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE = "moonstream_query_template"