kopia lustrzana https://github.com/bugout-dev/moonstream
working version of contract deploy crawler
rodzic
67fe019f10
commit
5ed9772852
|
@ -0,0 +1,177 @@
|
|||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from moonstreamdb.db import yield_db_session_ctx
|
||||
from sqlalchemy.orm.session import Session
|
||||
from web3 import Web3
|
||||
|
||||
from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore
|
||||
from ..ethereum import connect
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SLEEP_TIME = 5 * 60
|
||||
|
||||
|
||||
def runc_crawler_asc(
|
||||
w3: Web3,
|
||||
session: Session,
|
||||
from_block: Optional[int],
|
||||
to_block: Optional[int],
|
||||
synchronize: bool,
|
||||
batch_size: int,
|
||||
respect_state: bool,
|
||||
):
|
||||
"""
|
||||
Runs crawler in ascending order
|
||||
"""
|
||||
moonstream_data_store = MoonstreamDataStore(session)
|
||||
contract_deployment_crawler = ContractDeploymentCrawler(w3, moonstream_data_store)
|
||||
|
||||
if respect_state:
|
||||
from_block = moonstream_data_store.get_last_labeled_block_number() + 1
|
||||
logger.info(f"Respecting state, starting from block {from_block}")
|
||||
|
||||
if from_block is None:
|
||||
from_block = moonstream_data_store.get_first_block_number()
|
||||
logger.info(f"Starting block set to : {from_block}")
|
||||
|
||||
if to_block is None:
|
||||
to_block = moonstream_data_store.get_last_block_number()
|
||||
logger.info(f"Ending block set to : {to_block}")
|
||||
|
||||
assert from_block <= to_block, "from_block must be less than or equal to to_block"
|
||||
|
||||
logger.info(f"Starting crawling from block {from_block} to block {to_block}")
|
||||
contract_deployment_crawler.crawl(
|
||||
from_block=from_block,
|
||||
to_block=to_block,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
if synchronize:
|
||||
last_crawled_block = to_block
|
||||
while True:
|
||||
contract_deployment_crawler.crawl(
|
||||
from_block=last_crawled_block + 1,
|
||||
to_block=None,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
time.sleep(SLEEP_TIME)
|
||||
|
||||
|
||||
def runc_crawler_desc(
|
||||
w3: Web3,
|
||||
session: Session,
|
||||
from_block: Optional[int],
|
||||
to_block: Optional[int],
|
||||
synchronize: bool,
|
||||
batch_size: int,
|
||||
respect_state: bool,
|
||||
):
|
||||
"""
|
||||
Runs crawler in descending order
|
||||
"""
|
||||
moonstream_data_store = MoonstreamDataStore(session)
|
||||
contract_deployment_crawler = ContractDeploymentCrawler(w3, moonstream_data_store)
|
||||
|
||||
if respect_state:
|
||||
to_block = moonstream_data_store.get_first_block_number() - 1
|
||||
logger.info(f"Respecting state, ending at block {to_block}")
|
||||
|
||||
if from_block is None:
|
||||
from_block = moonstream_data_store.get_last_block_number()
|
||||
logger.info(f"Starting block set to : {from_block}")
|
||||
|
||||
if to_block is None:
|
||||
to_block = moonstream_data_store.get_first_block_number()
|
||||
logger.info(f"Ending block set to : {to_block}")
|
||||
|
||||
assert (
|
||||
from_block >= to_block
|
||||
), "from_block must be greater than or equal to to_block"
|
||||
|
||||
logger.info(f"Starting crawling from block {from_block} to block {to_block}")
|
||||
contract_deployment_crawler.crawl(
|
||||
from_block=from_block,
|
||||
to_block=to_block,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
if synchronize:
|
||||
# It will lead to holes if crawler shutted down not clearly and --respect-state will be problem,
|
||||
# since crawler's crawl step is implemented in asc order. Maybe later we can implement desc order
|
||||
raise ValueError("Synchronize not implemented for descending order")
|
||||
|
||||
|
||||
def handle_parser(args: argparse.Namespace):
|
||||
with yield_db_session_ctx() as session:
|
||||
w3 = connect()
|
||||
if args.order == "asc":
|
||||
runc_crawler_asc(
|
||||
w3=w3,
|
||||
session=session,
|
||||
from_block=args.start,
|
||||
to_block=args.to,
|
||||
synchronize=args.synchronize,
|
||||
batch_size=args.batch,
|
||||
respect_state=args.respect_state,
|
||||
)
|
||||
elif args.order == "desc":
|
||||
runc_crawler_desc(
|
||||
w3=w3,
|
||||
session=session,
|
||||
from_block=args.start,
|
||||
to_block=args.to,
|
||||
synchronize=args.synchronize,
|
||||
batch_size=args.batch,
|
||||
respect_state=args.respect_state,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid order {args.order}")
|
||||
|
||||
|
||||
def generate_parser():
|
||||
"""
|
||||
--start, -s: block to start crawling from, default: minimum block from database
|
||||
--to, -t: block to stop crawling at, default: maximum block from database
|
||||
--order: order to crawl : (desc, asc) default: asc
|
||||
--synchronize: Continious crawling, default: False
|
||||
--batch, -b : batch size, default: 10
|
||||
--respect-state: If set to True:\n If order is asc: start=last_labeled_block+1\n If order is desc: start=first_labeled_block-1
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(description="Moonstream Deployment Crawler")
|
||||
parser.add_argument(
|
||||
"--start", "-s", type=int, default=None, help="block to start crawling from"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--to", "-t", type=int, default=None, help="block to stop crawling at"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--order", type=str, default="asc", help="order to crawl : (desc, asc)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--synchronize", action="store_true", default=False, help="Continious crawling"
|
||||
)
|
||||
parser.add_argument("--batch", "-b", type=int, default=10, help="batch size")
|
||||
parser.add_argument(
|
||||
"--respect-state",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="If set to True:\n If order is asc: start=last_labeled_block+1\n If order is desc: start=first_labeled_block-1",
|
||||
)
|
||||
parser.set_defaults(func=handle_parser)
|
||||
return parser
|
||||
|
||||
|
||||
def main():
|
||||
parser = generate_parser()
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,270 @@
|
|||
from dataclasses import dataclass
|
||||
import logging
|
||||
from typing import List, Optional, Tuple, cast
|
||||
from hexbytes import HexBytes
|
||||
|
||||
from moonstreamdb.models import EthereumBlock, EthereumTransaction, EthereumLabel
|
||||
from sqlalchemy.orm import Session, Query
|
||||
from web3 import Web3
|
||||
from web3.types import TxReceipt
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ContractDeployment:
|
||||
address: str
|
||||
block_number: int
|
||||
transaction_hash: str
|
||||
deployer_address: str
|
||||
block_timestamp: int
|
||||
gas_used: int
|
||||
gas_price: int
|
||||
transaction_fee: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class RawDeploymentTx:
|
||||
transaction_hash: str
|
||||
gas_price: int
|
||||
timestamp: int
|
||||
|
||||
|
||||
class MoonstreamDataStore:
|
||||
def __init__(self, db_session: Session) -> None:
|
||||
self.db_session = db_session
|
||||
self.label = "contract_deployment"
|
||||
|
||||
def get_last_labeled_block_number(
|
||||
self,
|
||||
) -> int:
|
||||
"""
|
||||
Returns the last block number that has been labeled.
|
||||
"""
|
||||
|
||||
last_block = (
|
||||
self.db_session.query(EthereumLabel)
|
||||
.filter(EthereumLabel.label == self.label)
|
||||
.order_by(EthereumLabel.block_number.desc())
|
||||
.first()
|
||||
)
|
||||
if last_block is None:
|
||||
return 0
|
||||
else:
|
||||
return last_block.block_number
|
||||
|
||||
def get_first_labeled_block_number(self) -> int:
|
||||
"""
|
||||
Returns the first block number that has been labeled.
|
||||
"""
|
||||
first_block = (
|
||||
self.db_session.query(EthereumLabel)
|
||||
.filter(EthereumLabel.label == self.label)
|
||||
.order_by(EthereumLabel.block_number)
|
||||
.first()
|
||||
)
|
||||
if first_block is None:
|
||||
return 0
|
||||
return first_block.block_number
|
||||
|
||||
def get_last_block_number(self) -> int:
|
||||
"""
|
||||
Returns the last block number that has been processed.
|
||||
"""
|
||||
last_block = (
|
||||
self.db_session.query(EthereumBlock)
|
||||
.order_by(EthereumBlock.block_number.desc())
|
||||
.first()
|
||||
)
|
||||
if last_block is None:
|
||||
return 0
|
||||
return last_block.block_number
|
||||
|
||||
def get_first_block_number(self) -> int:
|
||||
"""
|
||||
Returns the first block number that has been processed.
|
||||
"""
|
||||
first_block = (
|
||||
self.db_session.query(EthereumBlock)
|
||||
.order_by(EthereumBlock.block_number)
|
||||
.first()
|
||||
)
|
||||
if first_block is None:
|
||||
return 0
|
||||
return first_block.block_number
|
||||
|
||||
def get_raw_contract_deployment_transactions(
|
||||
self, from_block: int, to_block: int
|
||||
) -> List[RawDeploymentTx]:
|
||||
"""
|
||||
Returns a list of raw contract deployment transactions.
|
||||
"""
|
||||
result = (
|
||||
self.db_session.query(
|
||||
EthereumTransaction.hash,
|
||||
EthereumTransaction.gas_price,
|
||||
EthereumBlock.timestamp,
|
||||
)
|
||||
.join(
|
||||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
.filter(EthereumBlock.block_number >= from_block)
|
||||
.filter(EthereumBlock.block_number <= to_block)
|
||||
.filter(EthereumTransaction.to_address == None)
|
||||
.all()
|
||||
)
|
||||
return [
|
||||
RawDeploymentTx(
|
||||
transaction_hash=row[0],
|
||||
gas_price=row[1],
|
||||
timestamp=row[2],
|
||||
)
|
||||
for row in result
|
||||
]
|
||||
|
||||
def save_contract_deployment_labels(
|
||||
self, contract_deployment_list: List[ContractDeployment]
|
||||
) -> None:
|
||||
"""
|
||||
Saves a list of contract deployment labels.
|
||||
"""
|
||||
transaction_hashes = [
|
||||
contract_deployment.transaction_hash
|
||||
for contract_deployment in contract_deployment_list
|
||||
]
|
||||
existing_labels = (
|
||||
self.db_session.query(EthereumLabel.transaction_hash)
|
||||
.filter(EthereumLabel.label == self.label)
|
||||
.filter(EthereumLabel.transaction_hash.in_(transaction_hashes))
|
||||
.all()
|
||||
)
|
||||
existing_labels_tx_hashes = [
|
||||
label_tx_hash[0] for label_tx_hash in existing_labels
|
||||
]
|
||||
new_labels = [
|
||||
EthereumLabel(
|
||||
transaction_hash=contract_deployment.transaction_hash,
|
||||
block_number=contract_deployment.block_number,
|
||||
block_timestamp=contract_deployment.block_timestamp,
|
||||
label=self.label,
|
||||
address=contract_deployment.address,
|
||||
label_data={
|
||||
"deployer": contract_deployment.deployer_address,
|
||||
"gasUsed": int(contract_deployment.gas_used),
|
||||
"gasPrice": int(contract_deployment.gas_price),
|
||||
"transactionFee": int(contract_deployment.transaction_fee),
|
||||
},
|
||||
)
|
||||
for contract_deployment in contract_deployment_list
|
||||
if contract_deployment.transaction_hash not in existing_labels_tx_hashes
|
||||
]
|
||||
if not new_labels:
|
||||
return
|
||||
try:
|
||||
logger.info(f"Saving {len(new_labels)} new contract deployment labels.")
|
||||
self.db_session.add_all(new_labels)
|
||||
self.db_session.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving contract deployment labels: {e}")
|
||||
self.db_session.rollback()
|
||||
|
||||
|
||||
def get_transaction_receipt(web3: Web3, tx_hash: str) -> TxReceipt:
|
||||
"""
|
||||
Returns the transaction receipt for the given transaction hash.
|
||||
"""
|
||||
|
||||
return web3.eth.get_transaction_receipt(cast(HexBytes, tx_hash))
|
||||
|
||||
|
||||
def get_contract_deployment_transactions(
|
||||
web3: Web3,
|
||||
datastore: MoonstreamDataStore,
|
||||
from_block: int,
|
||||
to_block: int,
|
||||
) -> List[ContractDeployment]:
|
||||
"""
|
||||
Returns a list of ContractDeployment objects for all contract deployment transactions in the given block range.
|
||||
"""
|
||||
logger.info(
|
||||
f"Getting contract deployment transactions from {from_block} to {to_block}"
|
||||
)
|
||||
contract_deployment_transactions = []
|
||||
for raw_deployment_tx in datastore.get_raw_contract_deployment_transactions(
|
||||
from_block, to_block
|
||||
):
|
||||
receipt = get_transaction_receipt(web3, raw_deployment_tx.transaction_hash)
|
||||
if receipt is None:
|
||||
continue
|
||||
|
||||
contract_deployment_transactions.append(
|
||||
ContractDeployment(
|
||||
address=cast(str, receipt["contractAddress"]),
|
||||
block_number=receipt["blockNumber"],
|
||||
transaction_hash=receipt["transactionHash"].hex(),
|
||||
deployer_address=receipt["from"],
|
||||
block_timestamp=raw_deployment_tx.timestamp,
|
||||
gas_used=receipt["gasUsed"],
|
||||
gas_price=raw_deployment_tx.gas_price,
|
||||
transaction_fee=receipt["gasUsed"] * raw_deployment_tx.gas_price,
|
||||
)
|
||||
)
|
||||
return contract_deployment_transactions
|
||||
|
||||
|
||||
class ContractDeploymentCrawler:
|
||||
"""
|
||||
Crawls contract deployments from MoonstreamDB transactions with the usage of web3
|
||||
to get transaction recipts
|
||||
"""
|
||||
|
||||
def __init__(self, web3: Web3, datastore: MoonstreamDataStore):
|
||||
self.web3 = web3
|
||||
self.datastore = datastore
|
||||
|
||||
def crawl(
|
||||
self, from_block: Optional[int], to_block: Optional[int], batch_size: int = 200
|
||||
) -> None:
|
||||
"""
|
||||
Crawls contract deployments in batches with the given batch size
|
||||
If from_block is None then the first block from datastore is used as start
|
||||
If to_block is None then the latest block from datastore is used
|
||||
"""
|
||||
if from_block is None:
|
||||
from_block = self.datastore.get_first_block_number()
|
||||
if to_block is None:
|
||||
to_block = self.datastore.get_last_block_number()
|
||||
|
||||
# Copilot is fucking awesome
|
||||
for batch_from_block, batch_to_block in self.get_batch_block_range(
|
||||
from_block, to_block, batch_size
|
||||
):
|
||||
contract_deployment_transactions = get_contract_deployment_transactions(
|
||||
self.web3, self.datastore, batch_from_block, batch_to_block
|
||||
)
|
||||
self.datastore.save_contract_deployment_labels(
|
||||
contract_deployment_transactions
|
||||
)
|
||||
|
||||
# Function Fully Generated by copilot, looks correct, lol
|
||||
def get_batch_block_range(
|
||||
self, from_block: int, to_block: int, batch_size: int
|
||||
) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Returns a list of block ranges with the given batch size
|
||||
"""
|
||||
if from_block > to_block:
|
||||
raise ValueError("from_block must be less than to_block")
|
||||
if batch_size < 1:
|
||||
raise ValueError("batch_size must be greater than 0")
|
||||
|
||||
block_ranges = []
|
||||
current_from_block = from_block
|
||||
current_to_block = current_from_block + batch_size - 1
|
||||
while current_to_block <= to_block:
|
||||
block_ranges.append((current_from_block, current_to_block))
|
||||
current_from_block = current_to_block + 1
|
||||
current_to_block = current_from_block + batch_size - 1
|
||||
return block_ranges
|
|
@ -12,7 +12,7 @@ chardet==4.0.0
|
|||
charset-normalizer==2.0.4
|
||||
click==8.0.1
|
||||
cytoolz==0.11.0
|
||||
-e git+https://git@github.com/bugout-dev/moonstream.git@0a771ddfbca1254be331149ccf2d162aa09b7bc0#egg=moonstreamdb&subdirectory=db
|
||||
-e git+https://git@github.com/bugout-dev/moonstream.git@67fe019f1086c435dd3b58f1ade2778acc2167c7#egg=moonstreamdb&subdirectory=db
|
||||
eth-abi==2.1.1
|
||||
eth-account==0.5.5
|
||||
eth-hash==0.3.2
|
||||
|
|
|
@ -51,6 +51,7 @@ setup(
|
|||
"identity=mooncrawl.identity:main",
|
||||
"etherscan=mooncrawl.etherscan:main",
|
||||
"nft=mooncrawl.nft.cli:main",
|
||||
"deploycrawler=mooncrawl.contract.cli:main",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
|
Ładowanie…
Reference in New Issue