Merge pull request #1095 from moonstream-to/revert-1085-add-moonworm-v3-db-crawler

Revert "Add moonworm v3 db crawler"
pull/1098/head
Sergei Sumarokov 2024-06-17 17:30:30 +03:00 zatwierdzone przez GitHub
commit 23842afba3
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
12 zmienionych plików z 188 dodań i 1093 usunięć

Wyświetl plik

@ -3,7 +3,7 @@ from concurrent.futures import Future, ThreadPoolExecutor, wait
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from uuid import UUID
from moonstreamtypes.blockchain import (
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_block_model,
get_transaction_model,
@ -39,11 +39,6 @@ from .settings import (
MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS,
)
@ -57,36 +52,9 @@ class BlockCrawlError(Exception):
"""
default_uri_mapping = {
AvailableBlockchainType.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI,
AvailableBlockchainType.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI,
AvailableBlockchainType.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI,
AvailableBlockchainType.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI,
AvailableBlockchainType.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI,
AvailableBlockchainType.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET: MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET_SEPOLIA: MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE: MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE_SEPOLIA: MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.GAME7_ORBIT_ARBITRUM_SEPOLIA: MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
}
def connect(
blockchain_type: Optional[AvailableBlockchainType] = None,
web3_uri: Optional[str] = None,
version: int = 2,
) -> Web3:
if blockchain_type is None and web3_uri is None:
raise Exception("Both blockchain_type and web3_uri could not be None")
@ -95,8 +63,41 @@ def connect(
request_kwargs: Dict[str, Any] = {"headers": {"Content-Type": "application/json"}}
if web3_uri is None:
web3_uri = default_uri_mapping.get(blockchain_type) # type: ignore
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):

Wyświetl plik

@ -2,17 +2,9 @@ import argparse
import logging
from typing import Optional
from uuid import UUID
from urllib.parse import urlparse, urlunparse
import requests
from moonstreamdbv3.db import (
MoonstreamDBEngine,
MoonstreamDBIndexesEngine,
MoonstreamCustomDBEngine,
)
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
from web3 import Web3
from web3.middleware import geth_poa_middleware
@ -21,8 +13,6 @@ from ..settings import (
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_DB_V3_CONTROLLER_API,
MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN,
)
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (
@ -32,8 +22,6 @@ from .crawler import (
make_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
update_job_state_with_filters,
get_event_crawl_job_records,
get_function_call_crawl_job_records,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -147,154 +135,6 @@ def handle_crawl(args: argparse.Namespace) -> None:
)
def get_db_connection(uuid):
url = (
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{uuid}/instances/1/creds/seer/url"
)
headers = {
"Authorization": f"Bearer {MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN}"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for bad requests (4xx or 5xx)
except requests.RequestException as e:
logging.error(f"Network-related error for UUID {uuid}: {str(e)}")
raise ValueError(f"Network-related error for UUID {uuid}: {str(e)}")
except Exception as e:
raise Exception(f"Unhandled exception, error: {str(e)}")
connection_string = response.text
try:
connection_string = ensure_port_in_connection_string(connection_string)
except ValueError as e:
error_msg = f"Invalid connection string for UUID {uuid}: {str(e)}"
logging.error(error_msg)
raise ValueError(error_msg)
return connection_string
def ensure_port_in_connection_string(connection_string):
# Parse the connection string into components
parsed_url = urlparse(connection_string)
# Check if a port is specified, and if not, add the default port
if parsed_url.port is None:
# Append default port 5432 for PostgreSQL if no port specified
connection_string = connection_string.replace(
"/" + connection_string.split("/")[-1],
f":5432" + "/" + connection_string.split("/")[-1],
)
connection_string = connection_string.replace('"', "")
return connection_string
def handle_crawl_v3(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
index_engine = MoonstreamDBIndexesEngine()
with index_engine.yield_db_session_ctx() as index_db_session:
initial_event_jobs = get_event_crawl_job_records(
index_db_session,
blockchain_type,
[],
{},
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
initial_function_call_jobs = get_function_call_crawl_job_records(
index_db_session,
blockchain_type,
[],
{},
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_last_labeled_block_number(db_session, blockchain_type)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block - 1
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
start_block = web3.eth.blockNumber - 10000
logger.info(f"Starting from block: {start_block}")
elif last_labeled_block is not None:
if start_block < last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
confirmations = args.confirmations
if not args.no_confirmations:
assert confirmations > 0, "confirmations must be greater than 0"
else:
confirmations = 0
craw_event_jobs = list(initial_event_jobs.values())
initial_function_call_jobs = list(initial_function_call_jobs.values())
continuous_crawler(
db_session,
blockchain_type,
web3,
craw_event_jobs,
initial_function_call_jobs,
start_block,
args.max_blocks_batch,
args.min_blocks_batch,
confirmations,
args.min_sleep_time,
args.heartbeat_interval,
args.new_jobs_refetch_interval,
web3_uri=args.web3_uri,
)
def handle_historical_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
@ -499,178 +339,6 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
"""
Historical crawl for MoonstreamDB v3
"""
blockchain_type = AvailableBlockchainType(args.blockchain_type)
##subscription_type = blockchain_type_to_subscription_type(blockchain_type)
addresses_filter = []
if args.address is not None:
## 40 hexadecimal characters format
addresses_filter.append(args.address[2:])
index_engine = MoonstreamDBIndexesEngine()
with index_engine.yield_db_session_ctx() as index_db_session:
initial_event_jobs = get_event_crawl_job_records(
index_db_session,
blockchain_type,
addresses_filter,
{},
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
initial_function_call_jobs = get_function_call_crawl_job_records(
index_db_session,
blockchain_type,
addresses_filter,
{},
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
customer_connection = get_db_connection(args.customer_uuid)
if customer_connection == "":
raise ValueError("No connection string found for the customer")
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
else:
filtered_function_call_jobs = initial_function_call_jobs.values()
if args.only_functions:
filtered_event_jobs = []
logger.info(
f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}"
)
else:
filtered_event_jobs = initial_event_jobs.values()
if args.only_events and args.only_functions:
raise ValueError(
"--only-events and --only-functions cannot be set at the same time"
)
logger.info(
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
logger.info(f"Blockchain type: {blockchain_type.value}")
customer_engine = MoonstreamCustomDBEngine(customer_connection)
with customer_engine.yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_first_labeled_block_number(
db_session,
blockchain_type,
args.address,
only_events=args.only_events,
db_version=3,
)
logger.info(f"Last labeled block: {last_labeled_block}")
addresses_deployment_blocks = None
end_block = args.end
start_block = args.start
# get set of addresses from event jobs and function call jobs
if args.find_deployed_blocks:
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
if args.start is None:
start_block = web3.eth.blockNumber - 1
addresses_deployment_blocks = find_all_deployed_blocks(
web3, list(addresses_set)
)
if len(addresses_deployment_blocks) == 0:
logger.error(
"No addresses found in the blockchain. Please check your addresses and try again"
)
return
end_block = min(addresses_deployment_blocks.values())
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
raise ValueError(
"No start block provided and no last labeled block found"
)
elif last_labeled_block is not None:
if start_block > last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
if start_block < end_block:
raise ValueError(
f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction."
)
historical_crawler(
db_session,
blockchain_type,
web3,
filtered_event_jobs, # type: ignore
filtered_function_call_jobs, # type: ignore
start_block,
end_block,
args.max_blocks_batch,
args.min_sleep_time,
web3_uri=args.web3_uri,
addresses_deployment_blocks=addresses_deployment_blocks,
version=3,
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
@ -776,103 +444,6 @@ def main() -> None:
crawl_parser.set_defaults(func=handle_crawl)
crawl_parser_v3 = subparsers.add_parser(
"crawl-v3",
help="continuous crawling the event/function call jobs from bugout journal",
)
crawl_parser_v3.add_argument(
"--start",
"-s",
type=int,
default=None,
)
crawl_parser_v3.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
crawl_parser_v3.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
crawl_parser_v3.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
crawl_parser_v3.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
crawl_parser_v3.add_argument(
"--min-blocks-batch",
"-n",
type=int,
default=20,
help="Minimum number of blocks to crawl in a single batch",
)
crawl_parser_v3.add_argument(
"--confirmations",
"-c",
type=int,
default=175,
help="Number of confirmations to wait for",
)
crawl_parser_v3.add_argument(
"--no-confirmations",
action="store_true",
default=False,
help="Do not wait for confirmations explicitly set confirmations to 0",
)
crawl_parser_v3.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
crawl_parser_v3.add_argument(
"--heartbeat-interval",
"-i",
type=float,
default=60,
help="Heartbeat interval in seconds",
)
crawl_parser_v3.add_argument(
"--new-jobs-refetch-interval",
"-r",
type=float,
default=180,
help="Time to wait before refetching new jobs",
)
crawl_parser_v3.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
crawl_parser_v3.set_defaults(func=handle_crawl_v3)
historical_crawl_parser = subparsers.add_parser(
"historical-crawl", help="Crawl historical data"
)
@ -961,112 +532,6 @@ def main() -> None:
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
historical_crawl_parser_v3 = subparsers.add_parser(
"historical-crawl-v3", help="Crawl historical data"
)
historical_crawl_parser_v3.add_argument(
"--address",
"-a",
required=False,
type=str,
)
historical_crawl_parser_v3.add_argument(
"--start",
"-s",
type=int,
default=None,
)
historical_crawl_parser_v3.add_argument(
"--end",
"-e",
type=int,
required=False,
)
historical_crawl_parser_v3.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
historical_crawl_parser_v3.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
historical_crawl_parser_v3.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
historical_crawl_parser_v3.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
historical_crawl_parser_v3.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
historical_crawl_parser_v3.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
historical_crawl_parser_v3.add_argument(
"--only-events",
action="store_true",
default=False,
help="Only crawl events",
)
historical_crawl_parser_v3.add_argument(
"--only-functions",
action="store_true",
default=False,
help="Only crawl function calls",
)
historical_crawl_parser_v3.add_argument(
"--find-deployed-blocks",
action="store_true",
default=False,
help="Find all deployed blocks",
)
historical_crawl_parser_v3.add_argument(
"--customer-uuid",
type=UUID,
required=True,
help="Customer UUID",
)
historical_crawl_parser_v3.add_argument(
"--user-uuid",
type=UUID,
required=False,
help="User UUID",
)
historical_crawl_parser_v3.set_defaults(func=handle_historical_crawl_v3)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -5,14 +5,12 @@ from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -107,7 +105,6 @@ def continuous_crawler(
new_jobs_refetch_interval: float = 120,
web3_uri: Optional[str] = None,
max_insert_batch: int = 10000,
version: int = 2,
):
crawler_type = "continuous"
assert (
@ -132,14 +129,11 @@ def continuous_crawler(
except Exception as e:
raise Exception(e)
evm_state_provider = Web3StateProvider(web3)
if version == 2:
evm_state_provider = MoonstreamEthereumStateProvider(
web3,
network, # type: ignore
db_session,
)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
heartbeat_template = {
"status": "crawling",
@ -212,7 +206,7 @@ def continuous_crawler(
)
all_function_calls = _crawl_functions(
blockchain_type,
evm_state_provider,
ethereum_state_provider,
function_call_crawl_jobs,
start_block,
end_block,
@ -274,7 +268,7 @@ def continuous_crawler(
function_call_crawl_jobs
)
heartbeat_template["function_call metrics"] = (
evm_state_provider.metrics
ethereum_state_provider.metrics
)
heartbeat(
crawler_type=crawler_type,

Wyświetl plik

@ -2,22 +2,17 @@ import json
import logging
import re
import time
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
import binascii
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
from uuid import UUID
from bugout.data import BugoutJournalEntries, BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import SubscriptionTypes
from moonstreamtypes.subscriptions import SubscriptionTypes
from moonstreamdbv3.models_indexes import AbiJobs
from moonworm.deployment import find_deployment_block # type: ignore
from sqlalchemy import func, cast as sqlcast, JSON
from sqlalchemy.orm import Session
from web3.main import Web3
from ..blockchain import connect
@ -139,7 +134,6 @@ class FunctionCallCrawlJob:
contract_address: ChecksumAddress
entries_tags: Dict[UUID, List[str]]
created_at: int
existing_selectors: List[str] = field(default_factory=list)
def get_crawl_job_entries(
@ -697,129 +691,3 @@ def add_progress_to_tags(
)
return entries_tags_delete, entries_tags_add
def get_event_crawl_job_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
addresses: List[str],
existing_crawl_job_records: Dict[str, EventCrawlJob],
):
"""
Retrieve and update the event crawl job records from the database.
"""
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) > 10)
)
if len(addresses) != 0:
query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])
)
crawl_job_records = query.all()
if len(crawl_job_records) == 0:
return existing_crawl_job_records
for crawl_job_record in crawl_job_records:
str_address = "0x" + crawl_job_record.address.hex()
checksummed_address = Web3.toChecksumAddress(str_address)
if crawl_job_record.abi_selector in existing_crawl_job_records:
if (
checksummed_address
not in existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts
):
existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts.append(checksummed_address)
else:
new_crawl_job = EventCrawlJob(
event_abi_hash=str(crawl_job_record.abi_selector),
event_abi=json.loads(str(crawl_job_record.abi)),
contracts=[checksummed_address],
address_entries={
crawl_job_record.address.hex(): {
UUID(str(crawl_job_record.id)): [
str(crawl_job_record.status),
str(crawl_job_record.progress),
]
}
},
created_at=int(crawl_job_record.created_at.timestamp()),
)
existing_crawl_job_records[str(crawl_job_record.abi_selector)] = (
new_crawl_job
)
return existing_crawl_job_records
def get_function_call_crawl_job_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
addresses: List[str],
existing_crawl_job_records: Dict[str, FunctionCallCrawlJob],
):
"""
Retrieve and update the function call crawl job records from the database.
"""
# Query AbiJobs where the abi_selector is exactly 8 characters long.
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) == 10)
.filter(
sqlcast(AbiJobs.abi, JSON).op("->>")("type") == "function",
sqlcast(AbiJobs.abi, JSON).op("->>")("stateMutability") != "view",
)
)
if len(addresses) != 0:
query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])
)
crawl_job_records = query.all()
# Iterate over each record fetched from the database
for crawl_job_record in crawl_job_records:
str_address = "0x" + crawl_job_record.address.hex()
if str_address not in existing_crawl_job_records:
existing_crawl_job_records[str_address] = FunctionCallCrawlJob(
contract_abi=[json.loads(str(crawl_job_record.abi))],
contract_address=Web3.toChecksumAddress(str_address),
entries_tags={
UUID(str(crawl_job_record.id)): [
str(crawl_job_record.status),
str(crawl_job_record.progress),
]
},
created_at=int(crawl_job_record.created_at.timestamp()),
existing_selectors=[str(crawl_job_record.abi_selector)],
)
else:
if (
crawl_job_record.abi_selector
not in existing_crawl_job_records[str_address].existing_selectors
):
existing_crawl_job_records[str_address].contract_abi.append(
json.loads(str(crawl_job_record.abi))
)
existing_crawl_job_records[str_address].existing_selectors.append(
str(crawl_job_record.abi_selector)
)
return existing_crawl_job_records

Wyświetl plik

@ -1,14 +1,12 @@
import json
import logging
from typing import Dict, List, Optional, Union, Any
from typing import Dict, List, Optional
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy import Integer, String, column, exists, func, select, text, values
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
@ -18,15 +16,12 @@ logger = logging.getLogger(__name__)
def _event_to_label(
blockchain_type: AvailableBlockchainType,
event: Event,
label_name=CRAWLER_LABEL,
db_version: int = 2,
blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type, version=db_version)
label_model = get_label_model(blockchain_type)
sanityzed_label_data = json.loads(
json.dumps(
{
@ -37,46 +32,27 @@ def _event_to_label(
).replace(r"\u0000", "")
)
if db_version == 2:
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
else:
del sanityzed_label_data["type"]
del sanityzed_label_data["name"]
label = label_model(
label=label_name,
label_name=event.event_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
block_hash=event.block_hash.hex(), # type: ignore
)
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
return label
def _function_call_to_label(
blockchain_type: AvailableBlockchainType,
function_call: ContractFunctionCall,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type, version=db_version)
label_model = get_label_model(blockchain_type)
sanityzed_label_data = json.loads(
json.dumps(
@ -91,34 +67,14 @@ def _function_call_to_label(
).replace(r"\u0000", "")
)
if db_version == 2:
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
else:
del sanityzed_label_data["type"]
del sanityzed_label_data["name"]
label = label_model(
label=label_name,
label_name=function_call.function_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
block_hash=function_call.block_hash.hex(), # type: ignore
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
caller_address=function_call.caller_address,
origin_address=function_call.caller_address,
)
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
return label
@ -127,9 +83,8 @@ def get_last_labeled_block_number(
db_session: Session,
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
db_version: int = 2,
) -> Optional[int]:
label_model = get_label_model(blockchain_type, version=db_version)
label_model = get_label_model(blockchain_type)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
@ -145,34 +100,38 @@ def get_first_labeled_block_number(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
label_name: str = CRAWLER_LABEL,
label_name=CRAWLER_LABEL,
only_events: bool = False,
db_version: int = 2,
) -> Optional[int]:
label_model = get_label_model(blockchain_type, version=db_version)
base_query = (
label_model = get_label_model(blockchain_type)
block_number_query = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name, label_model.address == address)
.filter(label_model.label == label_name)
.filter(label_model.address == address)
)
function_call_block_numbers = (
block_number_query.filter(label_model.log_index == None)
.order_by(label_model.block_number)
.limit(50)
.all()
)
event_block_numbers = (
block_number_query.filter(label_model.log_index != None)
.order_by(label_model.block_number)
.limit(50)
.all()
)
event_blocks = base_query.filter(label_model.log_index != None).first()
function_blocks = (
None
if only_events
else base_query.filter(label_model.log_index == None).first()
)
if event_blocks and function_blocks:
result = max(event_blocks, function_blocks)
elif event_blocks or function_blocks:
result = event_blocks if event_blocks else function_blocks
if only_events:
return event_block_numbers[0][0] if event_block_numbers else None
else:
result = None
return result[0] if result else None
event_block_number = event_block_numbers[0][0] if event_block_numbers else -1
function_call_block_number = (
function_call_block_numbers[0][0] if function_call_block_numbers else -1
)
max_block_number = max(event_block_number, function_call_block_number)
return max_block_number if max_block_number != -1 else None
def commit_session(db_session: Session) -> None:
@ -192,188 +151,93 @@ def add_events_to_session(
db_session: Session,
events: List[Event],
blockchain_type: AvailableBlockchainType,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(events) == 0:
return
label_model = get_label_model(blockchain_type, version=db_version)
label_model = get_label_model(blockchain_type)
if db_version == 2:
events_hashes_to_save = set([event.transaction_hash for event in events])
events_hashes_to_save = set([event.transaction_hash for event in events])
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in events_hashes_to_save]
)
).cte()
# Retrieve existing transaction hashes and registered log indexes
query = (
db_session.query(
label_model.transaction_hash.label("transaction_hash"),
func.array_agg(label_model.log_index).label("log_indexes"),
)
.filter(
label_model.label == label_name,
label_model.log_index.isnot(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
.group_by(label_model.transaction_hash)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in events_hashes_to_save]
)
).cte()
existing_log_index_by_tx_hash = {
row.transaction_hash: row.log_indexes for row in query
}
labels_to_save = [
_event_to_label(blockchain_type, event, label_name)
for event in events
if event.transaction_hash not in existing_log_index_by_tx_hash
or event.log_index
not in existing_log_index_by_tx_hash[event.transaction_hash]
]
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
else:
# Define the table name and columns based on the blockchain type
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for event in events:
label_event = _event_to_label(
blockchain_type, event, label_name, db_version
)
record = {
"label": label_event.label,
"transaction_hash": label_event.transaction_hash,
"log_index": label_event.log_index,
"block_number": label_event.block_number,
"block_hash": label_event.block_hash,
"block_timestamp": label_event.block_timestamp,
"caller_address": None,
"origin_address": None,
"address": label_event.address,
"label_name": label_event.label_name,
"label_type": "event",
"label_data": label_event.label_data,
}
records.append(record)
# Insert records using a single batched query with an ON CONFLICT clause
statement = insert(table).values(records)
do_nothing_statement = statement.on_conflict_do_nothing(
index_elements=["transaction_hash", "log_index"],
index_where=(table.c.label == "seer") & (table.c.label_type == "event"),
# Retrieve existing transaction hashes and registered log indexes
query = (
db_session.query(
label_model.transaction_hash.label("transaction_hash"),
func.array_agg(label_model.log_index).label("log_indexes"),
)
.filter(
label_model.label == label_name,
label_model.log_index.isnot(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
.group_by(label_model.transaction_hash)
)
db_session.execute(do_nothing_statement)
existing_log_index_by_tx_hash = {
row.transaction_hash: row.log_indexes for row in query
}
logger.info(f"Batch inserted {len(records)} event labels into {table.name}")
labels_to_save = [
_event_to_label(blockchain_type, event, label_name)
for event in events
if event.transaction_hash not in existing_log_index_by_tx_hash
or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
]
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
def add_function_calls_to_session(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(function_calls) == 0:
return
if db_version == 2:
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=db_version)
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
)
).cte()
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
)
).cte()
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash),
)
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
existing_tx_hashes = [row.transaction_hash for row in query]
existing_tx_hashes = [row.transaction_hash for row in query]
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
else:
label_model = get_label_model(blockchain_type, version=db_version)
# Define the table name and columns based on the blockchain type
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for function_call in function_calls:
label_function_call = _function_call_to_label(
blockchain_type, function_call, db_version
)
record = {
"label": label_function_call.label,
"transaction_hash": label_function_call.transaction_hash,
"log_index": None,
"block_number": label_function_call.block_number,
"block_hash": label_function_call.block_hash,
"block_timestamp": label_function_call.block_timestamp,
"caller_address": label_function_call.caller_address,
"origin_address": label_function_call.caller_address,
"address": label_function_call.address,
"label_name": label_function_call.label_name,
"label_type": "tx_call",
"label_data": label_function_call.label_data,
}
records.append(record)
# Insert records using a single batched query with an ON CONFLICT clause
statement = insert(table).values(records)
do_nothing_statement = statement.on_conflict_do_nothing(
index_elements=["transaction_hash"],
index_where=(table.c.label == "seer") & (table.c.label_type == "tx_call"),
)
db_session.execute(do_nothing_statement)
logger.info(
f"Batch inserted {len(records)} function call labels into {table.name}"
)
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)

Wyświetl plik

@ -2,12 +2,7 @@ import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
)
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
from moonworm.crawler.log_scanner import (
_crawl_events as moonworm_autoscale_crawl_events, # type: ignore
)
@ -31,7 +26,6 @@ class Event:
block_timestamp: int
transaction_hash: str
log_index: int
block_hash: Optional[str] = None
def _get_block_timestamp_from_web3(
@ -54,7 +48,6 @@ def get_block_timestamp(
block_number: int,
blocks_cache: Dict[int, int],
max_blocks_batch: int = 30,
version: int = 2,
) -> int:
"""
Get the timestamp of a block.
@ -70,14 +63,6 @@ def get_block_timestamp(
:param blocks_cache: The cache of blocks.
:return: The timestamp of the block.
"""
if version != 2:
if block_number in blocks_cache:
return blocks_cache[block_number]
target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number)
blocks_cache[block_number] = target_block_timestamp
return target_block_timestamp
assert max_blocks_batch > 0
if block_number in blocks_cache:
@ -86,9 +71,7 @@ def get_block_timestamp(
block_model = get_block_model(blockchain_type)
blocks = (
db_session.query(
block_model.block_number, block_model.timestamp, block_model.hash
)
db_session.query(block_model.block_number, block_model.timestamp)
.filter(
and_(
block_model.block_number >= block_number - max_blocks_batch - 1,
@ -125,7 +108,6 @@ def _crawl_events(
to_block: int,
blocks_cache: Dict[int, int] = {},
db_block_query_batch=10,
version: int = 2,
) -> List[Event]:
all_events = []
for job in jobs:
@ -147,7 +129,6 @@ def _crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -157,7 +138,6 @@ def _crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)
@ -174,25 +154,21 @@ def _autoscale_crawl_events(
blocks_cache: Dict[int, int] = {},
batch_size: int = 1000,
db_block_query_batch=10,
version: int = 2,
) -> Tuple[List[Event], int]:
"""
Crawl events with auto regulated batch_size.
"""
all_events = []
for job in jobs:
try:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
except Exception as e:
breakpoint()
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
@ -201,7 +177,6 @@ def _autoscale_crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -211,7 +186,6 @@ def _autoscale_crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)

Wyświetl plik

@ -1,8 +1,7 @@
import logging
from typing import List, Union
from typing import List
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonworm.crawler.function_call_crawler import ( # type: ignore
ContractFunctionCall,
@ -11,7 +10,6 @@ from moonworm.crawler.function_call_crawler import ( # type: ignore
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider
from moonworm.watch import MockState # type: ignore
from sqlalchemy.orm import Session
from web3 import Web3
@ -24,7 +22,7 @@ logger = logging.getLogger(__name__)
def _crawl_functions(
blockchain_type: AvailableBlockchainType,
ethereum_state_provider: Union[MoonstreamEthereumStateProvider, Web3StateProvider],
ethereum_state_provider: MoonstreamEthereumStateProvider,
jobs: List[FunctionCallCrawlJob],
from_block: int,
to_block: int,
@ -59,12 +57,9 @@ def function_call_crawler(
start_block: int,
end_block: int,
batch_size: int,
version: int = 2,
):
if version != 2:
raise ValueError("Only version 2 is supported")
try:
network = blockchain_type_to_network_type(blockchain_type=blockchain_type) # type: ignore
network = blockchain_type_to_network_type(blockchain_type=blockchain_type)
except Exception as e:
raise Exception(e)

Wyświetl plik

@ -1,17 +1,14 @@
import logging
import time
from typing import Dict, List, Optional, Union, Any
from typing import Dict, List, Optional
from uuid import UUID
from eth_typing.evm import ChecksumAddress
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
Network,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -31,7 +28,7 @@ logger = logging.getLogger(__name__)
def historical_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType, # AvailableBlockchainType,
blockchain_type: AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
@ -42,7 +39,6 @@ def historical_crawler(
web3_uri: Optional[str] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
max_insert_batch: int = 10000,
version: int = 2,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -61,15 +57,11 @@ def historical_crawler(
except Exception as e:
raise Exception(e)
evm_state_provider = Web3StateProvider(web3)
if version == 2:
### Moonstream state provider use the V2 db to get the block
evm_state_provider = MoonstreamEthereumStateProvider(
web3,
network, # type: ignore
db_session,
)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
logger.info(f"Starting historical event crawler start_block={start_block}")
@ -101,7 +93,6 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
else:
@ -114,7 +105,6 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
@ -127,12 +117,11 @@ def historical_crawler(
db_session,
all_events[i : i + max_insert_batch],
blockchain_type,
version,
)
else:
add_events_to_session(db_session, all_events, blockchain_type, version)
add_events_to_session(db_session, all_events, blockchain_type)
if function_call_crawl_jobs:
logger.info(
@ -140,7 +129,7 @@ def historical_crawler(
)
all_function_calls = _crawl_functions(
blockchain_type,
evm_state_provider,
ethereum_state_provider,
function_call_crawl_jobs,
batch_end_block,
start_block,
@ -156,12 +145,11 @@ def historical_crawler(
db_session,
all_function_calls[i : i + max_insert_batch],
blockchain_type,
version,
)
else:
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type, version
db_session, all_function_calls, blockchain_type
)
if addresses_deployment_blocks:

Wyświetl plik

@ -44,7 +44,7 @@ DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "seer"
CRAWLER_LABEL = "moonworm-alpha"
VIEW_STATE_CRAWLER_LABEL = "view-state-alpha"
METADATA_CRAWLER_LABEL = "metadata-crawler"
@ -196,42 +196,6 @@ if MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI == "":
"MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI == "":
raise Exception("MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI env variable is not set")
MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI == "":
raise Exception("MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI env variable is not set")
MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_CRAWL_WORKERS = 4
MOONSTREAM_CRAWL_WORKERS_RAW = os.environ.get("MOONSTREAM_CRAWL_WORKERS")
@ -429,9 +393,3 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)
MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN", ""
)

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.4.6"
MOONCRAWL_VERSION = "0.4.5"

Wyświetl plik

@ -3,6 +3,7 @@ export BUGOUT_BROOD_URL="https://auth.bugout.dev"
export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Engine environment variables
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"
@ -18,8 +19,6 @@ export MOONSTREAM_DATA_JOURNAL_ID="<Bugout_journal_id_for_moonstream>"
export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Bugout_access_token_for_moonstream>"
export NFT_HUMBUG_TOKEN="<Token_for_nft_crawler>"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<moonstream_admin_access_token>"
# Blockchain nodes environment variables
export MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
@ -39,12 +38,6 @@ export MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI="https://<connection_path_uri_to
export MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
# AWS environment variables
export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_for_smart_contracts>"
@ -57,6 +50,7 @@ export MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET="<AWS_S3_bucket_to_store_dashboar
export MOONSTREAM_ETHERSCAN_TOKEN="<Token_for_etherscan>"
export COINMARKETCAP_API_KEY="<API_key_to_parse_conmarketcap>"
# Custom crawler
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="<public_bucket>"
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev"
@ -64,8 +58,4 @@ export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="<access token for run querie
export INFURA_PROJECT_ID="<infura_project_id>"
# Leaderboard worker
export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID="<Bugout_journal_id_for_leaderboards>"
# DB v3 controller
export MOONSTREAM_DB_V3_CONTROLLER_API="https://mdb-v3-api.moonstream.to"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID=<Bugout_journal_id_for_leaderboards>

Wyświetl plik

@ -38,10 +38,8 @@ setup(
"chardet",
"fastapi",
"moonstreamdb>=0.4.4",
"moonstreamdb-v3>=0.0.10",
"moonstream-types>=0.0.3",
"moonstream>=0.1.1",
"moonworm[moonstream]>=0.9.1",
"moonworm[moonstream]==0.9.0",
"humbug",
"pydantic==1.9.2",
"python-dateutil",