From 9bb5fb4794d5b8feb49ec19e5679d3aeb9357d0b Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 12 Sep 2022 16:02:32 +0300 Subject: [PATCH] Add thread executor. --- .../mooncrawl/metadata_crawler/cli.py | 27 ++++++++++--------- crawlers/mooncrawl/mooncrawl/settings.py | 18 +++++++++++++ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 6140fd78..5a83b694 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -4,6 +4,7 @@ from urllib.error import HTTPError import urllib.request import logging from typing import Dict, Any +from concurrent.futures import ThreadPoolExecutor from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.db import ( @@ -19,8 +20,10 @@ from .db import ( metadata_to_label, ) from ..settings import ( + MOONSTREAM_METADATA_CRAWLER_THREADS, MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, ) +from ..data import TokenURIs logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -29,7 +32,7 @@ logger = logging.getLogger(__name__) batch_size = 50 -def crawl_uri(metadata_uri: str) -> Any: +def crawl_uri(token_uri_data: TokenURIs) -> Any: """ Get metadata from URI @@ -39,7 +42,7 @@ def crawl_uri(metadata_uri: str) -> Any: while retry < 3: try: - response = urllib.request.urlopen(metadata_uri, timeout=5) + response = urllib.request.urlopen(token_uri_data.token_uri, timeout=5) if response.status == 200: result = json.loads(response.read()) @@ -54,7 +57,7 @@ def crawl_uri(metadata_uri: str) -> Any: logger.error(err) retry += 1 continue - return result + return result, token_uri_data def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): @@ -89,28 +92,28 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): for address in tokens_uri_by_address: - already_parsed = get_current_metadata_for_address( - db_session=db_session, blockchain_type=blockchain_type, address=address - ) - for requests_chunk in [ tokens_uri_by_address[address][i : i + batch_size] for i in range(0, len(tokens_uri_by_address[address]), batch_size) ]: writed_labels = 0 - for token_uri_data in requests_chunk: - if token_uri_data.token_id not in already_parsed: - metadata = crawl_uri(token_uri_data.token_uri) + with ThreadPoolExecutor( + max_workers=MOONSTREAM_METADATA_CRAWLER_THREADS + ) as executor: + for result in executor.map( + crawl_uri, [request for request in requests_chunk] + ): db_session.add( metadata_to_label( + metadata=result[0], blockchain_type=blockchain_type, - metadata=metadata, - token_uri_data=token_uri_data, + token_uri_data=result[1], ) ) writed_labels += 1 + commit_session(db_session) logger.info(f"Write {writed_labels} labels for {address}") diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 27d6faf4..7cddcdd5 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -176,3 +176,21 @@ multicall_contracts: Dict[AvailableBlockchainType, str] = { AvailableBlockchainType.MUMBAI: "0xe9939e7Ea7D7fb619Ac57f648Da7B1D425832631", AvailableBlockchainType.ETHEREUM: "0x5BA1e12693Dc8F9c48aAD8770482f4739bEeD696", } + + +# Metadata crawler + +MOONSTREAM_METADATA_CRAWLER_THREADS = 10 + +MOONSTREAM_METADATA_CRAWLER_THREADS_RAW = os.environ.get( + "MOONSTREAM_METADATA_CRAWLER_THREADS" +) +try: + if MOONSTREAM_METADATA_CRAWLER_THREADS_RAW is not None: + MOONSTREAM_METADATA_CRAWLER_THREADS = int( + MOONSTREAM_METADATA_CRAWLER_THREADS_RAW + ) +except: + raise Exception( + f"Could not parse MOONSTREAM_METADATA_CRAWLER_THREADS as int: {MOONSTREAM_METADATA_CRAWLER_THREADS_RAW}" + )