working hisotic crawler

pull/634/head
Yhtyyar Sahatov 2022-06-16 15:53:19 +03:00
rodzic 8892de0d7c
commit 14ef9372a2
5 zmienionych plików z 365 dodań i 42 usunięć

Wyświetl plik

@ -10,6 +10,7 @@ from web3.middleware import geth_poa_middleware
from ..blockchain import AvailableBlockchainType
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .historical_crawler import historical_crawler
from .crawler import (
SubscriptionTypes,
blockchain_type_to_subscription_type,
@ -17,7 +18,7 @@ from .crawler import (
make_event_crawl_jobs,
make_function_call_crawl_jobs,
)
from .db import get_last_labeled_block_number
from .db import get_first_labeled_block_number, get_last_labeled_block_number
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -114,6 +115,117 @@ def handle_crawl(args: argparse.Namespace) -> None:
)
def handle_historical_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
addresses_filter = [args.address]
all_event_jobs = make_event_crawl_jobs(
get_crawl_job_entries(
subscription_type,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
)
filtered_event_jobs = []
for job in all_event_jobs:
intersection = [
address for address in job.contracts if address in addresses_filter
]
if intersection:
job.contracts = intersection
filtered_event_jobs.append(job)
logger.info(f"Filtered event crawl jobs count: {len(filtered_event_jobs)}")
all_function_call_jobs = make_function_call_crawl_jobs(
get_crawl_job_entries(
subscription_type,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
)
filtered_function_call_jobs = [
job
for job in all_function_call_jobs
if job.contract_address in addresses_filter
]
logger.info(
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_first_labeled_block_number(
db_session, blockchain_type, args.address
)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
raise ValueError(
"No start block provided and no last labeled block found"
)
elif last_labeled_block is not None:
if start_block > last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
if start_block < args.end:
raise ValueError(
f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction."
)
historical_crawler(
db_session,
blockchain_type,
web3,
filtered_event_jobs,
filtered_function_call_jobs,
start_block,
args.end,
args.max_blocks_batch,
args.min_sleep_time,
access_id=args.access_id,
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
@ -211,6 +323,71 @@ def main() -> None:
crawl_parser.set_defaults(func=handle_crawl)
historical_crawl_parser = subparsers.add_parser(
"historical-crawl", help="Crawl historical data"
)
historical_crawl_parser.add_argument(
"--address",
"-a",
required=True,
type=str,
)
historical_crawl_parser.add_argument(
"--start",
"-s",
type=int,
default=None,
)
historical_crawl_parser.add_argument(
"--end",
"-e",
type=int,
required=True,
)
historical_crawl_parser.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
historical_crawl_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
historical_crawl_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
historical_crawl_parser.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
historical_crawl_parser.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
historical_crawl_parser.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -24,6 +24,7 @@ from .crawler import (
make_function_call_crawl_jobs,
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
_retry_connect_web3,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
@ -82,34 +83,6 @@ def _refetch_new_jobs(
return event_crawl_jobs, function_call_crawl_jobs
def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
access_id: Optional[UUID] = None,
) -> Web3:
"""
Retry connecting to the blockchain.
"""
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type, access_id=access_id)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3
except Exception as e:
if retry_count == 0:
error = e
break
logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}")
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
)
def continuous_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,

Wyświetl plik

@ -6,6 +6,7 @@ from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, cast
from uuid import UUID
from bugout.data import BugoutSearchResult
from eth_typing.evm import ChecksumAddress
@ -15,6 +16,7 @@ from web3.main import Web3
from mooncrawl.data import AvailableBlockchainType
from ..blockchain import connect
from ..reporter import reporter
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -93,6 +95,34 @@ def _generate_reporter_callback(
return reporter_callback
def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
access_id: Optional[UUID] = None,
) -> Web3:
"""
Retry connecting to the blockchain.
"""
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type, access_id=access_id)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3
except Exception as e:
if retry_count == 0:
error = e
break
logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}")
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
)
def blockchain_type_to_subscription_type(
blockchain_type: AvailableBlockchainType,
) -> SubscriptionTypes:

Wyświetl plik

@ -1,24 +1,14 @@
import logging
from typing import Any, Dict, List, Optional, Union
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import (
Base,
EthereumLabel,
EthereumTransaction,
PolygonLabel,
PolygonTransaction,
)
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import label
from ..blockchain import connect, get_block_model, get_label_model
from ..blockchain import get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
from .event_crawler import Event
logging.basicConfig(level=logging.INFO)
@ -93,6 +83,25 @@ def get_last_labeled_block_number(
return block_number[0] if block_number else None
def get_first_labeled_block_number(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
label_name=CRAWLER_LABEL,
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_numbers = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
.filter(label_model.address == address)
.order_by(label_model.block_number.asc())
.limit(15)
.all()
)
return block_numbers[0][0] if block_numbers else None
def commit_session(db_session: Session) -> None:
"""
Save labels in the database.

Wyświetl plik

@ -0,0 +1,134 @@
import logging
import logging
import time
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.networks import Network # type: ignore
from sqlalchemy.orm.session import Session
from web3 import Web3
from ..data import AvailableBlockchainType
from .crawler import (
EventCrawlJob,
FunctionCallCrawlJob,
_retry_connect_web3,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def historical_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
start_block: int,
end_block: int,
max_blocks_batch: int = 100,
min_sleep_time: float = 0.1,
access_id: Optional[UUID] = None,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
assert start_block >= end_block, "start_block must be greater than end_block"
assert end_block > 0, "end_block must be greater than 0"
if web3 is None:
web3 = _retry_connect_web3(blockchain_type, access_id=access_id)
assert (
web3.eth.block_number >= start_block
), "start_block must be less than current block"
network = (
Network.ethereum
if blockchain_type == AvailableBlockchainType.ETHEREUM
else Network.polygon
)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
logger.info(f"Starting historical event crawler start_block={start_block}")
blocks_cache: Dict[int, int] = {}
failed_count = 0
while start_block >= end_block:
try:
# query db with limit 1, to avoid session closing
time.sleep(min_sleep_time)
batch_end_block = max(
start_block - max_blocks_batch,
end_block,
)
logger.info(f"Crawling events from {start_block} to {batch_end_block}")
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=batch_end_block,
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch * 2,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
)
add_events_to_session(db_session, all_events, blockchain_type)
logger.info(
f"Crawling function calls from {start_block} to {batch_end_block}"
)
all_function_calls = _crawl_functions(
blockchain_type,
ethereum_state_provider,
function_call_crawl_jobs,
batch_end_block,
start_block,
)
logger.info(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}."
)
logger.info(f"{ethereum_state_provider.metrics}")
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
# Commiting to db
commit_session(db_session)
start_block = batch_end_block - 1
failed_count = 0
except Exception as e:
logger.error(f"Internal error: {e}")
logger.exception(e)
failed_count += 1
if failed_count > 10:
logger.error("Too many failures, exiting")
raise e
try:
web3 = _retry_connect_web3(blockchain_type, access_id=access_id)
except Exception as err:
logger.error(f"Failed to reconnect: {err}")
logger.exception(err)
raise err