Custom engine for all crawlers with specific timeout

pull/766/head
kompotkot 2023-03-09 07:28:34 +00:00
rodzic b85939595c
commit 99a608c6dc
13 zmienionych plików z 125 dodań i 109 usunięć

Wyświetl plik

@ -8,11 +8,7 @@ from moonstreamdb.blockchain import (
get_block_model,
get_transaction_model,
)
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
EthereumBlock,
EthereumTransaction,
)
from moonstreamdb.models import EthereumBlock, EthereumTransaction
from psycopg2.errors import UniqueViolation # type: ignore
from sqlalchemy import Column, desc, func
from sqlalchemy.exc import IntegrityError
@ -23,13 +19,14 @@ from web3.middleware import geth_poa_middleware
from web3.types import BlockData
from .data import DateRange
from .db import yield_db_session, yield_db_session_ctx
from .settings import (
MOONSTREAM_CRAWL_WORKERS,
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI,
MOONSTREAM_POLYGON_WEB3_PROVIDER_URI,
MOONSTREAM_MUMBAI_WEB3_PROVIDER_URI,
MOONSTREAM_XDAI_WEB3_PROVIDER_URI,
MOONSTREAM_POLYGON_WEB3_PROVIDER_URI,
MOONSTREAM_WYRM_WEB3_PROVIDER_URI,
MOONSTREAM_XDAI_WEB3_PROVIDER_URI,
NB_ACCESS_ID_HEADER,
NB_DATA_SOURCE_HEADER,
WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS,

Wyświetl plik

@ -5,11 +5,11 @@ from typing import Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy.orm.session import Session
from web3 import Web3
from ..blockchain import connect
from ..db import yield_db_session_ctx
from ..settings import NB_CONTROLLER_ACCESS_ID
from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore

Wyświetl plik

@ -0,0 +1,77 @@
from contextlib import contextmanager
from typing import Generator
from moonstreamdb.db import (
MOONSTREAM_DB_URI,
MOONSTREAM_DB_URI_READ_ONLY,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
from sqlalchemy.orm import Session, sessionmaker
from .settings import (
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
)
engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
)
SessionLocal = sessionmaker(bind=engine)
def yield_db_session() -> Generator[Session, None, None]:
session = SessionLocal()
try:
yield session
finally:
session.close()
yield_db_session_ctx = contextmanager(yield_db_session)
# pre-ping
pre_ping_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
PrePing_SessionLocal = sessionmaker(bind=pre_ping_engine)
# Read only
RO_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
)
RO_SessionLocal = sessionmaker(bind=RO_engine)
def yield_db_read_only_session() -> Generator[Session, None, None]:
session = RO_SessionLocal()
try:
yield session
finally:
session.close()
yield_db_read_only_session_ctx = contextmanager(yield_db_read_only_session)
# Read only pre-ping
RO_pre_ping_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
# Read only pre-ping query timeout
RO_pre_ping_query_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)

Wyświetl plik

@ -4,10 +4,11 @@ import time
from typing import Optional, Union
import requests
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature
from sqlalchemy.orm import Session
from .db import yield_db_session_ctx
CRAWL_URLS = {
"functions": "https://www.4byte.directory/api/v1/signatures/",
"events": "https://www.4byte.directory/api/v1/event-signatures/",

Wyświetl plik

@ -12,10 +12,10 @@ from typing import Any, Dict, List, Optional
import boto3 # type: ignore
import requests
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel
from sqlalchemy.orm import Session
from .db import yield_db_session_ctx
from .settings import MOONSTREAM_ETHERSCAN_TOKEN
from .version import MOONCRAWL_VERSION

Wyświetl plik

@ -5,11 +5,11 @@ from typing import Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3
from web3.middleware import geth_poa_middleware
from ..blockchain import connect
from ..db import yield_db_session_ctx
from ..settings import NB_CONTROLLER_ACCESS_ID
from .base import crawl, get_checkpoint, populate_with_events

Wyświetl plik

@ -5,10 +5,11 @@ import time
from typing import Any
import requests
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel
from sqlalchemy import text
from .db import yield_db_session_ctx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Wyświetl plik

@ -1,29 +1,24 @@
import argparse
from contextlib import contextmanager
import json
from urllib.error import HTTPError
import urllib.request
import logging
import random
from typing import Dict, Any, List, Optional
import urllib.request
from contextlib import contextmanager
from typing import Any, Dict, List, Optional
from urllib.error import HTTPError
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import (
MOONSTREAM_DB_URI,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
MOONSTREAM_DB_STATEMENT_TIMEOUT_MILLIS,
MOONSTREAM_DB_URI_READ_ONLY,
)
from sqlalchemy.orm import sessionmaker
from ..db import PrePing_SessionLocal, RO_pre_ping_engine
from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS
from .db import (
get_uris_of_tokens,
clean_labels_from_db,
get_current_metadata_for_address,
get_tokens_id_wich_may_updated,
get_uris_of_tokens,
metadata_to_label,
clean_labels_from_db,
)
from ..settings import MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -98,28 +93,10 @@ def parse_metadata(
logger.info("Starting metadata crawler")
logger.info(f"Connecting to blockchain {blockchain_type.value}")
engine = create_moonstream_engine(
MOONSTREAM_DB_URI,
pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
)
process_session = sessionmaker(bind=engine)
db_session = process_session()
db_session = PrePing_SessionLocal()
# run crawling of levels
# create read only engine
# Read only
read_only_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS,
pool_pre_ping=True,
)
with yield_session_maker(engine=read_only_engine) as db_session_read_only:
with yield_session_maker(engine=RO_pre_ping_engine) as db_session_read_only:
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")

Wyświetl plik

@ -4,10 +4,10 @@ from typing import Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3
from web3.middleware import geth_poa_middleware
from ..db import yield_db_session_ctx
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (

Wyświetl plik

@ -1,11 +1,10 @@
import os
from typing import Optional, Dict
from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from moonstreamdb.blockchain import AvailableBlockchainType
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
@ -60,7 +59,7 @@ except:
)
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = 60000
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS = 100000
MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get(
"MOONSTREAM_CRAWLERS_DB_STATEMENT_TIMEOUT_MILLIS"
)

Wyświetl plik

@ -1,37 +1,30 @@
import argparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import TimeoutError
import json
import hashlib
import itertools
import json
import logging
from typing import Dict, List, Any, Optional
from uuid import UUID
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import TimeoutError
from pprint import pprint
from typing import Any, Dict, List, Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from moonstreamdb.db import (
MOONSTREAM_DB_URI,
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
import requests
from sqlalchemy.orm import sessionmaker
from web3._utils.request import cache_session
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from .db import view_call_to_label, commit_session, clean_labels
from .Multicall2_interface import Contract as Multicall2
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from ..db import PrePing_SessionLocal
from ..settings import (
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
INFURA_PROJECT_ID,
multicall_contracts,
NB_CONTROLLER_ACCESS_ID,
infura_networks,
multicall_contracts,
)
from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2
from .web3_util import FunctionSignature, connect
logging.basicConfig(level=logging.INFO)
@ -380,14 +373,7 @@ def parse_jobs(
# reverse call_tree
call_tree_levels = sorted(calls.keys(), reverse=True)[:-1]
engine = create_moonstream_engine(
MOONSTREAM_DB_URI,
pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
process_session = sessionmaker(bind=engine)
db_session = process_session()
db_session = PrePing_SessionLocal()
# run crawling of levels
try:
@ -494,14 +480,7 @@ def clean_labels_handler(args: argparse.Namespace) -> None:
block_number = web3_client.eth.get_block("latest").number # type: ignore
engine = create_moonstream_engine(
MOONSTREAM_DB_URI,
pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS,
)
process_session = sessionmaker(bind=engine)
db_session = process_session()
db_session = PrePing_SessionLocal()
try:
clean_labels(db_session, blockchain_type, args.blocks_cutoff, block_number)

Wyświetl plik

@ -6,13 +6,12 @@ import hashlib
import json
import logging
import time
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Union, Optional
from typing import Any, Callable, Dict, List, Optional, Union
from uuid import UUID
import traceback
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from moonstreamdb.blockchain import (
@ -20,13 +19,13 @@ from moonstreamdb.blockchain import (
get_label_model,
get_transaction_model,
)
from moonstreamdb.db import yield_db_read_only_session_ctx
from sqlalchemy import and_, distinct, func, text, extract, cast
from sqlalchemy import and_, cast, distinct, extract, func, text
from sqlalchemy.orm import Session
from sqlalchemy.sql.operators import in_op
from web3 import Web3
from ..blockchain import connect
from ..db import yield_db_read_only_session_ctx
from ..reporter import reporter
from ..settings import (
CRAWLER_LABEL,

Wyświetl plik

@ -6,19 +6,12 @@ from io import StringIO
from typing import Any, Dict, Optional
import boto3 # type: ignore
from moonstreamdb.db import (
create_moonstream_engine,
MOONSTREAM_DB_URI_READ_ONLY,
MOONSTREAM_POOL_SIZE,
)
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from ..reporter import reporter
from ..settings import (
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
)
from ..db import RO_pre_ping_query_engine
from ..reporter import reporter
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -84,14 +77,7 @@ def data_generate(
"""
s3 = boto3.client("s3")
# Create session
engine = create_moonstream_engine(
MOONSTREAM_DB_URI_READ_ONLY,
pool_pre_ping=True,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS,
)
process_session = sessionmaker(bind=engine)
process_session = sessionmaker(bind=RO_pre_ping_query_engine)
db_session = process_session()
try:
@ -100,7 +86,7 @@ def data_generate(
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
result = db_session.execute(query, params).keys() # type: ignore
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())