Merge pull request #35 from bugout-dev/clean-ethcrawler

Clean ethcrawler
pull/38/head
Sergei Sumarokov 2021-07-30 19:17:33 +03:00 zatwierdzone przez GitHub
commit d07d9d043f
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
1 zmienionych plików z 31 dodań i 9 usunięć

Wyświetl plik

@ -1,5 +1,5 @@
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import Future, ProcessPoolExecutor, wait
from typing import List, Tuple from typing import List, Optional, Tuple
from sqlalchemy import desc from sqlalchemy import desc
from web3 import Web3 from web3 import Web3
@ -14,6 +14,7 @@ from moonstreamdb.models import (
) )
# TODO(kompotkot): Write logic to chose between http and ipc
def connect(ipc_path: str = MOONSTREAM_IPC_PATH): def connect(ipc_path: str = MOONSTREAM_IPC_PATH):
web3_client = Web3(Web3.IPCProvider(ipc_path)) web3_client = Web3(Web3.IPCProvider(ipc_path))
return web3_client return web3_client
@ -129,18 +130,39 @@ def crawl_blocks_executor(
""" """
Execute crawler in processes. Execute crawler in processes.
""" """
errors: List[Exception] = []
def record_error(f: Future) -> None:
error = f.exception()
if error is not None:
errors.append(error)
worker_indices = range(MOONSTREAM_CRAWL_WORKERS)
worker_job_lists = [[] for _ in worker_indices]
for i, block_number in enumerate(block_numbers_list):
worker_job_lists[i % MOONSTREAM_CRAWL_WORKERS].append(block_number)
results: List[Future] = []
with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in range(1, MOONSTREAM_CRAWL_WORKERS + 1): for worker in worker_indices:
worker_block_numbers_list = block_numbers_list[
worker - 1 :: MOONSTREAM_CRAWL_WORKERS
]
if verbose: if verbose:
print(f"Spawned process for {len(worker_block_numbers_list)} blocks") print(f"Spawned process for {len(worker_job_lists[worker])} blocks")
executor.submit( result = executor.submit(
crawl_blocks, crawl_blocks,
worker_block_numbers_list, worker_job_lists[worker],
with_transactions, with_transactions,
) )
result.add_done_callback(record_error)
results.append(result)
wait(results)
# TODO(kompotkot): Return list of errors and colors responsible for
# handling errors
if len(errors) > 0:
print("Errors:")
for error in errors:
print(f"- {error}")
def process_contract_deployments() -> List[Tuple[str, str]]: def process_contract_deployments() -> List[Tuple[str, str]]: