Blockchain model dynamic chose for crawlers

pull/373/head
kompotkot 2021-11-09 13:46:33 +00:00
rodzic 526aa4635e
commit 381a4cef40
2 zmienionych plików z 84 dodań i 33 usunięć

Wyświetl plik

@ -1,6 +1,6 @@
import logging
from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor, wait
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
@ -15,6 +15,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Query, Session
from tqdm import tqdm
from web3 import HTTPProvider, IPCProvider, Web3
from web3.middleware import geth_poa_middleware
from web3.types import BlockData
from .data import AvailableBlockchainType, DateRange
@ -35,6 +36,8 @@ class BlockCrawlError(Exception):
def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_ETHEREUM_IPC_PATH
@ -42,26 +45,73 @@ def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] =
web3_uri = MOONSTREAM_POLYGON_IPC_PATH
else:
raise Exception("Wrong blockchain type provided for web3 URI")
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
# Inject --dev middleware if it is not Ethereum mainnet
# Docs: https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority
if blockchain_type != AvailableBlockchainType.ETHEREUM:
web3_client.middleware_onion.inject(geth_poa_middleware, layer=0)
return web3_client
def add_block(db_session, block: Any) -> None:
def get_block_model_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumBlock, PolygonBlock]]:
"""
Depends on provided blockchain type: Ethereum or Polygon,
set proper blocks model: EthereumBlock or PolygonBlock.
"""
block_model: Type[Union[EthereumBlock, PolygonBlock]]
if blockchain_type == AvailableBlockchainType.ETHEREUM:
block_model = EthereumBlock
elif blockchain_type == AvailableBlockchainType.POLYGON:
block_model = PolygonBlock
else:
raise Exception("Unsupported blockchain type provided")
return block_model
def get_transaction_model_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumTransaction, PolygonTransaction]]:
"""
Depends on provided blockchain type: Ethereum or Polygon,
set proper block transactions model: EthereumTransaction or PolygonTransaction.
"""
transaction_model: Type[Union[EthereumTransaction, PolygonTransaction]]
if blockchain_type == AvailableBlockchainType.ETHEREUM:
transaction_model = EthereumTransaction
elif blockchain_type == AvailableBlockchainType.POLYGON:
transaction_model = PolygonTransaction
else:
raise Exception("Unsupported blockchain type provided")
return transaction_model
def add_block(db_session, block: Any, blockchain_type: AvailableBlockchainType) -> None:
"""
Add block if doesn't presented in database.
block: web3.types.BlockData
Polygon notes:
- BlockData.extraData doesn't exist
"""
block_obj = EthereumBlock(
block_model = get_block_model_model(blockchain_type)
block_obj = block_model(
block_number=block.number,
difficulty=block.difficulty,
extra_data=block.extraData.hex(),
extra_data=block.get("extraData").hex()
if block.get("extraData", None) is not None
else None,
gas_limit=block.gasLimit,
gas_used=block.gasUsed,
base_fee_per_gas=block.get("baseFeePerGas", None),
@ -81,14 +131,17 @@ def add_block(db_session, block: Any) -> None:
db_session.add(block_obj)
def add_block_transactions(db_session, block: Any) -> None:
def add_block_transactions(
db_session, block: Any, blockchain_type: AvailableBlockchainType
) -> None:
"""
Add block transactions.
block: web3.types.BlockData
"""
transaction_model = get_transaction_model_model(blockchain_type)
for tx in block.transactions:
tx_obj = EthereumTransaction(
tx_obj = transaction_model(
hash=tx.hash.hex(),
block_number=block.number,
from_address=tx["from"],
@ -120,10 +173,11 @@ def get_latest_blocks(
if confirmations > 0:
latest_block_number -= confirmations
block_model = get_block_model_model(blockchain_type)
with yield_db_session_ctx() as db_session:
latest_stored_block_row = (
db_session.query(EthereumBlock.block_number)
.order_by(EthereumBlock.block_number.desc())
db_session.query(block_model.block_number)
.order_by(block_model.block_number.desc())
.first()
)
latest_stored_block_number = (
@ -152,10 +206,10 @@ def crawl_blocks(
block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions
)
add_block(db_session, block)
add_block(db_session, block, blockchain_type)
if with_transactions:
add_block_transactions(db_session, block)
add_block_transactions(db_session, block, blockchain_type)
db_session.commit()
except IntegrityError as err:
@ -191,27 +245,29 @@ def check_missing_blocks(
bottom_block = min(blocks_numbers[-1], blocks_numbers[0])
top_block = max(blocks_numbers[-1], blocks_numbers[0])
block_model = get_block_model_model(blockchain_type)
transaction_model = get_transaction_model_model(blockchain_type)
with yield_db_session_ctx() as db_session:
if notransactions:
blocks_exist_raw_query = (
db_session.query(EthereumBlock.block_number)
.filter(EthereumBlock.block_number >= bottom_block)
.filter(EthereumBlock.block_number <= top_block)
db_session.query(block_model.block_number)
.filter(block_model.block_number >= bottom_block)
.filter(block_model.block_number <= top_block)
)
blocks_exist = [[block[0]] for block in blocks_exist_raw_query.all()]
else:
corrupted_blocks = []
blocks_exist_raw_query = (
db_session.query(
EthereumBlock.block_number, func.count(EthereumTransaction.hash)
block_model.block_number, func.count(transaction_model.hash)
)
.join(
EthereumTransaction,
EthereumTransaction.block_number == EthereumBlock.block_number,
transaction_model,
transaction_model.block_number == block_model.block_number,
)
.filter(EthereumBlock.block_number >= bottom_block)
.filter(EthereumBlock.block_number <= top_block)
.group_by(EthereumBlock.block_number)
.filter(block_model.block_number >= bottom_block)
.filter(block_model.block_number <= top_block)
.group_by(block_model.block_number)
)
blocks_exist = [
[block[0], block[1]] for block in blocks_exist_raw_query.all()
@ -231,8 +287,8 @@ def check_missing_blocks(
corrupted_blocks.append(block_in_db[0])
# Delete existing corrupted block and add to missing list
del_block = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number == block_in_db[0])
db_session.query(block_model)
.filter(block_model.block_number == block_in_db[0])
.one()
)
db_session.delete(del_block)

Wyświetl plik

@ -117,7 +117,7 @@ def run_crawler_desc(
def handle_parser(args: argparse.Namespace):
with yield_db_session_ctx() as session:
w3 = connect(AvailableBlockchainType(args.blockchain))
w3 = connect(AvailableBlockchainType.ETHEREUM)
if args.order == "asc":
run_crawler_asc(
w3=w3,
@ -185,11 +185,6 @@ def generate_parser():
default=3 * 60,
help="time to sleep synzhronize mode waiting for new block crawled to db",
)
parser.add_argument(
"--blockchain",
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser.set_defaults(func=handle_parser)
return parser