Merge branch 'main' into v3-state-crawler

pull/1142/head
Andrey 2024-11-27 17:41:47 +02:00
commit 6ccb0594d1
11 zmienionych plików z 134 dodań i 33 usunięć

Wyświetl plik

@ -18,7 +18,11 @@ from moonstream.client import ( # type: ignore
)
from .middleware import MoonstreamHTTPException
from .settings import bugout_client as bc
from .settings import (
bugout_client as bc,
MOONSTREAM_DB_V3_CONTROLLER_API,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -204,3 +208,21 @@ def get_all_entries_from_search(
results.extend(existing_methods.results) # type: ignore
return results
def get_customer_db_uri(
customer_id: str,
instance_id: str,
user: str,
) -> str:
try:
response = requests.get(
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url",
headers={"Authorization": f"Bearer {MOONSTREAM_ADMIN_ACCESS_TOKEN}"},
)
response.raise_for_status()
return response.text.replace('"', "")
except Exception as e:
logger.error(f"Error get customer db uri: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)

Wyświetl plik

@ -232,6 +232,7 @@ async def queries_data_update_handler(
requested_query = request_data.query
blockchain_table = "polygon_labels"
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}")
@ -254,6 +255,8 @@ async def queries_data_update_handler(
)
)
blockchain_table = get_label_model(blockchain).__tablename__
# Check if it can transform to TextClause
try:
query = text(requested_query)
@ -296,6 +299,9 @@ async def queries_data_update_handler(
query=query,
params=passed_params,
params_hash=params_hash,
customer_id=request_data.customer_id,
instance_id=request_data.instance_id,
blockchain_table=blockchain_table,
)
except Exception as e:

Wyświetl plik

@ -50,6 +50,8 @@ class QueryDataUpdate(BaseModel):
query: str
params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None
class TokenURIs(BaseModel):

Wyświetl plik

@ -7,6 +7,7 @@ from moonstreamdb.db import (
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
from moonstreamdbv3.db import MoonstreamCustomDBEngine
from sqlalchemy.orm import Session, sessionmaker
from .settings import (

Wyświetl plik

@ -91,6 +91,13 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
params = leaderboard_data["params"]
blockchain = leaderboard_data.get("blockchain", None)
query_params = {}
if leaderboard_data.get("customer_id", False):
query_params["customer_id"] = leaderboard_data["customer_id"]
if leaderboard_data.get("instance_id", False):
query_params["instance_id"] = str(leaderboard_data["instance_id"])
### execute query
try:
@ -98,6 +105,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
args.query_api_access_token,
query_name,
params,
query_params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,

Wyświetl plik

@ -22,6 +22,7 @@ def get_results_for_moonstream_query(
moonstream_access_token: str,
query_name: str,
params: Dict[str, Any],
query_params: Dict[str, Any],
blockchain: Optional[str] = None,
api_url: str = MOONSTREAM_API_URL,
max_retries: int = 100,
@ -70,8 +71,13 @@ def get_results_for_moonstream_query(
attempts = 0
while not success and attempts < query_api_retries:
response = requests.post(
request_url, json=request_body, headers=headers, timeout=10
request_url,
json=request_body,
headers=headers,
timeout=10,
params=query_params,
)
attempts += 1
response.raise_for_status()

Wyświetl plik

@ -481,3 +481,12 @@ if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
)
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)
MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get(
"MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain"
)

Wyświetl plik

@ -5,16 +5,24 @@ import logging
import re
from collections import OrderedDict
from io import StringIO
from typing import Any, Dict
from typing import Any, Dict, Optional
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextClause
from ..actions import push_data_to_bucket
from ..db import RO_pre_ping_query_engine
from ..actions import push_data_to_bucket, get_customer_db_uri
from ..db import (
RO_pre_ping_query_engine,
MOONSTREAM_DB_URI_READ_ONLY,
MoonstreamCustomDBEngine,
)
from ..reporter import reporter
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX
from ..settings import (
CRAWLER_LABEL,
SEER_CRAWLER_LABEL,
MOONSTREAM_DB_V3_SCHEMA_NAME,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -64,12 +72,26 @@ def data_generate(
query: TextClause,
params: Dict[str, Any],
params_hash: str,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
blockchain_table: Optional[str] = None,
):
"""
Generate query and push it to S3
"""
label = CRAWLER_LABEL
db_uri = MOONSTREAM_DB_URI_READ_ONLY
if customer_id is not None and instance_id is not None:
db_uri = get_customer_db_uri(customer_id, instance_id, "customer")
label = SEER_CRAWLER_LABEL
process_session = sessionmaker(bind=RO_pre_ping_query_engine)
engine = MoonstreamCustomDBEngine(
url=db_uri, schema=MOONSTREAM_DB_V3_SCHEMA_NAME
)
else:
engine = RO_pre_ping_query_engine
process_session = sessionmaker(bind=engine)
db_session = process_session()
metadata = {
@ -80,35 +102,32 @@ def data_generate(
"params": json.dumps(params),
}
try:
# TODO:(Andrey) Need optimization that information is usefull but incomplete
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
block_number = None
block_timestamp = None
try:
### If blockchain is provided, we need to get the latest block number and timestamp
if blockchain_table is not None:
block_number, block_timestamp = db_session.execute(
text(
f"SELECT block_number, block_timestamp FROM {blockchain_table} WHERE label='{label}' ORDER BY block_number DESC LIMIT 1"
),
).one()
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
query_instance = db_session.execute(query, params) # type: ignore
csv_writer.writerow(query_instance.keys())
csv_writer.writerows(query_instance.fetchall())
metadata["block_number"] = block_number
metadata["block_timestamp"] = block_timestamp
metadata["block_number"] = block_number # type: ignore
metadata["block_timestamp"] = block_timestamp # type: ignore
data = csv_buffer.getvalue().encode("utf-8")
else:
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
data = json.dumps(
{

Wyświetl plik

@ -71,4 +71,7 @@ export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID="<Bugout_journal_id_for_leade
# DB v3 controller
export MOONSTREAM_DB_V3_CONTROLLER_API="https://mdb-v3-api.moonstream.to"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
# Moonstream DB v3
export MOONSTREAM_DB_V3_SCHEMA_NAME="blockchain"

Wyświetl plik

@ -304,6 +304,8 @@ class DashboardUpdate(BaseModel):
class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None
class UpdateQueryRequest(BaseModel):

Wyświetl plik

@ -27,6 +27,7 @@ from ..actions import (
get_query_by_name,
name_normalization,
query_parameter_hash,
check_user_resource_access,
)
from ..middleware import MoonstreamHTTPException
from ..settings import (
@ -370,6 +371,8 @@ async def update_query_data_handler(
request: Request,
query_name: str = Path(..., description="Query name"),
request_update: data.UpdateDataRequest = Body(...),
customer_id: Optional[str] = Query(None),
instance_id: Optional[str] = Query(None),
) -> data.QueryPresignUrl:
"""
Request update data on S3 bucket
@ -386,6 +389,26 @@ async def update_query_data_handler(
detail=f"Provided blockchain is not supported.",
)
json_payload = {}
is_customer_database = customer_id is not None and instance_id is not None
if is_customer_database:
results = check_user_resource_access(
customer_id=customer_id,
user_token=token,
)
if results is None:
raise MoonstreamHTTPException(
status_code=404,
detail="Not found customer",
)
json_payload["customer_id"] = customer_id
json_payload["instance_id"] = instance_id
# normalize query name
try:
@ -436,8 +459,9 @@ async def update_query_data_handler(
raise MoonstreamHTTPException(status_code=500, internal_error=e)
### check tags
if "preapprove" in entry.tags or "approved" not in entry.tags:
if (
"preapprove" in entry.tags or "approved" not in entry.tags
) and not is_customer_database:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
@ -456,17 +480,16 @@ async def update_query_data_handler(
if "ext:csv" in tags:
file_type = "csv"
json_payload["query"] = content
json_payload["params"] = request_update.params
json_payload["file_type"] = file_type
json_payload["blockchain"] = request_update.blockchain
try:
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": (
request_update.blockchain if request_update.blockchain else None
),
},
json=json_payload,
timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e: