Merge pull request #851 from moonstream-to/metadata-crawler-refactor

Refactor metadata crawler.
pull/854/head
Andrey Dolgolev 2023-07-13 21:29:52 +03:00 zatwierdzone przez GitHub
commit 147819e1ac
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 39 dodań i 22 usunięć

Wyświetl plik

@ -48,9 +48,7 @@ def leak_of_crawled_uri(
result = [] result = []
for id in ids: for id in ids:
if id not in maybe_updated: if id not in maybe_updated and random.random() > leak_rate:
result.append(id)
elif random.random() > leak_rate:
result.append(id) result.append(id)
return result return result
@ -91,7 +89,7 @@ def parse_metadata(
""" """
logger.info("Starting metadata crawler") logger.info("Starting metadata crawler")
logger.info(f"Connecting to blockchain {blockchain_type.value}") logger.info(f"Processing blockchain {blockchain_type.value}")
db_session = PrePing_SessionLocal() db_session = PrePing_SessionLocal()
@ -109,7 +107,15 @@ def parse_metadata(
tokens_uri_by_address[token_uri_data.address] = [] tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data) tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
except Exception as err:
logger.error(f"Error while requesting tokens with uri from database: {err}")
db_session.rollback()
db_session.close()
return
for address in tokens_uri_by_address: for address in tokens_uri_by_address:
with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only:
try:
logger.info(f"Starting to crawl metadata for address: {address}") logger.info(f"Starting to crawl metadata for address: {address}")
already_parsed = get_current_metadata_for_address( already_parsed = get_current_metadata_for_address(
@ -126,10 +132,14 @@ def parse_metadata(
leak_rate = 0.0 leak_rate = 0.0
if len(maybe_updated) > 0: if len(maybe_updated) > 0:
leak_rate = max_recrawl / len(maybe_updated) free_spots = len(maybe_updated) / max_recrawl
if leak_rate > 1: if free_spots > 1:
leak_rate = 1 leak_rate = 0
else:
leak_rate = 1 - (
len(already_parsed) - max_recrawl + len(maybe_updated)
) / len(already_parsed)
parsed_with_leak = leak_of_crawled_uri( parsed_with_leak = leak_of_crawled_uri(
already_parsed, leak_rate, maybe_updated already_parsed, leak_rate, maybe_updated
@ -142,19 +152,27 @@ def parse_metadata(
logger.info(f"Already parsed: {len(already_parsed)} for {address}") logger.info(f"Already parsed: {len(already_parsed)} for {address}")
logger.info( logger.info(
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}" f"Amount of state in database: {len(tokens_uri_by_address[address])} for {address}"
)
logger.info(
f"Amount of tokens parsed with leak: {len(parsed_with_leak)} for {address}"
) )
# Remove already parsed tokens # Remove already parsed tokens
tokens_uri_by_address[address] = [ new_tokens_uri_by_address = [
token_uri_data token_uri_data
for token_uri_data in tokens_uri_by_address[address] for token_uri_data in tokens_uri_by_address[address]
if token_uri_data.token_id not in parsed_with_leak if token_uri_data.token_id not in parsed_with_leak
] ]
logger.info(
f"Amount of tokens to parse: {len(new_tokens_uri_by_address)} for {address}"
)
for requests_chunk in [ for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size] new_tokens_uri_by_address[i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size) for i in range(0, len(new_tokens_uri_by_address), batch_size)
]: ]:
writed_labels = 0 writed_labels = 0
db_session.commit() db_session.commit()

Wyświetl plik

@ -73,12 +73,10 @@ def get_uris_of_tokens(
metadata_for_parsing = db_session.execute( metadata_for_parsing = db_session.execute(
text( text(
""" SELECT """ SELECT
DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id, DISTINCT ON(label_data -> 'inputs'-> 0, address ) label_data -> 'inputs'-> 0 as token_id, address as address,
label_data -> 'result' as token_uri, label_data -> 'result' as token_uri,
block_number as block_number, block_number as block_number,
block_timestamp as block_timestamp, block_timestamp as block_timestamp
address as address
FROM FROM
{} {}
WHERE WHERE
@ -86,6 +84,7 @@ def get_uris_of_tokens(
AND label_data ->> 'name' = :name AND label_data ->> 'name' = :name
ORDER BY ORDER BY
label_data -> 'inputs'-> 0 ASC, label_data -> 'inputs'-> 0 ASC,
address ASC,
block_number :: INT DESC; block_number :: INT DESC;
""".format( """.format(
table table
@ -97,10 +96,10 @@ def get_uris_of_tokens(
results = [ results = [
TokenURIs( TokenURIs(
token_id=data[0], token_id=data[0],
token_uri=data[1][0], address=data[1],
block_number=data[2], token_uri=data[2][0],
block_timestamp=data[3], block_number=data[3],
address=data[4], block_timestamp=data[4],
) )
for data in metadata_for_parsing for data in metadata_for_parsing
if data[1] is not None and len(data[1]) > 0 if data[1] is not None and len(data[1]) > 0