Add autoscale to historical crawl.

pull/675/head
Andrey 2022-09-21 18:38:05 +03:00
rodzic 01d0b8dd03
commit 3f35b47a91
4 zmienionych plików z 114 dodań i 15 usunięć

Wyświetl plik

@ -1,9 +1,9 @@
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_
from web3 import Web3
@ -119,6 +119,7 @@ def _crawl_events(
), # TODO report via humbug
)
for raw_event in raw_events:
logger.info(f"block_number type:{type(raw_event['blockNumber'])}")
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
web3,
@ -139,3 +140,86 @@ def _crawl_events(
all_events.append(event)
return all_events
def _autoscale_crawl_events(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
jobs: List[EventCrawlJob],
from_block: int,
to_block: int,
blocks_cache: Dict[int, int] = {},
batch_size: int = 1000,
db_block_query_batch=10,
) -> Tuple[List[Event], int]:
"""
def _crawl_events(
web3: Web3,
event_abi: Any,
from_block: int,
to_block: int,
batch_size: int,
contract_address: ChecksumAddress,
batch_size_update_threshold: int = 1000,
max_blocks_batch: int = 10000,
min_blocks_batch: int = 100,
) -> Tuple[List[Dict[str, Any]], int]:
events = []
current_from_block = from_block
while current_from_block <= to_block:
current_to_block = min(current_from_block + batch_size, to_block)
try:
events_chunk = _fetch_events_chunk(
web3,
event_abi,
current_from_block,
current_to_block,
[contract_address],
)
events.extend(events_chunk)
current_from_block = current_to_block + 1
if len(events) <= batch_size_update_threshold:
batch_size = min(batch_size * 2, max_blocks_batch)
except Exception as e:
if batch_size <= min_blocks_batch:
raise e
time.sleep(0.1)
batch_size = max(batch_size // 2, min_blocks_batch)
return events, batch_size
"""
all_events = []
for job in jobs:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3,
job.event_abi,
from_block,
to_block,
batch_size,
job.contracts[0],
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
web3,
blockchain_type,
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
)
event = Event(
event_name=raw_event["event"],
args=raw_event["args"],
address=raw_event["address"],
block_number=raw_event["blockNumber"],
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
)
all_events.append(event)
return all_events, batch_size

Wyświetl plik

@ -10,7 +10,7 @@ from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignor
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.networks import Network # type: ignore
from moonworm.cu_watch import MockState # type: ignore
from moonworm.watch import MockState # type: ignore
from sqlalchemy.orm import Session
from web3 import Web3

Wyświetl plik

@ -13,7 +13,7 @@ from web3 import Web3
from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
from .event_crawler import _crawl_events, _autoscale_crawl_events
from .function_call_crawler import _crawl_functions
logging.basicConfig(level=logging.INFO)
@ -71,16 +71,31 @@ def historical_crawler(
)
logger.info(f"Crawling events from {start_block} to {batch_end_block}")
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=batch_end_block,
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
)
if function_call_crawl_jobs:
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=batch_end_block,
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
)
else:
all_events, max_blocks_batch = _autoscale_crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=batch_end_block,
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
)

Wyświetl plik

@ -38,7 +38,7 @@ setup(
"chardet",
"fastapi",
"moonstreamdb>=0.3.2",
"moonworm==0.2.4",
"moonworm==0.5.1",
"humbug",
"pydantic",
"python-dateutil",