pull/1035/head
Andrey 2024-03-25 13:27:56 +02:00
rodzic e243442f5c
commit fa87560385
3 zmienionych plików z 108 dodań i 44 usunięć

Wyświetl plik

@ -101,6 +101,7 @@ def continuous_crawler(
heartbeat_interval: float = 60,
new_jobs_refetch_interval: float = 120,
access_id: Optional[UUID] = None,
max_insert_batch: int = 10000,
):
crawler_type = "continuous"
assert (
@ -204,7 +205,16 @@ def continuous_crawler(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
add_events_to_session(db_session, all_events, blockchain_type)
if len(all_events) > max_insert_batch:
for i in range(0, len(all_events), max_insert_batch):
add_events_to_session(
db_session,
all_events[i : i + max_insert_batch],
blockchain_type,
)
else:
add_events_to_session(db_session, all_events, blockchain_type)
logger.info(
f"Crawling function calls from {start_block} to {end_block}"
@ -220,9 +230,18 @@ def continuous_crawler(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
)
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
if len(all_function_calls) > max_insert_batch:
for i in range(0, len(all_function_calls), max_insert_batch):
add_function_calls_to_session(
db_session,
all_function_calls[i : i + max_insert_batch],
blockchain_type,
)
else:
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
current_time = datetime.utcnow()
@ -246,11 +265,11 @@ def continuous_crawler(
jobs_refetchet_time = current_time
commit_session(db_session)
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Commiting to db
commit_session(db_session)
# Update heartbeat
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = _date_to_str(current_time)

Wyświetl plik

@ -6,6 +6,7 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy import func, text, select, values, column, Integer, String, exists
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
@ -152,36 +153,47 @@ def add_events_to_session(
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> None:
if len(events) == 0:
return
label_model = get_label_model(blockchain_type)
events_hashes_to_save = [event.transaction_hash for event in events]
events_hashes_to_save = set([event.transaction_hash for event in events])
existing_labels = (
db_session.query(label_model.transaction_hash, label_model.log_index)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in events_hashes_to_save]
)
).cte()
# Retrieve existing transaction hashes and registered log indexes
query = (
db_session.query(
label_model.transaction_hash.label("transaction_hash"),
func.array_agg(label_model.log_index).label("log_indexes"),
)
.filter(
label_model.label == label_name,
label_model.log_index != None,
label_model.transaction_hash.in_(events_hashes_to_save),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
.all()
.group_by(label_model.transaction_hash)
)
existing_labels_transactions = []
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
for label in existing_labels:
if label[0] not in existing_labels_transactions:
existing_labels_transactions.append(label[0])
existing_log_index_by_tx_hash[label[0]] = []
existing_log_index_by_tx_hash[label[0]].append(label[1])
existing_log_index_by_tx_hash = {
row.transaction_hash: row.log_indexes for row in query
}
labels_to_save = []
for event in events:
if event.transaction_hash not in existing_labels_transactions:
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
elif (
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
):
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
labels_to_save = [
_event_to_label(blockchain_type, event, label_name)
for event in events
if event.transaction_hash not in existing_log_index_by_tx_hash
or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
]
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
@ -193,27 +205,38 @@ def add_function_calls_to_session(
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
label_model.label == label_name,
label_model.log_index == None,
label_model.transaction_hash.in_(transactions_hashes_to_save),
)
.all()
if len(function_calls) == 0:
return
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
existing_labels_transactions = [label[0] for label in existing_labels]
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
)
).cte()
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash),
)
existing_tx_hashes = [row.transaction_hash for row in query]
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_labels_transactions
if function_call.transaction_hash not in existing_tx_hashes
]
logger.info(f"Saving {len(labels_to_save)} labels to session")

Wyświetl plik

@ -38,6 +38,7 @@ def historical_crawler(
min_sleep_time: float = 0.1,
access_id: Optional[UUID] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
max_insert_batch: int = 10000,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -127,7 +128,18 @@ def historical_crawler(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
)
add_events_to_session(db_session, all_events, blockchain_type)
if len(all_events) > max_insert_batch:
for i in range(0, len(all_events), max_insert_batch):
add_events_to_session(
db_session,
all_events[i : i + max_insert_batch],
blockchain_type,
)
else:
add_events_to_session(db_session, all_events, blockchain_type)
if function_call_crawl_jobs:
logger.info(
@ -144,9 +156,19 @@ def historical_crawler(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}."
)
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
if len(all_function_calls) > max_insert_batch:
for i in range(0, len(all_function_calls), max_insert_batch):
add_function_calls_to_session(
db_session,
all_function_calls[i : i + max_insert_batch],
blockchain_type,
)
else:
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
if addresses_deployment_blocks:
for address, deployment_block in addresses_deployment_blocks.items():