From fa8756038594bf2250ef85b984cffc787ea8e57e Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 25 Mar 2024 13:27:56 +0200 Subject: [PATCH 1/3] Add fixes. --- .../moonworm_crawler/continuous_crawler.py | 31 +++++-- .../mooncrawl/moonworm_crawler/db.py | 91 ++++++++++++------- .../moonworm_crawler/historical_crawler.py | 30 +++++- 3 files changed, 108 insertions(+), 44 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 6c3a811f..33761970 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -101,6 +101,7 @@ def continuous_crawler( heartbeat_interval: float = 60, new_jobs_refetch_interval: float = 120, access_id: Optional[UUID] = None, + max_insert_batch: int = 10000, ): crawler_type = "continuous" assert ( @@ -204,7 +205,16 @@ def continuous_crawler( f"Crawled {len(all_events)} events from {start_block} to {end_block}." ) - add_events_to_session(db_session, all_events, blockchain_type) + if len(all_events) > max_insert_batch: + + for i in range(0, len(all_events), max_insert_batch): + add_events_to_session( + db_session, + all_events[i : i + max_insert_batch], + blockchain_type, + ) + else: + add_events_to_session(db_session, all_events, blockchain_type) logger.info( f"Crawling function calls from {start_block} to {end_block}" @@ -220,9 +230,18 @@ def continuous_crawler( f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}." ) - add_function_calls_to_session( - db_session, all_function_calls, blockchain_type - ) + if len(all_function_calls) > max_insert_batch: + + for i in range(0, len(all_function_calls), max_insert_batch): + add_function_calls_to_session( + db_session, + all_function_calls[i : i + max_insert_batch], + blockchain_type, + ) + else: + add_function_calls_to_session( + db_session, all_function_calls, blockchain_type + ) current_time = datetime.utcnow() @@ -246,11 +265,11 @@ def continuous_crawler( jobs_refetchet_time = current_time + commit_session(db_session) + if current_time - last_heartbeat_time > timedelta( seconds=heartbeat_interval ): - # Commiting to db - commit_session(db_session) # Update heartbeat heartbeat_template["last_block"] = end_block heartbeat_template["current_time"] = _date_to_str(current_time) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 26fd64ef..7db669c2 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -6,6 +6,7 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy.orm import Session +from sqlalchemy import func, text, select, values, column, Integer, String, exists from ..settings import CRAWLER_LABEL from .event_crawler import Event @@ -152,36 +153,47 @@ def add_events_to_session( blockchain_type: AvailableBlockchainType, label_name=CRAWLER_LABEL, ) -> None: + + if len(events) == 0: + return + label_model = get_label_model(blockchain_type) - events_hashes_to_save = [event.transaction_hash for event in events] + events_hashes_to_save = set([event.transaction_hash for event in events]) - existing_labels = ( - db_session.query(label_model.transaction_hash, label_model.log_index) + # Define a CTE VALUES expression to escape big IN clause + hashes_cte = select( + values(column("transaction_hash", String), name="hashes").data( + [(hash,) for hash in events_hashes_to_save] + ) + ).cte() + + # Retrieve existing transaction hashes and registered log indexes + query = ( + db_session.query( + label_model.transaction_hash.label("transaction_hash"), + func.array_agg(label_model.log_index).label("log_indexes"), + ) .filter( label_model.label == label_name, label_model.log_index != None, - label_model.transaction_hash.in_(events_hashes_to_save), + exists().where( + label_model.transaction_hash == hashes_cte.c.transaction_hash + ), ) - .all() + .group_by(label_model.transaction_hash) ) - existing_labels_transactions = [] - existing_log_index_by_tx_hash: Dict[str, List[int]] = {} - for label in existing_labels: - if label[0] not in existing_labels_transactions: - existing_labels_transactions.append(label[0]) - existing_log_index_by_tx_hash[label[0]] = [] - existing_log_index_by_tx_hash[label[0]].append(label[1]) + existing_log_index_by_tx_hash = { + row.transaction_hash: row.log_indexes for row in query + } - labels_to_save = [] - for event in events: - if event.transaction_hash not in existing_labels_transactions: - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) - elif ( - event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] - ): - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) + labels_to_save = [ + _event_to_label(blockchain_type, event, label_name) + for event in events + if event.transaction_hash not in existing_log_index_by_tx_hash + or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] + ] logger.info(f"Saving {len(labels_to_save)} event labels to session") db_session.add_all(labels_to_save) @@ -193,27 +205,38 @@ def add_function_calls_to_session( blockchain_type: AvailableBlockchainType, label_name=CRAWLER_LABEL, ) -> None: - label_model = get_label_model(blockchain_type) - transactions_hashes_to_save = [ - function_call.transaction_hash for function_call in function_calls - ] - existing_labels = ( - db_session.query(label_model.transaction_hash) - .filter( - label_model.label == label_name, - label_model.log_index == None, - label_model.transaction_hash.in_(transactions_hashes_to_save), - ) - .all() + if len(function_calls) == 0: + return + + label_model = get_label_model(blockchain_type) + + transactions_hashes_to_save = list( + set([function_call.transaction_hash for function_call in function_calls]) ) - existing_labels_transactions = [label[0] for label in existing_labels] + # Define a CTE VALUES expression to escape big IN clause + hashes_cte = select( + values(column("transaction_hash", String), name="hashes").data( + [(hash,) for hash in transactions_hashes_to_save] + ) + ).cte() + + # Retrieve existing transaction hashes + query = db_session.query( + label_model.transaction_hash.label("transaction_hash") + ).filter( + label_model.label == label_name, + label_model.log_index.is_(None), + exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash), + ) + + existing_tx_hashes = [row.transaction_hash for row in query] labels_to_save = [ _function_call_to_label(blockchain_type, function_call) for function_call in function_calls - if function_call.transaction_hash not in existing_labels_transactions + if function_call.transaction_hash not in existing_tx_hashes ] logger.info(f"Saving {len(labels_to_save)} labels to session") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 4091ae90..4f66a8bd 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -38,6 +38,7 @@ def historical_crawler( min_sleep_time: float = 0.1, access_id: Optional[UUID] = None, addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None, + max_insert_batch: int = 10000, ): assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" assert min_sleep_time > 0, "min_sleep_time must be greater than 0" @@ -127,7 +128,18 @@ def historical_crawler( f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}." ) - add_events_to_session(db_session, all_events, blockchain_type) + if len(all_events) > max_insert_batch: + + for i in range(0, len(all_events), max_insert_batch): + add_events_to_session( + db_session, + all_events[i : i + max_insert_batch], + blockchain_type, + ) + + else: + + add_events_to_session(db_session, all_events, blockchain_type) if function_call_crawl_jobs: logger.info( @@ -144,9 +156,19 @@ def historical_crawler( f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}." ) - add_function_calls_to_session( - db_session, all_function_calls, blockchain_type - ) + if len(all_function_calls) > max_insert_batch: + + for i in range(0, len(all_function_calls), max_insert_batch): + add_function_calls_to_session( + db_session, + all_function_calls[i : i + max_insert_batch], + blockchain_type, + ) + else: + + add_function_calls_to_session( + db_session, all_function_calls, blockchain_type + ) if addresses_deployment_blocks: for address, deployment_block in addresses_deployment_blocks.items(): From 4dead75827d3dc185fa4c61321a8adb233100f50 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 25 Mar 2024 13:29:11 +0200 Subject: [PATCH 2/3] Bump version. --- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 8eb58150..e999d730 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.3.8" +MOONCRAWL_VERSION = "0.3.9" From e96bb59965368654090ffb017a6e9aed12318f8f Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 25 Mar 2024 14:06:57 +0200 Subject: [PATCH 3/3] Replace != None -> isnot(None) more readable. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 7db669c2..ad964a32 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -176,7 +176,7 @@ def add_events_to_session( ) .filter( label_model.label == label_name, - label_model.log_index != None, + label_model.log_index.isnot(None), exists().where( label_model.transaction_hash == hashes_cte.c.transaction_hash ),