kopia lustrzana https://github.com/bugout-dev/moonstream
Add changes.
rodzic
b1f3d24145
commit
ff3899d8d5
|
@ -48,9 +48,7 @@ def leak_of_crawled_uri(
|
|||
result = []
|
||||
|
||||
for id in ids:
|
||||
if id not in maybe_updated:
|
||||
result.append(id)
|
||||
elif random.random() > leak_rate:
|
||||
if id not in maybe_updated and random.random() > leak_rate:
|
||||
result.append(id)
|
||||
|
||||
return result
|
||||
|
@ -91,7 +89,7 @@ def parse_metadata(
|
|||
"""
|
||||
|
||||
logger.info("Starting metadata crawler")
|
||||
logger.info(f"Connecting to blockchain {blockchain_type.value}")
|
||||
logger.info(f"Processing blockchain {blockchain_type.value}")
|
||||
|
||||
db_session = PrePing_SessionLocal()
|
||||
|
||||
|
@ -109,7 +107,17 @@ def parse_metadata(
|
|||
tokens_uri_by_address[token_uri_data.address] = []
|
||||
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
|
||||
|
||||
for address in tokens_uri_by_address:
|
||||
except Exception as err:
|
||||
logger.error(f"Error while requesting tokens with uri from database: {err}")
|
||||
db_session.rollback()
|
||||
db_session.close()
|
||||
return
|
||||
|
||||
for address in tokens_uri_by_address:
|
||||
if address == "0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f":
|
||||
continue
|
||||
with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only:
|
||||
try:
|
||||
logger.info(f"Starting to crawl metadata for address: {address}")
|
||||
|
||||
already_parsed = get_current_metadata_for_address(
|
||||
|
@ -126,10 +134,14 @@ def parse_metadata(
|
|||
leak_rate = 0.0
|
||||
|
||||
if len(maybe_updated) > 0:
|
||||
leak_rate = max_recrawl / len(maybe_updated)
|
||||
free_spots = len(maybe_updated) / max_recrawl
|
||||
|
||||
if leak_rate > 1:
|
||||
leak_rate = 1
|
||||
if free_spots > 1:
|
||||
leak_rate = 0
|
||||
else:
|
||||
leak_rate = 1 - (
|
||||
len(already_parsed) - max_recrawl + len(maybe_updated)
|
||||
) / len(already_parsed)
|
||||
|
||||
parsed_with_leak = leak_of_crawled_uri(
|
||||
already_parsed, leak_rate, maybe_updated
|
||||
|
@ -142,19 +154,27 @@ def parse_metadata(
|
|||
logger.info(f"Already parsed: {len(already_parsed)} for {address}")
|
||||
|
||||
logger.info(
|
||||
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}"
|
||||
f"Amount of state in database: {len(tokens_uri_by_address[address])} for {address}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Amount of tokens parsed with leak: {len(parsed_with_leak)} for {address}"
|
||||
)
|
||||
|
||||
# Remove already parsed tokens
|
||||
tokens_uri_by_address[address] = [
|
||||
new_tokens_uri_by_address = [
|
||||
token_uri_data
|
||||
for token_uri_data in tokens_uri_by_address[address]
|
||||
if token_uri_data.token_id not in parsed_with_leak
|
||||
]
|
||||
|
||||
logger.info(
|
||||
f"Amount of tokens to parse: {len(new_tokens_uri_by_address)} for {address}"
|
||||
)
|
||||
|
||||
for requests_chunk in [
|
||||
tokens_uri_by_address[address][i : i + batch_size]
|
||||
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
|
||||
new_tokens_uri_by_address[i : i + batch_size]
|
||||
for i in range(0, len(new_tokens_uri_by_address), batch_size)
|
||||
]:
|
||||
writed_labels = 0
|
||||
db_session.commit()
|
||||
|
@ -196,8 +216,8 @@ def parse_metadata(
|
|||
address=address,
|
||||
)
|
||||
|
||||
finally:
|
||||
db_session.close()
|
||||
finally:
|
||||
db_session.close()
|
||||
|
||||
|
||||
def handle_crawl(args: argparse.Namespace) -> None:
|
||||
|
|
Ładowanie…
Reference in New Issue