kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #778 from bugout-dev/query-parameters-endpoint
Extend get query endpoint.add-lands-tasks
commit
55cd64721c
|
@ -636,51 +636,93 @@ def get_entity_subscription_collection_id(
|
||||||
raise EntityCollectionNotFoundException(
|
raise EntityCollectionNotFoundException(
|
||||||
"Subscription collection not found."
|
"Subscription collection not found."
|
||||||
)
|
)
|
||||||
try:
|
collection_id = generate_collection_for_user(resource_type, token, user_id)
|
||||||
# try get collection
|
|
||||||
|
|
||||||
collections: EntityCollectionsResponse = ec.list_collections(token=token)
|
return collection_id
|
||||||
|
|
||||||
available_collections: Dict[str, str] = {
|
|
||||||
collection.name: collection.collection_id
|
|
||||||
for collection in collections.collections
|
|
||||||
}
|
|
||||||
|
|
||||||
if f"subscriptions_{user_id}" not in available_collections:
|
|
||||||
collection: EntityCollectionResponse = ec.add_collection(
|
|
||||||
token=token, name=f"subscriptions_{user_id}"
|
|
||||||
)
|
|
||||||
collection_id = collection.collection_id
|
|
||||||
else:
|
|
||||||
collection_id = available_collections[f"subscriptions_{user_id}"]
|
|
||||||
except EntityUnexpectedResponse as e:
|
|
||||||
logger.error(f"Error create collection, error: {str(e)}")
|
|
||||||
raise MoonstreamHTTPException(
|
|
||||||
status_code=500, detail="Can't create collection for subscriptions"
|
|
||||||
)
|
|
||||||
|
|
||||||
resource_data = {
|
|
||||||
"type": resource_type,
|
|
||||||
"user_id": str(user_id),
|
|
||||||
"collection_id": str(collection_id),
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
resource: BugoutResource = bc.create_resource(
|
|
||||||
token=token,
|
|
||||||
application_id=MOONSTREAM_APPLICATION_ID,
|
|
||||||
resource_data=resource_data,
|
|
||||||
)
|
|
||||||
except BugoutResponseException as e:
|
|
||||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error creating subscription resource: {str(e)}")
|
|
||||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
|
||||||
else:
|
else:
|
||||||
resource = resources.resources[0]
|
resource = resources.resources[0]
|
||||||
return resource.resource_data["collection_id"]
|
return resource.resource_data["collection_id"]
|
||||||
|
|
||||||
|
|
||||||
|
def generate_collection_for_user(
|
||||||
|
resource_type: str,
|
||||||
|
token: Union[uuid.UUID, str],
|
||||||
|
user_id: uuid.UUID,
|
||||||
|
) -> str:
|
||||||
|
try:
|
||||||
|
# try get collection
|
||||||
|
|
||||||
|
collections: EntityCollectionsResponse = ec.list_collections(token=token)
|
||||||
|
|
||||||
|
available_collections: Dict[str, str] = {
|
||||||
|
collection.name: collection.collection_id
|
||||||
|
for collection in collections.collections
|
||||||
|
}
|
||||||
|
|
||||||
|
subscription_collection_name = f"subscriptions_{user_id}"
|
||||||
|
|
||||||
|
if subscription_collection_name not in available_collections:
|
||||||
|
collection: EntityCollectionResponse = ec.add_collection(
|
||||||
|
token=token, name=subscription_collection_name
|
||||||
|
)
|
||||||
|
collection_id = collection.collection_id
|
||||||
|
else:
|
||||||
|
collection_id = available_collections[subscription_collection_name]
|
||||||
|
except EntityUnexpectedResponse as e:
|
||||||
|
logger.error(f"Error create collection, error: {str(e)}")
|
||||||
|
raise MoonstreamHTTPException(
|
||||||
|
status_code=500, detail="Can't create collection for subscriptions"
|
||||||
|
)
|
||||||
|
|
||||||
|
resource_data = {
|
||||||
|
"type": resource_type,
|
||||||
|
"user_id": str(user_id),
|
||||||
|
"collection_id": str(collection_id),
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
bc.create_resource(
|
||||||
|
token=token,
|
||||||
|
application_id=MOONSTREAM_APPLICATION_ID,
|
||||||
|
resource_data=resource_data,
|
||||||
|
)
|
||||||
|
except BugoutResponseException as e:
|
||||||
|
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating subscription resource: {str(e)}")
|
||||||
|
logger.error(
|
||||||
|
f"Required create resource data: {resource_data}, and grand access to journal: {collection_id}, for user: {user_id}"
|
||||||
|
)
|
||||||
|
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||||
|
|
||||||
|
try:
|
||||||
|
bc.update_journal_scopes(
|
||||||
|
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
|
journal_id=collection_id,
|
||||||
|
holder_type="user",
|
||||||
|
holder_id=user_id,
|
||||||
|
permission_list=[
|
||||||
|
"journals.read",
|
||||||
|
"journals.entries.read",
|
||||||
|
"journals.entries.create",
|
||||||
|
"journals.entries.update",
|
||||||
|
"journals.entries.delete",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Grand access to journal: {collection_id}, for user: {user_id} successfully"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error updating journal scopes: {str(e)}")
|
||||||
|
logger.error(
|
||||||
|
f"Required grand access to journal: {collection_id}, for user: {user_id}"
|
||||||
|
)
|
||||||
|
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||||
|
|
||||||
|
return collection_id
|
||||||
|
|
||||||
|
|
||||||
def generate_s3_access_links(
|
def generate_s3_access_links(
|
||||||
method_name: str,
|
method_name: str,
|
||||||
bucket: str,
|
bucket: str,
|
||||||
|
|
|
@ -285,3 +285,14 @@ class PreapprovedQuery(BaseModel):
|
||||||
|
|
||||||
class QueryPresignUrl(BaseModel):
|
class QueryPresignUrl(BaseModel):
|
||||||
url: str
|
url: str
|
||||||
|
|
||||||
|
|
||||||
|
class QueryInfoResponse(BaseModel):
|
||||||
|
query: str
|
||||||
|
query_id: str
|
||||||
|
public: bool = False
|
||||||
|
preapprove: bool = False
|
||||||
|
approved: bool = False
|
||||||
|
parameters: Dict[str, Any] = Field(default_factory=dict)
|
||||||
|
created_at: Optional[datetime]
|
||||||
|
updated_at: Optional[datetime]
|
||||||
|
|
|
@ -10,6 +10,7 @@ from bugout.data import BugoutResources, BugoutJournalEntryContent, BugoutJourna
|
||||||
from bugout.exceptions import BugoutResponseException
|
from bugout.exceptions import BugoutResponseException
|
||||||
from fastapi import APIRouter, Body, Request
|
from fastapi import APIRouter, Body, Request
|
||||||
import requests # type: ignore
|
import requests # type: ignore
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
|
@ -154,7 +155,9 @@ async def create_query_handler(
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{query_name}/query", tags=["queries"])
|
@router.get("/{query_name}/query", tags=["queries"])
|
||||||
async def get_query_handler(request: Request, query_name: str) -> BugoutJournalEntry:
|
async def get_query_handler(
|
||||||
|
request: Request, query_name: str
|
||||||
|
) -> data.QueryInfoResponse:
|
||||||
token = request.state.token
|
token = request.state.token
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -180,7 +183,41 @@ async def get_query_handler(request: Request, query_name: str) -> BugoutJournalE
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||||
|
|
||||||
return entry
|
try:
|
||||||
|
if entry.content is None:
|
||||||
|
raise MoonstreamHTTPException(
|
||||||
|
status_code=403, detail=f"Query is empty. Please update it."
|
||||||
|
)
|
||||||
|
query = text(entry.content)
|
||||||
|
except Exception as e:
|
||||||
|
raise MoonstreamHTTPException(
|
||||||
|
status_code=500, internal_error=e, detail="Error in query parsing"
|
||||||
|
)
|
||||||
|
|
||||||
|
query_parameters_names = list(query._bindparams.keys())
|
||||||
|
|
||||||
|
tags_dict = {
|
||||||
|
tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True)
|
||||||
|
for tag in entry.tags
|
||||||
|
}
|
||||||
|
|
||||||
|
query_parameters: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
for param in query_parameters_names:
|
||||||
|
if param in tags_dict:
|
||||||
|
query_parameters[param] = tags_dict[param]
|
||||||
|
else:
|
||||||
|
query_parameters[param] = None
|
||||||
|
|
||||||
|
return data.QueryInfoResponse(
|
||||||
|
query=entry.content,
|
||||||
|
query_id=str(entry.id),
|
||||||
|
preapprove="preapprove" in tags_dict,
|
||||||
|
approved="approved" in tags_dict,
|
||||||
|
parameters=query_parameters,
|
||||||
|
created_at=entry.created_at,
|
||||||
|
updated_at=entry.updated_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{query_name}", tags=["queries"])
|
@router.put("/{query_name}", tags=["queries"])
|
||||||
|
|
|
@ -238,7 +238,11 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
|
||||||
|
|
||||||
|
|
||||||
@router.get("/", tags=["subscriptions"], response_model=data.SubscriptionsListResponse)
|
@router.get("/", tags=["subscriptions"], response_model=data.SubscriptionsListResponse)
|
||||||
async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListResponse:
|
async def get_subscriptions_handler(
|
||||||
|
request: Request,
|
||||||
|
limit: Optional[int] = 10,
|
||||||
|
offset: Optional[int] = 0,
|
||||||
|
) -> data.SubscriptionsListResponse:
|
||||||
"""
|
"""
|
||||||
Get user's subscriptions.
|
Get user's subscriptions.
|
||||||
"""
|
"""
|
||||||
|
@ -249,16 +253,17 @@ async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListR
|
||||||
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
|
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
|
||||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||||
user_id=user.id,
|
user_id=user.id,
|
||||||
|
create_if_not_exist=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
subscriprions_list = ec.search_entities(
|
subscriprions_list = ec.search_entities(
|
||||||
token=token,
|
token=token,
|
||||||
collection_id=collection_id,
|
collection_id=collection_id,
|
||||||
required_field=[f"type:subscription"],
|
required_field=[f"type:subscription"],
|
||||||
limit=1000,
|
limit=limit,
|
||||||
|
offset=offset,
|
||||||
)
|
)
|
||||||
|
|
||||||
# resources: BugoutResources = bc.list_resources(token=token, params=params)
|
|
||||||
except EntityCollectionNotFoundException as e:
|
except EntityCollectionNotFoundException as e:
|
||||||
raise MoonstreamHTTPException(
|
raise MoonstreamHTTPException(
|
||||||
status_code=404,
|
status_code=404,
|
||||||
|
|
|
@ -10,7 +10,7 @@ from bugout.exceptions import BugoutResponseException
|
||||||
from fastapi import APIRouter, Body, Form, Request
|
from fastapi import APIRouter, Body, Form, Request
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..actions import create_onboarding_resource
|
from ..actions import create_onboarding_resource, generate_collection_for_user
|
||||||
from ..middleware import MoonstreamHTTPException
|
from ..middleware import MoonstreamHTTPException
|
||||||
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
|
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
|
||||||
from ..settings import bugout_client as bc
|
from ..settings import bugout_client as bc
|
||||||
|
@ -35,6 +35,7 @@ async def create_user_handler(
|
||||||
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
raise MoonstreamHTTPException(status_code=500, internal_error=e)
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,4 +2,4 @@
|
||||||
Moonstream library and API version.
|
Moonstream library and API version.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MOONSTREAMAPI_VERSION = "0.2.4"
|
MOONSTREAMAPI_VERSION = "0.2.5"
|
||||||
|
|
Ładowanie…
Reference in New Issue