kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #898 from moonstream-to/replace-search-in-query-api
Replace search in query apipull/941/head
commit
6e35ab9927
|
@ -14,7 +14,7 @@ from bugout.data import (
|
|||
BugoutSearchResult,
|
||||
)
|
||||
from bugout.exceptions import BugoutResponseException
|
||||
from fastapi import APIRouter, Body, Path, Request
|
||||
from fastapi import APIRouter, Body, Path, Request, Query
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType
|
||||
from sqlalchemy import text
|
||||
|
||||
|
@ -36,6 +36,7 @@ from ..settings import (
|
|||
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
|
||||
MOONSTREAM_S3_QUERIES_BUCKET,
|
||||
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
|
||||
BUGOUT_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
from ..settings import bugout_client as bc
|
||||
|
||||
|
@ -48,6 +49,10 @@ router = APIRouter(
|
|||
|
||||
@router.get("/list", tags=["queries"])
|
||||
async def get_list_of_queries_handler(request: Request) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Return list of queries which user own
|
||||
"""
|
||||
|
||||
token = request.state.token
|
||||
|
||||
# Check already existed queries
|
||||
|
@ -73,7 +78,7 @@ async def create_query_handler(
|
|||
request: Request, query_applied: data.PreapprovedQuery = Body(...)
|
||||
) -> BugoutJournalEntry:
|
||||
"""
|
||||
Create query in bugout journal
|
||||
Create query in bugout journal with preapprove status required approval from moonstream team
|
||||
"""
|
||||
|
||||
token = request.state.token
|
||||
|
@ -117,6 +122,7 @@ async def create_query_handler(
|
|||
title=f"Query:{query_name}",
|
||||
tags=["type:query"],
|
||||
content=query_applied.query,
|
||||
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS * 2,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
|
@ -161,10 +167,15 @@ async def create_query_handler(
|
|||
|
||||
@router.get("/templates", tags=["queries"])
|
||||
def get_suggested_queries(
|
||||
supported_interfaces: Optional[List[str]] = None,
|
||||
address: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
limit: int = 10,
|
||||
supported_interfaces: Optional[List[str]] = Query(
|
||||
None, description="Supported interfaces in format: d73f4e3a erc1155"
|
||||
),
|
||||
address: Optional[str] = Query(
|
||||
None,
|
||||
description="Query address for search if template applied to particular address",
|
||||
),
|
||||
title: Optional[str] = Query(None, description="Query title for search"),
|
||||
limit: int = Query(10),
|
||||
) -> data.SuggestedQueriesResponse:
|
||||
"""
|
||||
Return set of suggested queries for user
|
||||
|
@ -191,9 +202,10 @@ def get_suggested_queries(
|
|||
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
|
||||
query=query,
|
||||
limit=limit,
|
||||
timeout=5,
|
||||
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get suggested queries templates: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
@ -222,7 +234,7 @@ def get_suggested_queries(
|
|||
|
||||
@router.get("/{query_name}/query", tags=["queries"])
|
||||
async def get_query_handler(
|
||||
request: Request, query_name: str
|
||||
request: Request, query_name: str = Path(..., description="Query name")
|
||||
) -> data.QueryInfoResponse:
|
||||
token = request.state.token
|
||||
|
||||
|
@ -248,7 +260,7 @@ async def get_query_handler(
|
|||
limit=1,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
logger.error(f"Error in search template: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
@ -263,12 +275,10 @@ async def get_query_handler(
|
|||
)
|
||||
|
||||
try:
|
||||
entries = bc.search(
|
||||
entry = bc.get_entry(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
|
||||
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
|
||||
limit=1,
|
||||
timeout=5,
|
||||
entry_id=query_id,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
|
@ -276,23 +286,23 @@ async def get_query_handler(
|
|||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
||||
if len(entries.results) == 0:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail="Query not approved yet."
|
||||
)
|
||||
else:
|
||||
entries_results = cast(List[BugoutSearchResult], entries.results)
|
||||
query_id = entries_results[0].entry_url.split("/")[-1]
|
||||
entry = entries_results[0]
|
||||
|
||||
entries_results = cast(List[BugoutSearchResult], entries.results)
|
||||
entry = entries_results[0]
|
||||
content = entry.content
|
||||
tags = entry.tags
|
||||
created_at = entry.created_at
|
||||
updated_at = entry.updated_at
|
||||
|
||||
if content is None:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail=f"Query is empty. Please update it."
|
||||
)
|
||||
|
||||
try:
|
||||
if entry.content is None:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail=f"Query is empty. Please update it."
|
||||
)
|
||||
query = text(entry.content)
|
||||
query = text(content)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=500, internal_error=e, detail="Error in query parsing"
|
||||
|
@ -301,8 +311,7 @@ async def get_query_handler(
|
|||
query_parameters_names = list(query._bindparams.keys())
|
||||
|
||||
tags_dict = {
|
||||
tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True)
|
||||
for tag in entry.tags
|
||||
tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True) for tag in tags
|
||||
}
|
||||
|
||||
query_parameters: Dict[str, Any] = {}
|
||||
|
@ -313,23 +322,21 @@ async def get_query_handler(
|
|||
else:
|
||||
query_parameters[param] = None
|
||||
|
||||
print(type(entry.created_at))
|
||||
|
||||
return data.QueryInfoResponse(
|
||||
query=entry.content,
|
||||
query=content,
|
||||
query_id=str(query_id),
|
||||
preapprove="preapprove" in tags_dict,
|
||||
approved="approved" in tags_dict,
|
||||
parameters=query_parameters,
|
||||
created_at=entry.created_at, # type: ignore
|
||||
updated_at=entry.updated_at, # type: ignore
|
||||
created_at=created_at, # type: ignore
|
||||
updated_at=updated_at, # type: ignore
|
||||
)
|
||||
|
||||
|
||||
@router.put("/{query_name}", tags=["queries"])
|
||||
async def update_query_handler(
|
||||
request: Request,
|
||||
query_name: str,
|
||||
query_name: str = Path(..., description="Query name"),
|
||||
request_update: data.UpdateQueryRequest = Body(...),
|
||||
) -> BugoutJournalEntryContent:
|
||||
token = request.state.token
|
||||
|
@ -367,9 +374,9 @@ async def update_query_handler(
|
|||
)
|
||||
async def update_query_data_handler(
|
||||
request: Request,
|
||||
query_name: str,
|
||||
query_name: str = Path(..., description="Query name"),
|
||||
request_update: data.UpdateDataRequest = Body(...),
|
||||
) -> Optional[data.QueryPresignUrl]:
|
||||
) -> data.QueryPresignUrl:
|
||||
"""
|
||||
Request update data on S3 bucket
|
||||
"""
|
||||
|
@ -407,7 +414,7 @@ async def update_query_data_handler(
|
|||
limit=1,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
logger.error(f"Error in search template: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
@ -422,52 +429,55 @@ async def update_query_data_handler(
|
|||
)
|
||||
|
||||
try:
|
||||
entries = bc.search(
|
||||
entry = bc.get_entry(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
|
||||
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
|
||||
limit=1,
|
||||
timeout=5,
|
||||
entry_id=query_id,
|
||||
)
|
||||
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
||||
if len(entries.results) == 0:
|
||||
### check tags
|
||||
|
||||
if "preapprove" in entry.tags or "approved" not in entry.tags:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail="Query not approved yet."
|
||||
)
|
||||
|
||||
content = entry.content
|
||||
tags = entry.tags
|
||||
|
||||
else:
|
||||
entries_results = cast(List[BugoutSearchResult], entries.results)
|
||||
query_id = entries_results[0].entry_url.split("/")[-1]
|
||||
|
||||
s3_response = None
|
||||
|
||||
entries_results = cast(List[BugoutSearchResult], entries.results)
|
||||
if entries_results[0].content:
|
||||
content = entries_results[0].content
|
||||
|
||||
tags = entries_results[0].tags
|
||||
|
||||
if content:
|
||||
file_type = "json"
|
||||
|
||||
if "ext:csv" in tags:
|
||||
file_type = "csv"
|
||||
|
||||
responce = requests.post(
|
||||
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
|
||||
json={
|
||||
"query": content,
|
||||
"params": request_update.params,
|
||||
"file_type": file_type,
|
||||
"blockchain": request_update.blockchain
|
||||
if request_update.blockchain
|
||||
else None,
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
try:
|
||||
responce = requests.post(
|
||||
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
|
||||
json={
|
||||
"query": content,
|
||||
"params": request_update.params,
|
||||
"file_type": file_type,
|
||||
"blockchain": request_update.blockchain
|
||||
if request_update.blockchain
|
||||
else None,
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error interaction with crawlers: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
||||
if responce.status_code != 200:
|
||||
raise MoonstreamHTTPException(
|
||||
|
@ -476,6 +486,10 @@ async def update_query_data_handler(
|
|||
)
|
||||
|
||||
s3_response = data.QueryPresignUrl(**responce.json())
|
||||
else:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail=f"Query is empty. Please update it."
|
||||
)
|
||||
|
||||
return s3_response
|
||||
|
||||
|
@ -483,7 +497,7 @@ async def update_query_data_handler(
|
|||
@router.post("/{query_name}", tags=["queries"])
|
||||
async def get_access_link_handler(
|
||||
request: Request,
|
||||
query_name: str,
|
||||
query_name: str = Path(..., description="Query name"),
|
||||
request_update: data.UpdateDataRequest = Body(...),
|
||||
) -> Optional[data.QueryPresignUrl]:
|
||||
"""
|
||||
|
@ -513,7 +527,7 @@ async def get_access_link_handler(
|
|||
limit=1,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
logger.error(f"Error in search template: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
@ -528,12 +542,10 @@ async def get_access_link_handler(
|
|||
)
|
||||
|
||||
try:
|
||||
entries = bc.search(
|
||||
entry = bc.get_entry(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
|
||||
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
|
||||
limit=1,
|
||||
timeout=5,
|
||||
entry_id=query_id,
|
||||
)
|
||||
except BugoutResponseException as e:
|
||||
logger.error(f"Error in get query: {str(e)}")
|
||||
|
@ -541,38 +553,37 @@ async def get_access_link_handler(
|
|||
except Exception as e:
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
||||
if len(entries.results) == 0:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail="Query not approved yet."
|
||||
)
|
||||
else:
|
||||
entry = cast(BugoutJournalEntry, entries.results[0])
|
||||
|
||||
entries_results = cast(List[BugoutSearchResult], entries.results)
|
||||
content = entry.content
|
||||
tags = entry.tags
|
||||
|
||||
if not content:
|
||||
raise MoonstreamHTTPException(
|
||||
status_code=403, detail=f"Query is empty. Please update it."
|
||||
)
|
||||
try:
|
||||
s3_response = None
|
||||
passed_params = dict(request_update.params)
|
||||
|
||||
if entries_results[0].content:
|
||||
passed_params = dict(request_update.params)
|
||||
file_type = "json"
|
||||
|
||||
tags = entries_results[0].tags
|
||||
if "ext:csv" in tags:
|
||||
file_type = "csv"
|
||||
|
||||
file_type = "json"
|
||||
params_hash = query_parameter_hash(passed_params)
|
||||
|
||||
if "ext:csv" in tags:
|
||||
file_type = "csv"
|
||||
bucket = MOONSTREAM_S3_QUERIES_BUCKET
|
||||
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}"
|
||||
|
||||
params_hash = query_parameter_hash(passed_params)
|
||||
|
||||
bucket = MOONSTREAM_S3_QUERIES_BUCKET
|
||||
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}"
|
||||
|
||||
stats_presigned_url = generate_s3_access_links(
|
||||
method_name="get_object",
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
expiration=300000,
|
||||
http_method="GET",
|
||||
)
|
||||
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
|
||||
stats_presigned_url = generate_s3_access_links(
|
||||
method_name="get_object",
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
expiration=300000,
|
||||
http_method="GET",
|
||||
)
|
||||
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get access link: {str(e)}")
|
||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||
|
@ -582,8 +593,7 @@ async def get_access_link_handler(
|
|||
|
||||
@router.delete("/{query_name}", tags=["queries"])
|
||||
async def remove_query_handler(
|
||||
request: Request,
|
||||
query_name: str,
|
||||
request: Request, query_name: str = Path(..., description="Query name")
|
||||
) -> BugoutJournalEntry:
|
||||
"""
|
||||
Request delete query from journal
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream library and API version.
|
||||
"""
|
||||
|
||||
MOONSTREAMAPI_VERSION = "0.3.0"
|
||||
MOONSTREAMAPI_VERSION = "0.3.1"
|
||||
|
|
Ładowanie…
Reference in New Issue