kopia lustrzana https://github.com/bugout-dev/moonstream
Fix mypy. Add logger. Query use table name.
rodzic
42ebb66e4d
commit
31aa042dfa
|
@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
|
|||
batch_size = 50
|
||||
|
||||
|
||||
def crawl_uri(metadata_uri: str) -> Dict[str, Any]:
|
||||
def crawl_uri(metadata_uri: str) -> Any:
|
||||
|
||||
"""
|
||||
Get metadata from URI
|
||||
|
@ -59,6 +59,13 @@ def crawl_uri(metadata_uri: str) -> Dict[str, Any]:
|
|||
|
||||
def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
|
||||
|
||||
"""
|
||||
Parse all metadata of tokens.
|
||||
"""
|
||||
|
||||
logger.info("Starting metadata crawler")
|
||||
logger.info(f"Connecting to blockchain {blockchain_type.value}")
|
||||
|
||||
engine = create_moonstream_engine(
|
||||
MOONSTREAM_DB_URI_READ_ONLY,
|
||||
pool_pre_ping=True,
|
||||
|
@ -73,7 +80,7 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
|
|||
|
||||
uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type)
|
||||
|
||||
tokens_uri_by_address = {}
|
||||
tokens_uri_by_address: Dict[str, Any] = {}
|
||||
|
||||
for token_uri_data in uris_of_tokens:
|
||||
if token_uri_data.address not in tokens_uri_by_address:
|
||||
|
@ -90,7 +97,7 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
|
|||
tokens_uri_by_address[address][i : i + batch_size]
|
||||
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
|
||||
]:
|
||||
|
||||
writed_labels = 0
|
||||
for token_uri_data in requests_chunk:
|
||||
|
||||
if token_uri_data.token_id not in already_parsed:
|
||||
|
@ -103,7 +110,9 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
|
|||
token_uri_data=token_uri_data,
|
||||
)
|
||||
)
|
||||
writed_labels += 1
|
||||
commit_session(db_session)
|
||||
logger.info(f"Write {writed_labels} labels for {address}")
|
||||
|
||||
finally:
|
||||
db_session.close()
|
||||
|
@ -115,7 +124,7 @@ def handle_crawl(args: argparse.Namespace) -> None:
|
|||
Parse all metadata of tokens.
|
||||
"""
|
||||
|
||||
blockchain_type = AvailableBlockchainType(args.blockchain_type)
|
||||
blockchain_type = AvailableBlockchainType(args.blockchain)
|
||||
|
||||
parse_metadata(blockchain_type, args.batch_size)
|
||||
|
||||
|
@ -131,7 +140,7 @@ def main() -> None:
|
|||
help="Crawler of tokens metadata.",
|
||||
)
|
||||
metadata_crawler_parser.add_argument(
|
||||
"--blockchain-type",
|
||||
"--blockchain",
|
||||
"-b",
|
||||
type=str,
|
||||
help="Type of blockchain wich writng in database",
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
from cgitb import reset
|
||||
from genericpath import exists
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
from unittest import result
|
||||
import json
|
||||
from typing import Dict, Any, Optional, List
|
||||
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
|
||||
from sqlalchemy.orm import Session
|
||||
|
@ -17,7 +15,7 @@ logger = logging.getLogger(__name__)
|
|||
def metadata_to_label(
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
metadata: Optional[Dict[str, Any]],
|
||||
token_uri_data: Dict[str, Any],
|
||||
token_uri_data: TokenURIs,
|
||||
label_name=METADATA_CRAWLER_LABEL,
|
||||
):
|
||||
|
||||
|
@ -25,13 +23,20 @@ def metadata_to_label(
|
|||
Creates a label model.
|
||||
"""
|
||||
label_model = get_label_model(blockchain_type)
|
||||
|
||||
sanityzed_label_data = json.loads(
|
||||
json.dumps(
|
||||
{
|
||||
"type": "metadata",
|
||||
"token_id": token_uri_data.token_id,
|
||||
"metadata": metadata,
|
||||
}
|
||||
).replace(r"\u0000", "")
|
||||
)
|
||||
|
||||
label = label_model(
|
||||
label=label_name,
|
||||
label_data={
|
||||
"type": "metadata",
|
||||
"token_id": token_uri_data.token_id,
|
||||
"metadata": metadata,
|
||||
},
|
||||
label_data=sanityzed_label_data,
|
||||
address=token_uri_data.address,
|
||||
block_number=token_uri_data.block_number,
|
||||
transaction_hash=None,
|
||||
|
@ -56,11 +61,16 @@ def commit_session(db_session: Session) -> None:
|
|||
|
||||
def get_uris_of_tokens(
|
||||
db_session: Session, blockchain_type: AvailableBlockchainType
|
||||
) -> Dict[str, str]:
|
||||
) -> List[TokenURIs]:
|
||||
|
||||
"""
|
||||
Get meatadata URIs.
|
||||
"""
|
||||
|
||||
label_model = get_label_model(blockchain_type)
|
||||
|
||||
table = label_model.__tablename__
|
||||
|
||||
metadata_for_parsing = db_session.execute(
|
||||
""" SELECT
|
||||
DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id,
|
||||
|
@ -70,14 +80,15 @@ def get_uris_of_tokens(
|
|||
address as address
|
||||
|
||||
FROM
|
||||
polygon_labels
|
||||
:table
|
||||
WHERE
|
||||
label = 'view-state-alpha'
|
||||
AND label_data ->> 'name' = 'tokenURI'
|
||||
label = :label
|
||||
AND label_data ->> 'name' = :name
|
||||
ORDER BY
|
||||
label_data -> 'inputs'-> 0 ASC,
|
||||
block_number :: INT DESC;
|
||||
"""
|
||||
""",
|
||||
{"table": table, "label": VIEW_STATE_CRAWLER_LABEL, "name": "tokenURI"},
|
||||
)
|
||||
|
||||
results = [
|
||||
|
@ -91,7 +102,6 @@ def get_uris_of_tokens(
|
|||
for data in metadata_for_parsing
|
||||
]
|
||||
|
||||
# results = {key: value for key, value in metadata_for_parsing}
|
||||
return results
|
||||
|
||||
|
||||
|
@ -108,12 +118,12 @@ def get_current_metadata_for_address(
|
|||
polygon_labels
|
||||
WHERE
|
||||
address = :address
|
||||
AND label = 'metadata-crawler'
|
||||
AND label = :label
|
||||
ORDER BY
|
||||
label_data ->> 'token_id' ASC,
|
||||
block_number :: INT DESC;
|
||||
""",
|
||||
{"address": address},
|
||||
{"address": address, "label": METADATA_CRAWLER_LABEL},
|
||||
)
|
||||
|
||||
result = [data[0] for data in current_metadata]
|
||||
|
|
Ładowanie…
Reference in New Issue