tqdm bar for block crawlers

pull/282/head
kompotkot 2021-09-24 14:31:34 +00:00
rodzic 7e4f0f6ce0
commit 7d6853d0c8
2 zmienionych plików z 31 dodań i 30 usunięć

Wyświetl plik

@ -48,7 +48,7 @@ def yield_blocks_numbers_lists(
input_start_block = int(blocks_start_end[0])
input_end_block = int(blocks_start_end[1])
except Exception:
print(
logger.error(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
)
raise Exception
@ -144,12 +144,14 @@ def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None:
startTime = time.time()
for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks):
print(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
logger.info(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
crawl_blocks_executor(
block_numbers_list=blocks_numbers_list, with_transactions=True
)
print(f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers")
logger.info(
f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers"
)
def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None:
@ -161,7 +163,8 @@ def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None:
missing_blocks_numbers_total = []
for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks):
logger.info(
f"Checking missing blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]} with transactions: {not args.notransactions}"
f"Checking missing blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]} "
f"with comparing transactions: {not args.notransactions}"
)
missing_blocks_numbers = check_missing_blocks(
blocks_numbers=blocks_numbers_list,
@ -175,13 +178,11 @@ def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None:
f"{missing_blocks_numbers_total if len(missing_blocks_numbers_total) <= 10 else '...'}"
)
time.sleep(5)
if (len(missing_blocks_numbers_total)) > 0:
time.sleep(5)
crawl_blocks_executor(
missing_blocks_numbers_total,
with_transactions=True,
verbose=args.verbose,
num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS,
)
logger.info(
@ -318,12 +319,6 @@ def main() -> None:
action="store_true",
help="Lazy block adding one by one",
)
parser_ethcrawler_blocks_missing.add_argument(
"-v",
"--verbose",
action="store_true",
help="Print additional information",
)
parser_ethcrawler_blocks_missing.set_defaults(
func=ethcrawler_blocks_missing_handler
)

Wyświetl plik

@ -9,6 +9,7 @@ from sqlalchemy import desc, Column
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, Query
from tqdm import tqdm
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
@ -124,15 +125,17 @@ def get_latest_blocks(confirmations: int = 0) -> Tuple[Optional[int], int]:
return latest_stored_block_number, latest_block_number
def crawl_blocks(
blocks_numbers: List[int], with_transactions: bool = False, verbose: bool = False
) -> None:
def crawl_blocks(blocks_numbers: List[int], with_transactions: bool = False) -> None:
"""
Open database and geth sessions and fetch block data from blockchain.
"""
web3_client = connect()
with yield_db_session_ctx() as db_session:
pbar = tqdm(total=len(blocks_numbers))
for block_number in blocks_numbers:
pbar.set_description(
f"Crawling block {block_number} with txs: {with_transactions}"
)
try:
block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions
@ -158,9 +161,8 @@ def crawl_blocks(
f"Interrupted while adding block (number={block_number}) to database."
)
raise
if verbose:
logger.info(f"Added block: {block_number}")
pbar.update()
pbar.close()
def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> List[int]:
@ -198,14 +200,20 @@ def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> Lis
blocks_exist = [
[block[0], block[1]] for block in blocks_exist_raw_query.all()
]
web3_client = connect()
blocks_exist_len = len(blocks_exist)
pbar = tqdm(total=blocks_exist_len)
pbar.set_description(f"Checking txs in {blocks_exist_len} blocks")
for i, block_in_db in enumerate(blocks_exist):
block: Any = web3_client.eth.get_block(
block = web3_client.eth.get_block(
block_in_db[0], full_transactions=True
) # BlockData
)
if len(block.transactions) != block_in_db[1]:
corrupted_blocks.append(block_in_db[0])
# Delete existing corrupted block
# Delete existing corrupted block and add to missing list
del_block = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number == block_in_db[0])
@ -213,6 +221,9 @@ def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> Lis
)
db_session.delete(del_block)
del blocks_exist[i]
pbar.update()
pbar.close()
db_session.commit()
corrupted_blocks_len = len(corrupted_blocks)
@ -230,7 +241,6 @@ def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> Lis
def crawl_blocks_executor(
block_numbers_list: List[int],
with_transactions: bool = False,
verbose: bool = False,
num_processes: int = MOONSTREAM_CRAWL_WORKERS,
) -> None:
"""
@ -239,7 +249,6 @@ def crawl_blocks_executor(
Args:
block_numbers_list - List of block numbers to add to database.
with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table.
verbose - Print each block complete to stdout
num_processes - Number of processes to use to feed blocks into database.
Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError.
@ -260,16 +269,13 @@ def crawl_blocks_executor(
results: List[Future] = []
if num_processes == 1:
logger.warning("Executing block crawler in lazy mod")
return crawl_blocks(block_numbers_list, with_transactions, verbose)
return crawl_blocks(block_numbers_list, with_transactions)
else:
with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in worker_indices:
block_chunk = worker_job_lists[worker]
if verbose:
logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(
crawl_blocks, block_chunk, with_transactions, verbose
)
logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(crawl_blocks, block_chunk, with_transactions)
result.add_done_callback(record_error)
results.append(result)