diff --git a/backend/requirements.txt b/backend/requirements.txt index 783e88cb..e9e07a61 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -8,7 +8,7 @@ certifi==2021.5.30 charset-normalizer==2.0.3 click==8.0.1 fastapi==0.66.0 --e git+https://git@github.com/bugout-dev/moonstream.git@60e90219a8e24077a1ab046463775f837df5f03e#egg=moonstreamdb&subdirectory=db +-e git+https://git@github.com/bugout-dev/moonstream.git@876c23aac10f07da700798f47c44797a4ae157bb#egg=moonstreamdb&subdirectory=db h11==0.12.0 idna==3.2 jmespath==0.10.0 diff --git a/backend/sample.env b/backend/sample.env index 8fa5f798..dc4a3948 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -1,4 +1,4 @@ -export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to" +export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to" export MOONSTREAM_OPENAPI_LIST="users,subscriptions" export MOONSTREAM_APPLICATION_ID="" export MOONSTREAM_DATA_JOURNAL_ID="" diff --git a/crawlers/moonstreamcrawlers/cli.py b/crawlers/moonstreamcrawlers/cli.py index 0cd2a6f8..1b14d75f 100644 --- a/crawlers/moonstreamcrawlers/cli.py +++ b/crawlers/moonstreamcrawlers/cli.py @@ -4,44 +4,57 @@ Moonstream crawlers CLI. import argparse from distutils.util import strtobool import time +from typing import List -from .ethereum import crawl +from .ethereum import crawl, check_missing_blocks from .settings import MOONSTREAM_CRAWL_WORKERS +def get_blocks_numbers_lists( + bottom_block_number: int, top_block_number: int +) -> List[List[int]]: + """ + Generate list of blocks. + """ + block_step = 1000 + blocks_numbers_list_raw = list(range(top_block_number, bottom_block_number - 1, -1)) + blocks_numbers_list_raw_len = len(blocks_numbers_list_raw) + # Block steps used to prevent long executor tasks and data loss possibility + # Block step 2 convert [1,2,3] -> [[1,2],[3]] + if blocks_numbers_list_raw_len / block_step > 1: + blocks_numbers_lists = [ + blocks_numbers_list_raw[i : i + block_step] + for i in range(0, blocks_numbers_list_raw_len, block_step) + ] + else: + blocks_numbers_lists = [blocks_numbers_list_raw] + + return blocks_numbers_lists, blocks_numbers_list_raw_len + + def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None: """ Add blocks to moonstream database. """ try: blocks_start_end = args.blocks.split("-") - top_block_number = int(blocks_start_end[1]) bottom_block_number = int(blocks_start_end[0]) + top_block_number = int(blocks_start_end[1]) except Exception: print( "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" ) return - block_step = 1000 - blocks_numbers_list_raw = list(range(top_block_number, bottom_block_number - 1, -1)) - blocks_numbers_list_raw_len = len(blocks_numbers_list_raw) - # Block steps used to prevent long executor tasks and data loss possibility - # Block step 2 convert [1,2,3] -> [[1,2],[3]] - if len(blocks_numbers_list_raw) / block_step > 1: - blocks_numbers_lists = [ - blocks_numbers_list_raw[i : i + block_step] - for i in range(0, blocks_numbers_list_raw_len, block_step) - ] - else: - blocks_numbers_lists = [blocks_numbers_list_raw] + blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists( + bottom_block_number, top_block_number + ) startTime = time.time() for blocks_numbers_list in blocks_numbers_lists: crawl( block_numbers_list=blocks_numbers_list, with_transactions=bool(strtobool(args.transactions)), - check=bool(strtobool(args.check)), ) print( f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} " @@ -49,6 +62,46 @@ def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None: ) +def ethcrawler_blocks_missing_handler(args: argparse.Namespace) -> None: + try: + blocks_start_end = args.blocks.split("-") + bottom_block_number = int(blocks_start_end[0]) + top_block_number = int(blocks_start_end[1]) + except Exception: + print( + "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" + ) + return + + blocks_numbers_lists, blocks_numbers_list_raw_len = get_blocks_numbers_lists( + bottom_block_number, top_block_number + ) + startTime = time.time() + missing_blocks_numbers_total = [] + for blocks_numbers_list in blocks_numbers_lists: + print( + f"Check missing blocks from {blocks_numbers_list[0]} to {blocks_numbers_list[-1]}" + ) + missing_blocks_numbers = check_missing_blocks( + blocks_numbers=blocks_numbers_list, + ) + missing_blocks_numbers_total.extend(missing_blocks_numbers) + print(f"Found {len(missing_blocks_numbers_total)} missing blocks") + + time.sleep(5) + + if (len(missing_blocks_numbers_total)) > 0: + crawl( + missing_blocks_numbers_total, + with_transactions=bool(strtobool(args.transactions)), + ) + print( + f"Required time: {time.time() - startTime} for: {blocks_numbers_list_raw_len} " + f"blocks with {MOONSTREAM_CRAWL_WORKERS} workers" + f" with {len(missing_blocks_numbers_total)} missing blocks" + ) + + def main() -> None: parser = argparse.ArgumentParser(description="Moonstream crawlers CLI") parser.set_defaults(func=lambda _: parser.print_help()) @@ -89,15 +142,27 @@ def main() -> None: default="False", help="Add or not block transactions", ) - parser_ethcrawler_blocks_add.add_argument( - "-c", - "--check", + parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler) + + parser_ethcrawler_blocks_missing = subcommands_ethcrawler_blocks.add_parser( + "missing", description="Add missing ethereum blocks commands" + ) + parser_ethcrawler_blocks_missing.add_argument( + "-b", + "--blocks", + required=True, + help="List of blocks range in format {bottom_block}-{top_block}", + ) + parser_ethcrawler_blocks_missing.add_argument( + "-t", + "--transactions", choices=["True", "False"], default="False", - help="If True, it will check existence of block and transaction before write to database", + help="Add or not block transactions", + ) + parser_ethcrawler_blocks_missing.set_defaults( + func=ethcrawler_blocks_missing_handler ) - - parser_ethcrawler_blocks_add.set_defaults(func=ethcrawler_blocks_add_handler) args = parser.parse_args() args.func(args) diff --git a/crawlers/moonstreamcrawlers/ethereum.py b/crawlers/moonstreamcrawlers/ethereum.py index 4250b246..fe17e876 100644 --- a/crawlers/moonstreamcrawlers/ethereum.py +++ b/crawlers/moonstreamcrawlers/ethereum.py @@ -14,25 +14,10 @@ def connect(ipc_path: Optional[str] = MOONSTREAM_IPC_PATH): return web3_client -def add_block( - db_session, block: BlockData, block_number: int, check: bool = False -) -> None: +def add_block(db_session, block: BlockData, block_number: int) -> None: """ Add block if doesn't presented in database. """ - if check: - block_exist = ( - db_session.query(EthereumBlock) - .filter(EthereumBlock.block_number == block_number) - .one_or_none() - ) - if block_exist is not None and block_exist.hash == block.hash.hex(): - print(f"Block: {block_number} exists") - return - if block_exist is not None and block_exist.hash != block.hash.hex(): - print(f"Block: {block_number} exists, but incorrect") - db_session.delete(block_exist) - block_obj = EthereumBlock( block_number=block.number, difficulty=block.difficulty, @@ -54,24 +39,12 @@ def add_block( ) db_session.add(block_obj) print(f"Added new block: {block_number}") - return -def add_block_transaction( - db_session, block_number: int, tx, check: bool = False -) -> None: +def add_block_transaction(db_session, block_number: int, tx) -> None: """ Add block transaction if doesn't presented in database. """ - if check: - tx_exist = ( - db_session.query(EthereumTransaction) - .filter(EthereumTransaction.hash == tx.hash.hex()) - .one_or_none() - ) - if tx_exist is not None: - return - tx_obj = EthereumTransaction( hash=tx.hash.hex(), block_number=block_number, @@ -87,9 +60,7 @@ def add_block_transaction( db_session.add(tx_obj) -def process_blocks( - blocks_numbers: List[int], with_transactions: bool = False, check: bool = False -): +def process_blocks(blocks_numbers: List[int], with_transactions: bool = False): """ Open database and geth sessions and fetch block data from blockchain. """ @@ -99,16 +70,28 @@ def process_blocks( block: BlockData = web3_client.eth.get_block( block_number, full_transactions=with_transactions ) - add_block(db_session, block, block_number, check=check) + add_block(db_session, block, block_number) if with_transactions: for tx in block.transactions: - add_block_transaction(db_session, block.number, tx, check=check) + add_block_transaction(db_session, block.number, tx) db_session.commit() -def crawl( - block_numbers_list: List[int], with_transactions: bool = False, check: bool = False -): +def check_missing_blocks(blocks_numbers: List[int]): + missing_blocks_numbers = [] + for block_number in blocks_numbers: + with yield_db_session_ctx() as db_session: + block_exist = ( + db_session.query(EthereumBlock) + .filter(EthereumBlock.block_number == block_number) + .one_or_none() + ) + if block_exist is None: + missing_blocks_numbers.append(block_number) + return missing_blocks_numbers + + +def crawl(block_numbers_list: List[int], with_transactions: bool = False) -> None: """ Execute crawler. """ @@ -121,5 +104,4 @@ def crawl( process_blocks, block_numbers_list[worker - 1 :: MOONSTREAM_CRAWL_WORKERS], with_transactions, - check, ) diff --git a/crawlers/requirements.txt b/crawlers/requirements.txt index 976fc0cc..748be495 100644 Binary files a/crawlers/requirements.txt and b/crawlers/requirements.txt differ diff --git a/db/sample.env b/db/sample.env index 26a947b2..cc7ab125 100644 --- a/db/sample.env +++ b/db/sample.env @@ -1 +1,2 @@ export MOONSTREAM_DB_URI="" +export MOONSTREAM_POOL_SIZE=0