From 36a5be3feda21197df2b1727a64b0d1e519aba3c Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 00:36:32 +0200 Subject: [PATCH 1/5] Add queryAPI v3 and update leaderboard generator. --- crawlers/mooncrawl/mooncrawl/actions.py | 24 ++++++- crawlers/mooncrawl/mooncrawl/api.py | 6 ++ crawlers/mooncrawl/mooncrawl/data.py | 2 + .../mooncrawl/leaderboards_generator/cli.py | 8 +++ .../mooncrawl/leaderboards_generator/utils.py | 8 ++- crawlers/mooncrawl/mooncrawl/settings.py | 5 ++ .../mooncrawl/stats_worker/queries.py | 65 +++++++++++++------ crawlers/mooncrawl/mooncrawl/version.py | 2 +- moonstreamapi/moonstreamapi/data.py | 2 + moonstreamapi/moonstreamapi/routes/queries.py | 45 ++++++++++--- 10 files changed, 133 insertions(+), 34 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 3cf7e262..ba6cf868 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -18,7 +18,11 @@ from moonstream.client import ( # type: ignore ) from .middleware import MoonstreamHTTPException -from .settings import bugout_client as bc +from .settings import ( + bugout_client as bc, + MOONSTREAM_DB_V3_CONTROLLER_API, + MOONSTREAM_ADMIN_ACCESS_TOKEN, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -204,3 +208,21 @@ def get_all_entries_from_search( results.extend(existing_methods.results) # type: ignore return results + + +def get_customer_db_uri( + customer_id: str, + instance_id: str, + user: str, +) -> str: + + try: + response = requests.get( + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", + headers={"Authorization": f"Bearer {MOONSTREAM_ADMIN_ACCESS_TOKEN}"}, + ) + response.raise_for_status() + return response.text.replace('"', "") + except Exception as e: + logger.error(f"Error get customer db uri: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 54b8715d..5754a81d 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -232,6 +232,7 @@ async def queries_data_update_handler( requested_query = request_data.query + blockchain_table = None if request_data.blockchain: if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: logger.error(f"Unknown blockchain {request_data.blockchain}") @@ -254,6 +255,8 @@ async def queries_data_update_handler( ) ) + blockchain_table = get_label_model(blockchain).__tablename__ + # Check if it can transform to TextClause try: query = text(requested_query) @@ -296,6 +299,9 @@ async def queries_data_update_handler( query=query, params=passed_params, params_hash=params_hash, + customer_id=request_data.customer_id, + instance_id=request_data.instance_id, + blockchain_table=blockchain_table, ) except Exception as e: diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 3e76db46..54ae071f 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -50,6 +50,8 @@ class QueryDataUpdate(BaseModel): query: str params: Dict[str, Any] = Field(default_factory=dict) blockchain: Optional[str] = None + customer_id: Optional[str] = None + instance_id: Optional[str] = None class TokenURIs(BaseModel): diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 8cc2def2..07d2701b 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -91,6 +91,13 @@ def handle_leaderboards(args: argparse.Namespace) -> None: params = leaderboard_data["params"] blockchain = leaderboard_data.get("blockchain", None) + query_params = {} + + if leaderboard_data.get("customer_id", False): + query_params["customer_id"] = leaderboard_data["customer_id"] + + if leaderboard_data.get("instance_id", False): + query_params["instance_id"] = str(leaderboard_data["instance_id"]) ### execute query try: @@ -98,6 +105,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None: args.query_api_access_token, query_name, params, + query_params, blockchain, MOONSTREAM_API_URL, args.max_retries, diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index f1e9e87f..f0c5e833 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -22,6 +22,7 @@ def get_results_for_moonstream_query( moonstream_access_token: str, query_name: str, params: Dict[str, Any], + query_params: Dict[str, Any], blockchain: Optional[str] = None, api_url: str = MOONSTREAM_API_URL, max_retries: int = 100, @@ -70,8 +71,13 @@ def get_results_for_moonstream_query( attempts = 0 while not success and attempts < query_api_retries: + response = requests.post( - request_url, json=request_body, headers=headers, timeout=10 + request_url, + json=request_body, + headers=headers, + timeout=10, + params=query_params, ) attempts += 1 response.raise_for_status() diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index f9218296..9783ef93 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -481,3 +481,8 @@ if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "": raise ValueError( "MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set" ) + + +MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( + "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" +) diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index 55052605..586da909 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -5,16 +5,24 @@ import logging import re from collections import OrderedDict from io import StringIO -from typing import Any, Dict +from typing import Any, Dict, Optional from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text from sqlalchemy.sql.expression import TextClause -from ..actions import push_data_to_bucket -from ..db import RO_pre_ping_query_engine +from ..actions import push_data_to_bucket, get_customer_db_uri +from ..db import ( + create_moonstream_engine, + MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + MOONSTREAM_DB_URI_READ_ONLY, +) from ..reporter import reporter -from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX +from ..settings import ( + MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, + CRAWLER_LABEL, + SEER_CRAWLER_LABEL, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -64,13 +72,31 @@ def data_generate( query: TextClause, params: Dict[str, Any], params_hash: str, + customer_id: Optional[str] = None, + instance_id: Optional[str] = None, + blockchain_table: Optional[str] = None, ): """ Generate query and push it to S3 """ + label = CRAWLER_LABEL + db_uri = MOONSTREAM_DB_URI_READ_ONLY + if customer_id is not None and instance_id is not None: + db_uri = get_customer_db_uri(customer_id, instance_id, "customer") + label = SEER_CRAWLER_LABEL - process_session = sessionmaker(bind=RO_pre_ping_query_engine) - db_session = process_session() + if db_uri is None: + logger.error("No DB URI provided") + return + + data_engine = create_moonstream_engine( + url=db_uri, + pool_size=1, + statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + pool_pre_ping=True, + ) + + db_session = sessionmaker(bind=data_engine)() metadata = { "source": "drone-query-generation", @@ -80,35 +106,32 @@ def data_generate( "params": json.dumps(params), } - try: - # TODO:(Andrey) Need optimization that information is usefull but incomplete - block_number, block_timestamp = db_session.execute( - text( - "SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;" - ), - ).one() + block_number = None + block_timestamp = None + try: + ### If blockchain is provided, we need to get the latest block number and timestamp + if blockchain_table is not None: + block_number, block_timestamp = db_session.execute( + text( + f"SELECT block_number, block_timestamp FROM {blockchain_table} WHERE label='{label}' ORDER BY block_number DESC LIMIT 1" + ), + ).one() if file_type == "csv": csv_buffer = StringIO() csv_writer = csv.writer(csv_buffer, delimiter=";") - # engine.execution_options(stream_results=True) query_instance = db_session.execute(query, params) # type: ignore csv_writer.writerow(query_instance.keys()) csv_writer.writerows(query_instance.fetchall()) - metadata["block_number"] = block_number - metadata["block_timestamp"] = block_timestamp + metadata["block_number"] = block_number # type: ignore + metadata["block_timestamp"] = block_timestamp # type: ignore data = csv_buffer.getvalue().encode("utf-8") else: - block_number, block_timestamp = db_session.execute( - text( - "SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;" - ), - ).one() data = json.dumps( { diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index d5b0894c..40646fd2 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.4.13" +MOONCRAWL_VERSION = "0.5.0" diff --git a/moonstreamapi/moonstreamapi/data.py b/moonstreamapi/moonstreamapi/data.py index 9848f1c9..65440c7e 100644 --- a/moonstreamapi/moonstreamapi/data.py +++ b/moonstreamapi/moonstreamapi/data.py @@ -304,6 +304,8 @@ class DashboardUpdate(BaseModel): class UpdateDataRequest(BaseModel): params: Dict[str, Any] = Field(default_factory=dict) blockchain: Optional[str] = None + customer_id: Optional[str] = None + instance_id: Optional[str] = None class UpdateQueryRequest(BaseModel): diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index cce6e9b2..20bd76ff 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -27,6 +27,7 @@ from ..actions import ( get_query_by_name, name_normalization, query_parameter_hash, + check_user_resource_access, ) from ..middleware import MoonstreamHTTPException from ..settings import ( @@ -370,6 +371,8 @@ async def update_query_data_handler( request: Request, query_name: str = Path(..., description="Query name"), request_update: data.UpdateDataRequest = Body(...), + customer_id: Optional[str] = Query(None), + instance_id: Optional[str] = Query(None), ) -> data.QueryPresignUrl: """ Request update data on S3 bucket @@ -386,6 +389,26 @@ async def update_query_data_handler( detail=f"Provided blockchain is not supported.", ) + json_payload = {} + + is_customer_database = customer_id is not None and instance_id is not None + + if is_customer_database: + + results = check_user_resource_access( + customer_id=customer_id, + user_token=token, + ) + + if results is None: + raise MoonstreamHTTPException( + status_code=404, + detail="Not found customer", + ) + + json_payload["customer_id"] = customer_id + json_payload["instance_id"] = instance_id + # normalize query name try: @@ -436,8 +459,9 @@ async def update_query_data_handler( raise MoonstreamHTTPException(status_code=500, internal_error=e) ### check tags - - if "preapprove" in entry.tags or "approved" not in entry.tags: + if ( + "preapprove" in entry.tags or "approved" not in entry.tags + ) and not is_customer_database: raise MoonstreamHTTPException( status_code=403, detail="Query not approved yet." ) @@ -456,17 +480,18 @@ async def update_query_data_handler( if "ext:csv" in tags: file_type = "csv" + + json_payload["query"] = content + json_payload["params"] = request_update.params + json_payload["file_type"] = file_type + json_payload["blockchain"] = ( + request_update.blockchain if request_update.blockchain else None + ) + try: responce = requests.post( f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", - json={ - "query": content, - "params": request_update.params, - "file_type": file_type, - "blockchain": ( - request_update.blockchain if request_update.blockchain else None - ), - }, + json=json_payload, timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS, ) except Exception as e: From bb1206829e527305cbd7ac0fd860a09efa3cd22a Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 16:09:07 +0200 Subject: [PATCH 2/5] Improvments. --- crawlers/mooncrawl/mooncrawl/db.py | 1 + crawlers/mooncrawl/mooncrawl/settings.py | 4 ++++ .../mooncrawl/stats_worker/queries.py | 23 ++++++++----------- moonstreamapi/moonstreamapi/routes/queries.py | 4 +--- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/db.py b/crawlers/mooncrawl/mooncrawl/db.py index 8eba88f6..cb22baf7 100644 --- a/crawlers/mooncrawl/mooncrawl/db.py +++ b/crawlers/mooncrawl/mooncrawl/db.py @@ -7,6 +7,7 @@ from moonstreamdb.db import ( MOONSTREAM_POOL_SIZE, create_moonstream_engine, ) +from moonstreamdbv3.db import MoonstreamCustomDBEngine from sqlalchemy.orm import Session, sessionmaker from .settings import ( diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 9783ef93..20cf9253 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -486,3 +486,7 @@ if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "": MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" ) + +MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( + "MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain" +) diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index 586da909..c3e23d2b 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -13,15 +13,16 @@ from sqlalchemy.sql.expression import TextClause from ..actions import push_data_to_bucket, get_customer_db_uri from ..db import ( - create_moonstream_engine, - MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + RO_pre_ping_query_engine, MOONSTREAM_DB_URI_READ_ONLY, + MoonstreamCustomDBEngine, ) from ..reporter import reporter from ..settings import ( MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, CRAWLER_LABEL, SEER_CRAWLER_LABEL, + MOONSTREAM_DB_V3_SCHEMA_NAME, ) logging.basicConfig(level=logging.INFO) @@ -85,18 +86,14 @@ def data_generate( db_uri = get_customer_db_uri(customer_id, instance_id, "customer") label = SEER_CRAWLER_LABEL - if db_uri is None: - logger.error("No DB URI provided") - return + engine = MoonstreamCustomDBEngine( + url=db_uri, schema=MOONSTREAM_DB_V3_SCHEMA_NAME + ) + else: + engine = RO_pre_ping_query_engine - data_engine = create_moonstream_engine( - url=db_uri, - pool_size=1, - statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, - pool_pre_ping=True, - ) - - db_session = sessionmaker(bind=data_engine)() + process_session = sessionmaker(bind=engine) + db_session = process_session() metadata = { "source": "drone-query-generation", diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index 20bd76ff..c1388beb 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -484,9 +484,7 @@ async def update_query_data_handler( json_payload["query"] = content json_payload["params"] = request_update.params json_payload["file_type"] = file_type - json_payload["blockchain"] = ( - request_update.blockchain if request_update.blockchain else None - ) + json_payload["blockchain"] = request_update.blockchain try: responce = requests.post( From 4ed54cc9b5797fecde2f0927420cbaff42bd7e73 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 16:12:24 +0200 Subject: [PATCH 3/5] Delete unused env. --- crawlers/mooncrawl/mooncrawl/stats_worker/queries.py | 1 - 1 file changed, 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index c3e23d2b..f2178d06 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -19,7 +19,6 @@ from ..db import ( ) from ..reporter import reporter from ..settings import ( - MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, CRAWLER_LABEL, SEER_CRAWLER_LABEL, MOONSTREAM_DB_V3_SCHEMA_NAME, From 6fbdc168a8aaa2c0820b13f92fe14cb708222a56 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 16:41:22 +0200 Subject: [PATCH 4/5] Add default labels table. --- crawlers/mooncrawl/mooncrawl/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 5754a81d..615d2805 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -232,7 +232,7 @@ async def queries_data_update_handler( requested_query = request_data.query - blockchain_table = None + blockchain_table = "polygon_labels" if request_data.blockchain: if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: logger.error(f"Unknown blockchain {request_data.blockchain}") From be908976d999d94ca0e729122bd3a5f867bf13c5 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 17:02:52 +0200 Subject: [PATCH 5/5] Update sample env. --- crawlers/mooncrawl/sample.env | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 5215576b..1887e9b2 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -71,4 +71,7 @@ export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID="