From 99a608c6dc6c7039bc1e6a13e022f90152249c2e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 9 Mar 2023 07:28:34 +0000 Subject: [PATCH] Custom engine for all crawlers with specific timeout --- crawlers/mooncrawl/mooncrawl/blockchain.py | 11 +-- crawlers/mooncrawl/mooncrawl/contract/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/db.py | 77 +++++++++++++++++++ crawlers/mooncrawl/mooncrawl/esd.py | 3 +- crawlers/mooncrawl/mooncrawl/etherscan.py | 2 +- .../mooncrawl/generic_crawler/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/identity.py | 3 +- .../mooncrawl/metadata_crawler/cli.py | 45 +++-------- .../mooncrawl/moonworm_crawler/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/settings.py | 5 +- .../mooncrawl/mooncrawl/state_crawler/cli.py | 49 ++++-------- .../mooncrawl/stats_worker/dashboard.py | 9 +-- .../mooncrawl/stats_worker/queries.py | 24 ++---- 13 files changed, 125 insertions(+), 109 deletions(-) create mode 100644 crawlers/mooncrawl/mooncrawl/db.py diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index 2928cd96..d4f7000c 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -8,11 +8,7 @@ from moonstreamdb.blockchain import ( get_block_model, get_transaction_model, ) -from moonstreamdb.db import yield_db_session, yield_db_session_ctx -from moonstreamdb.models import ( - EthereumBlock, - EthereumTransaction, -) +from moonstreamdb.models import EthereumBlock, EthereumTransaction from psycopg2.errors import UniqueViolation # type: ignore from sqlalchemy import Column, desc, func from sqlalchemy.exc import IntegrityError @@ -23,13 +19,14 @@ from web3.middleware import geth_poa_middleware from web3.types import BlockData from .data import DateRange +from .db import yield_db_session, yield_db_session_ctx from .settings import ( MOONSTREAM_CRAWL_WORKERS, MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI, - MOONSTREAM_POLYGON_WEB3_PROVIDER_URI, MOONSTREAM_MUMBAI_WEB3_PROVIDER_URI, - MOONSTREAM_XDAI_WEB3_PROVIDER_URI, + MOONSTREAM_POLYGON_WEB3_PROVIDER_URI, MOONSTREAM_WYRM_WEB3_PROVIDER_URI, + MOONSTREAM_XDAI_WEB3_PROVIDER_URI, NB_ACCESS_ID_HEADER, NB_DATA_SOURCE_HEADER, WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS, diff --git a/crawlers/mooncrawl/mooncrawl/contract/cli.py b/crawlers/mooncrawl/mooncrawl/contract/cli.py index d22fb4d0..d7fccfbd 100644 --- a/crawlers/mooncrawl/mooncrawl/contract/cli.py +++ b/crawlers/mooncrawl/mooncrawl/contract/cli.py @@ -5,11 +5,11 @@ from typing import Optional from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -from moonstreamdb.db import yield_db_session_ctx from sqlalchemy.orm.session import Session from web3 import Web3 from ..blockchain import connect +from ..db import yield_db_session_ctx from ..settings import NB_CONTROLLER_ACCESS_ID from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore diff --git a/crawlers/mooncrawl/mooncrawl/db.py b/crawlers/mooncrawl/mooncrawl/db.py new file mode 100644 index 00000000..607a96c8 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/db.py @@ -0,0 +1,77 @@ +from contextlib import contextmanager +from typing import Generator + +from moonstreamdb.db import ( + MOONSTREAM_DB_URI, + MOONSTREAM_DB_URI_READ_ONLY, + MOONSTREAM_POOL_SIZE, + create_moonstream_engine, +) +from sqlalchemy.orm import Session, sessionmaker + +from .settings import ( + MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, + MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, +) + +engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, +) +SessionLocal = sessionmaker(bind=engine) + + +def yield_db_session() -> Generator[Session, None, None]: + session = SessionLocal() + try: + yield session + finally: + session.close() + + +yield_db_session_ctx = contextmanager(yield_db_session) + +# pre-ping +pre_ping_engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, + pool_pre_ping=True, +) +PrePing_SessionLocal = sessionmaker(bind=pre_ping_engine) + +# Read only +RO_engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI_READ_ONLY, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, +) +RO_SessionLocal = sessionmaker(bind=RO_engine) + + +def yield_db_read_only_session() -> Generator[Session, None, None]: + session = RO_SessionLocal() + try: + yield session + finally: + session.close() + + +yield_db_read_only_session_ctx = contextmanager(yield_db_read_only_session) + +# Read only pre-ping +RO_pre_ping_engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI_READ_ONLY, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, + pool_pre_ping=True, +) + +# Read only pre-ping query timeout +RO_pre_ping_query_engine = create_moonstream_engine( + url=MOONSTREAM_DB_URI_READ_ONLY, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + pool_pre_ping=True, +) diff --git a/crawlers/mooncrawl/mooncrawl/esd.py b/crawlers/mooncrawl/mooncrawl/esd.py index ace2f6f2..01cd5c88 100644 --- a/crawlers/mooncrawl/mooncrawl/esd.py +++ b/crawlers/mooncrawl/mooncrawl/esd.py @@ -4,10 +4,11 @@ import time from typing import Optional, Union import requests -from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature from sqlalchemy.orm import Session +from .db import yield_db_session_ctx + CRAWL_URLS = { "functions": "https://www.4byte.directory/api/v1/signatures/", "events": "https://www.4byte.directory/api/v1/event-signatures/", diff --git a/crawlers/mooncrawl/mooncrawl/etherscan.py b/crawlers/mooncrawl/mooncrawl/etherscan.py index 0ee24441..0e523bd0 100644 --- a/crawlers/mooncrawl/mooncrawl/etherscan.py +++ b/crawlers/mooncrawl/mooncrawl/etherscan.py @@ -12,10 +12,10 @@ from typing import Any, Dict, List, Optional import boto3 # type: ignore import requests -from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.models import EthereumLabel from sqlalchemy.orm import Session +from .db import yield_db_session_ctx from .settings import MOONSTREAM_ETHERSCAN_TOKEN from .version import MOONCRAWL_VERSION diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index 082f4138..a11aed9d 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -5,11 +5,11 @@ from typing import Optional from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -from moonstreamdb.db import yield_db_session_ctx from web3 import Web3 from web3.middleware import geth_poa_middleware from ..blockchain import connect +from ..db import yield_db_session_ctx from ..settings import NB_CONTROLLER_ACCESS_ID from .base import crawl, get_checkpoint, populate_with_events diff --git a/crawlers/mooncrawl/mooncrawl/identity.py b/crawlers/mooncrawl/mooncrawl/identity.py index 2497d9fb..c2e36fb2 100644 --- a/crawlers/mooncrawl/mooncrawl/identity.py +++ b/crawlers/mooncrawl/mooncrawl/identity.py @@ -5,10 +5,11 @@ import time from typing import Any import requests -from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.models import EthereumLabel from sqlalchemy import text +from .db import yield_db_session_ctx + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index a4df50c3..121a0640 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -1,29 +1,24 @@ import argparse -from contextlib import contextmanager import json -from urllib.error import HTTPError -import urllib.request import logging import random -from typing import Dict, Any, List, Optional +import urllib.request +from contextlib import contextmanager +from typing import Any, Dict, List, Optional +from urllib.error import HTTPError from moonstreamdb.blockchain import AvailableBlockchainType -from moonstreamdb.db import ( - MOONSTREAM_DB_URI, - MOONSTREAM_POOL_SIZE, - create_moonstream_engine, - MOONSTREAM_DB_STATEMENT_TIMEOUT_MILLIS, - MOONSTREAM_DB_URI_READ_ONLY, -) from sqlalchemy.orm import sessionmaker + +from ..db import PrePing_SessionLocal, RO_pre_ping_engine +from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS from .db import ( - get_uris_of_tokens, + clean_labels_from_db, get_current_metadata_for_address, get_tokens_id_wich_may_updated, + get_uris_of_tokens, metadata_to_label, - clean_labels_from_db, ) -from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -98,28 +93,10 @@ def parse_metadata( logger.info("Starting metadata crawler") logger.info(f"Connecting to blockchain {blockchain_type.value}") - engine = create_moonstream_engine( - MOONSTREAM_DB_URI, - pool_pre_ping=True, - pool_size=MOONSTREAM_POOL_SIZE, - statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, - ) - process_session = sessionmaker(bind=engine) - db_session = process_session() + db_session = PrePing_SessionLocal() # run crawling of levels - - # create read only engine - - # Read only - read_only_engine = create_moonstream_engine( - url=MOONSTREAM_DB_URI_READ_ONLY, - pool_size=MOONSTREAM_POOL_SIZE, - statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS, - pool_pre_ping=True, - ) - - with yield_session_maker(engine=read_only_engine) as db_session_read_only: + with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only: try: # get all tokens with uri logger.info("Requesting all tokens with uri from database") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index ef55c746..43d435d9 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -4,10 +4,10 @@ from typing import Optional from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -from moonstreamdb.db import yield_db_session_ctx from web3 import Web3 from web3.middleware import geth_poa_middleware +from ..db import yield_db_session_ctx from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID from .continuous_crawler import _retry_connect_web3, continuous_crawler from .crawler import ( diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index aa0812a2..dbc027d4 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -1,11 +1,10 @@ import os -from typing import Optional, Dict +from typing import Dict, Optional from uuid import UUID from bugout.app import Bugout from moonstreamdb.blockchain import AvailableBlockchainType - BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" @@ -60,7 +59,7 @@ except: ) -MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = 60000 +MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = 100000 MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get( "MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS" ) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index eb68651f..903632bc 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -1,37 +1,30 @@ import argparse -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures._base import TimeoutError -import json import hashlib import itertools +import json import logging -from typing import Dict, List, Any, Optional -from uuid import UUID import time +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures._base import TimeoutError from pprint import pprint +from typing import Any, Dict, List, Optional +from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 -from moonstreamdb.db import ( - MOONSTREAM_DB_URI, - MOONSTREAM_POOL_SIZE, - create_moonstream_engine, -) -import requests -from sqlalchemy.orm import sessionmaker from web3._utils.request import cache_session -from web3 import Web3, HTTPProvider from web3.middleware import geth_poa_middleware -from .db import view_call_to_label, commit_session, clean_labels -from .Multicall2_interface import Contract as Multicall2 +from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 + +from ..db import PrePing_SessionLocal from ..settings import ( - NB_CONTROLLER_ACCESS_ID, - MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, INFURA_PROJECT_ID, - multicall_contracts, + NB_CONTROLLER_ACCESS_ID, infura_networks, + multicall_contracts, ) +from .db import clean_labels, commit_session, view_call_to_label +from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature, connect logging.basicConfig(level=logging.INFO) @@ -380,14 +373,7 @@ def parse_jobs( # reverse call_tree call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] - engine = create_moonstream_engine( - MOONSTREAM_DB_URI, - pool_pre_ping=True, - pool_size=MOONSTREAM_POOL_SIZE, - statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, - ) - process_session = sessionmaker(bind=engine) - db_session = process_session() + db_session = PrePing_SessionLocal() # run crawling of levels try: @@ -494,14 +480,7 @@ def clean_labels_handler(args: argparse.Namespace) -> None: block_number = web3_client.eth.get_block("latest").number # type: ignore - engine = create_moonstream_engine( - MOONSTREAM_DB_URI, - pool_pre_ping=True, - pool_size=MOONSTREAM_POOL_SIZE, - statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, - ) - process_session = sessionmaker(bind=engine) - db_session = process_session() + db_session = PrePing_SessionLocal() try: clean_labels(db_session, blockchain_type, args.blocks_cutoff, block_number) diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py index 4e4e216c..b73cd865 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -6,13 +6,12 @@ import hashlib import json import logging import time +import traceback from datetime import datetime, timedelta from enum import Enum -from typing import Any, Callable, Dict, List, Union, Optional +from typing import Any, Callable, Dict, List, Optional, Union from uuid import UUID -import traceback - import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources from moonstreamdb.blockchain import ( @@ -20,13 +19,13 @@ from moonstreamdb.blockchain import ( get_label_model, get_transaction_model, ) -from moonstreamdb.db import yield_db_read_only_session_ctx -from sqlalchemy import and_, distinct, func, text, extract, cast +from sqlalchemy import and_, cast, distinct, extract, func, text from sqlalchemy.orm import Session from sqlalchemy.sql.operators import in_op from web3 import Web3 from ..blockchain import connect +from ..db import yield_db_read_only_session_ctx from ..reporter import reporter from ..settings import ( CRAWLER_LABEL, diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index 855bdb40..7f5bfd9c 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -6,19 +6,12 @@ from io import StringIO from typing import Any, Dict, Optional import boto3 # type: ignore -from moonstreamdb.db import ( - create_moonstream_engine, - MOONSTREAM_DB_URI_READ_ONLY, - MOONSTREAM_POOL_SIZE, -) from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text -from ..reporter import reporter -from ..settings import ( - MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, - MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, -) +from ..db import RO_pre_ping_query_engine +from ..reporter import reporter +from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -84,14 +77,7 @@ def data_generate( """ s3 = boto3.client("s3") - # Create session - engine = create_moonstream_engine( - MOONSTREAM_DB_URI_READ_ONLY, - pool_pre_ping=True, - pool_size=MOONSTREAM_POOL_SIZE, - statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, - ) - process_session = sessionmaker(bind=engine) + process_session = sessionmaker(bind=RO_pre_ping_query_engine) db_session = process_session() try: @@ -100,7 +86,7 @@ def data_generate( csv_writer = csv.writer(csv_buffer, delimiter=";") # engine.execution_options(stream_results=True) - result = db_session.execute(query, params).keys() + result = db_session.execute(query, params).keys() # type: ignore csv_writer.writerow(result.keys()) csv_writer.writerows(result.fetchAll())