pull/1085/head
Andrey 2024-06-12 15:46:08 +03:00
rodzic 9f9374bcfa
commit 5088c8e93d
10 zmienionych plików z 127 dodań i 237 usunięć

Wyświetl plik

@ -3,15 +3,12 @@ from concurrent.futures import Future, ThreadPoolExecutor, wait
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from uuid import UUID
from moonstreamdb.blockchain import (
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_block_model,
get_transaction_model,
)
from moonstreamdb.models import EthereumBlock, EthereumTransaction
from moonstreamdbv3.blockchain import (
AvailableBlockchainType as AvailableBlockchainTypeV3,
)
from psycopg2.errors import UniqueViolation # type: ignore
from sqlalchemy import Column, desc, func
from sqlalchemy.exc import IntegrityError
@ -42,15 +39,11 @@ from .settings import (
MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
<<<<<<< Updated upstream
MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
=======
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
>>>>>>> Stashed changes
WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS,
)
@ -64,67 +57,36 @@ class BlockCrawlError(Exception):
"""
def get_web3_uri(blockchain_type):
default_uri_mapping = {
AvailableBlockchainType.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI,
AvailableBlockchainType.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI,
AvailableBlockchainType.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI,
AvailableBlockchainType.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI,
AvailableBlockchainType.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI,
AvailableBlockchainType.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI,
}
v3_uri_mapping = {
AvailableBlockchainTypeV3.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.STARKNET: MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.STARKNET_SEPOLIA: MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.MANTLE: MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.MANTLE_SEPOLIA: MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainTypeV3.GAME7_ORBIT_ARBITRUM_SEPOLIA: MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
}
if isinstance(blockchain_type, AvailableBlockchainTypeV3):
return v3_uri_mapping.get(blockchain_type)
return default_uri_mapping.get(blockchain_type)
default_uri_mapping = {
AvailableBlockchainType.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI,
AvailableBlockchainType.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI,
AvailableBlockchainType.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI,
AvailableBlockchainType.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI,
AvailableBlockchainType.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI,
AvailableBlockchainType.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET: MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET_SEPOLIA: MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE: MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE_SEPOLIA: MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.GAME7_ORBIT_ARBITRUM_SEPOLIA: MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
}
def connect(
<<<<<<< Updated upstream
blockchain_type: Optional[Union[AvailableBlockchainType, AvailableBlockchainTypeV3]] = None,
=======
blockchain_type: Optional[
Union[AvailableBlockchainType, AvailableBlockchainTypeV3]
] = None,
>>>>>>> Stashed changes
blockchain_type: Optional[AvailableBlockchainType] = None,
web3_uri: Optional[str] = None,
version: int = 2,
) -> Web3:
if blockchain_type is None and web3_uri is None:
raise Exception("Both blockchain_type and web3_uri could not be None")
@ -133,89 +95,9 @@ def connect(
request_kwargs: Dict[str, Any] = {"headers": {"Content-Type": "application/json"}}
if web3_uri is None:
<<<<<<< Updated upstream
if isinstance(blockchain_type, AvailableBlockchainType):
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
elif isinstance(blockchain_type, AvailableBlockchainTypeV3):
if blockchain_type == AvailableBlockchainTypeV3.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
elif (
blockchain_type
== AvailableBlockchainTypeV3.GAME7_ORBIT_ARBITRUM_SEPOLIA
):
web3_uri = MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
=======
web3_uri = get_web3_uri(blockchain_type)
>>>>>>> Stashed changes
web3_uri = default_uri_mapping.get(blockchain_type) # type: ignore
if web3_uri is None:
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
request_kwargs["timeout"] = WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS

Wyświetl plik

@ -576,7 +576,11 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_first_labeled_block_number(
db_session, blockchain_type, args.address, only_events=args.only_events
db_session,
blockchain_type,
args.address,
only_events=args.only_events,
db_version=3,
)
logger.info(f"Last labeled block: {last_labeled_block}")
@ -651,6 +655,7 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
args.min_sleep_time,
web3_uri=args.web3_uri,
addresses_deployment_blocks=addresses_deployment_blocks,
version=3,
)

Wyświetl plik

@ -5,9 +5,10 @@ from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
@ -131,7 +132,7 @@ def continuous_crawler(
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
network, # type: ignore
db_session,
)

Wyświetl plik

@ -10,12 +10,10 @@ from uuid import UUID
from bugout.data import BugoutJournalEntries, BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import SubscriptionTypes
from moonstreamtypes.subscriptions import SubscriptionTypes
from moonstreamdbv3.models_indexes import AbiJobs
from moonstreamdbv3.blockchain import (
AvailableBlockchainType as AvailableBlockchainTypeV3,
)
from moonworm.deployment import find_deployment_block # type: ignore
from sqlalchemy import func
from sqlalchemy.orm import Session
@ -98,7 +96,7 @@ def _generate_reporter_callback(
def _retry_connect_web3(
blockchain_type: Union[AvailableBlockchainType, AvailableBlockchainTypeV3],
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
web3_uri: Optional[str] = None,
@ -712,7 +710,7 @@ def get_event_crawl_job_records(
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) > 8)
.filter(func.length(AbiJobs.abi_selector) > 10)
)
if len(addresses) != 0:
@ -725,25 +723,27 @@ def get_event_crawl_job_records(
for crawl_job_record in crawl_job_records:
str_address = crawl_job_record.address.hex()
str_address = "0x" + crawl_job_record.address.hex()
checksummed_address = Web3.toChecksumAddress(str_address)
if crawl_job_record.abi_selector in existing_crawl_job_records:
if (
str_address
checksummed_address
not in existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts
):
existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts.append(crawl_job_record.address.hex())
].contracts.append(checksummed_address)
else:
new_crawl_job = EventCrawlJob(
event_abi_hash=str(crawl_job_record.abi_selector),
event_abi=json.loads(str(crawl_job_record.abi)),
contracts=[str_address],
contracts=[checksummed_address],
address_entries={
crawl_job_record.address.hex(): {
UUID(str(crawl_job_record.id)): [

Wyświetl plik

@ -37,7 +37,7 @@ def _event_to_label(
).replace(r"\u0000", "")
)
if isinstance(blockchain_type, AvailableBlockchainType):
if db_version == 2:
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
@ -54,13 +54,14 @@ def _event_to_label(
label = label_model(
label=label_name,
label_name=event.event_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
block_hash=event.block_hash,
block_hash=event.block_hash.hex(), # type: ignore
)
return label
@ -69,9 +70,8 @@ def _event_to_label(
def _function_call_to_label(
blockchain_type: AvailableBlockchainType,
function_call: ContractFunctionCall,
label_name=CRAWLER_LABEL,
blocks_cache: Dict[int, Any] = {},
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> Base:
"""
Creates a label model.
@ -125,41 +125,14 @@ def get_last_labeled_block_number(
def get_first_labeled_block_number(
db_session: Session,
blockchain_type: Union[AvailableBlockchainType, AvailableBlockchainTypeV3],
blockchain_type: AvailableBlockchainType,
address: str,
label_name: str = CRAWLER_LABEL,
only_events: bool = False,
db_version: int = 2,
) -> Optional[int]:
<<<<<<< Updated upstream
label_model = get_label_model(blockchain_type, version=db_version)
block_number_query = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
.filter(label_model.address == address)
)
function_call_block_numbers = (
block_number_query.filter(label_model.log_index == None)
.order_by(label_model.block_number)
.limit(50)
.all()
)
event_block_numbers = (
block_number_query.filter(label_model.log_index != None)
.order_by(label_model.block_number)
.limit(50)
.all()
)
if only_events:
return event_block_numbers[0][0] if event_block_numbers else None
=======
if isinstance(blockchain_type, AvailableBlockchainType):
label_model = get_label_model(blockchain_type)
>>>>>>> Stashed changes
else:
label_model = get_label_model_v3(blockchain_type)
base_query = (
db_session.query(label_model.block_number)
@ -201,17 +174,16 @@ def add_events_to_session(
db_session: Session,
events: List[Event],
blockchain_type: AvailableBlockchainType,
v3_schema: bool = False,
label_name=CRAWLER_LABEL,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(events) == 0:
return
if not v3_schema:
label_model = get_label_model(blockchain_type, version=db_version)
label_model = get_label_model(blockchain_type, version=db_version)
if db_version == 2:
events_hashes_to_save = set([event.transaction_hash for event in events])
@ -254,17 +226,17 @@ def add_events_to_session(
db_session.add_all(labels_to_save)
else:
from sqlalchemy.dialects.postgresql import insert
# Define the table name and columns based on the blockchain type
label_model = get_label_model(blockchain_type, version=db_version)
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for event in events:
label_event = _event_to_label(blockchain_type, event, label_name)
label_event = _event_to_label(
blockchain_type, event, label_name, db_version
)
record = {
"label": label_event.label,
@ -299,8 +271,8 @@ def add_function_calls_to_session(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(function_calls) == 0:

Wyświetl plik

@ -2,7 +2,16 @@ import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
)
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
)
from moonworm.crawler.log_scanner import (
_crawl_events as moonworm_autoscale_crawl_events, # type: ignore
)
@ -49,6 +58,7 @@ def get_block_timestamp(
block_number: int,
blocks_cache: Dict[int, int],
max_blocks_batch: int = 30,
version: int = 2,
) -> int:
"""
Get the timestamp of a block.
@ -64,6 +74,14 @@ def get_block_timestamp(
:param blocks_cache: The cache of blocks.
:return: The timestamp of the block.
"""
if version != 2:
if block_number in blocks_cache:
return blocks_cache[block_number]
target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number)
blocks_cache[block_number] = target_block_timestamp
return target_block_timestamp
assert max_blocks_batch > 0
if block_number in blocks_cache:
@ -72,7 +90,9 @@ def get_block_timestamp(
block_model = get_block_model(blockchain_type)
blocks = (
db_session.query(block_model.block_number, block_model.timestamp)
db_session.query(
block_model.block_number, block_model.timestamp, block_model.hash
)
.filter(
and_(
block_model.block_number >= block_number - max_blocks_batch - 1,
@ -109,6 +129,7 @@ def _crawl_events(
to_block: int,
blocks_cache: Dict[int, int] = {},
db_block_query_batch=10,
version: int = 2,
) -> List[Event]:
all_events = []
for job in jobs:
@ -130,6 +151,7 @@ def _crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -139,6 +161,7 @@ def _crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)
@ -155,21 +178,25 @@ def _autoscale_crawl_events(
blocks_cache: Dict[int, int] = {},
batch_size: int = 1000,
db_block_query_batch=10,
version: int = 2,
) -> Tuple[List[Event], int]:
"""
Crawl events with auto regulated batch_size.
"""
all_events = []
for job in jobs:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
try:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
except Exception as e:
breakpoint()
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
@ -178,6 +205,7 @@ def _autoscale_crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -187,6 +215,7 @@ def _autoscale_crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)

Wyświetl plik

@ -2,6 +2,7 @@ import logging
from typing import List
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonworm.crawler.function_call_crawler import ( # type: ignore
ContractFunctionCall,
@ -57,9 +58,12 @@ def function_call_crawler(
start_block: int,
end_block: int,
batch_size: int,
version: int = 2,
):
if version != 2:
raise ValueError("Only version 2 is supported")
try:
network = blockchain_type_to_network_type(blockchain_type=blockchain_type)
network = blockchain_type_to_network_type(blockchain_type=blockchain_type) # type: ignore
except Exception as e:
raise Exception(e)

Wyświetl plik

@ -1,13 +1,15 @@
import logging
import time
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union, Any
from uuid import UUID
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
Network,
)
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -28,7 +30,7 @@ logger = logging.getLogger(__name__)
def historical_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
blockchain_type: AvailableBlockchainType, # AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
@ -39,7 +41,7 @@ def historical_crawler(
web3_uri: Optional[str] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
max_insert_batch: int = 10000,
v3_schema: bool = False,
version: int = 2,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -58,9 +60,13 @@ def historical_crawler(
except Exception as e:
raise Exception(e)
if version != 2:
## Function call crawler is not supported in version 3
network = Network("ethereum")
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
network, # type: ignore
db_session,
)
@ -94,6 +100,7 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
else:
@ -106,6 +113,7 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
@ -118,14 +126,12 @@ def historical_crawler(
db_session,
all_events[i : i + max_insert_batch],
blockchain_type,
v3_schema,
version,
)
else:
add_events_to_session(
db_session, all_events, blockchain_type, v3_schema
)
add_events_to_session(db_session, all_events, blockchain_type, version)
if function_call_crawl_jobs:
logger.info(

Wyświetl plik

@ -44,7 +44,7 @@ DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "moonworm-alpha"
CRAWLER_LABEL = "seer"
VIEW_STATE_CRAWLER_LABEL = "view-state-alpha"
METADATA_CRAWLER_LABEL = "metadata-crawler"
@ -196,14 +196,6 @@ if MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI == "":
"MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI env variable is not set"
)
<<<<<<< Updated upstream
MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI env variable is not set"
=======
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI", ""
)
@ -238,7 +230,6 @@ MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
if MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI env variable is not set"
>>>>>>> Stashed changes
)

Wyświetl plik

@ -39,7 +39,7 @@ setup(
"fastapi",
"moonstreamdb>=0.4.4",
"moonstreamdb-v3>=0.0.9",
"moonstream-types>=0.0.2",
"moonstream-types>=0.0.3",
"moonstream>=0.1.1",
"moonworm[moonstream]>=0.6.2",
"humbug",