kopia lustrzana https://github.com/bugout-dev/moonstream
direct write.
rodzic
3a19d014a0
commit
c3dd0bac9b
|
@ -5,8 +5,10 @@ from typing import Dict, List, Optional
|
|||
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
|
||||
from moonstreamdb.models import Base
|
||||
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
|
||||
from sqlalchemy.dialects.postgresql import array
|
||||
|
||||
# from sqlalchemy.dialects.postgresql.dml import insert
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import insert
|
||||
|
||||
|
||||
from ..settings import CRAWLER_LABEL
|
||||
|
@ -156,40 +158,26 @@ def add_events_to_session(
|
|||
) -> None:
|
||||
label_model = get_label_model(blockchain_type)
|
||||
|
||||
events_hashes_to_save = set([event.transaction_hash for event in events])
|
||||
label_name = f"{label_name}-unverified"
|
||||
|
||||
existing_labels = db_session.query(
|
||||
label_model.transaction_hash, label_model.log_index
|
||||
).filter(
|
||||
label_model.label == label_name,
|
||||
label_model.log_index != None,
|
||||
label_model.transaction_hash.op("ANY")(array(events_hashes_to_save)),
|
||||
)
|
||||
events_insert = []
|
||||
for raw_event in events:
|
||||
db_event = _event_to_label(blockchain_type, raw_event, label_name)
|
||||
events_insert.append(
|
||||
{
|
||||
"label": db_event.label,
|
||||
"label_data": db_event.label_data,
|
||||
"address": db_event.address,
|
||||
"block_number": db_event.block_number,
|
||||
"block_timestamp": db_event.block_timestamp,
|
||||
"transaction_hash": db_event.transaction_hash,
|
||||
"log_index": db_event.log_index,
|
||||
}
|
||||
)
|
||||
|
||||
print(existing_labels)
|
||||
breakpoint()
|
||||
insert_statement = insert(label_model).values(events_insert)
|
||||
|
||||
existing_labels = existing_labels.all()
|
||||
|
||||
existing_labels_transactions = []
|
||||
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
|
||||
for label in existing_labels:
|
||||
if label[0] not in existing_labels_transactions:
|
||||
existing_labels_transactions.append(label[0])
|
||||
existing_log_index_by_tx_hash[label[0]] = []
|
||||
existing_log_index_by_tx_hash[label[0]].append(label[1])
|
||||
|
||||
labels_to_save = []
|
||||
for event in events:
|
||||
if event.transaction_hash not in existing_labels_transactions:
|
||||
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
|
||||
elif (
|
||||
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
|
||||
):
|
||||
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
|
||||
|
||||
logger.info(f"Saving {len(labels_to_save)} event labels to session")
|
||||
db_session.add_all(labels_to_save)
|
||||
db_session.execute(insert_statement)
|
||||
|
||||
|
||||
def add_function_calls_to_session(
|
||||
|
@ -199,27 +187,27 @@ def add_function_calls_to_session(
|
|||
label_name=CRAWLER_LABEL,
|
||||
) -> None:
|
||||
label_model = get_label_model(blockchain_type)
|
||||
transactions_hashes_to_save = [
|
||||
function_call.transaction_hash for function_call in function_calls
|
||||
]
|
||||
|
||||
existing_labels = (
|
||||
db_session.query(label_model.transaction_hash)
|
||||
.filter(
|
||||
label_model.label == label_name,
|
||||
label_model.log_index == None,
|
||||
label_model.transaction_hash.in_(transactions_hashes_to_save),
|
||||
label_name = f"{label_name}-unverified"
|
||||
|
||||
function_calls_insert = []
|
||||
|
||||
for raw_function_call in function_calls:
|
||||
db_function_call = _function_call_to_label(
|
||||
blockchain_type, raw_function_call, label_name
|
||||
)
|
||||
function_calls_insert.append(
|
||||
{
|
||||
"label": db_function_call.label,
|
||||
"label_data": db_function_call.label_data,
|
||||
"address": db_function_call.address,
|
||||
"block_number": db_function_call.block_number,
|
||||
"block_timestamp": db_function_call.block_timestamp,
|
||||
"transaction_hash": db_function_call.transaction_hash,
|
||||
}
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
existing_labels_transactions = [label[0] for label in existing_labels]
|
||||
logger.info(f"Saving {len(function_calls_insert)} labels to session")
|
||||
insert_statement = insert(label_model).values(function_calls_insert)
|
||||
|
||||
labels_to_save = [
|
||||
_function_call_to_label(blockchain_type, function_call)
|
||||
for function_call in function_calls
|
||||
if function_call.transaction_hash not in existing_labels_transactions
|
||||
]
|
||||
|
||||
logger.info(f"Saving {len(labels_to_save)} labels to session")
|
||||
db_session.add_all(labels_to_save)
|
||||
db_session.execute(insert_statement)
|
||||
|
|
Ładowanie…
Reference in New Issue