kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #675 from bugout-dev/historical-crawl-improvements
Add autoscale to historical crawl.pull/665/head^2
commit
9d93a00f05
|
@ -1,9 +1,9 @@
|
|||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
|
||||
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
|
||||
from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy.sql.expression import and_
|
||||
from web3 import Web3
|
||||
|
@ -68,11 +68,11 @@ def get_block_timestamp(
|
|||
block_model = get_block_model(blockchain_type)
|
||||
|
||||
blocks = (
|
||||
db_session.query(block_model)
|
||||
db_session.query(block_model.block_number, block_model.timestamp)
|
||||
.filter(
|
||||
and_(
|
||||
block_model.block_number >= block_number,
|
||||
block_model.block_number <= block_number + max_blocks_batch - 1,
|
||||
block_model.block_number >= block_number - max_blocks_batch - 1,
|
||||
block_model.block_number <= block_number + max_blocks_batch + 1,
|
||||
)
|
||||
)
|
||||
.order_by(block_model.block_number.asc())
|
||||
|
@ -86,7 +86,7 @@ def get_block_timestamp(
|
|||
if target_block_timestamp is None:
|
||||
target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number)
|
||||
|
||||
if len(blocks_cache) > max_blocks_batch * 2:
|
||||
if len(blocks_cache) > (max_blocks_batch * 3 + 2):
|
||||
blocks_cache.clear()
|
||||
|
||||
blocks_cache[block_number] = target_block_timestamp
|
||||
|
@ -139,3 +139,52 @@ def _crawl_events(
|
|||
all_events.append(event)
|
||||
|
||||
return all_events
|
||||
|
||||
|
||||
def _autoscale_crawl_events(
|
||||
db_session: Session,
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
web3: Web3,
|
||||
jobs: List[EventCrawlJob],
|
||||
from_block: int,
|
||||
to_block: int,
|
||||
blocks_cache: Dict[int, int] = {},
|
||||
batch_size: int = 1000,
|
||||
db_block_query_batch=10,
|
||||
) -> Tuple[List[Event], int]:
|
||||
|
||||
"""
|
||||
Crawl events with auto regulated batch_size.
|
||||
"""
|
||||
all_events = []
|
||||
for job in jobs:
|
||||
|
||||
raw_events, batch_size = moonworm_autoscale_crawl_events(
|
||||
web3,
|
||||
job.event_abi,
|
||||
from_block,
|
||||
to_block,
|
||||
batch_size,
|
||||
job.contracts[0],
|
||||
)
|
||||
for raw_event in raw_events:
|
||||
raw_event["blockTimestamp"] = get_block_timestamp(
|
||||
db_session,
|
||||
web3,
|
||||
blockchain_type,
|
||||
raw_event["blockNumber"],
|
||||
blocks_cache,
|
||||
db_block_query_batch,
|
||||
)
|
||||
event = Event(
|
||||
event_name=raw_event["event"],
|
||||
args=raw_event["args"],
|
||||
address=raw_event["address"],
|
||||
block_number=raw_event["blockNumber"],
|
||||
block_timestamp=raw_event["blockTimestamp"],
|
||||
transaction_hash=raw_event["transactionHash"],
|
||||
log_index=raw_event["logIndex"],
|
||||
)
|
||||
all_events.append(event)
|
||||
|
||||
return all_events, batch_size
|
||||
|
|
|
@ -10,7 +10,7 @@ from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignor
|
|||
MoonstreamEthereumStateProvider,
|
||||
)
|
||||
from moonworm.crawler.networks import Network # type: ignore
|
||||
from moonworm.cu_watch import MockState # type: ignore
|
||||
from moonworm.watch import MockState # type: ignore
|
||||
from sqlalchemy.orm import Session
|
||||
from web3 import Web3
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ from web3 import Web3
|
|||
|
||||
from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3
|
||||
from .db import add_events_to_session, add_function_calls_to_session, commit_session
|
||||
from .event_crawler import _crawl_events
|
||||
from .event_crawler import _crawl_events, _autoscale_crawl_events
|
||||
from .function_call_crawler import _crawl_functions
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
@ -71,26 +71,41 @@ def historical_crawler(
|
|||
)
|
||||
|
||||
logger.info(f"Crawling events from {start_block} to {batch_end_block}")
|
||||
all_events = _crawl_events(
|
||||
db_session=db_session,
|
||||
blockchain_type=blockchain_type,
|
||||
web3=web3,
|
||||
jobs=event_crawl_jobs,
|
||||
from_block=batch_end_block,
|
||||
to_block=start_block,
|
||||
blocks_cache=blocks_cache,
|
||||
db_block_query_batch=max_blocks_batch,
|
||||
)
|
||||
|
||||
if function_call_crawl_jobs:
|
||||
all_events = _crawl_events(
|
||||
db_session=db_session,
|
||||
blockchain_type=blockchain_type,
|
||||
web3=web3,
|
||||
jobs=event_crawl_jobs,
|
||||
from_block=batch_end_block,
|
||||
to_block=start_block,
|
||||
blocks_cache=blocks_cache,
|
||||
db_block_query_batch=max_blocks_batch,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
all_events, max_blocks_batch = _autoscale_crawl_events(
|
||||
db_session=db_session,
|
||||
blockchain_type=blockchain_type,
|
||||
web3=web3,
|
||||
jobs=event_crawl_jobs,
|
||||
from_block=batch_end_block,
|
||||
to_block=start_block,
|
||||
blocks_cache=blocks_cache,
|
||||
db_block_query_batch=max_blocks_batch,
|
||||
)
|
||||
logger.info(
|
||||
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
|
||||
)
|
||||
|
||||
add_events_to_session(db_session, all_events, blockchain_type)
|
||||
|
||||
logger.info(
|
||||
f"Crawling function calls from {start_block} to {batch_end_block}"
|
||||
)
|
||||
if function_call_crawl_jobs:
|
||||
logger.info(
|
||||
f"Crawling function calls from {start_block} to {batch_end_block}"
|
||||
)
|
||||
all_function_calls = _crawl_functions(
|
||||
blockchain_type,
|
||||
ethereum_state_provider,
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream crawlers version.
|
||||
"""
|
||||
|
||||
MOONCRAWL_VERSION = "0.2.3"
|
||||
MOONCRAWL_VERSION = "0.2.4"
|
||||
|
|
|
@ -38,7 +38,7 @@ setup(
|
|||
"chardet",
|
||||
"fastapi",
|
||||
"moonstreamdb>=0.3.2",
|
||||
"moonworm==0.2.4",
|
||||
"moonworm[moonstream]==0.5.1",
|
||||
"humbug",
|
||||
"pydantic",
|
||||
"python-dateutil",
|
||||
|
|
Ładowanie…
Reference in New Issue