Add fixes for v3.

pull/1085/head
Andrey 2024-06-11 19:04:25 +03:00
rodzic 47d3b55ab0
commit e08df9f142
4 zmienionych plików z 204 dodań i 39 usunięć

Wyświetl plik

@ -9,6 +9,9 @@ from moonstreamdb.blockchain import (
get_transaction_model,
)
from moonstreamdb.models import EthereumBlock, EthereumTransaction
from moonstreamdbv3.blockchain import (
AvailableBlockchainType as AvailableBlockchainTypeV3,
)
from psycopg2.errors import UniqueViolation # type: ignore
from sqlalchemy import Column, desc, func
from sqlalchemy.exc import IntegrityError
@ -39,6 +42,7 @@ from .settings import (
MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI,
MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI,
MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI,
WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS,
)
@ -63,42 +67,85 @@ def connect(
request_kwargs: Dict[str, Any] = {"headers": {"Content-Type": "application/json"}}
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
if isinstance(blockchain_type, AvailableBlockchainType):
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainType.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
elif isinstance(blockchain_type, AvailableBlockchainTypeV3):
if blockchain_type == AvailableBlockchainTypeV3.ETHEREUM:
web3_uri = MOONSTREAM_NODE_ETHEREUM_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.POLYGON:
web3_uri = MOONSTREAM_NODE_POLYGON_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.MUMBAI:
web3_uri = MOONSTREAM_NODE_MUMBAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AMOY:
web3_uri = MOONSTREAM_NODE_AMOY_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XDAI:
web3_uri = MOONSTREAM_NODE_XDAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ZKSYNC_ERA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ZKSYNC_ERA_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ZKSYNC_ERA_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_ONE:
web3_uri = MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_NOVA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_NOVA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.ARBITRUM_SEPOLIA:
web3_uri = MOONSTREAM_NODE_ARBITRUM_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XAI:
web3_uri = MOONSTREAM_NODE_XAI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.XAI_SEPOLIA:
web3_uri = MOONSTREAM_NODE_XAI_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AVALANCHE:
web3_uri = MOONSTREAM_NODE_AVALANCHE_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.AVALANCHE_FUJI:
web3_uri = MOONSTREAM_NODE_AVALANCHE_FUJI_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.BLAST:
web3_uri = MOONSTREAM_NODE_BLAST_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.BLAST_SEPOLIA:
web3_uri = MOONSTREAM_NODE_BLAST_SEPOLIA_A_EXTERNAL_URI
elif blockchain_type == AvailableBlockchainTypeV3.PROOFOFPLAY_APEX:
web3_uri = MOONSTREAM_NODE_PROOFOFPLAY_APEX_A_EXTERNAL_URI
elif (
blockchain_type
== AvailableBlockchainTypeV3.GAME7_ORBIT_ARBITRUM_SEPOLIA
):
web3_uri = MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI
else:
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
request_kwargs["timeout"] = WEB3_CLIENT_REQUEST_TIMEOUT_SECONDS

Wyświetl plik

@ -9,6 +9,9 @@ import requests
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.subscriptions import blockchain_type_to_subscription_type
from moonstreamdbv3.db import MoonstreamDBEngine, MoonstreamDBIndexesEngine
from moonstreamdbv3.blockchain import (
AvailableBlockchainType as AvailableBlockchainTypeV3,
)
from web3 import Web3
from web3.middleware import geth_poa_middleware
@ -492,8 +495,8 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
Historical crawl for MoonstreamDB v3
"""
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
blockchain_type = AvailableBlockchainTypeV3(args.blockchain_type)
##subscription_type = blockchain_type_to_subscription_type(blockchain_type)
addresses_filter = []
if args.address is not None:
@ -527,6 +530,7 @@ def handle_historical_crawl_v3(args: argparse.Namespace) -> None:
customer_connection = get_db_connection(args.customer_uuid)
filtered_function_call_jobs = [] # v1
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
@ -941,6 +945,112 @@ def main() -> None:
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
historical_crawl_parser_v3 = subparsers.add_parser(
"historical-crawl-v3", help="Crawl historical data"
)
historical_crawl_parser_v3.add_argument(
"--address",
"-a",
required=False,
type=str,
)
historical_crawl_parser_v3.add_argument(
"--start",
"-s",
type=int,
default=None,
)
historical_crawl_parser_v3.add_argument(
"--end",
"-e",
type=int,
required=False,
)
historical_crawl_parser_v3.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
historical_crawl_parser_v3.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
historical_crawl_parser_v3.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
historical_crawl_parser_v3.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=80,
help="Maximum number of blocks to crawl in a single batch",
)
historical_crawl_parser_v3.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.1,
help="Minimum time to sleep between crawl step",
)
historical_crawl_parser_v3.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
historical_crawl_parser_v3.add_argument(
"--only-events",
action="store_true",
default=False,
help="Only crawl events",
)
historical_crawl_parser_v3.add_argument(
"--only-functions",
action="store_true",
default=False,
help="Only crawl function calls",
)
historical_crawl_parser_v3.add_argument(
"--find-deployed-blocks",
action="store_true",
default=False,
help="Find all deployed blocks",
)
historical_crawl_parser_v3.add_argument(
"--customer-uuid",
type=UUID,
required=True,
help="Customer UUID",
)
historical_crawl_parser_v3.add_argument(
"--user-uuid",
type=UUID,
required=False,
help="User UUID",
)
historical_crawl_parser_v3.set_defaults(func=handle_historical_crawl_v3)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -739,7 +739,7 @@ def get_event_crawl_job_records(
else:
new_crawl_job = EventCrawlJob(
event_abi_hash=str(crawl_job_record.abi_selector),
event_abi=json.loads(json.loads(str(crawl_job_record.abi))),
event_abi=json.loads(str(crawl_job_record.abi)),
contracts=[str_address],
address_entries={
crawl_job_record.address.hex(): {

Wyświetl plik

@ -196,6 +196,14 @@ if MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI == "":
"MOONSTREAM_NODE_ARBITRUM_ONE_A_EXTERNAL_URI env variable is not set"
)
MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI = os.environ.get(
"MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI", ""
)
if MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI == "":
raise Exception(
"MOONSTREAM_GAME7_ORBIT_ARBITRUM_SEPOLIA_WEB3_PROVIDER_URI env variable is not set"
)
MOONSTREAM_CRAWL_WORKERS = 4
MOONSTREAM_CRAWL_WORKERS_RAW = os.environ.get("MOONSTREAM_CRAWL_WORKERS")