Add queryAPI v3 and update leaderboard generator.

pull/1143/head
Andrey 2024-11-27 00:36:32 +02:00
rodzic 87e73c97b7
commit 36a5be3fed
10 zmienionych plików z 133 dodań i 34 usunięć

Wyświetl plik

@ -18,7 +18,11 @@ from moonstream.client import ( # type: ignore
) )
from .middleware import MoonstreamHTTPException from .middleware import MoonstreamHTTPException
from .settings import bugout_client as bc from .settings import (
bugout_client as bc,
MOONSTREAM_DB_V3_CONTROLLER_API,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -204,3 +208,21 @@ def get_all_entries_from_search(
results.extend(existing_methods.results) # type: ignore results.extend(existing_methods.results) # type: ignore
return results return results
def get_customer_db_uri(
customer_id: str,
instance_id: str,
user: str,
) -> str:
try:
response = requests.get(
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url",
headers={"Authorization": f"Bearer {MOONSTREAM_ADMIN_ACCESS_TOKEN}"},
)
response.raise_for_status()
return response.text.replace('"', "")
except Exception as e:
logger.error(f"Error get customer db uri: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)

Wyświetl plik

@ -232,6 +232,7 @@ async def queries_data_update_handler(
requested_query = request_data.query requested_query = request_data.query
blockchain_table = None
if request_data.blockchain: if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}") logger.error(f"Unknown blockchain {request_data.blockchain}")
@ -254,6 +255,8 @@ async def queries_data_update_handler(
) )
) )
blockchain_table = get_label_model(blockchain).__tablename__
# Check if it can transform to TextClause # Check if it can transform to TextClause
try: try:
query = text(requested_query) query = text(requested_query)
@ -296,6 +299,9 @@ async def queries_data_update_handler(
query=query, query=query,
params=passed_params, params=passed_params,
params_hash=params_hash, params_hash=params_hash,
customer_id=request_data.customer_id,
instance_id=request_data.instance_id,
blockchain_table=blockchain_table,
) )
except Exception as e: except Exception as e:

Wyświetl plik

@ -50,6 +50,8 @@ class QueryDataUpdate(BaseModel):
query: str query: str
params: Dict[str, Any] = Field(default_factory=dict) params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None
class TokenURIs(BaseModel): class TokenURIs(BaseModel):

Wyświetl plik

@ -91,6 +91,13 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
params = leaderboard_data["params"] params = leaderboard_data["params"]
blockchain = leaderboard_data.get("blockchain", None) blockchain = leaderboard_data.get("blockchain", None)
query_params = {}
if leaderboard_data.get("customer_id", False):
query_params["customer_id"] = leaderboard_data["customer_id"]
if leaderboard_data.get("instance_id", False):
query_params["instance_id"] = str(leaderboard_data["instance_id"])
### execute query ### execute query
try: try:
@ -98,6 +105,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
args.query_api_access_token, args.query_api_access_token,
query_name, query_name,
params, params,
query_params,
blockchain, blockchain,
MOONSTREAM_API_URL, MOONSTREAM_API_URL,
args.max_retries, args.max_retries,

Wyświetl plik

@ -22,6 +22,7 @@ def get_results_for_moonstream_query(
moonstream_access_token: str, moonstream_access_token: str,
query_name: str, query_name: str,
params: Dict[str, Any], params: Dict[str, Any],
query_params: Dict[str, Any],
blockchain: Optional[str] = None, blockchain: Optional[str] = None,
api_url: str = MOONSTREAM_API_URL, api_url: str = MOONSTREAM_API_URL,
max_retries: int = 100, max_retries: int = 100,
@ -70,8 +71,13 @@ def get_results_for_moonstream_query(
attempts = 0 attempts = 0
while not success and attempts < query_api_retries: while not success and attempts < query_api_retries:
response = requests.post( response = requests.post(
request_url, json=request_body, headers=headers, timeout=10 request_url,
json=request_body,
headers=headers,
timeout=10,
params=query_params,
) )
attempts += 1 attempts += 1
response.raise_for_status() response.raise_for_status()

Wyświetl plik

@ -481,3 +481,8 @@ if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
raise ValueError( raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set" "MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
) )
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)

Wyświetl plik

@ -5,16 +5,24 @@ import logging
import re import re
from collections import OrderedDict from collections import OrderedDict
from io import StringIO from io import StringIO
from typing import Any, Dict from typing import Any, Dict, Optional
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextClause from sqlalchemy.sql.expression import TextClause
from ..actions import push_data_to_bucket from ..actions import push_data_to_bucket, get_customer_db_uri
from ..db import RO_pre_ping_query_engine from ..db import (
create_moonstream_engine,
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
MOONSTREAM_DB_URI_READ_ONLY,
)
from ..reporter import reporter from ..reporter import reporter
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX from ..settings import (
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
CRAWLER_LABEL,
SEER_CRAWLER_LABEL,
)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -64,13 +72,31 @@ def data_generate(
query: TextClause, query: TextClause,
params: Dict[str, Any], params: Dict[str, Any],
params_hash: str, params_hash: str,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
blockchain_table: Optional[str] = None,
): ):
""" """
Generate query and push it to S3 Generate query and push it to S3
""" """
label = CRAWLER_LABEL
db_uri = MOONSTREAM_DB_URI_READ_ONLY
if customer_id is not None and instance_id is not None:
db_uri = get_customer_db_uri(customer_id, instance_id, "customer")
label = SEER_CRAWLER_LABEL
process_session = sessionmaker(bind=RO_pre_ping_query_engine) if db_uri is None:
db_session = process_session() logger.error("No DB URI provided")
return
data_engine = create_moonstream_engine(
url=db_uri,
pool_size=1,
statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
db_session = sessionmaker(bind=data_engine)()
metadata = { metadata = {
"source": "drone-query-generation", "source": "drone-query-generation",
@ -80,35 +106,32 @@ def data_generate(
"params": json.dumps(params), "params": json.dumps(params),
} }
try: block_number = None
# TODO:(Andrey) Need optimization that information is usefull but incomplete block_timestamp = None
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
try:
### If blockchain is provided, we need to get the latest block number and timestamp
if blockchain_table is not None:
block_number, block_timestamp = db_session.execute(
text(
f"SELECT block_number, block_timestamp FROM {blockchain_table} WHERE label='{label}' ORDER BY block_number DESC LIMIT 1"
),
).one()
if file_type == "csv": if file_type == "csv":
csv_buffer = StringIO() csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";") csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
query_instance = db_session.execute(query, params) # type: ignore query_instance = db_session.execute(query, params) # type: ignore
csv_writer.writerow(query_instance.keys()) csv_writer.writerow(query_instance.keys())
csv_writer.writerows(query_instance.fetchall()) csv_writer.writerows(query_instance.fetchall())
metadata["block_number"] = block_number metadata["block_number"] = block_number # type: ignore
metadata["block_timestamp"] = block_timestamp metadata["block_timestamp"] = block_timestamp # type: ignore
data = csv_buffer.getvalue().encode("utf-8") data = csv_buffer.getvalue().encode("utf-8")
else: else:
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
data = json.dumps( data = json.dumps(
{ {

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version. Moonstream crawlers version.
""" """
MOONCRAWL_VERSION = "0.4.13" MOONCRAWL_VERSION = "0.5.0"

Wyświetl plik

@ -304,6 +304,8 @@ class DashboardUpdate(BaseModel):
class UpdateDataRequest(BaseModel): class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict) params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None
class UpdateQueryRequest(BaseModel): class UpdateQueryRequest(BaseModel):

Wyświetl plik

@ -27,6 +27,7 @@ from ..actions import (
get_query_by_name, get_query_by_name,
name_normalization, name_normalization,
query_parameter_hash, query_parameter_hash,
check_user_resource_access,
) )
from ..middleware import MoonstreamHTTPException from ..middleware import MoonstreamHTTPException
from ..settings import ( from ..settings import (
@ -370,6 +371,8 @@ async def update_query_data_handler(
request: Request, request: Request,
query_name: str = Path(..., description="Query name"), query_name: str = Path(..., description="Query name"),
request_update: data.UpdateDataRequest = Body(...), request_update: data.UpdateDataRequest = Body(...),
customer_id: Optional[str] = Query(None),
instance_id: Optional[str] = Query(None),
) -> data.QueryPresignUrl: ) -> data.QueryPresignUrl:
""" """
Request update data on S3 bucket Request update data on S3 bucket
@ -386,6 +389,26 @@ async def update_query_data_handler(
detail=f"Provided blockchain is not supported.", detail=f"Provided blockchain is not supported.",
) )
json_payload = {}
is_customer_database = customer_id is not None and instance_id is not None
if is_customer_database:
results = check_user_resource_access(
customer_id=customer_id,
user_token=token,
)
if results is None:
raise MoonstreamHTTPException(
status_code=404,
detail="Not found customer",
)
json_payload["customer_id"] = customer_id
json_payload["instance_id"] = instance_id
# normalize query name # normalize query name
try: try:
@ -436,8 +459,9 @@ async def update_query_data_handler(
raise MoonstreamHTTPException(status_code=500, internal_error=e) raise MoonstreamHTTPException(status_code=500, internal_error=e)
### check tags ### check tags
if (
if "preapprove" in entry.tags or "approved" not in entry.tags: "preapprove" in entry.tags or "approved" not in entry.tags
) and not is_customer_database:
raise MoonstreamHTTPException( raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet." status_code=403, detail="Query not approved yet."
) )
@ -456,17 +480,18 @@ async def update_query_data_handler(
if "ext:csv" in tags: if "ext:csv" in tags:
file_type = "csv" file_type = "csv"
json_payload["query"] = content
json_payload["params"] = request_update.params
json_payload["file_type"] = file_type
json_payload["blockchain"] = (
request_update.blockchain if request_update.blockchain else None
)
try: try:
responce = requests.post( responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={ json=json_payload,
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": (
request_update.blockchain if request_update.blockchain else None
),
},
timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS, timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS,
) )
except Exception as e: except Exception as e: