Add insert for v3 and fix jobs defenitions.

pull/1085/head
Andrey 2024-06-14 19:11:29 +03:00
rodzic 05bd67a350
commit c6f40085f3
5 zmienionych plików z 141 dodań i 54 usunięć

Wyświetl plik

@ -6,7 +6,11 @@ from urllib.parse import urlparse, urlunparse
import requests
from moonstreamdbv3.db import MoonstreamDBEngine, MoonstreamDBIndexesEngine
from moonstreamdbv3.db import (
MoonstreamDBEngine,
MoonstreamDBIndexesEngine,
MoonstreamCustomDBEngine,
)
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from web3 import Web3
@ -160,8 +164,7 @@ def get_db_connection(uuid):
except Exception as e:
raise Exception(f"Unhandled exception, error: {str(e)}")
connection_string = response.text.strip('"')
connection_string = response.text
try:
connection_string = ensure_port_in_connection_string(connection_string)
except ValueError as e:
@ -173,11 +176,18 @@ def get_db_connection(uuid):
def ensure_port_in_connection_string(connection_string):
# Parse the connection string into components
parsed_url = urlparse(connection_string)
# Check if a port is specified, and if not, add the default port
if parsed_url.port is None:
host = f"{parsed_url.hostname}:5432" # Assuming default port for PostgreSQL
parsed_url = parsed_url._replace(netloc=host)
connection_string = urlunparse(parsed_url)
# Append default port 5432 for PostgreSQL if no port specified
connection_string = connection_string.replace(
"/" + connection_string.split("/")[-1],
f":5432" + "/" + connection_string.split("/")[-1],
)
connection_string = connection_string.replace('"', "")
return connection_string
@ -529,7 +539,9 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
customer_connection = get_db_connection(args.customer_uuid)
filtered_function_call_jobs = [] # v1
if customer_connection == "":
raise ValueError("No connection string found for the customer")
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
@ -555,7 +567,7 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
logger.info(f"Blockchain type: {blockchain_type.value}")
customer_engine = MoonstreamDBEngine()
customer_engine = MoonstreamCustomDBEngine(customer_connection)
with customer_engine.yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None

Wyświetl plik

@ -135,7 +135,6 @@ def continuous_crawler(
evm_state_provider = Web3StateProvider(web3)
if version == 2:
evm_state_provider = MoonstreamEthereumStateProvider(
web3,
network, # type: ignore

Wyświetl plik

@ -2,7 +2,7 @@ import json
import logging
import re
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
import binascii
from enum import Enum
@ -16,7 +16,7 @@ from moonstreamdb.subscriptions import SubscriptionTypes
from moonstreamtypes.subscriptions import SubscriptionTypes
from moonstreamdbv3.models_indexes import AbiJobs
from moonworm.deployment import find_deployment_block # type: ignore
from sqlalchemy import func
from sqlalchemy import func, cast, JSON
from sqlalchemy.orm import Session
from web3.main import Web3
@ -139,6 +139,7 @@ class FunctionCallCrawlJob:
contract_address: ChecksumAddress
entries_tags: Dict[UUID, List[str]]
created_at: int
existing_selectors: List[str] = field(default_factory=list)
def get_crawl_job_entries(
@ -778,7 +779,11 @@ def get_function_call_crawl_job_records(
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) == 8)
.filter(func.length(AbiJobs.abi_selector) == 10)
.filter(
cast(AbiJobs.abi, JSON).op("->>")("type") == "function",
cast(AbiJobs.abi, JSON).op("->>")("stateMutability") != "view",
)
)
if len(addresses) != 0:
@ -790,30 +795,31 @@ def get_function_call_crawl_job_records(
# Iterate over each record fetched from the database
for crawl_job_record in crawl_job_records:
str_address = crawl_job_record.address.hex() # Convert address to hex string
str_address = "0x" + crawl_job_record.address.hex()
if crawl_job_record.abi_selector in existing_crawl_job_records:
# If abi_selector already exists in the records, append new address if not already listed
current_job = existing_crawl_job_records[crawl_job_record.abi_selector]
if str_address not in current_job.contract_address:
# Since it should handle a single address, we ensure not to append but update if necessary.
current_job.contract_address = str_address
else:
# Create a new FunctionCallCrawlJob record if the abi_selector is not found in existing records.
new_crawl_job = FunctionCallCrawlJob(
if str_address not in existing_crawl_job_records:
existing_crawl_job_records[str_address] = FunctionCallCrawlJob(
contract_abi=[json.loads(str(crawl_job_record.abi))],
contract_address=str_address,
contract_address=Web3.toChecksumAddress(str_address),
entries_tags={
UUID(str(crawl_job_record.id)): [
str(crawl_job_record.status),
str(crawl_job_record.progress),
],
]
},
created_at=int(crawl_job_record.created_at.timestamp()),
existing_selectors=[str(crawl_job_record.abi_selector)],
)
existing_crawl_job_records[str(crawl_job_record.abi_selector)] = (
new_crawl_job
)
else:
if (
crawl_job_record.abi_selector
not in existing_crawl_job_records[str_address].existing_selectors
):
existing_crawl_job_records[str_address].contract_abi.append(
json.loads(str(crawl_job_record.abi))
)
existing_crawl_job_records[str_address].existing_selectors.append(
str(crawl_job_record.abi_selector)
)
return existing_crawl_job_records

Wyświetl plik

@ -91,7 +91,7 @@ def _function_call_to_label(
).replace(r"\u0000", "")
)
if isinstance(blockchain_type, AvailableBlockchainType):
if db_version == 2:
label = label_model(
label=label_name,
@ -102,6 +102,26 @@ def _function_call_to_label(
block_timestamp=function_call.block_timestamp,
)
else:
del sanityzed_label_data["type"]
del sanityzed_label_data["name"]
label = label_model(
label=label_name,
label_name=function_call.function_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
block_hash=function_call.block_hash.hex(), # type: ignore
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
caller_address=function_call.caller_address,
origin_address=function_call.caller_address,
)
print(label)
return label
@ -278,35 +298,84 @@ def add_function_calls_to_session(
if len(function_calls) == 0:
return
label_model = get_label_model(blockchain_type, version=db_version)
if db_version == 2:
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
label_model = get_label_model(blockchain_type, version=db_version)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
).cte()
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash),
)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
)
).cte()
existing_tx_hashes = [row.transaction_hash for row in query]
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
existing_tx_hashes = [row.transaction_hash for row in query]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
else:
label_model = get_label_model(blockchain_type, version=db_version)
# Define the table name and columns based on the blockchain type
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for function_call in function_calls:
label_function_call = _function_call_to_label(
blockchain_type, function_call, db_version
)
record = {
"label": label_function_call.label,
"transaction_hash": label_function_call.transaction_hash,
"log_index": None,
"block_number": label_function_call.block_number,
"block_hash": None,
"block_timestamp": label_function_call.block_timestamp,
"caller_address": label_function_call.label_data["caller"],
"origin_address": None,
"address": label_function_call.address,
"label_name": label_function_call.label_name,
"label_type": "tx_call",
"label_data": label_function_call.label_data,
}
records.append(record)
# Insert records using a single batched query with an ON CONFLICT clause
statement = insert(table).values(records)
do_nothing_statement = statement.on_conflict_do_nothing(
index_elements=["transaction_hash", "log_index"],
index_where=(table.c.label == "seer") & (table.c.label_type == "tx_call"),
)
db_session.execute(do_nothing_statement)
logger.info(
f"Batch inserted {len(records)} function call labels into {table.name}"
)

Wyświetl plik

@ -43,6 +43,7 @@ def _crawl_functions(
"function_call", blockchain_type
),
)
print(f"Processing job {function_call_crawler.whitelisted_methods}")
function_call_crawler.crawl(
from_block,
to_block,