pull/550/head
Andrey Dolgolev 2022-02-16 02:55:14 +02:00
rodzic c41f8d1616
commit 0cbd177442
4 zmienionych plików z 13 dodań i 56 usunięć

Wyświetl plik

@ -12,8 +12,8 @@ from . import actions, data
from .middleware import BroodAuthMiddleware, MoonstreamHTTPException
from .routes.address_info import router as addressinfo_router
from .routes.dashboards import router as dashboards_router
from .routes.queries import router as whales_router
from .routes.streams import router as queries_router
from .routes.queries import router as queries_router
from .routes.streams import router as streams_router
from .routes.subscriptions import router as subscriptions_router
from .routes.txinfo import router as txinfo_router
from .routes.users import router as users_router

Wyświetl plik

@ -23,15 +23,12 @@ from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/queries",
)
router = APIRouter(prefix="/queries",)
@router.post("/{query_id}/update", tags=["queries"])
async def update_query_data_handler(
request: Request,
query_id: str = Query(...),
request: Request, query_id: str = Query(...),
) -> Optional[Dict[str, Any]]:
"""
Request update data on S3 bucket
@ -62,5 +59,7 @@ async def update_query_data_handler(
)
return responce.json()
except:
return None
except Exception as e:
logger.error("Unable to get events")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return None

Wyświetl plik

@ -80,17 +80,14 @@ async def now_handler() -> data.NowResponse:
@app.post("/jobs/stats_update", tags=["jobs"])
async def status_handler(
stats_update: data.StatsUpdateRequest,
background_tasks: BackgroundTasks,
stats_update: data.StatsUpdateRequest, background_tasks: BackgroundTasks,
):
"""
Update dashboard endpoint create are tasks for update.
"""
dashboard_resource: BugoutResource = bc.get_resource(
token=stats_update.token,
resource_id=stats_update.dashboard_id,
timeout=10,
token=stats_update.token, resource_id=stats_update.dashboard_id, timeout=10,
)
# get all user subscriptions
@ -170,10 +167,8 @@ async def status_handler(
@app.post("/jobs/{query_id}/query_update", tags=["jobs"])
async def queries_data_update_handler(
query_id: str,
query: Any,
background_tasks: BackgroundTasks,
) -> str:
query_id: str, query: Any, background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
s3_client = boto3.client("s3")
@ -192,10 +187,7 @@ async def queries_data_update_handler(
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": "queries_bucket",
"Key": f"queries/{query_id}/data.json",
},
Params={"Bucket": "queries_bucket", "Key": f"queries/{query_id}/data.json",},
ExpiresIn=300000,
HttpMethod="GET",
)

Wyświetl plik

@ -1,34 +0,0 @@
import json
import logging
from typing import Any
import boto3 # type: ignore
from moonstreamdb.db import yield_db_session_ctx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def push_statistics(data: Any, key: str, bucket: str) -> None:
result_bytes = json.dumps(data).encode("utf-8")
s3 = boto3.client("s3")
s3.put_object(
Body=result_bytes,
Bucket=bucket,
Key=key,
ContentType="application/json",
Metadata={"drone_query": "data"},
)
logger.info(f"Statistics push to bucket: s3://{bucket}/{key}")
def data_generate(bucket: str, key: str, query: str):
"""
Generate query and push it to S3
"""
with yield_db_session_ctx() as db_session:
push_statistics(data=db_session.execute(query).all(), key=key, bucket=bucket)