Add v3 crawler.

pull/1105/head
Andrey 2024-06-25 17:09:39 +03:00
rodzic 15ff4a1c77
commit 249224df1d
3 zmienionych plików z 92 dodań i 24 usunięć

Wyświetl plik

@ -197,7 +197,14 @@ def handle_crawl_v3(args: argparse.Namespace) -> None:
index_engine = MoonstreamDBIndexesEngine() index_engine = MoonstreamDBIndexesEngine()
with index_engine.yield_db_session_ctx() as index_db_session: logger.info(f"Blockchain type: {blockchain_type.value}")
customer_connection = get_db_connection(args.customer_uuid)
customer_engine = MoonstreamCustomDBEngine(customer_connection)
index_engine = MoonstreamDBIndexesEngine()
with customer_engine.yield_db_session_ctx() as db_session, index_engine.yield_db_session_ctx() as index_db_session:
initial_event_jobs = get_event_crawl_job_records( initial_event_jobs = get_event_crawl_job_records(
index_db_session, index_db_session,
@ -219,8 +226,6 @@ def handle_crawl_v3(args: argparse.Namespace) -> None:
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
) )
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None web3: Optional[Web3] = None
if args.web3 is None: if args.web3 is None:
logger.info( logger.info(
@ -292,6 +297,8 @@ def handle_crawl_v3(args: argparse.Namespace) -> None:
args.heartbeat_interval, args.heartbeat_interval,
args.new_jobs_refetch_interval, args.new_jobs_refetch_interval,
web3_uri=args.web3_uri, web3_uri=args.web3_uri,
version=3,
index_db_session=index_db_session,
) )

Wyświetl plik

@ -3,7 +3,6 @@ import time
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from uuid import UUID
from moonstreamtypes.blockchain import AvailableBlockchainType from moonstreamtypes.blockchain import AvailableBlockchainType
@ -16,10 +15,6 @@ from moonworm.crawler.ethereum_state_provider import Web3StateProvider
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from web3 import Web3 from web3 import Web3
from ..settings import (
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
)
from .crawler import ( from .crawler import (
EventCrawlJob, EventCrawlJob,
FunctionCallCrawlJob, FunctionCallCrawlJob,
@ -31,10 +26,13 @@ from .crawler import (
merge_event_crawl_jobs, merge_event_crawl_jobs,
merge_function_call_crawl_jobs, merge_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup, moonworm_crawler_update_job_as_pickedup,
get_event_crawl_job_records,
get_function_call_crawl_job_records,
) )
from .db import add_events_to_session, add_function_calls_to_session, commit_session from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions from .function_call_crawler import _crawl_functions
from ..settings import CRAWLER_LABEL, SEER_CRAWLER_LABEL
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -108,8 +106,16 @@ def continuous_crawler(
web3_uri: Optional[str] = None, web3_uri: Optional[str] = None,
max_insert_batch: int = 10000, max_insert_batch: int = 10000,
version: int = 2, version: int = 2,
index_db_session: Optional[Session] = None,
customer_id: Optional[str] = None,
): ):
crawler_type = "continuous" crawler_type = "continuous"
if version == 3:
crawler_type = "continuous_v3"
label_name = CRAWLER_LABEL
if version == 3:
label_name = SEER_CRAWLER_LABEL
assert ( assert (
min_blocks_batch < max_blocks_batch min_blocks_batch < max_blocks_batch
), "min_blocks_batch must be less than max_blocks_batch" ), "min_blocks_batch must be less than max_blocks_batch"
@ -191,6 +197,7 @@ def continuous_crawler(
to_block=end_block, to_block=end_block,
blocks_cache=blocks_cache, blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2, db_block_query_batch=min_blocks_batch * 2,
version=version,
) )
logger.info( logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}." f"Crawled {len(all_events)} events from {start_block} to {end_block}."
@ -203,9 +210,13 @@ def continuous_crawler(
db_session, db_session,
all_events[i : i + max_insert_batch], all_events[i : i + max_insert_batch],
blockchain_type, blockchain_type,
version,
label_name,
) )
else: else:
add_events_to_session(db_session, all_events, blockchain_type) add_events_to_session(
db_session, all_events, blockchain_type, version, label_name
)
logger.info( logger.info(
f"Crawling function calls from {start_block} to {end_block}" f"Crawling function calls from {start_block} to {end_block}"
@ -228,10 +239,16 @@ def continuous_crawler(
db_session, db_session,
all_function_calls[i : i + max_insert_batch], all_function_calls[i : i + max_insert_batch],
blockchain_type, blockchain_type,
db_version=version,
label_name=label_name,
) )
else: else:
add_function_calls_to_session( add_function_calls_to_session(
db_session, all_function_calls, blockchain_type db_session,
all_function_calls,
blockchain_type,
db_version=version,
label_name=label_name,
) )
current_time = datetime.utcnow() current_time = datetime.utcnow()
@ -239,20 +256,56 @@ def continuous_crawler(
if current_time - jobs_refetchet_time > timedelta( if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval seconds=new_jobs_refetch_interval
): ):
logger.info( if version == 2:
f"Refetching new jobs from bugout journal since {jobs_refetchet_time}" ## Refetch new jobs from bugout journal
) logger.info(
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( f"Refetching new jobs from bugout journal since {jobs_refetchet_time}"
event_crawl_jobs, function_call_crawl_jobs, blockchain_type )
) event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
( (
event_crawl_jobs, event_crawl_jobs,
function_call_crawl_jobs, function_call_crawl_jobs,
) = moonworm_crawler_update_job_as_pickedup( ) = moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs=event_crawl_jobs, event_crawl_jobs=event_crawl_jobs,
function_call_crawl_jobs=function_call_crawl_jobs, function_call_crawl_jobs=function_call_crawl_jobs,
) )
elif version == 3 and index_db_session is not None:
## Refetch new jobs from index db
updated_event_crawl_jobs = get_event_crawl_job_records(
index_db_session,
blockchain_type,
[],
{event.event_abi_hash: event for event in event_crawl_jobs},
customer_id=customer_id,
)
event_crawl_jobs = [
event for event in updated_event_crawl_jobs.values()
]
updated_function_call_crawl_jobs = (
get_function_call_crawl_job_records(
index_db_session,
blockchain_type,
[],
{
function_call.contract_address: function_call
for function_call in function_call_crawl_jobs
},
customer_id=customer_id,
)
)
function_call_crawl_jobs = [
function_call
for function_call in updated_function_call_crawl_jobs.values()
]
else:
raise ValueError("Invalid version")
jobs_refetchet_time = current_time jobs_refetchet_time = current_time

Wyświetl plik

@ -704,6 +704,7 @@ def get_event_crawl_job_records(
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
addresses: List[str], addresses: List[str],
existing_crawl_job_records: Dict[str, EventCrawlJob], existing_crawl_job_records: Dict[str, EventCrawlJob],
customer_id: Optional[str] = None,
): ):
""" """
Retrieve and update the event crawl job records from the database. Retrieve and update the event crawl job records from the database.
@ -712,9 +713,12 @@ def get_event_crawl_job_records(
query = ( query = (
db_session.query(AbiJobs) db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value) .filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) > 10) .filter(func.cast(AbiJobs.abi_selector, JSON).op("->>")("type") == "event")
) )
if customer_id is not None:
query = query.filter(AbiJobs.customer_id == customer_id)
if len(addresses) != 0: if len(addresses) != 0:
query = query.filter( query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])
@ -770,6 +774,7 @@ def get_function_call_crawl_job_records(
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
addresses: List[str], addresses: List[str],
existing_crawl_job_records: Dict[str, FunctionCallCrawlJob], existing_crawl_job_records: Dict[str, FunctionCallCrawlJob],
customer_id: Optional[str] = None,
): ):
""" """
Retrieve and update the function call crawl job records from the database. Retrieve and update the function call crawl job records from the database.
@ -786,6 +791,9 @@ def get_function_call_crawl_job_records(
) )
) )
if customer_id is not None:
query = query.filter(AbiJobs.customer_id == customer_id)
if len(addresses) != 0: if len(addresses) != 0:
query = query.filter( query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])