Upgrade query proccessing.

pull/1163/head
Andrey 2025-01-13 22:31:42 +02:00
rodzic 8e0ce97989
commit ba95f3c88d
6 zmienionych plików z 135 dodań i 93 usunięć

Wyświetl plik

@ -116,6 +116,7 @@ def recive_S3_data_from_query(
client: Moonstream,
token: Union[str, uuid.UUID],
query_name: str,
query_params: Dict[str, Any] = {},
params: Dict[str, Any] = {},
time_await: int = 2,
max_retries: int = 30,
@ -136,15 +137,18 @@ def recive_S3_data_from_query(
if custom_body:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
json = custom_body
response = requests.post(
url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data",
headers=headers,
params=query_params,
json=json,
timeout=5,
)
data_url = MoonstreamQueryResultUrl(url=response.json()["url"])
else:
data_url = client.exec_query(

Wyświetl plik

@ -13,12 +13,7 @@ import boto3 # type: ignore
from bugout.data import BugoutJournalEntity, BugoutResource
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)
from moonstreamtypes.blockchain import AvailableBlockchainType, get_block_model, get_label_model, get_transaction_model
from sqlalchemy import text
from . import data
@ -232,6 +227,11 @@ async def queries_data_update_handler(
requested_query = request_data.query
version = 2
if request_data.customer_id and request_data.instance_id:
version = 3
blockchain_table = "polygon_labels"
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
@ -240,22 +240,23 @@ async def queries_data_update_handler(
blockchain = AvailableBlockchainType(request_data.blockchain)
requested_query = (
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
.replace(
blockchain_table = get_label_model(blockchain, version).__tablename__
requested_query = requested_query.replace(
"__labels_table__",
get_label_model(blockchain).__tablename__,
)
blockchain_table
)
if version == 2:
(
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
blockchain_table = get_label_model(blockchain).__tablename__
)
# Check if it can transform to TextClause
try:

Wyświetl plik

@ -8,11 +8,13 @@ from sqlalchemy.orm import Session
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import HTTPError
from bugout.exceptions import BugoutResponseException
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.blockchain import AvailableBlockchainType as AvailableBlockchainTypeV2
from ..actions import get_all_entries_from_search, request_connection_string
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, METADATA_TASKS_JOURNAL_ID
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_METADATA_TASKS_JOURNAL, MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN
from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx, create_moonstream_engine, sessionmaker
from ..data import TokenURIs
from .db import (
@ -77,6 +79,8 @@ def crawl_uri(metadata_uri: str) -> Any:
except HTTPError as error:
logger.error(f"request end with error statuscode: {error.code}")
retry += 1
if error.code == 404:
return None
continue
except Exception as err:
logger.error(err)
@ -233,12 +237,17 @@ def process_address_metadata(
v3=True
)
db_session.commit()
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
version=3
)
db_session.commit()
@ -248,7 +257,6 @@ def parse_metadata(
batch_size: int,
max_recrawl: int,
threads: int,
spire: bool = False,
custom_db_uri: Optional[str] = None,
):
"""
@ -257,62 +265,66 @@ def parse_metadata(
logger.info("Starting metadata crawler")
logger.info(f"Processing blockchain {blockchain_type.value}")
# Get tokens to crawl v2 flow
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
tokens_uri_by_address = get_tokens_to_crawl(
db_session_read_only,
blockchain_type,
{},
)
# Process each address
for address, tokens in tokens_uri_by_address.items():
process_address_metadata_with_leak(
address=address,
blockchain_type=blockchain_type,
batch_size=batch_size,
max_recrawl=max_recrawl,
threads=threads,
tokens=tokens,
)
spire_jobs = []
if spire == True:
# Get all jobs for this blockchain from Spire
search_query = f"#metadata-job #{blockchain_type.value}"
# Check if blockchain exists in v2 package
if blockchain_type.value in [chain.value for chain in AvailableBlockchainTypeV2]:
try:
entries = get_all_entries_from_search(
journal_id=METADATA_TASKS_JOURNAL_ID,
search_query=search_query,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
limit=1000,
)
logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}")
for entry in entries:
try:
if not entry.content:
continue
job = json.loads(entry.content)
if job.get("blockchain") != blockchain_type.value:
logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}")
continue
spire_jobs.append(job)
except Exception as err:
id = entry.entry_url.split("/")[-1]
logger.error(f"Error parsing job from entry {id}: {err}")
continue
logger.info(f"Processing v2 blockchain: {blockchain_type.value}")
# Get tokens to crawl v2 flow
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
tokens_uri_by_address = get_tokens_to_crawl(
db_session_read_only,
blockchain_type,
{},
)
# Process each address
for address, tokens in tokens_uri_by_address.items():
process_address_metadata_with_leak(
address=address,
blockchain_type=blockchain_type,
batch_size=batch_size,
max_recrawl=max_recrawl,
threads=threads,
tokens=tokens,
)
except Exception as err:
logger.error(f"Error fetching jobs from journal: {err}")
return
logger.error(f"V2 flow failed: {err}, continuing with Spire flow")
# Continue with Spire flow regardless of v2 result
spire_jobs = []
# Get all jobs for this blockchain from Spire
search_query = f"#metadata-job #{blockchain_type.value}"
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_METADATA_TASKS_JOURNAL,
search_query=search_query,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
limit=1000,
)
logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}")
for entry in entries:
try:
if not entry.content:
continue
job = json.loads(entry.content)
if job.get("blockchain") != blockchain_type.value:
logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}")
continue
spire_jobs.append(job)
except Exception as err:
id = entry.entry_url.split("/")[-1]
logger.error(f"Error parsing job from entry {id}: {err}")
continue
except BugoutResponseException as err:
logger.error(f"Bugout error fetching jobs from journal: {err.detail}")
except Exception as err:
logger.error(f"Error fetching jobs from journal: {err}")
return
# Process each job
@ -369,7 +381,7 @@ def parse_metadata(
except Exception as err:
logger.error(f"Error processing jobs: {err}")
raise err
finally:
for session in sessions_by_customer.values():
try:
@ -388,7 +400,6 @@ def handle_crawl(args: argparse.Namespace) -> None:
args.commit_batch_size,
args.max_recrawl,
args.threads,
args.spire,
args.custom_db_uri,
)
@ -431,12 +442,6 @@ def main() -> None:
default=4,
help="Amount of threads for crawling",
)
metadata_crawler_parser.add_argument(
"--spire",
type=bool,
default=False,
help="If true, use spire jobs to crawl metadata",
)
metadata_crawler_parser.add_argument(
"--custom-db-uri",
type=str,

Wyświetl plik

@ -7,7 +7,6 @@ from sqlalchemy.dialects.postgresql import insert
from datetime import datetime
##from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy.orm import Session
from sqlalchemy.sql import text
@ -19,6 +18,7 @@ from ..settings import (
METADATA_CRAWLER_LABEL,
VIEW_STATE_CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN,
bugout_client as bc,
moonstream_client as mc,
)
@ -271,14 +271,14 @@ def get_tokens_id_wich_may_updated(
def clean_labels_from_db(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2
):
"""
Remove existing labels.
But keep the latest one for each token.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=version)
table = label_model.__tablename__
@ -315,19 +315,34 @@ def clean_labels_from_db(
def get_tokens_from_query_api(
client: Moonstream,
blockchain_type: AvailableBlockchainType,
query_name: str,
params: dict,
token: str,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
) -> List[TokenURIs]:
"""
Get token URIs from Query API results
"""
query_params = {}
if customer_id and instance_id:
query_params["customer_id"] = customer_id
query_params["instance_id"] = instance_id
try:
data = recive_S3_data_from_query(
client=client,
token=token,
query_name=query_name,
params=params,
params={},
query_params=query_params,
custom_body={
"blockchain": blockchain_type.value,
"params": params,
}
)
# Convert query results to TokenURIs format
@ -367,9 +382,12 @@ def get_tokens_to_crawl(
tokens = get_tokens_from_query_api(
client=client,
blockchain_type=blockchain_type,
query_name=query_config["name"],
params=query_config["params"],
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
token=MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN,
customer_id=spire_job["customer_id"],
instance_id=spire_job["instance_id"],
)
# Group by address
@ -401,7 +419,6 @@ def upsert_metadata_labels(
try:
version = 3 if v3 else 2
label_model = get_label_model(blockchain_type, version=version)
# Prepare batch of labels
labels_data = []
@ -449,7 +466,11 @@ def upsert_metadata_labels(
result_stmt = insert_stmt.on_conflict_do_nothing(
)
db_session.execute(result_stmt)
result = db_session.execute(result_stmt)
engine = db_session.get_bind()
logger.info(f"Database URL: {engine.engine.url}")
db_session.commit()
except Exception as err:
logger.error(f"Error batch upserting metadata labels: {err}")

Wyświetl plik

@ -512,6 +512,17 @@ MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get(
)
METADATA_TASKS_JOURNAL_ID = os.environ.get(
"METADATA_TASKS_JOURNAL_ID", ""
MOONSTREAM_METADATA_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_METADATA_TASKS_JOURNAL", ""
)
### MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN
MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN = os.environ.get(
"MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN", ""
)
if MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN == "":
raise ValueError(
"MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN environment variable must be set"
)

Wyświetl plik

@ -16,7 +16,7 @@ from bugout.data import (
)
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Path, Query, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from sqlalchemy import text
from .. import data