Add thread executor.

state-crawler-improvments
Andrey 2022-09-12 16:02:32 +03:00
rodzic 85d192e12e
commit 9bb5fb4794
2 zmienionych plików z 33 dodań i 12 usunięć

Wyświetl plik

@ -4,6 +4,7 @@ from urllib.error import HTTPError
import urllib.request
import logging
from typing import Dict, Any
from concurrent.futures import ThreadPoolExecutor
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import (
@ -19,8 +20,10 @@ from .db import (
metadata_to_label,
)
from ..settings import (
MOONSTREAM_METADATA_CRAWLER_THREADS,
MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
from ..data import TokenURIs
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -29,7 +32,7 @@ logger = logging.getLogger(__name__)
batch_size = 50
def crawl_uri(metadata_uri: str) -> Any:
def crawl_uri(token_uri_data: TokenURIs) -> Any:
"""
Get metadata from URI
@ -39,7 +42,7 @@ def crawl_uri(metadata_uri: str) -> Any:
while retry < 3:
try:
response = urllib.request.urlopen(metadata_uri, timeout=5)
response = urllib.request.urlopen(token_uri_data.token_uri, timeout=5)
if response.status == 200:
result = json.loads(response.read())
@ -54,7 +57,7 @@ def crawl_uri(metadata_uri: str) -> Any:
logger.error(err)
retry += 1
continue
return result
return result, token_uri_data
def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
@ -89,28 +92,28 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
for address in tokens_uri_by_address:
already_parsed = get_current_metadata_for_address(
db_session=db_session, blockchain_type=blockchain_type, address=address
)
for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
]:
writed_labels = 0
for token_uri_data in requests_chunk:
if token_uri_data.token_id not in already_parsed:
metadata = crawl_uri(token_uri_data.token_uri)
with ThreadPoolExecutor(
max_workers=MOONSTREAM_METADATA_CRAWLER_THREADS
) as executor:
for result in executor.map(
crawl_uri, [request for request in requests_chunk]
):
db_session.add(
metadata_to_label(
metadata=result[0],
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
token_uri_data=result[1],
)
)
writed_labels += 1
commit_session(db_session)
logger.info(f"Write {writed_labels} labels for {address}")

Wyświetl plik

@ -176,3 +176,21 @@ multicall_contracts: Dict[AvailableBlockchainType, str] = {
AvailableBlockchainType.MUMBAI: "0xe9939e7Ea7D7fb619Ac57f648Da7B1D425832631",
AvailableBlockchainType.ETHEREUM: "0x5BA1e12693Dc8F9c48aAD8770482f4739bEeD696",
}
# Metadata crawler
MOONSTREAM_METADATA_CRAWLER_THREADS = 10
MOONSTREAM_METADATA_CRAWLER_THREADS_RAW = os.environ.get(
"MOONSTREAM_METADATA_CRAWLER_THREADS"
)
try:
if MOONSTREAM_METADATA_CRAWLER_THREADS_RAW is not None:
MOONSTREAM_METADATA_CRAWLER_THREADS = int(
MOONSTREAM_METADATA_CRAWLER_THREADS_RAW
)
except:
raise Exception(
f"Could not parse MOONSTREAM_METADATA_CRAWLER_THREADS as int: {MOONSTREAM_METADATA_CRAWLER_THREADS_RAW}"
)