Move _crawl_events from watch_contract.

pull/98/head
Andrey 2022-09-20 19:43:58 +03:00
rodzic 18cb7e55e9
commit 3293ee4121
2 zmienionych plików z 52 dodań i 35 usunięć

Wyświetl plik

@ -3,7 +3,7 @@ import datetime
import json
import logging
import time
from typing import Any, Callable, Iterable, List, Optional, Tuple
from typing import Any, Callable, Iterable, List, Optional, Tuple, Dict
from eth_abi.codec import ABICodec
from eth_typing.evm import ChecksumAddress
@ -127,6 +127,47 @@ def _fetch_events_chunk(
return all_events
def _crawl_events(
web3: Web3,
event_abi: Any,
from_block: int,
to_block: int,
batch_size: int,
contract_address: ChecksumAddress,
batch_size_update_threshold: int = 1000,
max_blocks_batch: int = 10000,
min_blocks_batch: int = 100,
) -> Tuple[List[Dict[str, Any]], int]:
"""
Crawls events from the given block range.
reduces the batch_size if response is failing.
increases the batch_size if response is successful.
"""
events = []
current_from_block = from_block
while current_from_block <= to_block:
current_to_block = min(current_from_block + batch_size, to_block)
try:
events_chunk = _fetch_events_chunk(
web3,
event_abi,
current_from_block,
current_to_block,
[contract_address],
)
events.extend(events_chunk)
current_from_block = current_to_block + 1
if len(events) <= batch_size_update_threshold:
batch_size = min(batch_size * 2, max_blocks_batch)
except Exception as e:
if batch_size <= min_blocks_batch:
raise e
time.sleep(0.1)
batch_size = max(batch_size // 2, min_blocks_batch)
return events, batch_size
class EventScanner:
def __init__(
self,

Wyświetl plik

@ -24,7 +24,7 @@ from .crawler.function_call_crawler import (
FunctionCallCrawlerState,
Web3StateProvider,
)
from .crawler.log_scanner import _fetch_events_chunk
from .crawler.log_scanner import _fetch_events_chunk, _crawl_events
class MockState(FunctionCallCrawlerState):
@ -113,38 +113,6 @@ def watch_contract(
None. Results are printed to stdout and, if an outfile has been provided, also to the file.
"""
def _crawl_events(
event_abi, from_block: int, to_block: int, batch_size: int
) -> Tuple[List[Dict[str, Any]], int]:
"""
Crawls events from the given block range.
reduces the batch_size if response is failing.
increases the batch_size if response is successful.
"""
events = []
current_from_block = from_block
while current_from_block <= to_block:
current_to_block = min(current_from_block + batch_size, to_block)
try:
events_chunk = _fetch_events_chunk(
web3,
event_abi,
current_from_block,
current_to_block,
[contract_address],
)
events.extend(events_chunk)
current_from_block = current_to_block + 1
if len(events) <= batch_size_update_threshold:
batch_size = min(batch_size * 2, max_blocks_batch)
except Exception as e:
if batch_size <= min_blocks_batch:
raise e
time.sleep(0.1)
batch_size = max(batch_size // 2, min_blocks_batch)
return events, batch_size
current_batch_size = min_blocks_batch
state = MockState()
crawler = FunctionCallCrawler(
@ -194,7 +162,15 @@ def watch_contract(
for event_abi in event_abis:
all_events, new_batch_size = _crawl_events(
event_abi, current_block, until_block, current_batch_size
web3,
event_abi,
current_block,
until_block,
current_batch_size,
contract_address,
batch_size_update_threshold,
max_blocks_batch,
min_blocks_batch,
)
if only_events: