Merge pull request #859 from moonstream-to/metadata-crawler-improvments

Refactor connection managers.
pull/860/head
Andrey Dolgolev 2023-07-17 16:49:45 +03:00 zatwierdzone przez GitHub
commit cd7704ecd2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 52 dodań i 23 usunięć

Wyświetl plik

@ -41,6 +41,18 @@ pre_ping_engine = create_moonstream_engine(
)
PrePing_SessionLocal = sessionmaker(bind=pre_ping_engine)
def yield_db_preping_session() -> Generator[Session, None, None]:
session = PrePing_SessionLocal()
try:
yield session
finally:
session.close()
yield_db_preping_session_ctx = contextmanager(yield_db_preping_session)
# Read only
RO_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
@ -68,6 +80,23 @@ RO_pre_ping_engine = create_moonstream_engine(
pool_pre_ping=True,
)
RO_SessionLocal_preping = sessionmaker(bind=RO_pre_ping_engine)
def yield_db_read_only_preping_session() -> Generator[Session, None, None]:
session = RO_SessionLocal_preping()
try:
yield session
finally:
session.close()
yield_db_read_only_preping_session_ctx = contextmanager(
yield_db_read_only_preping_session
)
# Read only pre-ping query timeout
RO_pre_ping_query_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,

Wyświetl plik

@ -11,7 +11,10 @@ from urllib.error import HTTPError
from moonstreamdb.blockchain import AvailableBlockchainType
from sqlalchemy.orm import sessionmaker
from ..db import pre_ping_engine, RO_pre_ping_engine
from ..db import (
yield_db_preping_session_ctx,
yield_db_read_only_preping_session_ctx,
)
from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS
from .db import (
clean_labels_from_db,
@ -28,16 +31,6 @@ logger = logging.getLogger(__name__)
batch_size = 50
@contextmanager
def yield_session_maker(engine):
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
yield session
finally:
session.close()
def leak_of_crawled_uri(
ids: List[Optional[str]], leak_rate: float, maybe_updated: List[Optional[str]]
) -> List[Optional[str]]:
@ -93,7 +86,7 @@ def parse_metadata(
logger.info(f"Processing blockchain {blockchain_type.value}")
# run crawling of levels
with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only:
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
@ -111,14 +104,8 @@ def parse_metadata(
return
for address in tokens_uri_by_address:
with yield_session_maker(
engine=RO_pre_ping_engine
) as db_session_read_only, yield_session_maker(
engine=pre_ping_engine
) as db_session:
with yield_db_read_only_preping_session_ctx() as db_session_read_only:
try:
logger.info(f"Starting to crawl metadata for address: {address}")
already_parsed = get_current_metadata_for_address(
db_session=db_session_read_only,
blockchain_type=blockchain_type,
@ -130,6 +117,17 @@ def parse_metadata(
blockchain_type=blockchain_type,
address=address,
)
except Exception as err:
logger.warning(err)
logger.warning(
f"Error while requesting metadata for address: {address}"
)
continue
with yield_db_preping_session_ctx() as db_session:
try:
logger.info(f"Starting to crawl metadata for address: {address}")
leak_rate = 0.0
if len(maybe_updated) > 0:
@ -206,8 +204,8 @@ def parse_metadata(
)
# trasaction is commited here
except Exception as err:
logger.error(err)
logger.error(
logger.warning(err)
logger.warning(
f"Error while writing labels for address: {address}"
)
db_session.rollback()
@ -218,8 +216,10 @@ def parse_metadata(
address=address,
)
except Exception as err:
logger.error(err)
logger.error(f"Error while crawling metadata for address: {address}")
logger.warning(err)
logger.warning(f"Error while crawling metadata for address: {address}")
db_session.rollback()
continue
def handle_crawl(args: argparse.Namespace) -> None: