kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #1035 from moonstream-to/moonworm-crawler-fixes
Moonworm crawler fixespull/1036/head
commit
a90f73ad39
|
@ -101,6 +101,7 @@ def continuous_crawler(
|
||||||
heartbeat_interval: float = 60,
|
heartbeat_interval: float = 60,
|
||||||
new_jobs_refetch_interval: float = 120,
|
new_jobs_refetch_interval: float = 120,
|
||||||
access_id: Optional[UUID] = None,
|
access_id: Optional[UUID] = None,
|
||||||
|
max_insert_batch: int = 10000,
|
||||||
):
|
):
|
||||||
crawler_type = "continuous"
|
crawler_type = "continuous"
|
||||||
assert (
|
assert (
|
||||||
|
@ -204,7 +205,16 @@ def continuous_crawler(
|
||||||
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
|
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
|
||||||
)
|
)
|
||||||
|
|
||||||
add_events_to_session(db_session, all_events, blockchain_type)
|
if len(all_events) > max_insert_batch:
|
||||||
|
|
||||||
|
for i in range(0, len(all_events), max_insert_batch):
|
||||||
|
add_events_to_session(
|
||||||
|
db_session,
|
||||||
|
all_events[i : i + max_insert_batch],
|
||||||
|
blockchain_type,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
add_events_to_session(db_session, all_events, blockchain_type)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Crawling function calls from {start_block} to {end_block}"
|
f"Crawling function calls from {start_block} to {end_block}"
|
||||||
|
@ -220,9 +230,18 @@ def continuous_crawler(
|
||||||
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
|
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
|
||||||
)
|
)
|
||||||
|
|
||||||
add_function_calls_to_session(
|
if len(all_function_calls) > max_insert_batch:
|
||||||
db_session, all_function_calls, blockchain_type
|
|
||||||
)
|
for i in range(0, len(all_function_calls), max_insert_batch):
|
||||||
|
add_function_calls_to_session(
|
||||||
|
db_session,
|
||||||
|
all_function_calls[i : i + max_insert_batch],
|
||||||
|
blockchain_type,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
add_function_calls_to_session(
|
||||||
|
db_session, all_function_calls, blockchain_type
|
||||||
|
)
|
||||||
|
|
||||||
current_time = datetime.utcnow()
|
current_time = datetime.utcnow()
|
||||||
|
|
||||||
|
@ -246,11 +265,11 @@ def continuous_crawler(
|
||||||
|
|
||||||
jobs_refetchet_time = current_time
|
jobs_refetchet_time = current_time
|
||||||
|
|
||||||
|
commit_session(db_session)
|
||||||
|
|
||||||
if current_time - last_heartbeat_time > timedelta(
|
if current_time - last_heartbeat_time > timedelta(
|
||||||
seconds=heartbeat_interval
|
seconds=heartbeat_interval
|
||||||
):
|
):
|
||||||
# Commiting to db
|
|
||||||
commit_session(db_session)
|
|
||||||
# Update heartbeat
|
# Update heartbeat
|
||||||
heartbeat_template["last_block"] = end_block
|
heartbeat_template["last_block"] = end_block
|
||||||
heartbeat_template["current_time"] = _date_to_str(current_time)
|
heartbeat_template["current_time"] = _date_to_str(current_time)
|
||||||
|
|
|
@ -6,6 +6,7 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
|
||||||
from moonstreamdb.models import Base
|
from moonstreamdb.models import Base
|
||||||
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
|
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
from sqlalchemy import func, text, select, values, column, Integer, String, exists
|
||||||
|
|
||||||
from ..settings import CRAWLER_LABEL
|
from ..settings import CRAWLER_LABEL
|
||||||
from .event_crawler import Event
|
from .event_crawler import Event
|
||||||
|
@ -152,36 +153,47 @@ def add_events_to_session(
|
||||||
blockchain_type: AvailableBlockchainType,
|
blockchain_type: AvailableBlockchainType,
|
||||||
label_name=CRAWLER_LABEL,
|
label_name=CRAWLER_LABEL,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
if len(events) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
label_model = get_label_model(blockchain_type)
|
label_model = get_label_model(blockchain_type)
|
||||||
|
|
||||||
events_hashes_to_save = [event.transaction_hash for event in events]
|
events_hashes_to_save = set([event.transaction_hash for event in events])
|
||||||
|
|
||||||
existing_labels = (
|
# Define a CTE VALUES expression to escape big IN clause
|
||||||
db_session.query(label_model.transaction_hash, label_model.log_index)
|
hashes_cte = select(
|
||||||
|
values(column("transaction_hash", String), name="hashes").data(
|
||||||
|
[(hash,) for hash in events_hashes_to_save]
|
||||||
|
)
|
||||||
|
).cte()
|
||||||
|
|
||||||
|
# Retrieve existing transaction hashes and registered log indexes
|
||||||
|
query = (
|
||||||
|
db_session.query(
|
||||||
|
label_model.transaction_hash.label("transaction_hash"),
|
||||||
|
func.array_agg(label_model.log_index).label("log_indexes"),
|
||||||
|
)
|
||||||
.filter(
|
.filter(
|
||||||
label_model.label == label_name,
|
label_model.label == label_name,
|
||||||
label_model.log_index != None,
|
label_model.log_index.isnot(None),
|
||||||
label_model.transaction_hash.in_(events_hashes_to_save),
|
exists().where(
|
||||||
|
label_model.transaction_hash == hashes_cte.c.transaction_hash
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.all()
|
.group_by(label_model.transaction_hash)
|
||||||
)
|
)
|
||||||
|
|
||||||
existing_labels_transactions = []
|
existing_log_index_by_tx_hash = {
|
||||||
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
|
row.transaction_hash: row.log_indexes for row in query
|
||||||
for label in existing_labels:
|
}
|
||||||
if label[0] not in existing_labels_transactions:
|
|
||||||
existing_labels_transactions.append(label[0])
|
|
||||||
existing_log_index_by_tx_hash[label[0]] = []
|
|
||||||
existing_log_index_by_tx_hash[label[0]].append(label[1])
|
|
||||||
|
|
||||||
labels_to_save = []
|
labels_to_save = [
|
||||||
for event in events:
|
_event_to_label(blockchain_type, event, label_name)
|
||||||
if event.transaction_hash not in existing_labels_transactions:
|
for event in events
|
||||||
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
|
if event.transaction_hash not in existing_log_index_by_tx_hash
|
||||||
elif (
|
or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
|
||||||
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
|
]
|
||||||
):
|
|
||||||
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
|
|
||||||
|
|
||||||
logger.info(f"Saving {len(labels_to_save)} event labels to session")
|
logger.info(f"Saving {len(labels_to_save)} event labels to session")
|
||||||
db_session.add_all(labels_to_save)
|
db_session.add_all(labels_to_save)
|
||||||
|
@ -193,27 +205,38 @@ def add_function_calls_to_session(
|
||||||
blockchain_type: AvailableBlockchainType,
|
blockchain_type: AvailableBlockchainType,
|
||||||
label_name=CRAWLER_LABEL,
|
label_name=CRAWLER_LABEL,
|
||||||
) -> None:
|
) -> None:
|
||||||
label_model = get_label_model(blockchain_type)
|
|
||||||
transactions_hashes_to_save = [
|
|
||||||
function_call.transaction_hash for function_call in function_calls
|
|
||||||
]
|
|
||||||
|
|
||||||
existing_labels = (
|
if len(function_calls) == 0:
|
||||||
db_session.query(label_model.transaction_hash)
|
return
|
||||||
.filter(
|
|
||||||
label_model.label == label_name,
|
label_model = get_label_model(blockchain_type)
|
||||||
label_model.log_index == None,
|
|
||||||
label_model.transaction_hash.in_(transactions_hashes_to_save),
|
transactions_hashes_to_save = list(
|
||||||
)
|
set([function_call.transaction_hash for function_call in function_calls])
|
||||||
.all()
|
|
||||||
)
|
)
|
||||||
|
|
||||||
existing_labels_transactions = [label[0] for label in existing_labels]
|
# Define a CTE VALUES expression to escape big IN clause
|
||||||
|
hashes_cte = select(
|
||||||
|
values(column("transaction_hash", String), name="hashes").data(
|
||||||
|
[(hash,) for hash in transactions_hashes_to_save]
|
||||||
|
)
|
||||||
|
).cte()
|
||||||
|
|
||||||
|
# Retrieve existing transaction hashes
|
||||||
|
query = db_session.query(
|
||||||
|
label_model.transaction_hash.label("transaction_hash")
|
||||||
|
).filter(
|
||||||
|
label_model.label == label_name,
|
||||||
|
label_model.log_index.is_(None),
|
||||||
|
exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash),
|
||||||
|
)
|
||||||
|
|
||||||
|
existing_tx_hashes = [row.transaction_hash for row in query]
|
||||||
|
|
||||||
labels_to_save = [
|
labels_to_save = [
|
||||||
_function_call_to_label(blockchain_type, function_call)
|
_function_call_to_label(blockchain_type, function_call)
|
||||||
for function_call in function_calls
|
for function_call in function_calls
|
||||||
if function_call.transaction_hash not in existing_labels_transactions
|
if function_call.transaction_hash not in existing_tx_hashes
|
||||||
]
|
]
|
||||||
|
|
||||||
logger.info(f"Saving {len(labels_to_save)} labels to session")
|
logger.info(f"Saving {len(labels_to_save)} labels to session")
|
||||||
|
|
|
@ -38,6 +38,7 @@ def historical_crawler(
|
||||||
min_sleep_time: float = 0.1,
|
min_sleep_time: float = 0.1,
|
||||||
access_id: Optional[UUID] = None,
|
access_id: Optional[UUID] = None,
|
||||||
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
|
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
|
||||||
|
max_insert_batch: int = 10000,
|
||||||
):
|
):
|
||||||
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
|
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
|
||||||
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
|
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
|
||||||
|
@ -127,7 +128,18 @@ def historical_crawler(
|
||||||
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
|
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
|
||||||
)
|
)
|
||||||
|
|
||||||
add_events_to_session(db_session, all_events, blockchain_type)
|
if len(all_events) > max_insert_batch:
|
||||||
|
|
||||||
|
for i in range(0, len(all_events), max_insert_batch):
|
||||||
|
add_events_to_session(
|
||||||
|
db_session,
|
||||||
|
all_events[i : i + max_insert_batch],
|
||||||
|
blockchain_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
add_events_to_session(db_session, all_events, blockchain_type)
|
||||||
|
|
||||||
if function_call_crawl_jobs:
|
if function_call_crawl_jobs:
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -144,9 +156,19 @@ def historical_crawler(
|
||||||
f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}."
|
f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}."
|
||||||
)
|
)
|
||||||
|
|
||||||
add_function_calls_to_session(
|
if len(all_function_calls) > max_insert_batch:
|
||||||
db_session, all_function_calls, blockchain_type
|
|
||||||
)
|
for i in range(0, len(all_function_calls), max_insert_batch):
|
||||||
|
add_function_calls_to_session(
|
||||||
|
db_session,
|
||||||
|
all_function_calls[i : i + max_insert_batch],
|
||||||
|
blockchain_type,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
|
||||||
|
add_function_calls_to_session(
|
||||||
|
db_session, all_function_calls, blockchain_type
|
||||||
|
)
|
||||||
|
|
||||||
if addresses_deployment_blocks:
|
if addresses_deployment_blocks:
|
||||||
for address, deployment_block in addresses_deployment_blocks.items():
|
for address, deployment_block in addresses_deployment_blocks.items():
|
||||||
|
|
|
@ -2,4 +2,4 @@
|
||||||
Moonstream crawlers version.
|
Moonstream crawlers version.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MOONCRAWL_VERSION = "0.3.8"
|
MOONCRAWL_VERSION = "0.3.9"
|
||||||
|
|
Ładowanie…
Reference in New Issue