Merge pull request #1163 from moonstream-to/metadata-crawler-update

Metadata crawler update
pull/1164/head
Andrey Dolgolev 2025-01-15 12:21:00 +02:00 zatwierdzone przez GitHub
commit d8c20bd503
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
17 zmienionych plików z 902 dodań i 191 usunięć

Wyświetl plik

@ -63,6 +63,15 @@ XAI_SEPOLIA_STATE_CLEAN_TIMER_FILE="xai-sepolia-state-clean.timer"
XAI_SEPOLIA_METADATA_SERVICE_FILE="xai-sepolia-metadata.service"
XAI_SEPOLIA_METADATA_TIMER_FILE="xai-sepolia-metadata.timer"
# Game7
GAME7_METADATA_SERVICE_FILE="game7-metadata.service"
GAME7_METADATA_TIMER_FILE="game7-metadata.timer"
# Game7 testnet
GAME7_TESTNET_METADATA_SERVICE_FILE="game7-testnet-metadata.service"
GAME7_TESTNET_METADATA_TIMER_FILE="game7-testnet-metadata.timer"
set -eu
echo
@ -229,4 +238,22 @@ chmod 644 "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${
cp "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XAI_SEPOLIA_METADATA_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${XAI_SEPOLIA_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XAI_SEPOLIA_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XAI_SEPOLIA_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XAI_SEPOLIA_METADATA_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Game7 metadata service and timer with: ${GAME7_METADATA_SERVICE_FILE}, ${GAME7_METADATA_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}"
cp "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_METADATA_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Game7 testnet metadata service and timer with: ${GAME7_TESTNET_METADATA_SERVICE_FILE}, ${GAME7_TESTNET_METADATA_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}"
cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_TESTNET_METADATA_TIMER_FILE}"

Wyświetl plik

@ -217,6 +217,14 @@ MANTLE_SEPOLIA_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="mantle-sepolia-historical-cra
MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="mantle-sepolia-historical-crawl-transactions.service"
MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="mantle-sepolia-historical-crawl-transactions.timer"
# Game7
GAME7_METADATA_SERVICE_FILE="game7-metadata.service"
GAME7_METADATA_TIMER_FILE="game7-metadata.timer"
# Game7 testnet
GAME7_TESTNET_METADATA_SERVICE_FILE="game7-testnet-metadata.service"
GAME7_TESTNET_METADATA_TIMER_FILE="game7-testnet-metadata.timer"
set -eu
echo
@ -1109,3 +1117,24 @@ cp "${SCRIPT_DIR}/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${MANTLE_SEPOLIA_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
# Game7
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Game7 metadata service and timer with: ${GAME7_METADATA_SERVICE_FILE}, ${GAME7_METADATA_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}"
cp "${SCRIPT_DIR}/${GAME7_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${GAME7_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_METADATA_TIMER_FILE}"
# Game7 testnet
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Game7 testnet metadata service and timer with: ${GAME7_TESTNET_METADATA_SERVICE_FILE}, ${GAME7_TESTNET_METADATA_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}"
cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${GAME7_TESTNET_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${GAME7_TESTNET_METADATA_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${GAME7_TESTNET_METADATA_TIMER_FILE}"

Wyświetl plik

@ -0,0 +1,11 @@
[Unit]
Description=Execute metadata crawler
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.metadata_crawler.cli crawl --blockchain game7
CPUWeight=60
SyslogIdentifier=game7-metadata

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Execute Game7 metadata crawler each 10m
[Timer]
OnBootSec=20s
OnUnitActiveSec=60m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,11 @@
[Unit]
Description=Execute metadata crawler
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.metadata_crawler.cli crawl --blockchain game7_testnet
CPUWeight=60
SyslogIdentifier=game7-testnet-metadata

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Execute Game7 testnet metadata crawler each 10m
[Timer]
OnBootSec=20s
OnUnitActiveSec=60m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -116,6 +116,7 @@ def recive_S3_data_from_query(
client: Moonstream,
token: Union[str, uuid.UUID],
query_name: str,
query_params: Dict[str, Any] = {},
params: Dict[str, Any] = {},
time_await: int = 2,
max_retries: int = 30,
@ -136,15 +137,18 @@ def recive_S3_data_from_query(
if custom_body:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
json = custom_body
response = requests.post(
url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data",
headers=headers,
params=query_params,
json=json,
timeout=5,
)
data_url = MoonstreamQueryResultUrl(url=response.json()["url"])
else:
data_url = client.exec_query(
@ -226,3 +230,27 @@ def get_customer_db_uri(
except Exception as e:
logger.error(f"Error get customer db uri: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
## DB V3
def request_connection_string(
customer_id: str,
instance_id: int,
token: str,
user: str = "seer", # token with write access
) -> str:
"""
Request connection string from the Moonstream DB V3 Controller API.
Default user is seer with write access
"""
response = requests.get(
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url",
headers={"Authorization": f"Bearer {token}"},
)
response.raise_for_status()
return response.text.replace('"', "")

Wyświetl plik

@ -13,12 +13,7 @@ import boto3 # type: ignore
from bugout.data import BugoutJournalEntity, BugoutResource
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)
from moonstreamtypes.blockchain import AvailableBlockchainType, get_block_model, get_label_model, get_transaction_model
from sqlalchemy import text
from . import data
@ -232,6 +227,11 @@ async def queries_data_update_handler(
requested_query = request_data.query
version = 2
if request_data.customer_id and request_data.instance_id:
version = 3
blockchain_table = "polygon_labels"
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
@ -240,22 +240,23 @@ async def queries_data_update_handler(
blockchain = AvailableBlockchainType(request_data.blockchain)
requested_query = (
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
.replace(
blockchain_table = get_label_model(blockchain, version).__tablename__
requested_query = requested_query.replace(
"__labels_table__",
get_label_model(blockchain).__tablename__,
)
blockchain_table
)
if version == 2:
(
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
blockchain_table = get_label_model(blockchain).__tablename__
)
# Check if it can transform to TextClause
try:

Wyświetl plik

@ -60,6 +60,7 @@ class TokenURIs(BaseModel):
block_number: str
block_timestamp: str
address: str
block_hash: Optional[str] = None # for v3 only
class ViewTasks(BaseModel):

Wyświetl plik

@ -0,0 +1,180 @@
# Metadata Crawler Architecture
## Overview
The metadata crawler is designed to fetch and store metadata for NFTs (Non-Fungible Tokens) from various blockchains. It supports both traditional database TokenURI view methods queries and Spire journal-based job configurations, with the ability to handle both v2 and v3 database structures.
## Core Components
### 1. Update Strategies
#### Leak-Based Strategy (Legacy v2)
- Uses probabilistic approach to determine which tokens to update
- Controlled by `max_recrawl` parameter
- Suitable for large collections with infrequent updates
#### SQL-Based Strategy (v3)
- Uses SQL queries to determine which tokens need updates
- More precise tracking of token updates
- Better suited for active collections
### 2. Database Connections
The crawler supports multiple database connection strategies:
- Default Moonstream database connection
- Custom database URI via `--custom-db-uri`
- Per-customer instance connections (v3)
```json
{
"customer_id": "...",
"instance_id": "...",
"blockchain": "ethereum",
"v3": true
}
```
### 3. Job Configuration
Jobs can be configured in two ways:
- Through Spire journal entries with tags `#metadata-job #{blockchain}`
- Direct database queries (legacy mode) using TokenURI view method
Example Spire journal entry:
```json
{
"type": "metadata-job",
"query_api": {
"name": "new_tokens_to_crawl",
"params": {
"address": "0x...",
"blockchain": "ethereum"
}
},
"contract_address": "0x...",
"blockchain": "ethereum",
"update_existing": false,
"v3": true,
"customer_id": "...", // Optional, for custom database
"instance_id": "..." // Optional, for custom database
}
```
### 2. Data Flow
1. **Token Discovery**
- Query API integration for dynamic token discovery
- Database queries for existing tokens
- Support for multiple addresses per job
2. **Metadata Fetching**
- Parallel processing with ThreadPoolExecutor
- IPFS gateway support
- Automatic retry mechanism
- Rate limiting and batch processing
3. **Storage**
- Supports both v2 and v3 database structures
- Batch upsert operations
- Efficient cleaning of old labels
### 3. Database Structures
v2:
```python
{
"label": METADATA_CRAWLER_LABEL,
"label_data": {
"type": "metadata",
"token_id": "...",
"metadata": {...}
},
"block_number": 1234567890
"block_timestamp": 456
}
```
v3:
```python
{
"label": METADATA_CRAWLER_LABEL,
"label_type": "metadata",
"label_data": {
"token_id": "...",
"metadata": {...}
},
"address": "0x...",
"block_number": 123,
"block_timestamp": 456,
"block_hash": "0x..."
}
```
## Key Features
1. **Flexible Token Selection**
- Query API integration
- Support for multiple addresses
- Configurable update strategies
2. **Efficient Processing**
- Batch processing
- Parallel metadata fetching
- Optimized database operations
3. **Error Handling**
- Retry mechanism for failed requests
- Transaction management
- Detailed logging
4. **Database Management**
- Efficient upsert operations
- Label cleaning
- Version compatibility (v2/v3)
## Usage
### CLI Options
```bash
metadata-crawler crawl \
--blockchain ethereum \
--commit-batch-size 50 \
--max-recrawl 300 \
--threads 4 \
--spire true \
--custom-db-uri "postgresql://..." # Optional
```
### Environment Variables
- `MOONSTREAM_ADMIN_ACCESS_TOKEN`: Required for API access
- `METADATA_CRAWLER_LABEL`: Label for database entries
- `METADATA_TASKS_JOURNAL_ID`: Journal ID for metadata tasks
### Database Modes
1. **Legacy Mode (v2)**
- Uses leak-based update strategy
- Single database connection
- Simple metadata structure
2. **Modern Mode (v3)**
- SQL-based update tracking
- Support for multiple database instances
- Enhanced metadata structure
- Per-customer database isolation
## Best Practices
1. **Job Configuration**
- Use descriptive job names
- Group related addresses
- Set appropriate update intervals
2. **Performance Optimization**
- Adjust batch sizes based on network conditions
- Monitor thread count vs. performance
- Use appropriate IPFS gateways
3. **Maintenance**
- Regular cleaning of old labels
- Monitor database size
- Check for failed metadata fetches

Wyświetl plik

@ -3,21 +3,32 @@ import json
import logging
import random
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy.orm import Session
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import HTTPError
from moonstreamdb.blockchain import AvailableBlockchainType
from bugout.exceptions import BugoutResponseException
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.blockchain import AvailableBlockchainType as AvailableBlockchainTypeV2
from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx
from ..actions import get_all_entries_from_search, request_connection_string
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_METADATA_TASKS_JOURNAL, MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN
from ..db import yield_db_preping_session_ctx, yield_db_read_only_preping_session_ctx, create_moonstream_engine, sessionmaker
from ..data import TokenURIs
from .db import (
clean_labels_from_db,
get_current_metadata_for_address,
get_tokens_id_wich_may_updated,
get_uris_of_tokens,
metadata_to_label,
get_tokens_to_crawl,
upsert_metadata_labels,
)
from ..settings import moonstream_client as mc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -50,7 +61,6 @@ def crawl_uri(metadata_uri: str) -> Any:
result = None
while retry < 3:
try:
if metadata_uri.startswith("ipfs://"):
metadata_uri = metadata_uri.replace(
"ipfs://", "https://ipfs.io/ipfs/", 1
@ -61,10 +71,7 @@ def crawl_uri(metadata_uri: str) -> Any:
response = urllib.request.urlopen(req, timeout=10)
if (
metadata_uri.startswith("data:application/json")
or response.status == 200
):
if metadata_uri.startswith("data:application/json") or response.status == 200:
result = json.loads(response.read())
break
retry += 1
@ -72,6 +79,8 @@ def crawl_uri(metadata_uri: str) -> Any:
except HTTPError as error:
logger.error(f"request end with error statuscode: {error.code}")
retry += 1
if error.code == 404:
return None
continue
except Exception as err:
logger.error(err)
@ -81,167 +90,329 @@ def crawl_uri(metadata_uri: str) -> Any:
return result
def process_address_metadata_with_leak(
address: str,
blockchain_type: AvailableBlockchainType,
batch_size: int,
max_recrawl: int,
threads: int,
tokens: List[TokenURIs],
) -> None:
"""
Process metadata for a single address with v3 support
"""
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
try:
already_parsed = get_current_metadata_for_address(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
maybe_updated = get_tokens_id_wich_may_updated(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
except Exception as err:
logger.warning(f"Error while getting metadata state for address {address}: {err}")
return
with yield_db_preping_session_ctx() as db_session:
try:
logger.info(f"Starting to crawl metadata for address: {address}")
logger.info(f"Maybe updated: {len(maybe_updated)}")
# Calculate how many tokens we can 'leak' so total recrawled (maybe_updated + leaked) <= max_recrawl
num_already_parsed = len(already_parsed)
num_maybe_updated = len(maybe_updated)
free_spots = max(0, max_recrawl - num_maybe_updated)
if num_already_parsed > 0 and free_spots > 0:
leak_rate = free_spots / num_already_parsed
else:
leak_rate = 0
logger.info(
f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}"
)
# TODO: Fully random leak is not correct, we should leak based on created_at
parsed_with_leak = leak_of_crawled_uri(
already_parsed, leak_rate, maybe_updated
)
logger.info(f"Already parsed: {len(already_parsed)} for {address}")
logger.info(f"Amount of tokens to parse: {len(tokens)} for {address}")
# Remove already parsed tokens
new_tokens = [
token for token in tokens
if token.token_id not in parsed_with_leak
]
for requests_chunk in [
new_tokens[i : i + batch_size]
for i in range(0, len(new_tokens), batch_size)
]:
metadata_batch = []
try:
# Gather all metadata in parallel
with ThreadPoolExecutor(max_workers=threads) as executor:
future_to_token = {
executor.submit(crawl_uri, token.token_uri): token
for token in requests_chunk
}
for future in as_completed(future_to_token):
token = future_to_token[future]
try:
metadata = future.result(timeout=10)
if metadata:
metadata_batch.append((token, metadata))
except Exception as e:
logger.error(f"Error fetching metadata for token {token.token_id}: {e}")
continue
if metadata_batch:
# Batch upsert all metadata
upsert_metadata_labels(
db_session=db_session,
blockchain_type=blockchain_type,
metadata_batch=metadata_batch,
v3=False
)
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
logger.info(f"Write {len(metadata_batch)} labels for {address}")
except Exception as err:
logger.warning(f"Error while writing labels for address {address}: {err}")
db_session.rollback()
except Exception as err:
logger.warning(f"Error while crawling metadata for address {address}: {err}")
db_session.rollback()
def process_address_metadata(
address: str,
blockchain_type: AvailableBlockchainType,
db_session: Session,
batch_size: int,
max_recrawl: int,
threads: int,
tokens: List[TokenURIs],
) -> None:
"""
Process metadata for a single address with v3 support
Leak logic is implemented in sql statement
"""
logger.info(f"Processing address {address} with {len(tokens)} tokens")
total_tokens = len(tokens)
total_chunks = (total_tokens + batch_size - 1) // batch_size
for chunk_index, requests_chunk in enumerate([
tokens[i : i + batch_size]
for i in range(0, len(tokens), batch_size)
]):
logger.info(
f"Processing chunk {chunk_index + 1}/{total_chunks} "
f"({len(requests_chunk)} tokens) for address {address}"
)
metadata_batch = []
with ThreadPoolExecutor(max_workers=threads) as executor:
future_to_token = {
executor.submit(crawl_uri, token.token_uri): token
for token in requests_chunk
}
for future in as_completed(future_to_token):
token = future_to_token[future]
metadata = future.result(timeout=10)
metadata_batch.append((token, metadata))
upsert_metadata_labels(
db_session=db_session,
blockchain_type=blockchain_type,
metadata_batch=metadata_batch,
v3=True
)
logger.info(f"Wrote {len(metadata_batch)} labels for {address}")
db_session.commit()
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
version=3
)
db_session.commit()
def parse_metadata(
blockchain_type: AvailableBlockchainType,
batch_size: int,
max_recrawl: int,
threads: int,
custom_db_uri: Optional[str] = None,
):
"""
Parse all metadata of tokens.
"""
logger.info("Starting metadata crawler")
logger.info(f"Processing blockchain {blockchain_type.value}")
# run crawling of levels
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
# Check if blockchain exists in v2 package
if blockchain_type.value in [chain.value for chain in AvailableBlockchainTypeV2]:
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
uris_of_tokens = get_uris_of_tokens(db_session_read_only, blockchain_type)
tokens_uri_by_address: Dict[str, Any] = {}
for token_uri_data in uris_of_tokens:
if token_uri_data.address not in tokens_uri_by_address:
tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
logger.info(f"Processing v2 blockchain: {blockchain_type.value}")
# Get tokens to crawl v2 flow
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
tokens_uri_by_address = get_tokens_to_crawl(
db_session_read_only,
blockchain_type,
{},
)
# Process each address
for address, tokens in tokens_uri_by_address.items():
process_address_metadata_with_leak(
address=address,
blockchain_type=blockchain_type,
batch_size=batch_size,
max_recrawl=max_recrawl,
threads=threads,
tokens=tokens,
)
except Exception as err:
logger.error(f"Error while requesting tokens with uri from database: {err}")
return
logger.error(f"V2 flow failed: {err}, continuing with Spire flow")
for address in tokens_uri_by_address:
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
# Continue with Spire flow regardless of v2 result
spire_jobs = []
# Get all jobs for this blockchain from Spire
search_query = f"#metadata-job #{blockchain_type.value}"
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_METADATA_TASKS_JOURNAL,
search_query=search_query,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
limit=1000,
)
logger.info(f"Found {len(entries)} metadata jobs for blockchain {blockchain_type.value}")
for entry in entries:
try:
already_parsed = get_current_metadata_for_address(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
maybe_updated = get_tokens_id_wich_may_updated(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
if not entry.content:
continue
job = json.loads(entry.content)
if job.get("blockchain") != blockchain_type.value:
logger.warning(f"Skipping job with mismatched blockchain: {job.get('blockchain')} != {blockchain_type.value}")
continue
spire_jobs.append(job)
except Exception as err:
logger.warning(err)
logger.warning(
f"Error while requesting metadata for address: {address}"
)
id = entry.entry_url.split("/")[-1]
logger.error(f"Error parsing job from entry {id}: {err}")
continue
except BugoutResponseException as err:
logger.error(f"Bugout error fetching jobs from journal: {err.detail}")
except Exception as err:
logger.error(f"Error fetching jobs from journal: {err}")
return
with yield_db_preping_session_ctx() as db_session:
# Process each job
# sessions list for each customer and instance
sessions_by_customer: Dict[Tuple[str, str], Session] = {}
# all sessions in one try block
try:
for job in spire_jobs:
try:
logger.info(f"Starting to crawl metadata for address: {address}")
customer_id = job.get("customer_id")
instance_id = job.get("instance_id")
leak_rate = 0.0
if len(maybe_updated) > 0:
free_spots = len(maybe_updated) / max_recrawl
if free_spots > 1:
leak_rate = 0
if (customer_id, instance_id) not in sessions_by_customer:
# Create session
# Assume fetch_connection_string fetches the connection string
if custom_db_uri:
connection_string = custom_db_uri
else:
leak_rate = 1 - (
len(already_parsed) - max_recrawl + len(maybe_updated)
) / len(already_parsed)
parsed_with_leak = leak_of_crawled_uri(
already_parsed, leak_rate, maybe_updated
)
logger.info(
f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}"
)
logger.info(f"Already parsed: {len(already_parsed)} for {address}")
logger.info(
f"Amount of state in database: {len(tokens_uri_by_address[address])} for {address}"
)
logger.info(
f"Amount of tokens parsed with leak: {len(parsed_with_leak)} for {address}"
)
# Remove already parsed tokens
new_tokens_uri_by_address = [
token_uri_data
for token_uri_data in tokens_uri_by_address[address]
if token_uri_data.token_id not in parsed_with_leak
]
logger.info(
f"Amount of tokens to parse: {len(new_tokens_uri_by_address)} for {address}"
)
for requests_chunk in [
new_tokens_uri_by_address[i : i + batch_size]
for i in range(0, len(new_tokens_uri_by_address), batch_size)
]:
writed_labels = 0
db_session.commit()
try:
with db_session.begin():
for token_uri_data in requests_chunk:
with ThreadPoolExecutor(
max_workers=threads
) as executor:
future = executor.submit(
crawl_uri, token_uri_data.token_uri
)
metadata = future.result(timeout=10)
db_session.add(
metadata_to_label(
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
)
)
writed_labels += 1
if writed_labels > 0:
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
logger.info(
f"Write {writed_labels} labels for {address}"
)
# trasaction is commited here
except Exception as err:
logger.warning(err)
logger.warning(
f"Error while writing labels for address: {address}"
connection_string = request_connection_string(
customer_id=customer_id,
instance_id=instance_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
db_session.rollback()
engine = create_moonstream_engine(connection_string, 2, 100000)
session = sessionmaker(bind=engine)
try:
sessions_by_customer[(customer_id, instance_id)] = session()
except Exception as e:
logger.error(f"Connection to {engine} failed: {e}")
continue
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
# Get tokens to crawl
tokens_uri_by_address = get_tokens_to_crawl(
sessions_by_customer[(customer_id, instance_id)],
blockchain_type,
job,
)
for address, tokens in tokens_uri_by_address.items():
process_address_metadata(
address=address,
blockchain_type=blockchain_type,
db_session=sessions_by_customer[(customer_id, instance_id)],
batch_size=batch_size,
max_recrawl=max_recrawl,
threads=threads,
tokens=tokens,
)
except Exception as err:
logger.warning(err)
logger.warning(f"Error while crawling metadata for address: {address}")
db_session.rollback()
logger.error(f"Error processing job: {err}")
continue
except Exception as err:
logger.error(f"Error processing jobs: {err}")
raise err
finally:
for session in sessions_by_customer.values():
try:
session.close()
except Exception as err:
logger.error(f"Error closing session: {err}")
def handle_crawl(args: argparse.Namespace) -> None:
"""
Parse all metadata of tokens.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
parse_metadata(
blockchain_type, args.commit_batch_size, args.max_recrawl, args.threads
blockchain_type,
args.commit_batch_size,
args.max_recrawl,
args.threads,
args.custom_db_uri,
)
@ -259,7 +430,7 @@ def main() -> None:
"--blockchain",
"-b",
type=str,
help="Type of blockchain wich writng in database",
help="Type of blockchain which writing in database",
required=True,
)
metadata_crawler_parser.add_argument(
@ -283,6 +454,11 @@ def main() -> None:
default=4,
help="Amount of threads for crawling",
)
metadata_crawler_parser.add_argument(
"--custom-db-uri",
type=str,
help="Custom db uri to use for crawling",
)
metadata_crawler_parser.set_defaults(func=handle_crawl)
args = parser.parse_args()

Wyświetl plik

@ -1,13 +1,29 @@
import json
import logging
from typing import Any, Dict, List, Optional
from hexbytes import HexBytes
from typing import Any, Dict, List, Optional, Tuple
###from sqlalchemy import
from sqlalchemy.dialects.postgresql import insert
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from datetime import datetime
from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy.orm import Session
from sqlalchemy.sql import text
from ..actions import recive_S3_data_from_query
from ..data import TokenURIs
from ..settings import CRAWLER_LABEL, METADATA_CRAWLER_LABEL, VIEW_STATE_CRAWLER_LABEL
from ..settings import (
CRAWLER_LABEL,
METADATA_CRAWLER_LABEL,
VIEW_STATE_CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN,
bugout_client as bc,
moonstream_client as mc,
)
from moonstream.client import Moonstream # type: ignore
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -18,11 +34,13 @@ def metadata_to_label(
metadata: Optional[Dict[str, Any]],
token_uri_data: TokenURIs,
label_name=METADATA_CRAWLER_LABEL,
v3: bool = False,
):
"""
Creates a label model.
Creates a label model with support for v2 and v3 database structures.
"""
label_model = get_label_model(blockchain_type)
version = 3 if v3 else 2
label_model = get_label_model(blockchain_type, version=version)
sanityzed_label_data = json.loads(
json.dumps(
@ -34,14 +52,34 @@ def metadata_to_label(
).replace(r"\u0000", "")
)
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=token_uri_data.address,
block_number=token_uri_data.block_number,
transaction_hash=None,
block_timestamp=token_uri_data.block_timestamp,
)
if v3:
# V3 structure similar to state crawler
label_data = {
"token_id": token_uri_data.token_id,
"metadata": metadata,
}
label = label_model(
label=label_name,
label_name="metadata", # Fixed name for metadata labels
label_type="metadata",
label_data=label_data,
address=HexBytes(token_uri_data.address),
block_number=token_uri_data.block_number,
# Use a fixed tx hash for metadata since it's not from a transaction
block_timestamp=token_uri_data.block_timestamp,
block_hash=token_uri_data.block_hash if hasattr(token_uri_data, 'block_hash') else None,
)
else:
# Original v2 structure
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=token_uri_data.address,
block_number=token_uri_data.block_number,
transaction_hash=None,
block_timestamp=token_uri_data.block_timestamp,
)
return label
@ -60,13 +98,13 @@ def commit_session(db_session: Session) -> None:
def get_uris_of_tokens(
db_session: Session, blockchain_type: AvailableBlockchainType
db_session: Session, blockchain_type: AvailableBlockchainType, version: int = 2
) -> List[TokenURIs]:
"""
Get meatadata URIs.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=version)
table = label_model.__tablename__
@ -113,13 +151,13 @@ def get_uris_of_tokens(
def get_current_metadata_for_address(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2
):
"""
Get existing metadata.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=version)
table = label_model.__tablename__
@ -149,7 +187,7 @@ def get_current_metadata_for_address(
def get_tokens_id_wich_may_updated(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2
):
"""
Returns a list of tokens which may have updated information.
@ -163,7 +201,7 @@ def get_tokens_id_wich_may_updated(
Required integration with entity API and opcodes crawler.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=version)
table = label_model.__tablename__
@ -233,14 +271,14 @@ def get_tokens_id_wich_may_updated(
def clean_labels_from_db(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
db_session: Session, blockchain_type: AvailableBlockchainType, address: str, version: int = 2
):
"""
Remove existing labels.
But keep the latest one for each token.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=version)
table = label_model.__tablename__
@ -273,3 +311,165 @@ def clean_labels_from_db(
),
{"address": address, "label": METADATA_CRAWLER_LABEL},
)
def get_tokens_from_query_api(
client: Moonstream,
blockchain_type: AvailableBlockchainType,
query_name: str,
params: dict,
token: str,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
) -> List[TokenURIs]:
"""
Get token URIs from Query API results
"""
query_params = {}
if customer_id and instance_id:
query_params["customer_id"] = customer_id
query_params["instance_id"] = instance_id
try:
data = recive_S3_data_from_query(
client=client,
token=token,
query_name=query_name,
params={},
query_params=query_params,
custom_body={
"blockchain": blockchain_type.value,
"params": params,
}
)
# Convert query results to TokenURIs format
results = []
for item in data.get("data", []):
results.append(
TokenURIs(
token_id=str(item.get("token_id")),
address=item.get("address"),
token_uri=item.get("token_uri"),
block_number=item.get("block_number"),
block_timestamp=item.get("block_timestamp"),
)
)
return results
except Exception as err:
logger.error(f"Error fetching data from Query API: {err}")
return []
def get_tokens_to_crawl(
db_session: Session,
blockchain_type: AvailableBlockchainType,
spire_job: Optional[dict] = None,
) -> Dict[str, List[TokenURIs]]:
"""`
Get tokens to crawl either from Query API (if specified in Spire job) or database
"""
tokens_uri_by_address = {}
if spire_job:
if "query_api" not in spire_job:
raise ValueError("Query API is not specified in Spire job")
# Get tokens from Query API
query_config = spire_job["query_api"]
client = Moonstream()
tokens = get_tokens_from_query_api(
client=client,
blockchain_type=blockchain_type,
query_name=query_config["name"],
params=query_config["params"],
token=MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN,
customer_id=spire_job["customer_id"],
instance_id=spire_job["instance_id"],
)
# Group by address
for token in tokens:
if token.address not in tokens_uri_by_address:
tokens_uri_by_address[token.address] = []
tokens_uri_by_address[token.address].append(token)
else:
# Get tokens from database (existing logic)
uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type)
for token_uri_data in uris_of_tokens:
if token_uri_data.address not in tokens_uri_by_address:
tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
return tokens_uri_by_address
def upsert_metadata_labels(
db_session: Session,
blockchain_type: AvailableBlockchainType,
metadata_batch: List[Tuple[TokenURIs, Optional[Dict[str, Any]]]],
v3: bool = False,
db_batch_size: int = 100,
) -> None:
"""
Batch upsert metadata labels - update if exists, insert if not.
"""
try:
version = 3 if v3 else 2
label_model = get_label_model(blockchain_type, version=version)
# Prepare batch of labels
labels_data = []
for token_uri_data, metadata in metadata_batch:
if v3:
# V3 structure
label_data = {
"token_id": token_uri_data.token_id,
"metadata": metadata,
}
labels_data.append({
"label": METADATA_CRAWLER_LABEL,
"label_name": "metadata",
"label_type": "metadata",
"label_data": label_data,
"address": HexBytes(token_uri_data.address),
"block_number": token_uri_data.block_number,
"block_timestamp": token_uri_data.block_timestamp,
"block_hash": getattr(token_uri_data, 'block_hash', None),
})
else:
# V2 structure
label_data = {
"type": "metadata",
"token_id": token_uri_data.token_id,
"metadata": metadata,
}
labels_data.append({
"label": METADATA_CRAWLER_LABEL,
"label_data": label_data,
"address": token_uri_data.address,
"block_number": token_uri_data.block_number,
"transaction_hash": None,
"block_timestamp": token_uri_data.block_timestamp,
})
if not labels_data:
return
# Create insert statement
insert_stmt = insert(label_model).values(labels_data)
result_stmt = insert_stmt.on_conflict_do_nothing(
)
db_session.execute(result_stmt)
db_session.commit()
except Exception as err:
logger.error(f"Error batch upserting metadata labels: {err}")
raise

Wyświetl plik

@ -3,21 +3,16 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType # type: ignore
from moonstream.client import Moonstream # type: ignore
# Bugout
# APIs
## Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)
BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
)
@ -31,6 +26,24 @@ except:
HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN")
## Moonstream
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
moonstream_client = Moonstream()
## Moonstream Engine
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)
## Moonstream DB
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)
# Origin
RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS")
if RAW_ORIGINS is None:
@ -490,3 +503,19 @@ MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get(
"MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain"
)
MOONSTREAM_METADATA_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_METADATA_TASKS_JOURNAL", ""
)
### MOONSTREAM_PUBLIC_QUERIES_USER_TOKEN
MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN = os.environ.get(
"MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN", ""
)
if MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN == "":
raise ValueError(
"MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN environment variable must be set"
)

Wyświetl plik

@ -824,4 +824,4 @@ def main() -> None:
if __name__ == "__main__":
main()
main()

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.5.1"
MOONCRAWL_VERSION = "0.5.3"

Wyświetl plik

@ -38,8 +38,8 @@ setup(
"chardet",
"fastapi",
"moonstreamdb>=0.4.6",
"moonstreamdb-v3>=0.1.3",
"moonstream-types>=0.0.10",
"moonstreamdb-v3>=0.1.4",
"moonstream-types>=0.0.11",
"moonstream>=0.1.2",
"moonworm[moonstream]>=0.9.3",
"humbug",

Wyświetl plik

@ -16,7 +16,7 @@ from bugout.data import (
)
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Path, Query, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from sqlalchemy import text
from .. import data