| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor, wait | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | from dataclasses import dataclass | 
					
						
							|  |  |  | from datetime import datetime | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | import logging | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | from typing import Any, Callable, Dict, List, Optional, Tuple, Union | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | from psycopg2.errors import UniqueViolation  # type: ignore | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | from sqlalchemy import desc, Column | 
					
						
							|  |  |  | from sqlalchemy import func | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | from sqlalchemy.exc import IntegrityError | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | from sqlalchemy.orm import Session, Query | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  | from tqdm import tqdm | 
					
						
							| 
									
										
										
										
											2021-08-02 13:21:37 +00:00
										 |  |  | from web3 import Web3, IPCProvider, HTTPProvider | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  | from web3.types import BlockData | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | from moonstreamdb.db import yield_db_session, yield_db_session_ctx | 
					
						
							| 
									
										
										
										
											2021-07-29 04:35:49 +00:00
										 |  |  | from moonstreamdb.models import ( | 
					
						
							|  |  |  |     EthereumBlock, | 
					
						
							|  |  |  |     EthereumTransaction, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | logger = logging.getLogger(__name__) | 
					
						
							|  |  |  | logger.setLevel(logging.INFO) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  | class EthereumBlockCrawlError(Exception): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Raised when there is a problem crawling Ethereum blocks. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | @dataclass | 
					
						
							|  |  |  | class DateRange: | 
					
						
							|  |  |  |     start_time: datetime | 
					
						
							|  |  |  |     end_time: datetime | 
					
						
							|  |  |  |     include_start: bool | 
					
						
							|  |  |  |     include_end: bool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-02 13:21:37 +00:00
										 |  |  | def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH): | 
					
						
							|  |  |  |     web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() | 
					
						
							|  |  |  |     if web3_uri is not None: | 
					
						
							|  |  |  |         if web3_uri.startswith("http://") or web3_uri.startswith("https://"): | 
					
						
							|  |  |  |             web3_provider = Web3.HTTPProvider(web3_uri) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             web3_provider = Web3.IPCProvider(web3_uri) | 
					
						
							|  |  |  |     web3_client = Web3(web3_provider) | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     return web3_client | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  | def add_block(db_session, block: Any) -> None: | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Add block if doesn't presented in database. | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     block: web3.types.BlockData | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     block_obj = EthereumBlock( | 
					
						
							|  |  |  |         block_number=block.number, | 
					
						
							|  |  |  |         difficulty=block.difficulty, | 
					
						
							|  |  |  |         extra_data=block.extraData.hex(), | 
					
						
							|  |  |  |         gas_limit=block.gasLimit, | 
					
						
							|  |  |  |         gas_used=block.gasUsed, | 
					
						
							|  |  |  |         hash=block.hash.hex(), | 
					
						
							|  |  |  |         logs_bloom=block.logsBloom.hex(), | 
					
						
							|  |  |  |         miner=block.miner, | 
					
						
							|  |  |  |         nonce=block.nonce.hex(), | 
					
						
							|  |  |  |         parent_hash=block.parentHash.hex(), | 
					
						
							|  |  |  |         receipt_root=block.get("receiptRoot", ""), | 
					
						
							|  |  |  |         uncles=block.sha3Uncles.hex(), | 
					
						
							|  |  |  |         size=block.size, | 
					
						
							|  |  |  |         state_root=block.stateRoot.hex(), | 
					
						
							|  |  |  |         timestamp=block.timestamp, | 
					
						
							|  |  |  |         total_difficulty=block.totalDifficulty, | 
					
						
							|  |  |  |         transactions_root=block.transactionsRoot.hex(), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     db_session.add(block_obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  | def add_block_transactions(db_session, block: Any) -> None: | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     Add block transactions. | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     block: web3.types.BlockData | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     for tx in block.transactions: | 
					
						
							| 
									
										
										
										
											2021-07-28 22:49:31 +00:00
										 |  |  |         tx_obj = EthereumTransaction( | 
					
						
							|  |  |  |             hash=tx.hash.hex(), | 
					
						
							|  |  |  |             block_number=block.number, | 
					
						
							|  |  |  |             from_address=tx["from"], | 
					
						
							|  |  |  |             to_address=tx.to, | 
					
						
							|  |  |  |             gas=tx.gas, | 
					
						
							|  |  |  |             gas_price=tx.gasPrice, | 
					
						
							|  |  |  |             input=tx.input, | 
					
						
							|  |  |  |             nonce=tx.nonce, | 
					
						
							|  |  |  |             transaction_index=tx.transactionIndex, | 
					
						
							|  |  |  |             value=tx.value, | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2021-07-28 22:49:31 +00:00
										 |  |  |         db_session.add(tx_obj) | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  | def get_latest_blocks(confirmations: int = 0) -> Tuple[Optional[int], int]: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Retrieve the latest block from the connected node (connection is created by the connect() method). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     If confirmations > 0, and the latest block on the node has block number N, this returns the block | 
					
						
							|  |  |  |     with block_number (N - confirmations) | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     web3_client = connect() | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  |     latest_block_number: int = web3_client.eth.block_number | 
					
						
							|  |  |  |     if confirmations > 0: | 
					
						
							|  |  |  |         latest_block_number -= confirmations | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     with yield_db_session_ctx() as db_session: | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  |         latest_stored_block_row = ( | 
					
						
							| 
									
										
										
										
											2021-07-29 18:09:53 +00:00
										 |  |  |             db_session.query(EthereumBlock.block_number) | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |             .order_by(EthereumBlock.block_number.desc()) | 
					
						
							|  |  |  |             .first() | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  |         latest_stored_block_number = ( | 
					
						
							|  |  |  |             None if latest_stored_block_row is None else latest_stored_block_row[0] | 
					
						
							| 
									
										
										
										
											2021-08-02 14:15:01 +00:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  |     return latest_stored_block_number, latest_block_number | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  | def crawl_blocks(blocks_numbers: List[int], with_transactions: bool = False) -> None: | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Open database and geth sessions and fetch block data from blockchain. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     web3_client = connect() | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |     with yield_db_session_ctx() as db_session: | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |         pbar = tqdm(total=len(blocks_numbers)) | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |         for block_number in blocks_numbers: | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |             pbar.set_description( | 
					
						
							|  |  |  |                 f"Crawling block {block_number} with txs: {with_transactions}" | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |             try: | 
					
						
							|  |  |  |                 block: BlockData = web3_client.eth.get_block( | 
					
						
							|  |  |  |                     block_number, full_transactions=with_transactions | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 add_block(db_session, block) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if with_transactions: | 
					
						
							|  |  |  |                     add_block_transactions(db_session, block) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 db_session.commit() | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |             except IntegrityError as err: | 
					
						
							|  |  |  |                 assert isinstance(err.orig, UniqueViolation) | 
					
						
							|  |  |  |                 logger.warning( | 
					
						
							|  |  |  |                     "UniqueViolation error occurred, it means block already exists" | 
					
						
							|  |  |  |                 ) | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |             except Exception as err: | 
					
						
							|  |  |  |                 db_session.rollback() | 
					
						
							|  |  |  |                 message = f"Error adding block (number={block_number}) to database:\n{repr(err)}" | 
					
						
							|  |  |  |                 raise EthereumBlockCrawlError(message) | 
					
						
							|  |  |  |             except: | 
					
						
							|  |  |  |                 db_session.rollback() | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |                 logger.error( | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |                     f"Interrupted while adding block (number={block_number}) to database." | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 raise | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |             pbar.update() | 
					
						
							|  |  |  |         pbar.close() | 
					
						
							| 
									
										
										
										
											2021-07-29 14:01:39 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | def check_missing_blocks(blocks_numbers: List[int], notransactions=False) -> List[int]: | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Query block from postgres. If block does not presented in database, | 
					
						
							|  |  |  |     add to missing blocks numbers list. | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |     If arg notransactions=False, it checks correct number of transactions in | 
					
						
							|  |  |  |     database according to blockchain. | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-08-03 08:41:51 +00:00
										 |  |  |     bottom_block = min(blocks_numbers[-1], blocks_numbers[0]) | 
					
						
							|  |  |  |     top_block = max(blocks_numbers[-1], blocks_numbers[0]) | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-03 08:41:51 +00:00
										 |  |  |     with yield_db_session_ctx() as db_session: | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |         if notransactions: | 
					
						
							|  |  |  |             blocks_exist_raw_query = ( | 
					
						
							|  |  |  |                 db_session.query(EthereumBlock.block_number) | 
					
						
							|  |  |  |                 .filter(EthereumBlock.block_number >= bottom_block) | 
					
						
							|  |  |  |                 .filter(EthereumBlock.block_number <= top_block) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             blocks_exist = [[block[0]] for block in blocks_exist_raw_query.all()] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             corrupted_blocks = [] | 
					
						
							|  |  |  |             blocks_exist_raw_query = ( | 
					
						
							|  |  |  |                 db_session.query( | 
					
						
							|  |  |  |                     EthereumBlock.block_number, func.count(EthereumTransaction.hash) | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 .join( | 
					
						
							|  |  |  |                     EthereumTransaction, | 
					
						
							|  |  |  |                     EthereumTransaction.block_number == EthereumBlock.block_number, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 .filter(EthereumBlock.block_number >= bottom_block) | 
					
						
							|  |  |  |                 .filter(EthereumBlock.block_number <= top_block) | 
					
						
							|  |  |  |                 .group_by(EthereumBlock.block_number) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             blocks_exist = [ | 
					
						
							|  |  |  |                 [block[0], block[1]] for block in blocks_exist_raw_query.all() | 
					
						
							|  |  |  |             ] | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |             web3_client = connect() | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |             blocks_exist_len = len(blocks_exist) | 
					
						
							|  |  |  |             pbar = tqdm(total=blocks_exist_len) | 
					
						
							|  |  |  |             pbar.set_description(f"Checking txs in {blocks_exist_len} blocks") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |             for i, block_in_db in enumerate(blocks_exist): | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |                 block = web3_client.eth.get_block( | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |                     block_in_db[0], full_transactions=True | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |                 ) | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |                 if len(block.transactions) != block_in_db[1]: | 
					
						
							|  |  |  |                     corrupted_blocks.append(block_in_db[0]) | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |                     # Delete existing corrupted block and add to missing list | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |                     del_block = ( | 
					
						
							|  |  |  |                         db_session.query(EthereumBlock) | 
					
						
							|  |  |  |                         .filter(EthereumBlock.block_number == block_in_db[0]) | 
					
						
							|  |  |  |                         .one() | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                     db_session.delete(del_block) | 
					
						
							|  |  |  |                     del blocks_exist[i] | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |                 pbar.update() | 
					
						
							|  |  |  |             pbar.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |             db_session.commit() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             corrupted_blocks_len = len(corrupted_blocks) | 
					
						
							|  |  |  |             if corrupted_blocks_len > 0: | 
					
						
							|  |  |  |                 logger.warning( | 
					
						
							|  |  |  |                     f"Removed {corrupted_blocks_len} corrupted blocks: {corrupted_blocks if corrupted_blocks_len <= 10 else '...'}" | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-03 08:41:51 +00:00
										 |  |  |     missing_blocks_numbers = [ | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |         block for block in blocks_numbers if block not in [i[0] for i in blocks_exist] | 
					
						
							| 
									
										
										
										
											2021-08-03 08:41:51 +00:00
										 |  |  |     ] | 
					
						
							| 
									
										
										
										
											2021-07-28 15:03:06 +00:00
										 |  |  |     return missing_blocks_numbers | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:57:30 +00:00
										 |  |  | def crawl_blocks_executor( | 
					
						
							| 
									
										
										
										
											2021-07-29 14:01:39 +00:00
										 |  |  |     block_numbers_list: List[int], | 
					
						
							|  |  |  |     with_transactions: bool = False, | 
					
						
							| 
									
										
										
										
											2021-08-02 18:46:50 +00:00
										 |  |  |     num_processes: int = MOONSTREAM_CRAWL_WORKERS, | 
					
						
							| 
									
										
										
										
											2021-07-28 21:57:30 +00:00
										 |  |  | ) -> None: | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-07-28 21:43:35 +00:00
										 |  |  |     Execute crawler in processes. | 
					
						
							| 
									
										
										
										
											2021-08-05 23:11:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |     block_numbers_list - List of block numbers to add to database. | 
					
						
							|  |  |  |     with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table. | 
					
						
							|  |  |  |     num_processes - Number of processes to use to feed blocks into database. | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError. | 
					
						
							|  |  |  |     The error message is a list of all the things that went wrong in the crawl. | 
					
						
							| 
									
										
										
										
											2021-07-26 22:15:50 +00:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |     errors: List[BaseException] = [] | 
					
						
							| 
									
										
										
										
											2021-07-29 18:52:37 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def record_error(f: Future) -> None: | 
					
						
							|  |  |  |         error = f.exception() | 
					
						
							|  |  |  |         if error is not None: | 
					
						
							|  |  |  |             errors.append(error) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-29 18:36:19 +00:00
										 |  |  |     worker_indices = range(MOONSTREAM_CRAWL_WORKERS) | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |     worker_job_lists: List[List[Any]] = [[] for _ in worker_indices] | 
					
						
							| 
									
										
										
										
											2021-07-29 18:36:19 +00:00
										 |  |  |     for i, block_number in enumerate(block_numbers_list): | 
					
						
							|  |  |  |         worker_job_lists[i % MOONSTREAM_CRAWL_WORKERS].append(block_number) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-29 18:41:37 +00:00
										 |  |  |     results: List[Future] = [] | 
					
						
							| 
									
										
										
										
											2021-08-02 18:46:50 +00:00
										 |  |  |     if num_processes == 1: | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |         logger.warning("Executing block crawler in lazy mod") | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |         return crawl_blocks(block_numbers_list, with_transactions) | 
					
						
							| 
									
										
										
										
											2021-08-02 18:46:50 +00:00
										 |  |  |     else: | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |         with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: | 
					
						
							| 
									
										
										
										
											2021-08-02 18:46:50 +00:00
										 |  |  |             for worker in worker_indices: | 
					
						
							| 
									
										
										
										
											2021-09-24 13:28:41 +00:00
										 |  |  |                 block_chunk = worker_job_lists[worker] | 
					
						
							| 
									
										
										
										
											2021-09-24 14:31:34 +00:00
										 |  |  |                 logger.info(f"Spawned process for {len(block_chunk)} blocks") | 
					
						
							|  |  |  |                 result = executor.submit(crawl_blocks, block_chunk, with_transactions) | 
					
						
							| 
									
										
										
										
											2021-08-02 18:46:50 +00:00
										 |  |  |                 result.add_done_callback(record_error) | 
					
						
							|  |  |  |                 results.append(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         wait(results) | 
					
						
							|  |  |  |         if len(errors) > 0: | 
					
						
							| 
									
										
										
										
											2021-08-06 00:10:48 +00:00
										 |  |  |             error_messages = "\n".join([f"- {error}" for error in errors]) | 
					
						
							|  |  |  |             message = f"Error processing blocks in list:\n{error_messages}" | 
					
						
							|  |  |  |             raise EthereumBlockCrawlError(message) | 
					
						
							| 
									
										
										
										
											2021-07-29 04:35:49 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-03 08:41:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | def trending( | 
					
						
							|  |  |  |     date_range: DateRange, db_session: Optional[Session] = None | 
					
						
							|  |  |  | ) -> Dict[str, Any]: | 
					
						
							|  |  |  |     close_db_session = False | 
					
						
							|  |  |  |     if db_session is None: | 
					
						
							|  |  |  |         close_db_session = True | 
					
						
							|  |  |  |         db_session = next(yield_db_session()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     start_timestamp = int(date_range.start_time.timestamp()) | 
					
						
							|  |  |  |     end_timestamp = int(date_range.end_time.timestamp()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def make_query( | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |         db_session: Session, | 
					
						
							| 
									
										
										
										
											2021-08-09 16:30:08 +00:00
										 |  |  |         identifying_column: Column, | 
					
						
							|  |  |  |         statistic_column: Column, | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         aggregate_func: Callable, | 
					
						
							|  |  |  |         aggregate_label: str, | 
					
						
							|  |  |  |     ) -> Query: | 
					
						
							|  |  |  |         query = db_session.query( | 
					
						
							| 
									
										
										
										
											2021-08-09 16:30:08 +00:00
										 |  |  |             identifying_column, aggregate_func(statistic_column).label(aggregate_label) | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ).join( | 
					
						
							|  |  |  |             EthereumBlock, | 
					
						
							|  |  |  |             EthereumTransaction.block_number == EthereumBlock.block_number, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         if date_range.include_start: | 
					
						
							|  |  |  |             query = query.filter(EthereumBlock.timestamp >= start_timestamp) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             query = query.filter(EthereumBlock.timestamp > start_timestamp) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if date_range.include_end: | 
					
						
							|  |  |  |             query = query.filter(EthereumBlock.timestamp <= end_timestamp) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             query = query.filter(EthereumBlock.timestamp < end_timestamp) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         query = ( | 
					
						
							| 
									
										
										
										
											2021-08-09 16:30:08 +00:00
										 |  |  |             query.group_by(identifying_column).order_by(desc(aggregate_label)).limit(10) | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return query | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-10 02:45:20 +00:00
										 |  |  |     results: Dict[str, Any] = { | 
					
						
							|  |  |  |         "date_range": { | 
					
						
							|  |  |  |             "start_time": date_range.start_time.isoformat(), | 
					
						
							|  |  |  |             "end_time": date_range.end_time.isoformat(), | 
					
						
							|  |  |  |             "include_start": date_range.include_start, | 
					
						
							|  |  |  |             "include_end": date_range.include_end, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         transactions_out_query = make_query( | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |             db_session, | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |             EthereumTransaction.from_address, | 
					
						
							|  |  |  |             EthereumTransaction.hash, | 
					
						
							|  |  |  |             func.count, | 
					
						
							|  |  |  |             "transactions_out", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         transactions_out = transactions_out_query.all() | 
					
						
							|  |  |  |         results["transactions_out"] = [ | 
					
						
							| 
									
										
										
										
											2021-08-09 13:01:28 +00:00
										 |  |  |             {"address": row[0], "statistic": row[1]} for row in transactions_out | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         transactions_in_query = make_query( | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |             db_session, | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |             EthereumTransaction.to_address, | 
					
						
							|  |  |  |             EthereumTransaction.hash, | 
					
						
							|  |  |  |             func.count, | 
					
						
							|  |  |  |             "transactions_in", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         transactions_in = transactions_in_query.all() | 
					
						
							|  |  |  |         results["transactions_in"] = [ | 
					
						
							| 
									
										
										
										
											2021-08-09 13:01:28 +00:00
										 |  |  |             {"address": row[0], "statistic": row[1]} for row in transactions_in | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         value_out_query = make_query( | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |             db_session, | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |             EthereumTransaction.from_address, | 
					
						
							|  |  |  |             EthereumTransaction.value, | 
					
						
							|  |  |  |             func.sum, | 
					
						
							|  |  |  |             "value_out", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         value_out = value_out_query.all() | 
					
						
							|  |  |  |         results["value_out"] = [ | 
					
						
							| 
									
										
										
										
											2021-08-09 13:01:28 +00:00
										 |  |  |             {"address": row[0], "statistic": int(row[1])} for row in value_out | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         value_in_query = make_query( | 
					
						
							| 
									
										
										
										
											2021-08-24 14:13:20 +00:00
										 |  |  |             db_session, | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |             EthereumTransaction.to_address, | 
					
						
							|  |  |  |             EthereumTransaction.value, | 
					
						
							|  |  |  |             func.sum, | 
					
						
							|  |  |  |             "value_in", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         value_in = value_in_query.all() | 
					
						
							|  |  |  |         results["value_in"] = [ | 
					
						
							| 
									
										
										
										
											2021-08-09 13:01:28 +00:00
										 |  |  |             {"address": row[0], "statistic": int(row[1])} for row in value_in | 
					
						
							| 
									
										
										
										
											2021-08-09 12:55:10 +00:00
										 |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         if close_db_session: | 
					
						
							|  |  |  |             db_session.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return results |