Merge pull request #715 from bugout-dev/add-parameters-in-s3-path

Add parameters in s3 path
pull/768/head
Andrey Dolgolev 2023-03-09 20:02:19 +02:00 zatwierdzone przez GitHub
commit fcc80ac619
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
8 zmienionych plików z 210 dodań i 77 usunięć

Wyświetl plik

@ -1,3 +1,4 @@
from collections import OrderedDict
import hashlib
import json
from itertools import chain
@ -425,14 +426,14 @@ def upload_abi_to_s3(
"""
s3_client = boto3.client("s3")
s3 = boto3.client("s3")
bucket = MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET
result_bytes = abi.encode("utf-8")
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[resource.resource_data['subscription_type_id']]}/abi/{resource.resource_data['address']}/{resource.id}/abi.json"
s3_client.put_object(
s3.put_object(
Body=result_bytes,
Bucket=bucket,
Key=result_key,
@ -595,3 +596,36 @@ def get_query_by_name(query_name: str, token: uuid.UUID) -> str:
query_id = available_queries[query_name]
return query_id
def generate_s3_access_links(
method_name: str,
bucket: str,
key: str,
http_method: str,
expiration: int = 300,
) -> str:
s3 = boto3.client("s3")
stats_presigned_url = s3.generate_presigned_url(
method_name,
Params={
"Bucket": bucket,
"Key": key,
},
ExpiresIn=expiration,
HttpMethod=http_method,
)
return stats_presigned_url
def query_parameter_hash(params: Dict[str, Any]) -> str:
"""
Generate a hash of the query parameters
"""
hash = hashlib.md5(
json.dumps(OrderedDict(params), sort_keys=True).encode("utf-8")
).hexdigest()
return hash

Wyświetl plik

@ -5,15 +5,21 @@ import logging
from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResources, BugoutJournalEntryContent, BugoutJournalEntry
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Request
import requests
import requests # type: ignore
from .. import data
from ..actions import get_query_by_name, name_normalization, NameNormalizationException
from ..actions import (
get_query_by_name,
name_normalization,
NameNormalizationException,
query_parameter_hash,
generate_s3_access_links,
)
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -291,10 +297,11 @@ async def update_query_data_handler(
return s3_response
@router.get("/{query_name}", tags=["queries"])
@router.post("/{query_name}", tags=["queries"])
async def get_access_link_handler(
request: Request,
query_name: str,
request_update: data.UpdateDataRequest = Body(...),
) -> Optional[data.QueryPresignUrl]:
"""
Request S3 presign url
@ -312,10 +319,9 @@ async def get_access_link_handler(
detail=f"Provided query name can't be normalize please select different.",
)
except Exception as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
s3 = boto3.client("s3")
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -328,6 +334,8 @@ async def get_access_link_handler(
s3_response = None
if entries.results and entries.results[0].content:
passed_params = dict(request_update.params)
tags = entries.results[0].tags
file_type = "json"
@ -335,20 +343,24 @@ async def get_access_link_handler(
if "ext:csv" in tags:
file_type = "csv"
stats_presigned_url = s3.generate_presigned_url(
"get_object",
Params={
"Bucket": MOONSTREAM_S3_QUERIES_BUCKET,
"Key": f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
},
ExpiresIn=300000,
HttpMethod="GET",
params_hash = query_parameter_hash(passed_params)
bucket = MOONSTREAM_S3_QUERIES_BUCKET
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}"
stats_presigned_url = generate_s3_access_links(
method_name="get_object",
bucket=bucket,
key=key,
expiration=300000,
http_method="GET",
)
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
except BugoutResponseException as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return s3_response

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.2.2"
MOONSTREAMAPI_VERSION = "0.2.3"

Wyświetl plik

@ -0,0 +1,58 @@
from collections import OrderedDict
import hashlib
import json
import logging
from typing import Any, Dict
import boto3 # type: ignore
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def push_data_to_bucket(
data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {}
) -> None:
s3 = boto3.client("s3")
s3.put_object(
Body=data,
Bucket=bucket,
Key=key,
ContentType="application/json",
Metadata=metadata,
)
logger.info(f"Data pushed to bucket: s3://{bucket}/{key}")
def generate_s3_access_links(
method_name: str,
bucket: str,
key: str,
http_method: str,
expiration: int = 300,
) -> str:
s3 = boto3.client("s3")
stats_presigned_url = s3.generate_presigned_url(
method_name,
Params={
"Bucket": bucket,
"Key": key,
},
ExpiresIn=expiration,
HttpMethod=http_method,
)
return stats_presigned_url
def query_parameter_hash(params: Dict[str, Any]) -> str:
"""
Generate a hash of the query parameters
"""
hash = hashlib.md5(
json.dumps(OrderedDict(params), sort_keys=True).encode("utf-8")
).hexdigest()
return hash

Wyświetl plik

@ -4,8 +4,7 @@ The Mooncrawl HTTP API
import logging
import time
from cgi import test
from datetime import datetime, timedelta
from os import times
from datetime import timedelta
from typing import Any, Dict, List
from uuid import UUID
@ -15,6 +14,7 @@ from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from .actions import generate_s3_access_links, query_parameter_hash
from . import data
from .middleware import MoonstreamHTTPException
from .settings import (
@ -25,6 +25,7 @@ from .settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
ORIGINS,
LINKS_EXPIRATION_TIME,
)
from .settings import bugout_client as bc
from .stats_worker import dashboard, queries
@ -178,9 +179,30 @@ async def queries_data_update_handler(
request_data: data.QueryDataUpdate,
background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
s3_client = boto3.client("s3")
# Check if query is valid
try:
queries.query_validation(request_data.query)
except queries.QueryNotValid:
logger.error(f"Query not pass validation check query id: {query_id}")
raise MoonstreamHTTPException(
status_code=401,
detail="Incorrect query is not valid with current restrictions",
)
except Exception as e:
logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
expected_query_parameters = text(request_data.query)._bindparams.keys()
# Check if it can transform to TextClause
try:
query = text(request_data.query)
except Exception as e:
logger.error(
f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}"
)
raise MoonstreamHTTPException(status_code=500, detail="Can't parse query")
# Get requried keys for query
expected_query_parameters = query._bindparams.keys()
# request.params validations
passed_params = {
@ -197,39 +219,33 @@ async def queries_data_update_handler(
status_code=500, detail="Unmatched amount of applying query parameters"
)
try:
valid_query = queries.query_validation(request_data.query)
except queries.QueryNotValid:
logger.error(f"Incorrect query provided with id: {query_id}")
raise MoonstreamHTTPException(
status_code=401, detail="Incorrect query provided"
)
except Exception as e:
logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
params_hash = query_parameter_hash(passed_params)
bucket = MOONSTREAM_S3_QUERIES_BUCKET
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{request_data.file_type}"
try:
background_tasks.add_task(
queries.data_generate,
bucket=MOONSTREAM_S3_QUERIES_BUCKET,
query_id=f"{query_id}",
file_type=request_data.file_type,
query=valid_query,
bucket=bucket,
key=key,
query=query,
params=passed_params,
params_hash=params_hash,
)
except Exception as e:
logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": MOONSTREAM_S3_QUERIES_BUCKET,
"Key": f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{request_data.file_type}",
},
ExpiresIn=43200, # 12 hours
HttpMethod="GET",
stats_presigned_url = generate_s3_access_links(
method_name="get_object",
bucket=bucket,
key=key,
expiration=LINKS_EXPIRATION_TIME,
http_method="GET",
)
return {"url": stats_presigned_url}

Wyświetl plik

@ -149,6 +149,9 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
# queries
LINKS_EXPIRATION_TIME = 60 * 60 * 12 # 12 hours
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS = 30000
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get(
"MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS"

Wyświetl plik

@ -1,13 +1,20 @@
import csv
from collections import OrderedDict
import hashlib
import json
import logging
import re
from io import StringIO
from typing import Any, Dict, Optional
from typing import Any, Dict
import boto3 # type: ignore
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextClause
from ..actions import push_data_to_bucket
from ..reporter import reporter
from ..db import RO_pre_ping_query_engine
from ..reporter import reporter
@ -25,18 +32,6 @@ class QueryNotValid(Exception):
"""
def push_statistics(s3: Any, data: Any, key: str, bucket: str) -> None:
s3.put_object(
Body=data,
Bucket=bucket,
Key=key,
ContentType="application/json",
Metadata={"drone_query": "data"},
)
logger.info(f"Statistics push to bucket: s3://{bucket}/{key}")
def query_validation(query: str) -> str:
"""
Sanitize provided query.
@ -48,7 +43,7 @@ def query_validation(query: str) -> str:
def to_json_types(value):
if isinstance(value, (str, int, tuple, list, dict)):
if isinstance(value, (str, int, tuple, dict, list)):
return value
elif isinstance(value, set):
return list(value)
@ -66,37 +61,52 @@ def from_json_types(value):
def data_generate(
bucket: str,
query_id: str,
file_type: str,
query: str,
params: Optional[Dict[str, Any]],
bucket: str,
key: str,
query: TextClause,
params: Dict[str, Any],
params_hash: str,
):
"""
Generate query and push it to S3
"""
s3 = boto3.client("s3")
process_session = sessionmaker(bind=RO_pre_ping_query_engine)
db_session = process_session()
metadata = {
"source": "drone-query-generation",
"query_id": query_id,
"file_type": file_type,
"params_hash": params_hash,
"params": json.dumps(params),
}
try:
# TODO:(Andrey) Need optimization that information is usefull but incomplete
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys() # type: ignore
query_instance = db_session.execute(query, params) # type: ignore
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
csv_writer.writerow(query_instance.keys())
csv_writer.writerows(query_instance.fetchall())
metadata["block_number"] = block_number
metadata["block_timestamp"] = block_timestamp
data = csv_buffer.getvalue().encode("utf-8")
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
block_number, block_timestamp = db_session.execute(
text(
@ -113,16 +123,16 @@ def data_generate(
key: to_json_types(value)
for key, value in row._asdict().items()
}
for row in db_session.execute(text(query), params).all()
for row in db_session.execute(query, params).all()
],
}
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
bucket=bucket,
)
push_data_to_bucket(
data=data,
key=key,
bucket=bucket,
metadata=metadata,
)
except Exception as err:
logger.error(f"Error while generating data: {err}")
db_session.rollback()

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.2.8"
MOONCRAWL_VERSION = "0.2.9"