From 66a9b9a80ff758f00d99434f082389f6eaa0ba2e Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 2 Aug 2021 11:46:50 -0700 Subject: [PATCH] Actually use --order argument when running sync jobs Was not hooked up the the `crawl_blocks_executor` before. Also removed branching for whether to run `crawl_blocks` or `crawl_blocks_executor` in the CLI handler. Added `num_processes` arguments to `crawl_blocks_executor` that callers can use to specify whether or not to create subprocesses. --- crawlers/moonstreamcrawlers/cli.py | 26 ++++++--------- crawlers/moonstreamcrawlers/ethereum.py | 42 +++++++++++++------------ 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/crawlers/moonstreamcrawlers/cli.py b/crawlers/moonstreamcrawlers/cli.py index c9ac6461..9e7a2469 100644 --- a/crawlers/moonstreamcrawlers/cli.py +++ b/crawlers/moonstreamcrawlers/cli.py @@ -93,24 +93,18 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None: ) time.sleep(20) continue - if top_block_number - bottom_block_number >= 10: - for blocks_numbers_list in yield_blocks_numbers_lists( - f"{bottom_block_number}-{top_block_number}" - ): - print( - f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}" - ) - crawl_blocks_executor( - block_numbers_list=blocks_numbers_list, - with_transactions=bool(strtobool(args.transactions)), - ) - else: - blocks_numbers_list = range(bottom_block_number, top_block_number + 1) - print(f"Adding blocks {bottom_block_number}-{top_block_number - 1}") - crawl_blocks( - blocks_numbers=blocks_numbers_list, + + for blocks_numbers_list in yield_blocks_numbers_lists( + f"{bottom_block_number}-{top_block_number}", + order=args.order, + ): + print(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}") + # TODO(kompotkot): Set num_processes argument based on number of blocks to synchronize. + crawl_blocks_executor( + block_numbers_list=blocks_numbers_list, with_transactions=bool(strtobool(args.transactions)), ) + print(f"Synchronized blocks from {bottom_block_number} to {top_block_number}") time.sleep(10) diff --git a/crawlers/moonstreamcrawlers/ethereum.py b/crawlers/moonstreamcrawlers/ethereum.py index cf211616..58c4b30f 100644 --- a/crawlers/moonstreamcrawlers/ethereum.py +++ b/crawlers/moonstreamcrawlers/ethereum.py @@ -136,6 +136,7 @@ def crawl_blocks_executor( block_numbers_list: List[int], with_transactions: bool = False, verbose: bool = False, + num_processes: int = MOONSTREAM_CRAWL_WORKERS, ) -> None: """ Execute crawler in processes. @@ -153,27 +154,28 @@ def crawl_blocks_executor( worker_job_lists[i % MOONSTREAM_CRAWL_WORKERS].append(block_number) results: List[Future] = [] + if num_processes == 1: + return crawl_blocks(block_numbers_list, with_transactions, verbose) + else: + with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: + for worker in worker_indices: + if verbose: + print(f"Spawned process for {len(worker_job_lists[worker])} blocks") + result = executor.submit( + crawl_blocks, + worker_job_lists[worker], + with_transactions, + ) + result.add_done_callback(record_error) + results.append(result) - with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: - for worker in worker_indices: - if verbose: - print(f"Spawned process for {len(worker_job_lists[worker])} blocks") - result = executor.submit( - crawl_blocks, - worker_job_lists[worker], - with_transactions, - ) - result.add_done_callback(record_error) - results.append(result) - - wait(results) - # TODO(kompotkot): Return list of errors and colors responsible for - # handling errors - if len(errors) > 0: - print("Errors:") - for error in errors: - print(f"- {error}") - + wait(results) + # TODO(kompotkot): Return list of errors and colors responsible for + # handling errors + if len(errors) > 0: + print("Errors:") + for error in errors: + print(f"- {error}") def process_contract_deployments() -> List[Tuple[str, str]]: """