Merge pull request #24 from bugout-dev/missing-block-check

Missing block check
pull/28/head
Neeraj Kashyap 2021-07-28 10:31:34 -07:00 zatwierdzone przez GitHub
commit aa3ebd833c
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
6 zmienionych plików z 109 dodań i 61 usunięć

Wyświetl plik

@ -8,7 +8,7 @@ certifi==2021.5.30
charset-normalizer==2.0.3 charset-normalizer==2.0.3
click==8.0.1 click==8.0.1
fastapi==0.66.0 fastapi==0.66.0
-e git+https://git@github.com/bugout-dev/moonstream.git@60e90219a8e24077a1ab046463775f837df5f03e#egg=moonstreamdb&subdirectory=db -e git+https://git@github.com/bugout-dev/moonstream.git@876c23aac10f07da700798f47c44797a4ae157bb#egg=moonstreamdb&subdirectory=db
h11==0.12.0 h11==0.12.0
idna==3.2 idna==3.2
jmespath==0.10.0 jmespath==0.10.0

Wyświetl plik

@ -1,4 +1,4 @@
export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to" export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to"
export MOONSTREAM_OPENAPI_LIST="users,subscriptions" export MOONSTREAM_OPENAPI_LIST="users,subscriptions"
export MOONSTREAM_APPLICATION_ID="<issued_bugout_application_id>" export MOONSTREAM_APPLICATION_ID="<issued_bugout_application_id>"
export MOONSTREAM_DATA_JOURNAL_ID="<bugout_journal_id_to_store_blockchain_data>" export MOONSTREAM_DATA_JOURNAL_ID="<bugout_journal_id_to_store_blockchain_data>"

Wyświetl plik

@ -4,44 +4,57 @@ Moonstream crawlers CLI.
import argparse import argparse
from distutils.util import strtobool from distutils.util import strtobool
import time import time
from typing import List
from .ethereum import crawl from .ethereum import crawl, check_missing_blocks
from .settings import MOONSTREAM_CRAWL_WORKERS from .settings import MOONSTREAM_CRAWL_WORKERS
def get_blocks_numbers_lists(
bottom_block_number: int, top_block_number: int
) -> List[List[int]]:
"""
Generate list of blocks.
"""
block_step = 1000
blocks_numbers_list_raw = list(range(top_block_number, bottom_block_number - 1, -1))
blocks_numbers_list_raw_len = len(blocks_numbers_list_raw)
# Block steps used to prevent long executor tasks and data loss possibility
# Block step 2 convert [1,2,3] -> [[1,2],[3]]
if blocks_numbers_list_raw_len / block_step > 1:
blocks_numbers_lists = [
blocks_numbers_list_raw[i : i + block_step]
for i in range(0, blocks_numbers_list_raw_len, block_step)
]
else:
blocks_numbers_lists = [blocks_numbers_list_raw]
return blocks_numbers_lists, blocks_numbers_list_raw_len
def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None: def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None:
""" """
Add blocks to moonstream database. Add blocks to moonstream database.
""" """
try: try:
blocks_start_end = args.blocks.split("-") blocks_start_end = args.blocks.split("-")
top_block_number = int(blocks_start_end[1])
bottom_block_number = int(blocks_start_end[0]) bottom_block_number = int(blocks_start_end[0])
top_block_number = int(blocks_start_end[1])
except Exception: except Exception:
print( print(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
) )
return return
block_step = 1000 blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists(
blocks_numbers_list_raw = list(range(top_block_number, bottom_block_number - 1, -1)) bottom_block_number, top_block_number
blocks_numbers_list_raw_len = len(blocks_numbers_list_raw) )
# Block steps used to prevent long executor tasks and data loss possibility
# Block step 2 convert [1,2,3] -> [[1,2],[3]]
if len(blocks_numbers_list_raw) / block_step > 1:
blocks_numbers_lists = [
blocks_numbers_list_raw[i : i + block_step]
for i in range(0, blocks_numbers_list_raw_len, block_step)
]
else:
blocks_numbers_lists = [blocks_numbers_list_raw]
startTime = time.time() startTime = time.time()
for blocks_numbers_list in blocks_numbers_lists: for blocks_numbers_list in blocks_numbers_lists:
crawl( crawl(
block_numbers_list=blocks_numbers_list, block_numbers_list=blocks_numbers_list,
with_transactions=bool(strtobool(args.transactions)), with_transactions=bool(strtobool(args.transactions)),
check=bool(strtobool(args.check)),
) )
print( print(
f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} " f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} "
@ -49,6 +62,46 @@ def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None:
) )
def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None:
try:
blocks_start_end = args.blocks.split("-")
bottom_block_number = int(blocks_start_end[0])
top_block_number = int(blocks_start_end[1])
except Exception:
print(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
)
return
blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists(
bottom_block_number, top_block_number
)
startTime = time.time()
missing_blocks_numbers_total = []
for blocks_numbers_list in blocks_numbers_lists:
print(
f"Check missing blocks from {blocks_numbers_list[0]} to {blocks_numbers_list[-1]}"
)
missing_blocks_numbers = check_missing_blocks(
blocks_numbers=blocks_numbers_list,
)
missing_blocks_numbers_total.extend(missing_blocks_numbers)
print(f"Found {len(missing_blocks_numbers_total)} missing blocks")
time.sleep(5)
if (len(missing_blocks_numbers_total)) > 0:
crawl(
missing_blocks_numbers_total,
with_transactions=bool(strtobool(args.transactions)),
)
print(
f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} "
f"blocks with {MOONSTREAM_CRAWL_WORKERS} workers"
f" with {len(missing_blocks_numbers_total)} missing blocks"
)
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser(description="Moonstream crawlers CLI") parser = argparse.ArgumentParser(description="Moonstream crawlers CLI")
parser.set_defaults(func=lambda _: parser.print_help()) parser.set_defaults(func=lambda _: parser.print_help())
@ -89,15 +142,27 @@ def main() -> None:
default="False", default="False",
help="Add or not block transactions", help="Add or not block transactions",
) )
parser_ethcrawler_blocks_add.add_argument( parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler)
"-c",
"--check", parser_ethcrawler_blocks_missing = subcommands_ethcrawler_blocks.add_parser(
"missing", description="Add missing ethereum blocks commands"
)
parser_ethcrawler_blocks_missing.add_argument(
"-b",
"--blocks",
required=True,
help="List of blocks range in format {bottom_block}-{top_block}",
)
parser_ethcrawler_blocks_missing.add_argument(
"-t",
"--transactions",
choices=["True", "False"], choices=["True", "False"],
default="False", default="False",
help="If True, it will check existence of block and transaction before write to database", help="Add or not block transactions",
)
parser_ethcrawler_blocks_missing.set_defaults(
func=ethcrawler_blocks_missing_handler
) )
parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler)
args = parser.parse_args() args = parser.parse_args()
args.func(args) args.func(args)

Wyświetl plik

@ -14,25 +14,10 @@ def connect(ipc_path: Optional[str] = MOONSTREAM_IPC_PATH):
return web3_client return web3_client
def add_block( def add_block(db_session, block: BlockData, block_number: int) -> None:
db_session, block: BlockData, block_number: int, check: bool = False
) -> None:
""" """
Add block if doesn't presented in database. Add block if doesn't presented in database.
""" """
if check:
block_exist = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number == block_number)
.one_or_none()
)
if block_exist is not None and block_exist.hash == block.hash.hex():
print(f"Block: {block_number} exists")
return
if block_exist is not None and block_exist.hash != block.hash.hex():
print(f"Block: {block_number} exists, but incorrect")
db_session.delete(block_exist)
block_obj = EthereumBlock( block_obj = EthereumBlock(
block_number=block.number, block_number=block.number,
difficulty=block.difficulty, difficulty=block.difficulty,
@ -54,24 +39,12 @@ def add_block(
) )
db_session.add(block_obj) db_session.add(block_obj)
print(f"Added new block: {block_number}") print(f"Added new block: {block_number}")
return
def add_block_transaction( def add_block_transaction(db_session, block_number: int, tx) -> None:
db_session, block_number: int, tx, check: bool = False
) -> None:
""" """
Add block transaction if doesn't presented in database. Add block transaction if doesn't presented in database.
""" """
if check:
tx_exist = (
db_session.query(EthereumTransaction)
.filter(EthereumTransaction.hash == tx.hash.hex())
.one_or_none()
)
if tx_exist is not None:
return
tx_obj = EthereumTransaction( tx_obj = EthereumTransaction(
hash=tx.hash.hex(), hash=tx.hash.hex(),
block_number=block_number, block_number=block_number,
@ -87,9 +60,7 @@ def add_block_transaction(
db_session.add(tx_obj) db_session.add(tx_obj)
def process_blocks( def process_blocks(blocks_numbers: List[int], with_transactions: bool = False):
blocks_numbers: List[int], with_transactions: bool = False, check: bool = False
):
""" """
Open database and geth sessions and fetch block data from blockchain. Open database and geth sessions and fetch block data from blockchain.
""" """
@ -99,16 +70,28 @@ def process_blocks(
block: BlockData = web3_client.eth.get_block( block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions block_number, full_transactions=with_transactions
) )
add_block(db_session, block, block_number, check=check) add_block(db_session, block, block_number)
if with_transactions: if with_transactions:
for tx in block.transactions: for tx in block.transactions:
add_block_transaction(db_session, block.number, tx, check=check) add_block_transaction(db_session, block.number, tx)
db_session.commit() db_session.commit()
def crawl( def check_missing_blocks(blocks_numbers: List[int]):
block_numbers_list: List[int], with_transactions: bool = False, check: bool = False missing_blocks_numbers = []
): for block_number in blocks_numbers:
with yield_db_session_ctx() as db_session:
block_exist = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number == block_number)
.one_or_none()
)
if block_exist is None:
missing_blocks_numbers.append(block_number)
return missing_blocks_numbers
def crawl(block_numbers_list: List[int], with_transactions: bool = False) -> None:
""" """
Execute crawler. Execute crawler.
""" """
@ -121,5 +104,4 @@ def crawl(
process_blocks, process_blocks,
block_numbers_list[worker - 1 :: MOONSTREAM_CRAWL_WORKERS], block_numbers_list[worker - 1 :: MOONSTREAM_CRAWL_WORKERS],
with_transactions, with_transactions,
check,
) )

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -1 +1,2 @@
export MOONSTREAM_DB_URI="<database_uri>" export MOONSTREAM_DB_URI="<database_uri>"
export MOONSTREAM_POOL_SIZE=0