Merge pull request #282 from bugout-dev/block-tx-checker

Threads instead of pools, missing block cmd check num of transactions
pull/295/head
Sergei Sumarokov 2021-09-24 19:15:28 +03:00 zatwierdzone przez GitHub
commit c35354be04
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
4 zmienionych plików z 119 dodań i 78 usunięć

Wyświetl plik

@ -15,7 +15,6 @@ import dateutil.parser
from .ethereum import (
crawl_blocks_executor,
crawl_blocks,
check_missing_blocks,
get_latest_blocks,
process_contract_deployments,
@ -26,9 +25,8 @@ from .publish import publish_json
from .settings import MOONSTREAM_CRAWL_WORKERS
from .version import MOONCRAWL_VERSION
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class ProcessingOrder(Enum):
@ -50,7 +48,7 @@ def yield_blocks_numbers_lists(
input_start_block = int(blocks_start_end[0])
input_end_block = int(blocks_start_end[1])
except Exception:
print(
logger.error(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
)
raise Exception
@ -131,7 +129,7 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
# TODO(kompotkot): Set num_processes argument based on number of blocks to synchronize.
crawl_blocks_executor(
block_numbers_list=blocks_numbers_list,
with_transactions=not args.notransactions,
with_transactions=True,
num_processes=args.jobs,
)
logger.info(
@ -146,47 +144,48 @@ def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None:
startTime = time.time()
for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks):
print(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
logger.info(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
crawl_blocks_executor(
block_numbers_list=blocks_numbers_list,
with_transactions=not args.notransactions,
block_numbers_list=blocks_numbers_list, with_transactions=True
)
print(f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers")
logger.info(
f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers"
)
def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None:
"""
Check missing blocks and missing transactions in each block.
"""
startTime = time.time()
missing_blocks_numbers_total = []
for blocks_numbers_list in yield_blocks_numbers_lists(args.blocks):
print(
f"Check missing blocks from {blocks_numbers_list[0]} to {blocks_numbers_list[-1]}"
logger.info(
f"Checking missing blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]} "
f"with comparing transactions: {not args.notransactions}"
)
missing_blocks_numbers = check_missing_blocks(
blocks_numbers=blocks_numbers_list,
notransactions=args.notransactions,
)
if len(missing_blocks_numbers) > 0:
print(f"Found {len(missing_blocks_numbers)} missing blocks")
logger.info(f"Found {len(missing_blocks_numbers)} missing blocks")
missing_blocks_numbers_total.extend(missing_blocks_numbers)
print(f"Found {len(missing_blocks_numbers_total)} missing blocks total")
time.sleep(5)
logger.info(
f"Found {len(missing_blocks_numbers_total)} missing blocks total: "
f"{missing_blocks_numbers_total if len(missing_blocks_numbers_total) <= 10 else '...'}"
)
if (len(missing_blocks_numbers_total)) > 0:
if args.lazy:
print("Executed lazy block crawler")
crawl_blocks(
missing_blocks_numbers_total,
with_transactions=not args.notransactions,
verbose=args.verbose,
)
else:
crawl_blocks_executor(
missing_blocks_numbers_total,
with_transactions=not args.notransactions,
verbose=args.verbose,
)
print(
time.sleep(5)
crawl_blocks_executor(
missing_blocks_numbers_total,
with_transactions=True,
num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS,
)
logger.info(
f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers "
f"for {len(missing_blocks_numbers_total)} missing blocks"
)
@ -257,12 +256,6 @@ def main() -> None:
parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser(
"synchronize", description="Synchronize to latest ethereum block commands"
)
parser_ethcrawler_blocks_sync.add_argument(
"-n",
"--notransactions",
action="store_true",
help="Skip crawling block transactions",
)
parser_ethcrawler_blocks_sync.add_argument(
"-s",
"--start",
@ -303,16 +296,10 @@ def main() -> None:
required=True,
help="List of blocks range in format {bottom_block}-{top_block}",
)
parser_ethcrawler_blocks_add.add_argument(
"-n",
"--notransactions",
action="store_true",
help="Skip crawling block transactions",
)
parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler)
parser_ethcrawler_blocks_missing = subcommands_ethcrawler_blocks.add_parser(
"missing", description="Add missing ethereum blocks commands"
"missing", description="Add missing ethereum blocks with transactions commands"
)
parser_ethcrawler_blocks_missing.add_argument(
"-b",
@ -332,12 +319,6 @@ def main() -> None:
action="store_true",
help="Lazy block adding one by one",
)
parser_ethcrawler_blocks_missing.add_argument(
"-v",
"--verbose",
action="store_true",
help="Print additional information",
)
parser_ethcrawler_blocks_missing.set_defaults(
func=ethcrawler_blocks_missing_handler
)

Wyświetl plik

@ -1,11 +1,15 @@
from concurrent.futures import Future, ProcessPoolExecutor, wait
from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor, wait
from dataclasses import dataclass
from datetime import datetime
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from psycopg2.errors import UniqueViolation # type: ignore
from sqlalchemy import desc, Column
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, Query
from tqdm import tqdm
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
@ -17,6 +21,9 @@ from moonstreamdb.models import (
EthereumTransaction,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EthereumBlockCrawlError(Exception):
"""
@ -118,15 +125,17 @@ def get_latest_blocks(confirmations: int = 0) -> Tuple[Optional[int], int]:
return latest_stored_block_number, latest_block_number
def crawl_blocks(
blocks_numbers: List[int], with_transactions: bool = False, verbose: bool = False
) -> None:
def crawl_blocks(blocks_numbers: List[int], with_transactions: bool = False) -> None:
"""
Open database and geth sessions and fetch block data from blockchain.
"""
web3_client = connect()
with yield_db_session_ctx() as db_session:
pbar = tqdm(total=len(blocks_numbers))
for block_number in blocks_numbers:
pbar.set_description(
f"Crawling block {block_number} with txs: {with_transactions}"
)
try:
block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions
@ -137,38 +146,94 @@ def crawl_blocks(
add_block_transactions(db_session, block)
db_session.commit()
except IntegrityError as err:
assert isinstance(err.orig, UniqueViolation)
logger.warning(
"UniqueViolation error occurred, it means block already exists"
)
except Exception as err:
db_session.rollback()
message = f"Error adding block (number={block_number}) to database:\n{repr(err)}"
raise EthereumBlockCrawlError(message)
except:
db_session.rollback()
print(
logger.error(
f"Interrupted while adding block (number={block_number}) to database."
)
raise
if verbose:
print(f"Added block: {block_number}")
pbar.update()
pbar.close()
def check_missing_blocks(blocks_numbers: List[int]) -> List[int]:
def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> List[int]:
"""
Query block from postgres. If block does not presented in database,
add to missing blocks numbers list.
If arg notransactions=False, it checks correct number of transactions in
database according to blockchain.
"""
bottom_block = min(blocks_numbers[-1], blocks_numbers[0])
top_block = max(blocks_numbers[-1], blocks_numbers[0])
with yield_db_session_ctx() as db_session:
blocks_exist_raw = (
db_session.query(EthereumBlock.block_number)
.filter(EthereumBlock.block_number >= bottom_block)
.filter(EthereumBlock.block_number <= top_block)
.all()
)
blocks_exist = [block[0] for block in blocks_exist_raw]
if notransactions:
blocks_exist_raw_query = (
db_session.query(EthereumBlock.block_number)
.filter(EthereumBlock.block_number >= bottom_block)
.filter(EthereumBlock.block_number <= top_block)
)
blocks_exist = [[block[0]] for block in blocks_exist_raw_query.all()]
else:
corrupted_blocks = []
blocks_exist_raw_query = (
db_session.query(
EthereumBlock.block_number, func.count(EthereumTransaction.hash)
)
.join(
EthereumTransaction,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumBlock.block_number >= bottom_block)
.filter(EthereumBlock.block_number <= top_block)
.group_by(EthereumBlock.block_number)
)
blocks_exist = [
[block[0], block[1]] for block in blocks_exist_raw_query.all()
]
web3_client = connect()
blocks_exist_len = len(blocks_exist)
pbar = tqdm(total=blocks_exist_len)
pbar.set_description(f"Checking txs in {blocks_exist_len} blocks")
for i, block_in_db in enumerate(blocks_exist):
block = web3_client.eth.get_block(
block_in_db[0], full_transactions=True
)
if len(block.transactions) != block_in_db[1]:
corrupted_blocks.append(block_in_db[0])
# Delete existing corrupted block and add to missing list
del_block = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number == block_in_db[0])
.one()
)
db_session.delete(del_block)
del blocks_exist[i]
pbar.update()
pbar.close()
db_session.commit()
corrupted_blocks_len = len(corrupted_blocks)
if corrupted_blocks_len > 0:
logger.warning(
f"Removed {corrupted_blocks_len} corrupted blocks: {corrupted_blocks if corrupted_blocks_len <= 10 else '...'}"
)
missing_blocks_numbers = [
block for block in blocks_numbers if block not in blocks_exist
block for block in blocks_numbers if block not in [i[0] for i in blocks_exist]
]
return missing_blocks_numbers
@ -176,7 +241,6 @@ def check_missing_blocks(blocks_numbers: List[int]) -> List[int]:
def crawl_blocks_executor(
block_numbers_list: List[int],
with_transactions: bool = False,
verbose: bool = False,
num_processes: int = MOONSTREAM_CRAWL_WORKERS,
) -> None:
"""
@ -185,7 +249,6 @@ def crawl_blocks_executor(
Args:
block_numbers_list - List of block numbers to add to database.
with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table.
verbose - Print logs to stdout?
num_processes - Number of processes to use to feed blocks into database.
Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError.
@ -205,17 +268,14 @@ def crawl_blocks_executor(
results: List[Future] = []
if num_processes == 1:
return crawl_blocks(block_numbers_list, with_transactions, verbose)
logger.warning("Executing block crawler in lazy mod")
return crawl_blocks(block_numbers_list, with_transactions)
else:
with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in worker_indices:
if verbose:
print(f"Spawned process for {len(worker_job_lists[worker])} blocks")
result = executor.submit(
crawl_blocks,
worker_job_lists[worker],
with_transactions,
)
block_chunk = worker_job_lists[worker]
logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(crawl_blocks, block_chunk, with_transactions)
result.add_done_callback(record_error)
results.append(result)

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.0.4"
MOONCRAWL_VERSION = "0.0.5"

Wyświetl plik

@ -7,7 +7,7 @@ with open("README.md") as ifp:
setup(
name="mooncrawl",
version="0.0.3",
version="0.0.5",
author="Bugout.dev",
author_email="engineers@bugout.dev",
license="Apache License 2.0",