moonworm/moonworm/crawler/example.py

128 wiersze
4.6 KiB
Python

import logging
import sys
import time
from web3 import Web3
from web3.providers.rpc import HTTPProvider
from .log_scanner import EventScanner
from .state import JSONifiedState
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
if __name__ == "__main__":
# Simple demo that scans all the token transfers of RCC token (11k).
# The demo supports persistant state by using a JSON file.
# You will need an Ethereum node for this.
# Running this script will consume around 20k JSON-RPC calls.
# With locally running Geth, the script takes 10 minutes.
# The resulting JSON state file is 2.9 MB.
# We use tqdm library to render a nice progress bar in the console
# https://pypi.org/project/tqdm/
from tqdm import tqdm
# RCC has around 11k Transfer events
# https://etherscan.io/token/0x9b6443b0fb9c241a7fdac375595cea13e6b7807a
RCC_ADDRESS = "0x7ceb23fd6bc0add59e62ac25578270cff1b9f619"
# Reduced ERC-20 ABI, only Transfer event
ABI = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"},
],
"name": "Transfer",
"type": "event",
}
]
def run():
if len(sys.argv) < 2:
print("Usage: eventscanner.py http://your-node-url")
sys.exit(1)
api_url = sys.argv[1]
# Enable logs to the stdout.
# DEBUG is very verbose level
logging.basicConfig(level=logging.INFO)
provider = HTTPProvider(api_url)
# Remove the default JSON-RPC retry middleware
# as it correctly cannot handle eth_getLogs block range
# throttle down.
provider.middlewares.clear()
web3 = Web3(provider)
print(web3.eth.block_number)
# Restore/create our persistent state
state = JSONifiedState()
state.restore()
# chain_id: int, web3: Web3, abi: dict, state: EventScannerState, events: List, filters: {}, max_chunk_scan_size: int=10000
scanner = EventScanner(
web3=web3,
scanner_state=state,
events=ABI,
addresses=[RCC_ADDRESS],
# How many maximum blocks at the time we request from JSON-RPC
# and we are unlikely to exceed the response size limit of the JSON-RPC server
max_chunk_scan_size=100000,
skip_block_timestamp=True,
)
# Assume we might have scanned the blocks all the way to the last Ethereum block
# that mined a few seconds before the previous scan run ended.
# Because there might have been a minor Etherueum chain reorganisations
# since the last scan ended, we need to discard
# the last few blocks from the previous scan results.
chain_reorg_safety_blocks = 10
scanner.delete_potentially_forked_block_data(
state.get_last_scanned_block() - chain_reorg_safety_blocks
)
# Scan from [last block scanned] - [latest ethereum block]
# Note that our chain reorg safety blocks cannot go negative
# start_block = max(state.get_last_scanned_block() - chain_reorg_safety_blocks, 0)
end_block = scanner.get_suggested_scan_end_block()
start_block = end_block - 1000
blocks_to_scan = end_block - start_block
print(f"Scanning events from blocks {start_block} - {end_block}")
# Render a progress bar in the console
start = time.time()
with tqdm(total=blocks_to_scan) as progress_bar:
def _update_progress(
start, end, current, current_block_timestamp, chunk_size, events_count
):
if current_block_timestamp:
formatted_time = current_block_timestamp.strftime("%d-%m-%Y")
else:
formatted_time = ""
progress_bar.set_description(
f"Current block: {current} ({formatted_time}), blocks in a scan batch: {chunk_size}, events processed in a batch {events_count}"
)
progress_bar.update(chunk_size)
# Run the scan
result, total_chunks_scanned = scanner.scan(
start_block, end_block, progress_callback=_update_progress
)
state.save()
duration = time.time() - start
print(
f"Scanned total {len(result)} Transfer events, in {duration} seconds, total {total_chunks_scanned} chunk scans performed"
)
run()