Revert "Client id to work with node balancer"

pull/542/head
Sergei Sumarokov 2022-01-24 20:35:46 +03:00 zatwierdzone przez GitHub
rodzic 55ef13ebb7
commit 7e0c7b52ba
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
15 zmienionych plików z 31 dodań i 100 usunięć

Wyświetl plik

@ -8,6 +8,4 @@ User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \
--client-id "ethereum-missing" \
blocks missing --blockchain ethereum -n
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks missing --blockchain ethereum -n

Wyświetl plik

@ -11,9 +11,7 @@ User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \
--client-id "ethereum-synchronize" \
blocks synchronize --blockchain ethereum -c 6 -j 2
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 2
SyslogIdentifier=ethereum-synchronize
[Install]

Wyświetl plik

@ -8,6 +8,4 @@ User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \
--client-id "polygon-missing" \
blocks missing --blockchain polygon -n
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks missing --blockchain polygon -n

Wyświetl plik

@ -11,9 +11,7 @@ WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli \
--client-id "polygon-moonworm-crawler" \
crawl -b polygon
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli crawl -b polygon
SyslogIdentifier=polygon-moonworm-crawler
[Install]

Wyświetl plik

@ -11,9 +11,7 @@ User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \
--client-id "polygon-synchronize" \
blocks synchronize --blockchain polygon -c 60 -j 2
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain polygon -c 60 -j 2
SyslogIdentifier=polygon-synchronize
[Install]

Wyświetl plik

@ -25,7 +25,6 @@ from .settings import (
MOONSTREAM_CRAWL_WORKERS,
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI,
MOONSTREAM_POLYGON_WEB3_PROVIDER_URI,
MOONSTREAM_CLIENT_ID_HEADER,
)
logger = logging.getLogger(__name__)
@ -38,17 +37,9 @@ class BlockCrawlError(Exception):
"""
def connect(
blockchain_type: AvailableBlockchainType,
web3_uri: Optional[str] = None,
client_id: Optional[str] = None,
):
def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
request_kwargs: Any = None
if client_id is not None:
request_kwargs = {"headers": {MOONSTREAM_CLIENT_ID_HEADER: client_id}}
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI
@ -58,10 +49,7 @@ def connect(
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(
endpoint_uri=web3_uri,
request_kwargs=request_kwargs,
)
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
@ -193,18 +181,15 @@ def add_block_transactions(
def get_latest_blocks(
blockchain_type: AvailableBlockchainType,
confirmations: int = 0,
client_id: Optional[str] = None,
blockchain_type: AvailableBlockchainType, confirmations: int = 0
) -> Tuple[Optional[int], int]:
"""
Retrieve the latest block from the connected node (connection is created by the
connect(AvailableBlockchainType, ClientTokenID) method).
Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method).
If confirmations > 0, and the latest block on the node has block number N, this returns the block
with block_number (N - confirmations)
"""
web3_client = connect(blockchain_type, client_id=client_id)
web3_client = connect(blockchain_type)
latest_block_number: int = web3_client.eth.block_number
if confirmations > 0:
latest_block_number -= confirmations
@ -227,12 +212,11 @@ def crawl_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
with_transactions: bool = False,
client_id: Optional[str] = None,
) -> None:
"""
Open database and geth sessions and fetch block data from blockchain.
"""
web3_client = connect(blockchain_type, client_id=client_id)
web3_client = connect(blockchain_type)
with yield_db_session_ctx() as db_session:
pbar = tqdm(total=len(blocks_numbers))
for block_number in blocks_numbers:
@ -272,7 +256,6 @@ def check_missing_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
notransactions=False,
client_id: Optional[str] = None,
) -> List[int]:
"""
Query block from postgres. If block does not presented in database,
@ -311,7 +294,7 @@ def check_missing_blocks(
[block[0], block[1]] for block in blocks_exist_raw_query.all()
]
web3_client = connect(blockchain_type, client_id=client_id)
web3_client = connect(blockchain_type)
blocks_exist_len = len(blocks_exist)
pbar = tqdm(total=blocks_exist_len)
@ -353,7 +336,6 @@ def crawl_blocks_executor(
block_numbers_list: List[int],
with_transactions: bool = False,
num_processes: int = MOONSTREAM_CRAWL_WORKERS,
client_id: Optional[str] = None,
) -> None:
"""
Execute crawler in processes.
@ -362,7 +344,6 @@ def crawl_blocks_executor(
block_numbers_list - List of block numbers to add to database.
with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table.
num_processes - Number of processes to use to feed blocks into database.
client_id - Client identifier
Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError.
The error message is a list of all the things that went wrong in the crawl.
@ -382,20 +363,14 @@ def crawl_blocks_executor(
results: List[Future] = []
if num_processes == 1:
logger.warning("Executing block crawler in lazy mod")
return crawl_blocks(
blockchain_type, block_numbers_list, with_transactions, client_id
)
return crawl_blocks(blockchain_type, block_numbers_list, with_transactions)
else:
with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in worker_indices:
block_chunk = worker_job_lists[worker]
logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(
crawl_blocks,
blockchain_type,
block_chunk,
with_transactions,
client_id,
crawl_blocks, blockchain_type, block_chunk, with_transactions
)
result.add_done_callback(record_error)
results.append(result)

Wyświetl plik

@ -117,7 +117,7 @@ def run_crawler_desc(
def handle_parser(args: argparse.Namespace):
with yield_db_session_ctx() as session:
w3 = connect(AvailableBlockchainType.ETHEREUM, client_id=args.client_id)
w3 = connect(AvailableBlockchainType.ETHEREUM)
if args.order == "asc":
run_crawler_asc(
w3=w3,
@ -153,11 +153,6 @@ def generate_parser():
"""
parser = argparse.ArgumentParser(description="Moonstream Deployment Crawler")
parser.add_argument(
"--client-id",
type=str,
help="Client token ID",
)
parser.add_argument(
"--start", "-s", type=int, default=None, help="block to start crawling from"
)

Wyświetl plik

@ -93,7 +93,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
"""
while True:
latest_stored_block_number, latest_block_number = get_latest_blocks(
AvailableBlockchainType(args.blockchain), args.confirmations, args.client_id
AvailableBlockchainType(args.blockchain), args.confirmations
)
if latest_stored_block_number is None:
latest_stored_block_number = 0
@ -137,7 +137,6 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
block_numbers_list=blocks_numbers_list,
with_transactions=True,
num_processes=args.jobs,
client_id=args.client_id,
)
logger.info(
f"Synchronized blocks from {latest_stored_block_number} to {latest_block_number}"
@ -156,8 +155,6 @@ def crawler_blocks_add_handler(args: argparse.Namespace) -> None:
blockchain_type=AvailableBlockchainType(args.blockchain),
block_numbers_list=blocks_numbers_list,
with_transactions=True,
num_processes=MOONSTREAM_CRAWL_WORKERS,
client_id=args.client_id,
)
logger.info(
@ -180,7 +177,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
confirmations = 150
shift = 2000
_, latest_block_number = get_latest_blocks(
AvailableBlockchainType(args.blockchain), confirmations, args.client_id
AvailableBlockchainType(args.blockchain), confirmations
)
block_range = f"{latest_block_number-shift}-{latest_block_number}"
@ -209,7 +206,6 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
block_numbers_list=missing_blocks_numbers_total,
with_transactions=True,
num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS,
client_id=args.client_id,
)
logger.info(
f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers "
@ -250,12 +246,6 @@ def main() -> None:
time_now = datetime.now(timezone.utc)
parser.add_argument(
"--client-id",
type=str,
help="Client token ID",
)
# Blockchain blocks parser
parser_crawler_blocks = subcommands.add_parser(
"blocks", description="Blockchain blocks commands"

Wyświetl plik

@ -54,9 +54,7 @@ def handle_crawl(args: argparse.Namespace) -> None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(
blockchain_type=blockchain_type, client_id=args.client_id
)
web3 = _retry_connect_web3(blockchain_type)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(

Wyświetl plik

@ -85,7 +85,6 @@ def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
client_id: Optional[str] = None,
) -> Web3:
"""
Retry connecting to the blockchain.
@ -93,7 +92,7 @@ def _retry_connect_web3(
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type, client_id=client_id)
web3 = connect(blockchain_type)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3

Wyświetl plik

@ -55,11 +55,7 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
raise ValueError(
"Could not find Web3 connection information in arguments or in MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable"
)
return connect(
AvailableBlockchainType.ETHEREUM,
web3_connection_string,
client_id=args.client_id,
)
return connect(AvailableBlockchainType.ETHEREUM, web3_connection_string)
def get_latest_block_from_node(web3_client: Web3):
@ -262,12 +258,6 @@ def main() -> None:
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers(description="Subcommands")
parser.add_argument(
"--client-id",
type=str,
help="Client token ID",
)
parser_ethereum = subcommands.add_parser(
"ethereum",
description="Collect information about NFTs from Ethereum blockchains",

Wyświetl plik

@ -26,13 +26,10 @@ ORIGINS = RAW_ORIGINS.split(",")
# OpenAPI
DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "moonworm-alpha"
MOONSTREAM_CLIENT_ID_HEADER = os.environ.get(
"MOONSTREAM_CLIENT_ID_HEADER", "x-moonstream-client-id"
)
# Geth connection address
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get(
"MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", ""

Wyświetl plik

@ -8,7 +8,7 @@ import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List
from uuid import UUID
import boto3 # type: ignore
@ -337,9 +337,7 @@ def generate_list_of_names(
def process_external(
abi_external_calls: List[Dict[str, Any]],
blockchain: AvailableBlockchainType,
client_id: Optional[str] = None,
abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType
):
"""
Request all required external data
@ -385,7 +383,7 @@ def process_external(
logger.error(f"Error processing external call: {e}")
if external_calls:
web3_client = connect(blockchain, client_id=client_id)
web3_client = connect(blockchain)
for extcall in external_calls:
try:
@ -436,7 +434,6 @@ def generate_web3_metrics(
address: str,
crawler_label: str,
abi_json: Any,
client_id: Optional[str] = None,
) -> List[Any]:
"""
Generate stats for cards components
@ -449,7 +446,6 @@ def generate_web3_metrics(
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
client_id=client_id,
)
extention_data.append(
@ -615,7 +611,6 @@ def stats_generate_handler(args: argparse.Namespace):
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
client_id=args.client_id,
)
# Generate blocks state information

Wyświetl plik

@ -36,7 +36,7 @@ func logMiddleware(next http.Handler) http.Handler {
if err != nil {
log.Printf("Unable to parse client IP: %s\n", r.RemoteAddr)
} else {
log.Printf("%s %s %s %s\n", ip, r.Method, r.URL.Path, r.Header.Get("X-Origin-Path"))
log.Printf("%s %s %s\n", ip, r.Method, r.URL.Path)
}
})
}

Wyświetl plik

@ -30,8 +30,10 @@ var ConfigList NodeConfigList
var MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_IPC_PORT = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_PORT")
var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
var MOONSTREAM_NODE_POLYGON_IPC_PORT = os.Getenv("MOONSTREAM_NODE_POLYGON_IPC_PORT")
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
var MOONSTREAM_CLIENT_ID_HEADER = os.Getenv("MOONSTREAM_CLIENT_ID_HEADER")
@ -54,8 +56,8 @@ func checkEnvVarSet() {
MOONSTREAM_CLIENT_ID_HEADER = "x-moonstream-client-id"
}
if MOONSTREAM_NODES_SERVER_PORT == "" {
log.Fatal("Environment variable MOONSTREAM_NODES_SERVER_PORT should be set")
if MOONSTREAM_NODES_SERVER_PORT == "" || MOONSTREAM_NODE_ETHEREUM_IPC_PORT == "" || MOONSTREAM_NODE_POLYGON_IPC_PORT == "" {
log.Fatal("Some of environment variables not set")
}
}
@ -68,12 +70,12 @@ func (nc *NodeConfigList) InitNodeConfigList() {
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "ethereum",
IPs: []string{MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR},
Port: "8545",
Port: MOONSTREAM_NODE_ETHEREUM_IPC_PORT,
})
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
Blockchain: "polygon",
IPs: []string{MOONSTREAM_NODE_POLYGON_A_IPC_ADDR, MOONSTREAM_NODE_POLYGON_B_IPC_ADDR},
Port: "8545",
Port: MOONSTREAM_NODE_POLYGON_IPC_PORT,
})
// Parse node addr, ip and blockchain