Merge branch 'main' into add-xai-state-scripts

pull/1098/head
Andrey Dolgolev 2024-06-24 23:44:30 +03:00 zatwierdzone przez GitHub
commit 7753978605
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
18 zmienionych plików z 1352 dodań i 216 usunięć

Wyświetl plik

@ -5,11 +5,11 @@ import time
import uuid
from collections import OrderedDict
from datetime import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List
import boto3 # type: ignore
import requests # type: ignore
from bugout.data import BugoutResources
from bugout.data import BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException
from moonstream.client import ( # type: ignore
ENDPOINT_QUERIES,
@ -170,3 +170,37 @@ def recive_S3_data_from_query(
logger.info("Too many retries")
break
return data_response.json()
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
"""
offset = 0
results: List[BugoutSearchResult] = []
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit):
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
return results

Wyświetl plik

@ -3,7 +3,7 @@ from concurrent.futures import Future, ThreadPoolExecutor, wait
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from uuid import UUID
from moonstreamdb.blockchain import (
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_block_model,
get_transaction_model,
@ -15,7 +15,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Query, Session
from tqdm import tqdm
from web3 import HTTPProvider, IPCProvider, Web3
from web3.middleware import geth_poa_middleware
from web3.middleware import geth_poa_middleware # type: ignore
from web3.types import BlockData
from .data import DateRange
@ -39,6 +39,11 @@ from .settings import (
MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS,
)
@ -52,9 +57,36 @@ class BlockCrawlError(Exception):
"""
default_uri_mapping = {
AvailableBlockchainType.ETHEREUM: MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI,
AvailableBlockchainType.POLYGON: MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI,
AvailableBlockchainType.MUMBAI: MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI,
AvailableBlockchainType.AMOY: MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI,
AvailableBlockchainType.XDAI: MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA: MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA: MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_ONE: MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_NOVA: MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI,
AvailableBlockchainType.ARBITRUM_SEPOLIA: MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.XAI: MOONSTREAM_NODE_XAI_A_EXTERNAL_URI,
AvailableBlockchainType.XAI_SEPOLIA: MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE: MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI,
AvailableBlockchainType.AVALANCHE_FUJI: MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST: MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI,
AvailableBlockchainType.BLAST_SEPOLIA: MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.PROOFOFPLAY_APEX: MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET: MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI,
AvailableBlockchainType.STARKNET_SEPOLIA: MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE: MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI,
AvailableBlockchainType.MANTLE_SEPOLIA: MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI,
AvailableBlockchainType.GAME7_ORBIT_ARBITRUM_SEPOLIA: MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI,
}
def connect(
blockchain_type: Optional[AvailableBlockchainType] = None,
web3_uri: Optional[str] = None,
version: int = 2,
) -> Web3:
if blockchain_type is None and web3_uri is None:
raise Exception("Both blockchain_type and web3_uri could not be None")
@ -63,41 +95,8 @@ def connect(
request_kwargs: Dict[str, Any] = {"headers": {"Content-Type": "application/json"}}
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
else:
web3_uri = default_uri_mapping.get(blockchain_type) # type: ignore
if web3_uri is None:
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):

Wyświetl plik

@ -14,7 +14,8 @@ from typing import Iterator, List
from uuid import UUID
import dateutil.parser # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from .blockchain import (
DateRange,

Wyświetl plik

@ -58,3 +58,12 @@ class TokenURIs(BaseModel):
block_number: str
block_timestamp: str
address: str
class ViewTasks(BaseModel):
type: str
stateMutability: str
inputs: Any
name: str
outputs: List[Dict[str, Any]]
address: str

Wyświetl plik

@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Set, Union
from eth_typing import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.blockchain import (
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_label_model,
get_transaction_model,
@ -314,7 +314,12 @@ def populate_with_events(
events.append(event)
logger.info(f"Found {len(events)} events for populate")
add_events_to_session(db_session, events, blockchain_type, label_name)
add_events_to_session(
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
label_name=label_name,
)
commit_session(db_session)
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1
@ -403,10 +408,10 @@ def crawl(
label_name,
)
add_events_to_session(
db_session,
events,
blockchain_type,
label_name,
db_session=db_session,
events=events,
blockchain_type=blockchain_type,
label_name=label_name,
)
commit_session(db_session)
pbar.update(batch_end - current_block + 1)

Wyświetl plik

@ -4,7 +4,7 @@ import logging
from typing import Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from web3 import Web3
from web3.middleware import geth_poa_middleware

Wyświetl plik

@ -2,17 +2,27 @@ import argparse
import logging
from typing import Optional
from uuid import UUID
from urllib.parse import urlparse, urlunparse
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
import requests
from moonstreamdbv3.db import (
MoonstreamDBEngine,
MoonstreamDBIndexesEngine,
MoonstreamCustomDBEngine,
)
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from web3 import Web3
from web3.middleware import geth_poa_middleware
from web3.middleware import geth_poa_middleware # type: ignore
from ..db import yield_db_session_ctx
from ..settings import (
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_DB_V3_CONTROLLER_API,
MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN,
)
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (
@ -22,6 +32,8 @@ from .crawler import (
make_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
update_job_state_with_filters,
get_event_crawl_job_records,
get_function_call_crawl_job_records,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -135,6 +147,154 @@ def handle_crawl(args: argparse.Namespace) -> None:
)
def get_db_connection(uuid):
url = (
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{uuid}/instances/1/creds/seer/url"
)
headers = {
"Authorization": f"Bearer {MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN}"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for bad requests (4xx or 5xx)
except requests.RequestException as e:
logging.error(f"Network-related error for UUID {uuid}: {str(e)}")
raise ValueError(f"Network-related error for UUID {uuid}: {str(e)}")
except Exception as e:
raise Exception(f"Unhandled exception, error: {str(e)}")
connection_string = response.text
try:
connection_string = ensure_port_in_connection_string(connection_string)
except ValueError as e:
error_msg = f"Invalid connection string for UUID {uuid}: {str(e)}"
logging.error(error_msg)
raise ValueError(error_msg)
return connection_string
def ensure_port_in_connection_string(connection_string):
# Parse the connection string into components
parsed_url = urlparse(connection_string)
# Check if a port is specified, and if not, add the default port
if parsed_url.port is None:
# Append default port 5432 for PostgreSQL if no port specified
connection_string = connection_string.replace(
"/" + connection_string.split("/")[-1],
f":5432" + "/" + connection_string.split("/")[-1],
)
connection_string = connection_string.replace('"', "")
return connection_string
def handle_crawl_v3(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
index_engine = MoonstreamDBIndexesEngine()
with index_engine.yield_db_session_ctx() as index_db_session:
initial_event_jobs = get_event_crawl_job_records(
index_db_session,
blockchain_type,
[],
{},
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
initial_function_call_jobs = get_function_call_crawl_job_records(
index_db_session,
blockchain_type,
[],
{},
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_last_labeled_block_number(db_session, blockchain_type)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block - 1
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
start_block = web3.eth.blockNumber - 10000
logger.info(f"Starting from block: {start_block}")
elif last_labeled_block is not None:
if start_block < last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
confirmations = args.confirmations
if not args.no_confirmations:
assert confirmations > 0, "confirmations must be greater than 0"
else:
confirmations = 0
craw_event_jobs = list(initial_event_jobs.values())
initial_function_call_jobs = list(initial_function_call_jobs.values())
continuous_crawler(
db_session,
blockchain_type,
web3,
craw_event_jobs,
initial_function_call_jobs,
start_block,
args.max_blocks_batch,
args.min_blocks_batch,
confirmations,
args.min_sleep_time,
args.heartbeat_interval,
args.new_jobs_refetch_interval,
web3_uri=args.web3_uri,
)
def handle_historical_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
@ -271,6 +431,177 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
start_block = args.start
# get set of addresses from event jobs and function call jobs
if args.find_deployed_blocks:
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts) # type: ignore
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address) # type: ignore
if args.start is None:
start_block = web3.eth.blockNumber - 1
addresses_deployment_blocks = find_all_deployed_blocks(
web3, list(addresses_set)
)
if len(addresses_deployment_blocks) == 0:
logger.error(
"No addresses found in the blockchain. Please check your addresses and try again"
)
return
end_block = min(addresses_deployment_blocks.values())
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
raise ValueError(
"No start block provided and no last labeled block found"
)
elif last_labeled_block is not None:
if start_block > last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
if start_block < end_block:
raise ValueError(
f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction."
)
historical_crawler(
db_session,
blockchain_type,
web3,
filtered_event_jobs, # type: ignore
filtered_function_call_jobs, # type: ignore
start_block,
end_block,
args.max_blocks_batch,
args.min_sleep_time,
web3_uri=args.web3_uri,
addresses_deployment_blocks=addresses_deployment_blocks,
)
def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
"""
Historical crawl for MoonstreamDB v3
"""
blockchain_type = AvailableBlockchainType(args.blockchain_type)
##subscription_type = blockchain_type_to_subscription_type(blockchain_type)
addresses_filter = []
if args.address is not None:
## 40 hexadecimal characters format
addresses_filter.append(args.address[2:])
index_engine = MoonstreamDBIndexesEngine()
with index_engine.yield_db_session_ctx() as index_db_session:
initial_event_jobs = get_event_crawl_job_records(
index_db_session,
blockchain_type,
addresses_filter,
{},
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
initial_function_call_jobs = get_function_call_crawl_job_records(
index_db_session,
blockchain_type,
addresses_filter,
{},
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
customer_connection = get_db_connection(args.customer_uuid)
if customer_connection == "":
raise ValueError("No connection string found for the customer")
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
else:
filtered_function_call_jobs = initial_function_call_jobs.values()
if args.only_functions:
filtered_event_jobs = []
logger.info(
f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}"
)
else:
filtered_event_jobs = initial_event_jobs.values()
if args.only_events and args.only_functions:
raise ValueError(
"--only-events and --only-functions cannot be set at the same time"
)
logger.info(
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
logger.info(f"Blockchain type: {blockchain_type.value}")
customer_engine = MoonstreamCustomDBEngine(customer_connection)
with customer_engine.yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type, web3_uri=args.web3_uri)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_first_labeled_block_number(
db_session,
blockchain_type,
args.address,
only_events=args.only_events,
db_version=3,
)
logger.info(f"Last labeled block: {last_labeled_block}")
addresses_deployment_blocks = None
end_block = args.end
start_block = args.start
# get set of addresses from event jobs and function call jobs
if args.find_deployed_blocks:
addresses_set = set()
@ -328,14 +659,15 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
db_session,
blockchain_type,
web3,
filtered_event_jobs,
filtered_function_call_jobs,
filtered_event_jobs, # type: ignore
filtered_function_call_jobs, # type: ignore
start_block,
end_block,
args.max_blocks_batch,
args.min_sleep_time,
web3_uri=args.web3_uri,
addresses_deployment_blocks=addresses_deployment_blocks,
version=3,
)
@ -444,6 +776,103 @@ def main() -> None:
crawl_parser.set_defaults(func=handle_crawl)
crawl_parser_v3 = subparsers.add_parser(
"crawl-v3",
help="continuous crawling the event/function call jobs from bugout journal",
)
crawl_parser_v3.add_argument(
"--start",
"-s",
type=int,
default=None,
)
crawl_parser_v3.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
crawl_parser_v3.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
crawl_parser_v3.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
crawl_parser_v3.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
crawl_parser_v3.add_argument(
"--min-blocks-batch",
"-n",
type=int,
default=20,
help="Minimum number of blocks to crawl in a single batch",
)
crawl_parser_v3.add_argument(
"--confirmations",
"-c",
type=int,
default=175,
help="Number of confirmations to wait for",
)
crawl_parser_v3.add_argument(
"--no-confirmations",
action="store_true",
default=False,
help="Do not wait for confirmations explicitly set confirmations to 0",
)
crawl_parser_v3.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
crawl_parser_v3.add_argument(
"--heartbeat-interval",
"-i",
type=float,
default=60,
help="Heartbeat interval in seconds",
)
crawl_parser_v3.add_argument(
"--new-jobs-refetch-interval",
"-r",
type=float,
default=180,
help="Time to wait before refetching new jobs",
)
crawl_parser_v3.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
crawl_parser_v3.set_defaults(func=handle_crawl_v3)
historical_crawl_parser = subparsers.add_parser(
"historical-crawl", help="Crawl historical data"
)
@ -532,6 +961,112 @@ def main() -> None:
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
historical_crawl_parser_v3 = subparsers.add_parser(
"historical-crawl-v3", help="Crawl historical data"
)
historical_crawl_parser_v3.add_argument(
"--address",
"-a",
required=False,
type=str,
)
historical_crawl_parser_v3.add_argument(
"--start",
"-s",
type=int,
default=None,
)
historical_crawl_parser_v3.add_argument(
"--end",
"-e",
type=int,
required=False,
)
historical_crawl_parser_v3.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
historical_crawl_parser_v3.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
historical_crawl_parser_v3.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
historical_crawl_parser_v3.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
historical_crawl_parser_v3.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
historical_crawl_parser_v3.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
historical_crawl_parser_v3.add_argument(
"--only-events",
action="store_true",
default=False,
help="Only crawl events",
)
historical_crawl_parser_v3.add_argument(
"--only-functions",
action="store_true",
default=False,
help="Only crawl function calls",
)
historical_crawl_parser_v3.add_argument(
"--find-deployed-blocks",
action="store_true",
default=False,
help="Find all deployed blocks",
)
historical_crawl_parser_v3.add_argument(
"--customer-uuid",
type=UUID,
required=True,
help="Customer UUID",
)
historical_crawl_parser_v3.add_argument(
"--user-uuid",
type=UUID,
required=False,
help="User UUID",
)
historical_crawl_parser_v3.set_defaults(func=handle_historical_crawl_v3)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -5,12 +5,14 @@ from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.subscriptions import blockchain_type_to_subscription_type
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -105,6 +107,7 @@ def continuous_crawler(
new_jobs_refetch_interval: float = 120,
web3_uri: Optional[str] = None,
max_insert_batch: int = 10000,
version: int = 2,
):
crawler_type = "continuous"
assert (
@ -129,11 +132,14 @@ def continuous_crawler(
except Exception as e:
raise Exception(e)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
evm_state_provider = Web3StateProvider(web3)
if version == 2:
evm_state_provider = MoonstreamEthereumStateProvider(
web3,
network, # type: ignore
db_session,
)
heartbeat_template = {
"status": "crawling",
@ -206,7 +212,7 @@ def continuous_crawler(
)
all_function_calls = _crawl_functions(
blockchain_type,
ethereum_state_provider,
evm_state_provider,
function_call_crawl_jobs,
start_block,
end_block,
@ -268,7 +274,7 @@ def continuous_crawler(
function_call_crawl_jobs
)
heartbeat_template["function_call metrics"] = (
ethereum_state_provider.metrics
evm_state_provider.metrics
)
heartbeat(
crawler_type=crawler_type,

Wyświetl plik

@ -2,17 +2,22 @@ import json
import logging
import re
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
import binascii
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
from uuid import UUID
from bugout.data import BugoutJournalEntries, BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import SubscriptionTypes
from moonstreamtypes.subscriptions import SubscriptionTypes
from moonstreamdbv3.models_indexes import AbiJobs
from moonworm.deployment import find_deployment_block # type: ignore
from sqlalchemy import func, cast as sqlcast, JSON
from sqlalchemy.orm import Session
from web3.main import Web3
from ..blockchain import connect
@ -134,6 +139,7 @@ class FunctionCallCrawlJob:
contract_address: ChecksumAddress
entries_tags: Dict[UUID, List[str]]
created_at: int
existing_selectors: List[str] = field(default_factory=list)
def get_crawl_job_entries(
@ -691,3 +697,129 @@ def add_progress_to_tags(
)
return entries_tags_delete, entries_tags_add
def get_event_crawl_job_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
addresses: List[str],
existing_crawl_job_records: Dict[str, EventCrawlJob],
):
"""
Retrieve and update the event crawl job records from the database.
"""
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) > 10)
)
if len(addresses) != 0:
query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])
)
crawl_job_records = query.all()
if len(crawl_job_records) == 0:
return existing_crawl_job_records
for crawl_job_record in crawl_job_records:
str_address = "0x" + crawl_job_record.address.hex()
checksummed_address = Web3.toChecksumAddress(str_address)
if crawl_job_record.abi_selector in existing_crawl_job_records:
if (
checksummed_address
not in existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts
):
existing_crawl_job_records[
crawl_job_record.abi_selector
].contracts.append(checksummed_address)
else:
new_crawl_job = EventCrawlJob(
event_abi_hash=str(crawl_job_record.abi_selector),
event_abi=json.loads(str(crawl_job_record.abi)),
contracts=[checksummed_address],
address_entries={
crawl_job_record.address.hex(): {
UUID(str(crawl_job_record.id)): [
str(crawl_job_record.status),
str(crawl_job_record.progress),
]
}
},
created_at=int(crawl_job_record.created_at.timestamp()),
)
existing_crawl_job_records[str(crawl_job_record.abi_selector)] = (
new_crawl_job
)
return existing_crawl_job_records
def get_function_call_crawl_job_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
addresses: List[str],
existing_crawl_job_records: Dict[str, FunctionCallCrawlJob],
):
"""
Retrieve and update the function call crawl job records from the database.
"""
# Query AbiJobs where the abi_selector is exactly 8 characters long.
query = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == blockchain_type.value)
.filter(func.length(AbiJobs.abi_selector) == 10)
.filter(
sqlcast(AbiJobs.abi, JSON).op("->>")("type") == "function",
sqlcast(AbiJobs.abi, JSON).op("->>")("stateMutability") != "view",
)
)
if len(addresses) != 0:
query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses])
)
crawl_job_records = query.all()
# Iterate over each record fetched from the database
for crawl_job_record in crawl_job_records:
str_address = "0x" + crawl_job_record.address.hex()
if str_address not in existing_crawl_job_records:
existing_crawl_job_records[str_address] = FunctionCallCrawlJob(
contract_abi=[json.loads(str(crawl_job_record.abi))],
contract_address=Web3.toChecksumAddress(str_address),
entries_tags={
UUID(str(crawl_job_record.id)): [
str(crawl_job_record.status),
str(crawl_job_record.progress),
]
},
created_at=int(crawl_job_record.created_at.timestamp()),
existing_selectors=[str(crawl_job_record.abi_selector)],
)
else:
if (
crawl_job_record.abi_selector
not in existing_crawl_job_records[str_address].existing_selectors
):
existing_crawl_job_records[str_address].contract_abi.append(
json.loads(str(crawl_job_record.abi))
)
existing_crawl_job_records[str_address].existing_selectors.append(
str(crawl_job_record.abi_selector)
)
return existing_crawl_job_records

Wyświetl plik

@ -1,12 +1,14 @@
import json
import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union, Any
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy import Integer, String, column, exists, func, select, text, values
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
@ -16,12 +18,15 @@ logger = logging.getLogger(__name__)
def _event_to_label(
blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL
) -> Base:
blockchain_type: AvailableBlockchainType,
event: Event,
label_name=CRAWLER_LABEL,
db_version: int = 2,
) -> Base: # type: ignore
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=db_version)
sanityzed_label_data = json.loads(
json.dumps(
{
@ -32,27 +37,46 @@ def _event_to_label(
).replace(r"\u0000", "")
)
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
if db_version == 2:
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
else:
del sanityzed_label_data["type"]
del sanityzed_label_data["name"]
label = label_model(
label=label_name,
label_name=event.event_name,
label_data=sanityzed_label_data,
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
block_hash=event.block_hash.hex(), # type: ignore
)
return label
def _function_call_to_label(
blockchain_type: AvailableBlockchainType,
function_call: ContractFunctionCall,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> Base:
) -> Base: # type: ignore
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=db_version)
sanityzed_label_data = json.loads(
json.dumps(
@ -67,14 +91,34 @@ def _function_call_to_label(
).replace(r"\u0000", "")
)
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
if db_version == 2:
label = label_model(
label=label_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
else:
del sanityzed_label_data["type"]
del sanityzed_label_data["name"]
label = label_model(
label=label_name,
label_name=function_call.function_name,
label_data=sanityzed_label_data,
address=function_call.contract_address,
block_number=function_call.block_number,
block_hash=function_call.block_hash.hex(), # type: ignore
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
caller_address=function_call.caller_address,
origin_address=function_call.caller_address,
)
return label
@ -83,8 +127,9 @@ def get_last_labeled_block_number(
db_session: Session,
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
db_version: int = 2,
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=db_version)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
@ -100,38 +145,34 @@ def get_first_labeled_block_number(
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
label_name=CRAWLER_LABEL,
label_name: str = CRAWLER_LABEL,
only_events: bool = False,
db_version: int = 2,
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_number_query = (
label_model = get_label_model(blockchain_type, version=db_version)
base_query = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
.filter(label_model.address == address)
.filter(label_model.label == label_name, label_model.address == address)
.order_by(label_model.block_number)
)
function_call_block_numbers = (
block_number_query.filter(label_model.log_index == None)
.order_by(label_model.block_number)
.limit(50)
.all()
)
event_block_numbers = (
block_number_query.filter(label_model.log_index != None)
.order_by(label_model.block_number)
.limit(50)
.all()
event_blocks = base_query.filter(label_model.log_index != None).first()
function_blocks = (
None
if only_events
else base_query.filter(label_model.log_index == None).first()
)
if only_events:
return event_block_numbers[0][0] if event_block_numbers else None
if event_blocks and function_blocks:
result = max(event_blocks, function_blocks)
elif event_blocks or function_blocks:
result = event_blocks if event_blocks else function_blocks
else:
event_block_number = event_block_numbers[0][0] if event_block_numbers else -1
function_call_block_number = (
function_call_block_numbers[0][0] if function_call_block_numbers else -1
)
max_block_number = max(event_block_number, function_call_block_number)
return max_block_number if max_block_number != -1 else None
result = None
return result[0] if result else None
def commit_session(db_session: Session) -> None:
@ -151,93 +192,188 @@ def add_events_to_session(
db_session: Session,
events: List[Event],
blockchain_type: AvailableBlockchainType,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(events) == 0:
return
label_model = get_label_model(blockchain_type)
label_model = get_label_model(blockchain_type, version=db_version)
events_hashes_to_save = set([event.transaction_hash for event in events])
if db_version == 2:
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in events_hashes_to_save]
events_hashes_to_save = set([event.transaction_hash for event in events])
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in events_hashes_to_save]
)
).cte()
# Retrieve existing transaction hashes and registered log indexes
query = (
db_session.query(
label_model.transaction_hash.label("transaction_hash"),
func.array_agg(label_model.log_index).label("log_indexes"),
)
.filter(
label_model.label == label_name,
label_model.log_index.isnot(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
.group_by(label_model.transaction_hash)
)
).cte()
# Retrieve existing transaction hashes and registered log indexes
query = (
db_session.query(
label_model.transaction_hash.label("transaction_hash"),
func.array_agg(label_model.log_index).label("log_indexes"),
existing_log_index_by_tx_hash = {
row.transaction_hash: row.log_indexes for row in query
}
labels_to_save = [
_event_to_label(blockchain_type, event, label_name)
for event in events
if event.transaction_hash not in existing_log_index_by_tx_hash
or event.log_index
not in existing_log_index_by_tx_hash[event.transaction_hash]
]
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
else:
# Define the table name and columns based on the blockchain type
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for event in events:
label_event = _event_to_label(
blockchain_type, event, label_name, db_version
)
record = {
"label": label_event.label,
"transaction_hash": label_event.transaction_hash,
"log_index": label_event.log_index,
"block_number": label_event.block_number,
"block_hash": label_event.block_hash,
"block_timestamp": label_event.block_timestamp,
"caller_address": None,
"origin_address": None,
"address": label_event.address,
"label_name": label_event.label_name,
"label_type": "event",
"label_data": label_event.label_data,
}
records.append(record)
# Insert records using a single batched query with an ON CONFLICT clause
statement = insert(table).values(records)
do_nothing_statement = statement.on_conflict_do_nothing(
index_elements=["transaction_hash", "log_index"],
index_where=(table.c.label == "seer") & (table.c.label_type == "event"),
)
.filter(
label_model.label == label_name,
label_model.log_index.isnot(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
.group_by(label_model.transaction_hash)
)
existing_log_index_by_tx_hash = {
row.transaction_hash: row.log_indexes for row in query
}
db_session.execute(do_nothing_statement)
labels_to_save = [
_event_to_label(blockchain_type, event, label_name)
for event in events
if event.transaction_hash not in existing_log_index_by_tx_hash
or event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
]
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
logger.info(f"Batch inserted {len(records)} event labels into {table.name}")
def add_function_calls_to_session(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
db_version: int = 2,
label_name=CRAWLER_LABEL,
) -> None:
if len(function_calls) == 0:
return
label_model = get_label_model(blockchain_type)
if db_version == 2:
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
label_model = get_label_model(blockchain_type, version=db_version)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
transactions_hashes_to_save = list(
set([function_call.transaction_hash for function_call in function_calls])
)
).cte()
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(label_model.transaction_hash == hashes_cte.c.transaction_hash),
)
# Define a CTE VALUES expression to escape big IN clause
hashes_cte = select(
values(column("transaction_hash", String), name="hashes").data(
[(hash,) for hash in transactions_hashes_to_save]
)
).cte()
existing_tx_hashes = [row.transaction_hash for row in query]
# Retrieve existing transaction hashes
query = db_session.query(
label_model.transaction_hash.label("transaction_hash")
).filter(
label_model.label == label_name,
label_model.log_index.is_(None),
exists().where(
label_model.transaction_hash == hashes_cte.c.transaction_hash
),
)
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
existing_tx_hashes = [row.transaction_hash for row in query]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_tx_hashes
]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
else:
label_model = get_label_model(blockchain_type, version=db_version)
# Define the table name and columns based on the blockchain type
table = label_model.__table__
# Create a list of dictionaries representing new records
records = []
for function_call in function_calls:
label_function_call = _function_call_to_label(
blockchain_type, function_call, db_version
)
record = {
"label": label_function_call.label,
"transaction_hash": label_function_call.transaction_hash,
"log_index": None,
"block_number": label_function_call.block_number,
"block_hash": label_function_call.block_hash,
"block_timestamp": label_function_call.block_timestamp,
"caller_address": label_function_call.caller_address,
"origin_address": label_function_call.caller_address,
"address": label_function_call.address,
"label_name": label_function_call.label_name,
"label_type": "tx_call",
"label_data": label_function_call.label_data,
}
records.append(record)
# Insert records using a single batched query with an ON CONFLICT clause
statement = insert(table).values(records)
do_nothing_statement = statement.on_conflict_do_nothing(
index_elements=["transaction_hash"],
index_where=(table.c.label == "seer") & (table.c.label_type == "tx_call"),
)
db_session.execute(do_nothing_statement)
logger.info(
f"Batch inserted {len(records)} function call labels into {table.name}"
)

Wyświetl plik

@ -2,11 +2,16 @@ import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
from moonworm.crawler.log_scanner import (
_crawl_events as moonworm_autoscale_crawl_events, # type: ignore
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk
from moonworm.crawler.log_scanner import ( # type: ignore
_crawl_events as moonworm_autoscale_crawl_events,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_
from web3 import Web3
@ -26,6 +31,7 @@ class Event:
block_timestamp: int
transaction_hash: str
log_index: int
block_hash: Optional[str] = None
def _get_block_timestamp_from_web3(
@ -48,6 +54,7 @@ def get_block_timestamp(
block_number: int,
blocks_cache: Dict[int, int],
max_blocks_batch: int = 30,
version: int = 2,
) -> int:
"""
Get the timestamp of a block.
@ -63,6 +70,14 @@ def get_block_timestamp(
:param blocks_cache: The cache of blocks.
:return: The timestamp of the block.
"""
if version != 2:
if block_number in blocks_cache:
return blocks_cache[block_number]
target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number)
blocks_cache[block_number] = target_block_timestamp
return target_block_timestamp
assert max_blocks_batch > 0
if block_number in blocks_cache:
@ -71,7 +86,9 @@ def get_block_timestamp(
block_model = get_block_model(blockchain_type)
blocks = (
db_session.query(block_model.block_number, block_model.timestamp)
db_session.query(
block_model.block_number, block_model.timestamp, block_model.hash
)
.filter(
and_(
block_model.block_number >= block_number - max_blocks_batch - 1,
@ -108,6 +125,7 @@ def _crawl_events(
to_block: int,
blocks_cache: Dict[int, int] = {},
db_block_query_batch=10,
version: int = 2,
) -> List[Event]:
all_events = []
for job in jobs:
@ -129,6 +147,7 @@ def _crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -138,6 +157,7 @@ def _crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)
@ -154,21 +174,25 @@ def _autoscale_crawl_events(
blocks_cache: Dict[int, int] = {},
batch_size: int = 1000,
db_block_query_batch=10,
version: int = 2,
) -> Tuple[List[Event], int]:
"""
Crawl events with auto regulated batch_size.
"""
all_events = []
for job in jobs:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
try:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
except Exception as e:
breakpoint()
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
@ -177,6 +201,7 @@ def _autoscale_crawl_events(
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
version,
)
event = Event(
event_name=raw_event["event"],
@ -186,6 +211,7 @@ def _autoscale_crawl_events(
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
block_hash=raw_event.get("blockHash"),
)
all_events.append(event)

Wyświetl plik

@ -1,7 +1,7 @@
import logging
from typing import List
from typing import List, Union
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonworm.crawler.function_call_crawler import ( # type: ignore
ContractFunctionCall,
@ -10,6 +10,7 @@ from moonworm.crawler.function_call_crawler import ( # type: ignore
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider # type: ignore
from moonworm.watch import MockState # type: ignore
from sqlalchemy.orm import Session
from web3 import Web3
@ -22,7 +23,7 @@ logger = logging.getLogger(__name__)
def _crawl_functions(
blockchain_type: AvailableBlockchainType,
ethereum_state_provider: MoonstreamEthereumStateProvider,
ethereum_state_provider: Union[MoonstreamEthereumStateProvider, Web3StateProvider],
jobs: List[FunctionCallCrawlJob],
from_block: int,
to_block: int,
@ -57,9 +58,12 @@ def function_call_crawler(
start_block: int,
end_block: int,
batch_size: int,
version: int = 2,
):
if version != 2:
raise ValueError("Only version 2 is supported")
try:
network = blockchain_type_to_network_type(blockchain_type=blockchain_type)
network = blockchain_type_to_network_type(blockchain_type=blockchain_type) # type: ignore
except Exception as e:
raise Exception(e)

Wyświetl plik

@ -1,14 +1,17 @@
import logging
import time
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union, Any
from uuid import UUID
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.networks import blockchain_type_to_network_type # type: ignore
from moonstreamtypes.blockchain import AvailableBlockchainType
from moonstreamtypes.networks import blockchain_type_to_network_type
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
Network,
)
from moonworm.crawler.ethereum_state_provider import Web3StateProvider # type: ignore
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -21,6 +24,7 @@ from .crawler import (
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _autoscale_crawl_events, _crawl_events
from .function_call_crawler import _crawl_functions
from ..settings import CRAWLER_LABEL, SEER_CRAWLER_LABEL
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -28,7 +32,7 @@ logger = logging.getLogger(__name__)
def historical_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
blockchain_type: AvailableBlockchainType, # AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
@ -39,6 +43,7 @@ def historical_crawler(
web3_uri: Optional[str] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
max_insert_batch: int = 10000,
version: int = 2,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -57,11 +62,18 @@ def historical_crawler(
except Exception as e:
raise Exception(e)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
evm_state_provider = Web3StateProvider(web3)
label = SEER_CRAWLER_LABEL
if version == 2:
### Moonstream state provider use the V2 db to get the block
evm_state_provider = MoonstreamEthereumStateProvider(
web3,
network, # type: ignore
db_session,
)
label = CRAWLER_LABEL
logger.info(f"Starting historical event crawler start_block={start_block}")
@ -93,6 +105,7 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
else:
@ -105,6 +118,7 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
version=version,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
@ -117,11 +131,15 @@ def historical_crawler(
db_session,
all_events[i : i + max_insert_batch],
blockchain_type,
version,
label_name=label,
)
else:
add_events_to_session(db_session, all_events, blockchain_type)
add_events_to_session(
db_session, all_events, blockchain_type, version, label_name=label
)
if function_call_crawl_jobs:
logger.info(
@ -129,7 +147,7 @@ def historical_crawler(
)
all_function_calls = _crawl_functions(
blockchain_type,
ethereum_state_provider,
evm_state_provider,
function_call_crawl_jobs,
batch_end_block,
start_block,
@ -145,11 +163,17 @@ def historical_crawler(
db_session,
all_function_calls[i : i + max_insert_batch],
blockchain_type,
version,
label_name=label,
)
else:
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
db_session,
all_function_calls,
blockchain_type,
version,
label_name=label,
)
if addresses_deployment_blocks:

Wyświetl plik

@ -3,7 +3,7 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamtypes.blockchain import AvailableBlockchainType
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
@ -45,6 +45,7 @@ DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "moonworm-alpha"
SEER_CRAWLER_LABEL = "seer"
VIEW_STATE_CRAWLER_LABEL = "view-state-alpha"
METADATA_CRAWLER_LABEL = "metadata-crawler"
@ -196,6 +197,42 @@ if MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI == "":
"MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI == "":
raise Exception("MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI env variable is not set")
MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI == "":
raise Exception("MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI env variable is not set")
MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI = os.environ.get(
"MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI", ""
)
if MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI == "":
raise Exception(
"MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_CRAWL_WORKERS = 4
MOONSTREAM_CRAWL_WORKERS_RAW = os.environ.get("MOONSTREAM_CRAWL_WORKERS")
@ -393,3 +430,19 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)
MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN", ""
)
# state crawler
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID = os.environ.get(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID", ""
)
if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
)

Wyświetl plik

@ -11,15 +11,21 @@ from typing import Any, Dict, List, Optional
from uuid import UUID
from moonstream.client import Moonstream # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from web3.middleware import geth_poa_middleware
from moonstreamtypes.blockchain import AvailableBlockchainType
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from ..actions import recive_S3_data_from_query
from ..actions import recive_S3_data_from_query, get_all_entries_from_search
from ..blockchain import connect
from ..data import ViewTasks
from ..db import PrePing_SessionLocal
from ..settings import INFURA_PROJECT_ID, infura_networks, multicall_contracts
from ..settings import (
bugout_client as bc,
INFURA_PROJECT_ID,
infura_networks,
multicall_contracts,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
)
from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2
from .web3_util import FunctionSignature
@ -509,11 +515,49 @@ def handle_crawl(args: argparse.Namespace) -> None:
Read all view methods of the contracts and crawl
"""
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
blockchain_type = AvailableBlockchainType(args.blockchain)
if args.jobs_file is not None:
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
else:
logger.info("Reading jobs from the journal")
jobs = []
# Bugout
query = f"#state_job #blockchain:{blockchain_type.value}"
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
if len(existing_jobs) == 0:
logger.info("No jobs found in the journal")
return
for job in existing_jobs:
try:
if job.content is None:
logger.error(f"Job content is None for entry {job.entry_url}")
continue
### parse json
job_content = json.loads(job.content)
### validate via ViewTasks
ViewTasks(**job_content)
jobs.append(job_content)
except Exception as e:
logger.error(f"Job validation of entry {job.entry_url} failed: {e}")
continue
custom_web3_provider = args.web3_uri
if args.infura and INFURA_PROJECT_ID is not None:
@ -573,6 +617,100 @@ def clean_labels_handler(args: argparse.Namespace) -> None:
db_session.close()
def migrate_state_tasks_handler(args: argparse.Namespace) -> None:
### Get all tasks from files
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
# file example jobs/ethereum-jobs.json
blockchain_type = AvailableBlockchainType(args.blockchain)
migrated_blockchain = blockchain_type.value
### Get all tasks from the journal
query = f"#state_job #{migrated_blockchain}"
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
existing_state_tasks_list = []
logger.info(f"Existing jobs: {len(existing_jobs)}")
logger.info(f"New jobs: {jobs}")
### validate existing jobs
for bugout_job in existing_jobs:
try:
if bugout_job.content is None:
logger.error(f"Job content is None for entry {bugout_job.entry_url}")
continue
### parse json
job_content = json.loads(bugout_job.content)
### validate via ViewTasks
ViewTasks(**job_content)
except Exception as e:
logger.error(f"Job validation of entry {bugout_job.entry_url} failed: {e}")
continue
### from tags get blockchain, name and address
for tag in bugout_job.tags:
if tag.startswith("blockchain"):
blockchain = tag.split(":")[1]
if tag.startswith("name"):
name = tag.split(":")[1]
if tag.startswith("address"):
address = tag.split(":")[1]
existing_state_tasks_list.append(f"{blockchain}:{name}:{address}")
### Get all tasks from files
for job in jobs:
name = job["name"]
address = job["address"]
### Deduplicate tasks
if f"{migrated_blockchain}:{name}:{address}" not in existing_state_tasks_list:
### create new task
json_str = json.dumps(job, indent=4)
### add tabs to json string for better readability
json_str_with_tabs = "\n".join(
"\t" + line for line in json_str.splitlines()
)
try:
bc.create_entry(
title=f"{name}:{address}",
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
content=json_str_with_tabs,
tags=[
"state_job",
f"blockchain:{migrated_blockchain}",
f"name:{name}",
f"address:{address}",
],
)
except Exception as e:
logger.error(f"Error creating entry: {e}")
continue
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
@ -615,7 +753,7 @@ def main() -> None:
"-j",
type=str,
help="Path to json file with jobs",
required=True,
required=False,
)
view_state_crawler_parser.add_argument(
"--batch-size",
@ -626,6 +764,28 @@ def main() -> None:
)
view_state_crawler_parser.set_defaults(func=handle_crawl)
view_state_migration_parser = subparsers.add_parser(
"migrate-jobs",
help="Migrate jobs from one files to bugout",
)
view_state_migration_parser.add_argument(
"--jobs-file",
"-j",
type=str,
help="Path to json file with jobs",
required=True,
)
view_state_migration_parser.add_argument(
"--blockchain",
"-b",
type=str,
help="Type of blovkchain wich writng in database",
required=True,
)
view_state_migration_parser.set_defaults(func=migrate_state_tasks_handler)
view_state_cleaner = subparsers.add_parser(
"clean-state-labels",
help="Clean labels from database",

Wyświetl plik

@ -2,7 +2,7 @@ import json
import logging
from typing import Any, Dict
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy.orm import Session
from ..settings import VIEW_STATE_CRAWLER_LABEL

Wyświetl plik

@ -3,7 +3,6 @@ export BUGOUT_BROOD_URL="https://auth.bugout.dev"
export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Engine environment variables
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"
@ -19,6 +18,8 @@ export MOONSTREAM_DATA_JOURNAL_ID="<Bugout_journal_id_for_moonstream>"
export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Bugout_access_token_for_moonstream>"
export NFT_HUMBUG_TOKEN="<Token_for_nft_crawler>"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<moonstream_admin_access_token>"
# Blockchain nodes environment variables
export MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
@ -38,6 +39,12 @@ export MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI="https://<connection_path_uri_to
export MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_GAME7_ORBIT_ARBITRUM_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_MANTLE_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_MANTLE_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_STARKNET_SEPOLIA_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
export MOONSTREAM_NODE_STARKNET_A_EXTERNAL_URI="https://<connection_path_uri_to_node>"
# AWS environment variables
export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_for_smart_contracts>"
@ -50,7 +57,6 @@ export MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET="<AWS_S3_bucket_to_store_dashboar
export MOONSTREAM_ETHERSCAN_TOKEN="<Token_for_etherscan>"
export COINMARKETCAP_API_KEY="<API_key_to_parse_conmarketcap>"
# Custom crawler
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET="<public_bucket>"
export MOONSTREAM_S3_PUBLIC_DATA_BUCKET_PREFIX="dev"
@ -58,4 +64,8 @@ export MOONSTREAM_PUBLIC_QUERIES_DATA_ACCESS_TOKEN="<access token for run querie
export INFURA_PROJECT_ID="<infura_project_id>"
# Leaderboard worker
export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID=<Bugout_journal_id_for_leaderboards>
export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID="<Bugout_journal_id_for_leaderboards>"
# DB v3 controller
export MOONSTREAM_DB_V3_CONTROLLER_API="https://mdb-v3-api.moonstream.to"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"

Wyświetl plik

@ -38,8 +38,10 @@ setup(
"chardet",
"fastapi",
"moonstreamdb>=0.4.4",
"moonstreamdb-v3>=0.0.10",
"moonstream-types>=0.0.3",
"moonstream>=0.1.1",
"moonworm[moonstream]==0.9.0",
"moonworm[moonstream]>=0.9.2",
"humbug",
"pydantic==1.9.2",
"python-dateutil",