Add update logic.

pull/665/head
Andrey 2022-09-14 19:49:07 +03:00
rodzic 9bb5fb4794
commit c11ddd51ce
3 zmienionych plików z 119 dodań i 70 usunięć

Wyświetl plik

@ -54,6 +54,6 @@ class QueryDataUpdate(BaseModel):
class TokenURIs(BaseModel): class TokenURIs(BaseModel):
token_id: str token_id: str
token_uri: str token_uri: str
block_number: str block_number: int
block_timestamp: str block_timestamp: str
address: str address: str

Wyświetl plik

@ -1,9 +1,11 @@
import argparse import argparse
import json import json
from time import time, sleep
from urllib.error import HTTPError from urllib.error import HTTPError
import urllib.request import urllib.request
import logging import logging
from typing import Dict, Any import requests
from typing import Any
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.blockchain import AvailableBlockchainType
@ -15,8 +17,8 @@ from moonstreamdb.db import (
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from .db import ( from .db import (
commit_session, commit_session,
get_uris_of_tokens, get_uri_addresses,
get_current_metadata_for_address, get_not_updated_metadata_for_address,
metadata_to_label, metadata_to_label,
) )
from ..settings import ( from ..settings import (
@ -41,8 +43,14 @@ def crawl_uri(token_uri_data: TokenURIs) -> Any:
result = None result = None
while retry < 3: while retry < 3:
try: try:
req = urllib.request.Request(
response = urllib.request.urlopen(token_uri_data.token_uri, timeout=5) token_uri_data.token_uri,
None,
{
"User-agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5"
},
)
response = urllib.request.urlopen(req, timeout=5)
if response.status == 200: if response.status == 200:
result = json.loads(response.read()) result = json.loads(response.read())
@ -51,12 +59,17 @@ def crawl_uri(token_uri_data: TokenURIs) -> Any:
except HTTPError as error: except HTTPError as error:
logger.error(f"request end with error statuscode: {error.code}") logger.error(f"request end with error statuscode: {error.code}")
logger.error(f"requested uri: {token_uri_data.token_uri}")
retry += 1 retry += 1
sleep(2)
continue continue
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
logger.error(f"requested uri: {token_uri_data.token_uri}")
retry += 1 retry += 1
sleep(2)
continue continue
sleep(0.5)
return result, token_uri_data return result, token_uri_data
@ -81,20 +94,23 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
# run crawling of levels # run crawling of levels
try: try:
uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type) meradata_addresses = get_uri_addresses(db_session, blockchain_type)
tokens_uri_by_address: Dict[str, Any] = {} for address in meradata_addresses:
for token_uri_data in uris_of_tokens: not_updated_tokens = get_not_updated_metadata_for_address(
if token_uri_data.address not in tokens_uri_by_address: db_session,
tokens_uri_by_address[token_uri_data.address] = [] blockchain_type,
tokens_uri_by_address[token_uri_data.address].append(token_uri_data) address=address,
)
for address in tokens_uri_by_address: logger.info(
f"Start crawling {len(not_updated_tokens)} tokens of address {address}"
)
for requests_chunk in [ for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size] not_updated_tokens[i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size) for i in range(0, len(not_updated_tokens), batch_size)
]: ]:
writed_labels = 0 writed_labels = 0

Wyświetl plik

@ -1,6 +1,7 @@
import logging import logging
import json import json
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from unittest import result
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -60,9 +61,9 @@ def commit_session(db_session: Session) -> None:
raise e raise e
def get_uris_of_tokens( def get_uri_addresses(
db_session: Session, blockchain_type: AvailableBlockchainType db_session: Session, blockchain_type: AvailableBlockchainType
) -> List[TokenURIs]: ) -> List[str]:
""" """
Get meatadata URIs. Get meatadata URIs.
@ -70,47 +71,22 @@ def get_uris_of_tokens(
label_model = get_label_model(blockchain_type) label_model = get_label_model(blockchain_type)
table = label_model.__tablename__ addresses = (
db_session.query(label_model.address.distinct())
.filter(label_model.label == VIEW_STATE_CRAWLER_LABEL)
.filter(label_model.label_data["name"].astext == "tokenURI")
).all()
metadata_for_parsing = db_session.execute( result = [address[0] for address in addresses]
""" SELECT
DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id,
label_data -> 'result' as token_uri,
block_number as block_number,
block_timestamp as block_timestamp,
address as address
FROM return result
{}
WHERE
label = :label
AND label_data ->> 'name' = :name
ORDER BY
label_data -> 'inputs'-> 0 ASC,
block_number :: INT DESC;
""".format(
table
),
{"table": table, "label": VIEW_STATE_CRAWLER_LABEL, "name": "tokenURI"},
)
results = [
TokenURIs(
token_id=data[0],
token_uri=data[1],
block_number=data[2],
block_timestamp=data[3],
address=data[4],
)
for data in metadata_for_parsing
]
return results
def get_current_metadata_for_address( def get_not_updated_metadata_for_address(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str db_session: Session,
): blockchain_type: AvailableBlockchainType,
address: str,
) -> List[TokenURIs]:
""" """
Get existing metadata. Get existing metadata.
""" """
@ -120,22 +96,79 @@ def get_current_metadata_for_address(
table = label_model.__tablename__ table = label_model.__tablename__
current_metadata = db_session.execute( current_metadata = db_session.execute(
""" SELECT """ with current_tokens_uri as (
DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id SELECT
FROM DISTINCT ON((label_data -> 'inputs' -> 0) :: int) (label_data -> 'inputs' -> 0) :: text as token_id,
{} label_data ->> 'result' as token_uri,
WHERE block_number,
address = :address address,
AND label = :label block_timestamp
ORDER BY from
label_data ->> 'token_id' ASC, {}
block_number :: INT DESC; where
""".format( label = :view_state_label
table AND address = :address
and label_data ->> 'name' = 'tokenURI'
order by
(label_data -> 'inputs' -> 0) :: INT ASC,
block_number :: INT DESC
),
tokens_metadata as (
SELECT
DISTINCT ON((label_data ->> 'token_id') :: int) (label_data ->> 'token_id') :: text as token_id,
label_data ->>'token_uri' as token_uri,
block_number
from
{}
where
label = :metadata_label
AND address = :address
order by
(label_data ->> 'token_id') :: INT ASC,
block_number :: INT DESC
),
tokens_state as (
SELECT
current_tokens_uri.token_id,
current_tokens_uri.token_uri as state_token_uri,
current_tokens_uri.block_number as view_state_block_number,
current_tokens_uri.block_timestamp as block_timestamp,
current_tokens_uri.address as address,
tokens_metadata.block_number as metadata_block_number,
tokens_metadata.token_uri as metadata_token_uri
from
current_tokens_uri
left JOIN tokens_metadata ON current_tokens_uri.token_id = tokens_metadata.token_id
)
SELECT
token_id,
state_token_uri,
view_state_block_number,
block_timestamp,
address
from
tokens_state
where
view_state_block_number > metadata_block_number OR metadata_token_uri is null OR metadata_token_uri != state_token_uri;
""".format(
table, table
), ),
{"address": address, "label": METADATA_CRAWLER_LABEL}, {
) "metadata_label": METADATA_CRAWLER_LABEL,
"view_state_label": VIEW_STATE_CRAWLER_LABEL,
"address": address,
},
).all()
result = [data[0] for data in current_metadata] results = [
TokenURIs(
token_id=data[0],
token_uri=data[1],
block_number=data[2],
block_timestamp=data[3],
address=data[4],
)
for data in current_metadata
]
return result return results