Add query task sending.

pull/550/head
Andrey Dolgolev 2022-02-16 00:55:20 +02:00
rodzic 210d60d519
commit 79eaed49f8
2 zmienionych plików z 65 dodań i 6 usunięć

Wyświetl plik

@ -0,0 +1,63 @@
"""
The Moonstream subscriptions HTTP API
"""
import logging
from typing import Any, Dict, List, Optional
from bugout.data import BugoutResource
from fastapi import APIRouter, Depends, Query, Request
import requests
from sqlalchemy.orm import Session
from moonstreamdb import db
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_QUERIES_JOURNAL_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
)
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/queries",)
@router.post("/{query_id}/update", tags=["queries"])
async def update_query_data_handler(
request: Request, query_id: str = Query(...),
) -> Optional[Dict[str, Any]]:
"""
Request update data on S3 bucket
"""
token = request.state.token
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"#approved #query:{query_id} #user_token:{token}",
limit=1,
)
if entries.results and entries.results[0].content:
content = entries.results[0].content
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/query_update",
json=content,
)
if responce.status_code != 200:
raise MoonstreamHTTPException(
status_code=responce.status_code,
detail="Task for start generate stats failed.",
)
return responce.json()
except:
return None

Wyświetl plik

@ -30,9 +30,7 @@ from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/streams",
)
router = APIRouter(prefix="/streams",)
def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]:
@ -41,9 +39,7 @@ def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]:
"""
response = bc.list_resources(
token=token,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
},
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)