Add init metadata parser.

pull/660/head
Andrey 2022-08-31 19:26:30 +03:00
rodzic c2d790d08d
commit aaa6c95953
5 zmienionych plików z 284 dodań i 3 usunięć

Wyświetl plik

@ -49,3 +49,11 @@ class QueryDataUpdate(BaseModel):
file_type: str
query: str
params: Dict[str, Any] = Field(default_factory=dict)
class TokenURIs(BaseModel):
token_id: str
token_uri: str
block_number: str
block_timestamp: str
address: str

Wyświetl plik

@ -0,0 +1,156 @@
import argparse
import json
import hashlib
import itertools
from pickle import TRUE
from pprint import pprint
import logging
from random import random
import requests
from typing import Dict, List, Any
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import (
MOONSTREAM_DB_URI_READ_ONLY,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
from sqlalchemy.orm import sessionmaker
from .db import (
commit_session,
get_uris_of_tokens,
get_current_metadata_for_address,
metadata_to_label,
)
from ..settings import (
MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def crawl_uri(metadata_uri: str) -> Dict[str, Any]:
"""
Get metadata from URI
"""
retry = 0
result = None
while retry < 3:
try:
metadata = requests.get(metadata_uri)
if metadata.status_code == 200:
result = metadata.data
break
retry += 1
except Exception as err:
print(err)
retry += 1
continue
return result
def parse_metadata(jobs, blockchain_type, block_number):
engine = create_moonstream_engine(
MOONSTREAM_DB_URI_READ_ONLY,
pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
process_session = sessionmaker(bind=engine)
db_session = process_session()
# run crawling of levels
try:
uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type)
tokens_uri_by_address = {}
for token_uri_data in uris_of_tokens:
if token_uri_data.address not in tokens_uri_by_address:
tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
for address in tokens_uri_by_address:
already_parsed = get_current_metadata_for_address(
db_session=db_session, blockchain_type=blockchain_type, address=address
)
for token_uri_data in tokens_uri_by_address[address]:
if token_uri_data.token_id not in already_parsed:
metadata = crawl_uri(token_uri_data)
db_session.add(
metadata_to_label(
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
)
)
commit_session(db_session)
finally:
db_session.close()
def handle_crawl(args: argparse.Namespace) -> None:
"""
Parse all metadata of tokens.
"""
blockchain_type = AvailableBlockchainType(args.blockchain_type)
parse_metadata(blockchain_type)
def parse_abi(args: argparse.Namespace) -> None:
"""
Parse the abi of the contract and save it to the database.
"""
with open(args.abi_file, "r") as f:
# read json and parse only stateMutability equal to view
abi = json.load(f)
output_json = []
for method in abi:
if method.get("stateMutability") and method["stateMutability"] == "view":
output_json.append(method)
with open(f"view+{args.abi_file}", "w") as f:
json.dump(output_json, f)
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
subparsers = parser.add_subparsers()
metadata_crawler_parser = subparsers.add_parser(
"crawl",
help="continuous crawling the event/function call jobs from bugout journal",
)
metadata_crawler_parser.add_argument(
"--blockchain-type",
"-b",
type=str,
help="Type of blovkchain wich writng in database",
required=True,
)
metadata_crawler_parser.set_defaults(func=handle_crawl)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,120 @@
from cgitb import reset
from genericpath import exists
import logging
from typing import Dict, Any, Optional
from unittest import result
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy.orm import Session
from ..data import TokenURIs
from ..settings import VIEW_STATE_CRAWLER_LABEL, METADATA_CRAWLER_LABEL
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def metadata_to_label(
blockchain_type: AvailableBlockchainType,
metadata: Optional[Dict[str, Any]],
token_uri_data: Dict[str, Any],
label_name=METADATA_CRAWLER_LABEL,
):
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=label_name,
label_data={
"type": "metadata",
"token_id": token_uri_data.token_id,
"metadata": metadata,
},
address=token_uri_data["address"],
block_number=token_uri_data["block_number"],
transaction_hash=None,
block_timestamp=token_uri_data["block_timestamp"],
)
return label
def commit_session(db_session: Session) -> None:
"""
Save labels in the database.
"""
try:
logger.info("Committing session to database")
db_session.commit()
except Exception as e:
logger.error(f"Failed to save labels: {e}")
db_session.rollback()
raise e
def get_uris_of_tokens(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Dict[str, str]:
"""
Get meatadata URIs.
"""
metadata_for_parsing = db_session.query(
""" SELECT
DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id,
label_data -> 'result' as token_uri,
block_number as block_number,
block_timestamp as block_timestamp,
address as address,
FROM
polygon_labels
WHERE
AND label = 'view-state-alpha'
AND label_data ->> 'name' = 'tokenURI'
ORDER BY
label_data -> 'inputs'-> 0 ASC,
block_number :: INT DESC;
"""
).execute()
results = [
TokenURIs(
token_id=data[0],
token_uri=data[1],
block_number=data[2],
block_timestamp=data[3],
address=data[4],
)
for data in metadata_for_parsing
]
# results = {key: value for key, value in metadata_for_parsing}
return results
def get_current_metadata_for_address(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
):
"""
Get existing metadata.
"""
current_metadata = db_session.query(
""" SELECT
DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id
FROM
polygon_labels
WHERE
address = :address
AND label = 'metadata-crawler'
ORDER BY
label_data ->> 'token_id' ASC,
block_number :: INT DESC;
"""
).execute()
result = [data[0] for data in current_metadata]
return result

Wyświetl plik

@ -62,11 +62,8 @@ setup(
"moonworm-crawler=mooncrawl.moonworm_crawler.cli:main",
"nft=mooncrawl.nft.cli:main",
"statistics=mooncrawl.stats_worker.dashboard:main",
<<<<<<< Updated upstream
=======
"state-crawler=mooncrawl.state_crawler.cli:main",
"metadata-crawler=mooncrawl.metadata_crawler.cli:main",
>>>>>>> Stashed changes
]
},
)