Add fix of metadata crawler for more optimized writing to database.

pull/756/head
Andrey 2023-02-14 19:26:49 +02:00
rodzic 410471b193
commit d7e108a1c5
1 zmienionych plików z 56 dodań i 14 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
import argparse
from contextlib import contextmanager
import json
from urllib.error import HTTPError
import urllib.request
@ -11,7 +12,8 @@ from moonstreamdb.db import (
MOONSTREAM_DB_URI,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
yield_db_read_only_session_ctx,
MOONSTREAM_DB_STATEMENT_TIMEOUT_MILLIS,
MOONSTREAM_DB_URI_READ_ONLY,
)
from sqlalchemy.orm import sessionmaker
from .db import (
@ -32,6 +34,16 @@ logger = logging.getLogger(__name__)
batch_size = 50
@contextmanager
def yield_session_maker(engine):
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
yield session
finally:
session.close()
def leak_of_crawled_uri(
ids: List[Optional[str]], leak_rate: float, maybe_updated: List[Optional[str]]
) -> List[Optional[str]]:
@ -99,7 +111,17 @@ def parse_metadata(
# run crawling of levels
with yield_db_read_only_session_ctx() as db_session_read_only:
# create read only engine
# Read only
read_only_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
with yield_session_maker(engine=read_only_engine) as db_session_read_only:
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
@ -148,6 +170,13 @@ def parse_metadata(
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}"
)
# Remove already parsed tokens
tokens_uri_by_address[address] = [
token_uri_data
for token_uri_data in tokens_uri_by_address[address]
if token_uri_data.token_id not in parsed_with_leak
]
for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
@ -155,9 +184,9 @@ def parse_metadata(
writed_labels = 0
db_session.commit()
with db_session.begin():
for token_uri_data in requests_chunk:
if token_uri_data.token_id not in parsed_with_leak:
try:
with db_session.begin():
for token_uri_data in requests_chunk:
metadata = crawl_uri(token_uri_data.token_uri)
db_session.add(
@ -169,15 +198,28 @@ def parse_metadata(
)
writed_labels += 1
if writed_labels > 0:
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
logger.info(f"Write {writed_labels} labels for {address}")
if writed_labels > 0:
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
logger.info(
f"Write {writed_labels} labels for {address}"
)
# trasaction is commited here
except Exception as err:
logger.error(err)
logger.error(
f"Error while writing labels for address: {address}"
)
db_session.rollback()
# trasaction is commited here
clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
finally:
db_session.close()
@ -221,7 +263,7 @@ def main() -> None:
"--max-recrawl",
"-m",
type=int,
default=200,
default=300,
help="Maximum amount of recrawling of already crawled tokens",
)
metadata_crawler_parser.set_defaults(func=handle_crawl)