diff --git a/crawlers/mooncrawl/mooncrawl/db.py b/crawlers/mooncrawl/mooncrawl/db.py index 607a96c8..8eba88f6 100644 --- a/crawlers/mooncrawl/mooncrawl/db.py +++ b/crawlers/mooncrawl/mooncrawl/db.py @@ -41,6 +41,18 @@ pre_ping_engine = create_moonstream_engine( ) PrePing_SessionLocal = sessionmaker(bind=pre_ping_engine) + +def yield_db_preping_session() -> Generator[Session, None, None]: + session = PrePing_SessionLocal() + try: + yield session + finally: + session.close() + + +yield_db_preping_session_ctx = contextmanager(yield_db_preping_session) + + # Read only RO_engine = create_moonstream_engine( url=MOONSTREAM_DB_URI_READ_ONLY, @@ -68,6 +80,23 @@ RO_pre_ping_engine = create_moonstream_engine( pool_pre_ping=True, ) + +RO_SessionLocal_preping = sessionmaker(bind=RO_pre_ping_engine) + + +def yield_db_read_only_preping_session() -> Generator[Session, None, None]: + session = RO_SessionLocal_preping() + try: + yield session + finally: + session.close() + + +yield_db_read_only_preping_session_ctx = contextmanager( + yield_db_read_only_preping_session +) + + # Read only pre-ping query timeout RO_pre_ping_query_engine = create_moonstream_engine( url=MOONSTREAM_DB_URI_READ_ONLY, diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 1bff0eef..9b6fc0ac 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -11,7 +11,10 @@ from urllib.error import HTTPError from moonstreamdb.blockchain import AvailableBlockchainType from sqlalchemy.orm import sessionmaker -from ..db import pre_ping_engine, RO_pre_ping_engine +from ..db import ( + yield_db_preping_session_ctx, + yield_db_read_only_preping_session_ctx, +) from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS from .db import ( clean_labels_from_db, @@ -28,16 +31,6 @@ logger = logging.getLogger(__name__) batch_size = 50 -@contextmanager -def yield_session_maker(engine): - SessionLocal = sessionmaker(bind=engine) - session = SessionLocal() - try: - yield session - finally: - session.close() - - def leak_of_crawled_uri( ids: List[Optional[str]], leak_rate: float, maybe_updated: List[Optional[str]] ) -> List[Optional[str]]: @@ -93,7 +86,7 @@ def parse_metadata( logger.info(f"Processing blockchain {blockchain_type.value}") # run crawling of levels - with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only: + with yield_db_read_only_preping_session_ctx() as db_session_read_only: try: # get all tokens with uri logger.info("Requesting all tokens with uri from database") @@ -111,14 +104,8 @@ def parse_metadata( return for address in tokens_uri_by_address: - with yield_session_maker( - engine=RO_pre_ping_engine - ) as db_session_read_only, yield_session_maker( - engine=pre_ping_engine - ) as db_session: + with yield_db_read_only_preping_session_ctx() as db_session_read_only: try: - logger.info(f"Starting to crawl metadata for address: {address}") - already_parsed = get_current_metadata_for_address( db_session=db_session_read_only, blockchain_type=blockchain_type, @@ -130,6 +117,17 @@ def parse_metadata( blockchain_type=blockchain_type, address=address, ) + except Exception as err: + logger.warning(err) + logger.warning( + f"Error while requesting metadata for address: {address}" + ) + continue + + with yield_db_preping_session_ctx() as db_session: + try: + logger.info(f"Starting to crawl metadata for address: {address}") + leak_rate = 0.0 if len(maybe_updated) > 0: @@ -206,8 +204,8 @@ def parse_metadata( ) # trasaction is commited here except Exception as err: - logger.error(err) - logger.error( + logger.warning(err) + logger.warning( f"Error while writing labels for address: {address}" ) db_session.rollback() @@ -218,8 +216,10 @@ def parse_metadata( address=address, ) except Exception as err: - logger.error(err) - logger.error(f"Error while crawling metadata for address: {address}") + logger.warning(err) + logger.warning(f"Error while crawling metadata for address: {address}") + db_session.rollback() + continue def handle_crawl(args: argparse.Namespace) -> None: