Merge pull request #756 from bugout-dev/fix-metadata-crawler-writer

Add fix of metadata crawler for more optimized writing to database.
pull/758/head^2
Andrey Dolgolev 2023-02-14 19:51:10 +02:00 zatwierdzone przez GitHub
commit 6f168e37e8
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 73 dodań i 18 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
import argparse import argparse
from contextlib import contextmanager
import json import json
from urllib.error import HTTPError from urllib.error import HTTPError
import urllib.request import urllib.request
@ -11,7 +12,8 @@ from moonstreamdb.db import (
MOONSTREAM_DB_URI, MOONSTREAM_DB_URI,
MOONSTREAM_POOL_SIZE, MOONSTREAM_POOL_SIZE,
create_moonstream_engine, create_moonstream_engine,
yield_db_read_only_session_ctx, MOONSTREAM_DB_STATEMENT_TIMEOUT_MILLIS,
MOONSTREAM_DB_URI_READ_ONLY,
) )
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from .db import ( from .db import (
@ -21,9 +23,7 @@ from .db import (
metadata_to_label, metadata_to_label,
clean_labels_from_db, clean_labels_from_db,
) )
from ..settings import ( from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS
MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,6 +32,16 @@ logger = logging.getLogger(__name__)
batch_size = 50 batch_size = 50
@contextmanager
def yield_session_maker(engine):
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
yield session
finally:
session.close()
def leak_of_crawled_uri( def leak_of_crawled_uri(
ids: List[Optional[str]], leak_rate: float, maybe_updated: List[Optional[str]] ids: List[Optional[str]], leak_rate: float, maybe_updated: List[Optional[str]]
) -> List[Optional[str]]: ) -> List[Optional[str]]:
@ -92,14 +102,24 @@ def parse_metadata(
MOONSTREAM_DB_URI, MOONSTREAM_DB_URI,
pool_pre_ping=True, pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE, pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
) )
process_session = sessionmaker(bind=engine) process_session = sessionmaker(bind=engine)
db_session = process_session() db_session = process_session()
# run crawling of levels # run crawling of levels
with yield_db_read_only_session_ctx() as db_session_read_only: # create read only engine
# Read only
read_only_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
with yield_session_maker(engine=read_only_engine) as db_session_read_only:
try: try:
# get all tokens with uri # get all tokens with uri
logger.info("Requesting all tokens with uri from database") logger.info("Requesting all tokens with uri from database")
@ -148,6 +168,13 @@ def parse_metadata(
f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}" f"Amount of tokens for crawl: {len(tokens_uri_by_address[address])- len(parsed_with_leak)} for {address}"
) )
# Remove already parsed tokens
tokens_uri_by_address[address] = [
token_uri_data
for token_uri_data in tokens_uri_by_address[address]
if token_uri_data.token_id not in parsed_with_leak
]
for requests_chunk in [ for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size] tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size) for i in range(0, len(tokens_uri_by_address[address]), batch_size)
@ -155,9 +182,9 @@ def parse_metadata(
writed_labels = 0 writed_labels = 0
db_session.commit() db_session.commit()
with db_session.begin(): try:
for token_uri_data in requests_chunk: with db_session.begin():
if token_uri_data.token_id not in parsed_with_leak: for token_uri_data in requests_chunk:
metadata = crawl_uri(token_uri_data.token_uri) metadata = crawl_uri(token_uri_data.token_uri)
db_session.add( db_session.add(
@ -169,15 +196,28 @@ def parse_metadata(
) )
writed_labels += 1 writed_labels += 1
if writed_labels > 0: if writed_labels > 0:
clean_labels_from_db( clean_labels_from_db(
db_session=db_session, db_session=db_session,
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
address=address, address=address,
) )
logger.info(f"Write {writed_labels} labels for {address}") logger.info(
f"Write {writed_labels} labels for {address}"
)
# trasaction is commited here
except Exception as err:
logger.error(err)
logger.error(
f"Error while writing labels for address: {address}"
)
db_session.rollback()
# trasaction is commited here clean_labels_from_db(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
)
finally: finally:
db_session.close() db_session.close()
@ -221,7 +261,7 @@ def main() -> None:
"--max-recrawl", "--max-recrawl",
"-m", "-m",
type=int, type=int,
default=200, default=300,
help="Maximum amount of recrawling of already crawled tokens", help="Maximum amount of recrawling of already crawled tokens",
) )
metadata_crawler_parser.set_defaults(func=handle_crawl) metadata_crawler_parser.set_defaults(func=handle_crawl)

Wyświetl plik

@ -59,6 +59,21 @@ except:
f"Could not parse MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS as int: {MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS_RAW}" f"Could not parse MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS as int: {MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS_RAW}"
) )
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = 60000
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get(
"MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS"
)
try:
if MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW is not None:
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = int(
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW
)
except:
raise Exception(
f"Could not parse MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS as int: {MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW}"
)
# Geth connection address # Geth connection address
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get( MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get(
"MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", "" "MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", ""