pull/752/head
Andrey 2023-02-10 00:17:34 +02:00
rodzic 4c7a7fdaf6
commit a3038b1f88
2 zmienionych plików z 68 dodań i 61 usunięć

Wyświetl plik

@ -11,6 +11,7 @@ from moonstreamdb.db import (
MOONSTREAM_DB_URI,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
yield_db_read_only_session_ctx,
)
from sqlalchemy.orm import sessionmaker
from .db import (
@ -91,86 +92,92 @@ def parse_metadata(
db_session = process_session()
# run crawling of levels
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type)
with yield_db_read_only_session_ctx() as db_session_read_only:
try:
tokens_uri_by_address: Dict[str, Any] = {}
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
uris_of_tokens = get_uris_of_tokens(db_session_read_only, blockchain_type)
for token_uri_data in uris_of_tokens:
if token_uri_data.address not in tokens_uri_by_address:
tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
tokens_uri_by_address: Dict[str, Any] = {}
for address in tokens_uri_by_address:
for token_uri_data in uris_of_tokens:
if token_uri_data.address not in tokens_uri_by_address:
tokens_uri_by_address[token_uri_data.address] = []
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
logger.info(f"Starting to crawl metadata for address: {address}")
for address in tokens_uri_by_address:
already_parsed = get_current_metadata_for_address(
db_session=db_session, blockchain_type=blockchain_type, address=address
)
logger.info(f"Starting to crawl metadata for address: {address}")
maybe_updated = get_tokens_wich_maybe_updated(
db_session=db_session, blockchain_type=blockchain_type, address=address
)
leak_rate = 0.0
already_parsed = get_current_metadata_for_address(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
if len(maybe_updated) > 0:
leak_rate = max_recrawl / len(maybe_updated)
maybe_updated = get_tokens_wich_maybe_updated(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
address=address,
)
leak_rate = 0.0
if leak_rate > 1:
leak_rate = 1
if len(maybe_updated) > 0:
leak_rate = max_recrawl / len(maybe_updated)
parsed_with_leak = leak_of_crawled_uri(
already_parsed, leak_rate, maybe_updated
)
if leak_rate > 1:
leak_rate = 1
logger.info(
f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}"
)
parsed_with_leak = leak_of_crawled_uri(
already_parsed, leak_rate, maybe_updated
)
logger.info(f"Already parsed: {len(already_parsed)} for {address}")
logger.info(
f"Leak rate: {leak_rate} for {address} with maybe updated {len(maybe_updated)}"
)
logger.info(
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}"
)
logger.info(f"Already parsed: {len(already_parsed)} for {address}")
for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
]:
writed_labels = 0
db_session.commit()
logger.info(
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}"
)
with db_session.begin():
for token_uri_data in requests_chunk:
for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
]:
writed_labels = 0
db_session.commit()
if token_uri_data.token_id not in parsed_with_leak:
metadata = crawl_uri(token_uri_data.token_uri)
with db_session.begin():
for token_uri_data in requests_chunk:
db_session.add(
metadata_to_label(
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
if token_uri_data.token_id not in parsed_with_leak:
metadata = crawl_uri(token_uri_data.token_uri)
db_session.add(
metadata_to_label(
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
)
)
writed_labels += 1
if writed_labels > 0:
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
writed_labels += 1
logger.info(f"Write {writed_labels} labels for {address}")
if writed_labels > 0:
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
logger.info(f"Write {writed_labels} labels for {address}")
# trasaction is commited here
# trasaction is commited here
finally:
db_session.close()
finally:
db_session.close()
def handle_crawl(args: argparse.Namespace) -> None:

Wyświetl plik

@ -244,9 +244,9 @@ def clean_labels_from_db(
WHERE
label=:label
AND address=:address
AND polygon_labels.id not in (select id from lates_token_metadata) RETURNING polygon_labels.block_number;
AND {}.id not in (select id from lates_token_metadata) RETURNING {}.block_number;
""".format(
table, table
table, table, table, table
),
{"address": address, "label": METADATA_CRAWLER_LABEL},
)