Tested with ethereum crawlers

pull/373/head
kompotkot 2021-11-09 11:14:03 +00:00
rodzic 273044302a
commit baea584a9e
3 zmienionych plików z 37 dodań i 22 usunięć

Wyświetl plik

@ -7,7 +7,7 @@ from moonstreamdb.models import (
EthereumBlock, EthereumBlock,
EthereumTransaction, EthereumTransaction,
PolygonBlock, PolygonBlock,
PolygonTransactions, PolygonTransaction,
) )
from psycopg2.errors import UniqueViolation # type: ignore from psycopg2.errors import UniqueViolation # type: ignore
from sqlalchemy import Column, desc, func from sqlalchemy import Column, desc, func
@ -106,7 +106,9 @@ def add_block_transactions(db_session, block: Any) -> None:
db_session.add(tx_obj) db_session.add(tx_obj)
def get_latest_blocks(blockchain_type: AvailableBlockchainType,confirmations: int = 0) -> Tuple[Optional[int], int]: def get_latest_blocks(
blockchain_type: AvailableBlockchainType, confirmations: int = 0
) -> Tuple[Optional[int], int]:
""" """
Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method). Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method).
@ -131,7 +133,11 @@ def get_latest_blocks(blockchain_type: AvailableBlockchainType,confirmations: in
return latest_stored_block_number, latest_block_number return latest_stored_block_number, latest_block_number
def crawl_blocks(blockchain_type: AvailableBlockchainType, blocks_numbers: List[int], with_transactions: bool = False) -> None: def crawl_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
with_transactions: bool = False,
) -> None:
""" """
Open database and geth sessions and fetch block data from blockchain. Open database and geth sessions and fetch block data from blockchain.
""" """
@ -171,7 +177,11 @@ def crawl_blocks(blockchain_type: AvailableBlockchainType, blocks_numbers: List[
pbar.close() pbar.close()
def check_missing_blocks(blockchain_type: AvailableBlockchainType, blocks_numbers: List[int], notransactions=False) -> List[int]: def check_missing_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
notransactions=False,
) -> List[int]:
""" """
Query block from postgres. If block does not presented in database, Query block from postgres. If block does not presented in database,
add to missing blocks numbers list. add to missing blocks numbers list.
@ -282,7 +292,9 @@ def crawl_blocks_executor(
for worker in worker_indices: for worker in worker_indices:
block_chunk = worker_job_lists[worker] block_chunk = worker_job_lists[worker]
logger.info(f"Spawned process for {len(block_chunk)} blocks") logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(crawl_blocks, blockchain_type, block_chunk, with_transactions) result = executor.submit(
crawl_blocks, blockchain_type, block_chunk, with_transactions
)
result.add_done_callback(record_error) result.add_done_callback(record_error)
results.append(result) results.append(result)

Wyświetl plik

@ -8,7 +8,8 @@ from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from web3 import Web3 from web3 import Web3
from ..ethereum import connect from ..blockchain import connect
from ..data import AvailableBlockchainType
from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@ -116,7 +117,7 @@ def run_crawler_desc(
def handle_parser(args: argparse.Namespace): def handle_parser(args: argparse.Namespace):
with yield_db_session_ctx() as session: with yield_db_session_ctx() as session:
w3 = connect() w3 = connect(AvailableBlockchainType(args.blockchain))
if args.order == "asc": if args.order == "asc":
run_crawler_asc( run_crawler_asc(
w3=w3, w3=w3,
@ -184,6 +185,11 @@ def generate_parser():
default=3 * 60, default=3 * 60,
help="time to sleep synzhronize mode waiting for new block crawled to db", help="time to sleep synzhronize mode waiting for new block crawled to db",
) )
parser.add_argument(
"--blockchain",
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser.set_defaults(func=handle_parser) parser.set_defaults(func=handle_parser)
return parser return parser

Wyświetl plik

@ -93,7 +93,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
""" """
while True: while True:
latest_stored_block_number, latest_block_number = get_latest_blocks( latest_stored_block_number, latest_block_number = get_latest_blocks(
args.blockchain_type, args.confirmations AvailableBlockchainType(args.blockchain), args.confirmations
) )
if latest_stored_block_number is None: if latest_stored_block_number is None:
latest_stored_block_number = 0 latest_stored_block_number = 0
@ -133,7 +133,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
) )
# TODO(kompotkot): Set num_processes argument based on number of blocks to synchronize. # TODO(kompotkot): Set num_processes argument based on number of blocks to synchronize.
crawl_blocks_executor( crawl_blocks_executor(
blockchain_type=args.blockchain_type, blockchain_type=AvailableBlockchainType(args.blockchain),
block_numbers_list=blocks_numbers_list, block_numbers_list=blocks_numbers_list,
with_transactions=True, with_transactions=True,
num_processes=args.jobs, num_processes=args.jobs,
@ -152,7 +152,7 @@ def crawler_blocks_add_handler(args: argparse.Namespace) -> None:
for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks): for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks):
logger.info(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}") logger.info(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
crawl_blocks_executor( crawl_blocks_executor(
blockchain_type=args.blockchain_type, blockchain_type=AvailableBlockchainType(args.blockchain),
block_numbers_list=blocks_numbers_list, block_numbers_list=blocks_numbers_list,
with_transactions=True, with_transactions=True,
) )
@ -175,7 +175,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
f"with comparing transactions: {not args.notransactions}" f"with comparing transactions: {not args.notransactions}"
) )
missing_blocks_numbers = check_missing_blocks( missing_blocks_numbers = check_missing_blocks(
blockchain_type=args.blockchain_type, blockchain_type=AvailableBlockchainType(args.blockchain),
blocks_numbers=blocks_numbers_list, blocks_numbers=blocks_numbers_list,
notransactions=args.notransactions, notransactions=args.notransactions,
) )
@ -190,7 +190,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
if (len(missing_blocks_numbers_total)) > 0: if (len(missing_blocks_numbers_total)) > 0:
time.sleep(5) time.sleep(5)
crawl_blocks_executor( crawl_blocks_executor(
blockchain_type=args.blockchain_type, blockchain_type=AvailableBlockchainType(args.blockchain),
block_numbers_list=missing_blocks_numbers_total, block_numbers_list=missing_blocks_numbers_total,
with_transactions=True, with_transactions=True,
num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS, num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS,
@ -290,10 +290,9 @@ def main() -> None:
), ),
) )
parser_crawler_blocks_sync.add_argument( parser_crawler_blocks_sync.add_argument(
"-t", "--blockchain",
"--blockchain-type",
required=True, required=True,
help=f"Available blockchain types: {[member for member in AvailableBlockchainType.__members__]}", help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
) )
parser_crawler_blocks_sync.set_defaults(func=crawler_blocks_sync_handler) parser_crawler_blocks_sync.set_defaults(func=crawler_blocks_sync_handler)
@ -307,10 +306,9 @@ def main() -> None:
help="List of blocks range in format {bottom_block}-{top_block}", help="List of blocks range in format {bottom_block}-{top_block}",
) )
parser_crawler_blocks_add.add_argument( parser_crawler_blocks_add.add_argument(
"-t", "--blockchain",
"--blockchain-type",
required=True, required=True,
help=f"Available blockchain types: {[member for member in AvailableBlockchainType.__members__]}", help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
) )
parser_crawler_blocks_add.set_defaults(func=crawler_blocks_add_handler) parser_crawler_blocks_add.set_defaults(func=crawler_blocks_add_handler)
@ -337,10 +335,9 @@ def main() -> None:
help="Lazy block adding one by one", help="Lazy block adding one by one",
) )
parser_crawler_blocks_missing.add_argument( parser_crawler_blocks_missing.add_argument(
"-t", "--blockchain",
"--blockchain-type",
required=True, required=True,
help=f"Available blockchain types: {[member for member in AvailableBlockchainType.__members__]}", help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
) )
parser_crawler_blocks_missing.set_defaults(func=crawler_blocks_missing_handler) parser_crawler_blocks_missing.set_defaults(func=crawler_blocks_missing_handler)