pull/481/head
yhtiyar 2021-12-16 22:43:32 +03:00
rodzic 6d87050067
commit 83eaeebe1a
7 zmienionych plików z 431 dodań i 189 usunięć

Wyświetl plik

@ -1,14 +1,26 @@
import web3 import logging
import argparse
from web3 import Web3
from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.db import yield_db_session_ctx
from web3.middleware import geth_poa_middleware from web3.middleware import geth_poa_middleware
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client
from .crawler import * from .crawler import (
from .event_crawler import continuous_event_crawler make_event_crawl_jobs,
from .function_call_crawler import function_call_crawler make_function_call_crawl_jobs,
get_crawl_job_entries,
SubscriptionTypes,
)
from ..blockchain import AvailableBlockchainType
from .continuous_crawler import continuous_crawler
def crawl_events(): logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_crawl(args: argparse.Namespace) -> None:
initial_event_jobs = make_event_crawl_jobs( initial_event_jobs = make_event_crawl_jobs(
get_crawl_job_entries( get_crawl_job_entries(
@ -32,28 +44,115 @@ def crawl_events():
print(initial_function_call_jobs) print(initial_function_call_jobs)
with yield_db_session_ctx() as db_session: with yield_db_session_ctx() as db_session:
web3 = Web3( web3 = None
Web3.HTTPProvider( if args.web3 is not None:
"https://polygon-mainnet.infura.io/v3/0492b7dd00bb4ad8a3346b3a0d780140" web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
) )
) if args.poa:
web3.middleware_onion.inject(geth_poa_middleware, layer=0) web3.middleware_onion.inject(geth_poa_middleware, layer=0)
function_call_crawler(
start_block = args.start_block
continuous_crawler(
db_session, db_session,
AvailableBlockchainType.POLYGON, args.blockchain_type,
web3, web3,
initial_event_jobs,
initial_function_call_jobs, initial_function_call_jobs,
start_block=21418707, start_block,
end_block=web3.eth.blockNumber, args.max_blocks_batch,
batch_size=100, args.min_blocks_batch,
args.confirmations,
args.min_sleep_time,
args.heartbeat_interval,
args.new_jobs_refetch_interval,
) )
# continuous_event_crawler(
# db_session=session,
# blockchain_type=AvailableBlockchainType.POLYGON,
# web3=web3,
# crawl_jobs=initial_event_jobs,
# start_block=web3.eth.blockNumber - 120000,
# )
crawl_events() def generate_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
crawl_parser = subparsers.add_parser("crawl")
crawl_parser.add_argument(
"--start",
"-s",
type=int,
default=None,
)
crawl_parser.add_argument(
"--blockchain-type",
"-b",
type=str,
choices=[
AvailableBlockchainType.ETHEREUM.value,
AvailableBlockchainType.POLYGON.value,
],
required=True,
)
crawl_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
crawl_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
crawl_parser.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=100,
help="Maximum number of blocks to crawl in a single batch",
)
crawl_parser.add_argument(
"--min-blocks-batch",
"-n",
type=int,
default=10,
help="Minimum number of blocks to crawl in a single batch",
)
crawl_parser.add_argument(
"--confirmations",
"-c",
type=int,
default=100,
help="Number of confirmations to wait for",
)
crawl_parser.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.01,
help="Minimum time to sleep between crawl step",
)
crawl_parser.add_argument(
"--heartbeat-interval",
"-i",
type=float,
default=60,
help="Heartbeat interval in seconds",
)
crawl_parser.add_argument(
"--new-jobs-refetch-interval",
"-r",
type=float,
default=120,
help="Time to wait before refetching new jobs",
)
crawl_parser.set_defaults(func=handle_crawl)
return parser

Wyświetl plik

@ -2,15 +2,16 @@ import logging
import time import time
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Tuple from typing import Dict, List, Optional, Tuple
from moonworm.crawler.moonstream_ethereum_state_provider import ( from moonworm.crawler.moonstream_ethereum_state_provider import (
MoonstreamEthereumStateProvider, MoonstreamEthereumStateProvider,
) )
from moonworm.crawler.networks import Network
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_ from sqlalchemy.sql.expression import and_
from web3 import Web3 from web3 import Web3
from ..blockchain import connect
from ..data import AvailableBlockchainType from ..data import AvailableBlockchainType
from .crawler import ( from .crawler import (
EventCrawlJob, EventCrawlJob,
@ -19,14 +20,14 @@ from .crawler import (
get_crawl_job_entries, get_crawl_job_entries,
heartbeat, heartbeat,
make_event_crawl_jobs, make_event_crawl_jobs,
merge_event_crawl_jobs,
save_labels,
merge_function_call_crawl_jobs,
make_function_call_crawl_jobs, make_function_call_crawl_jobs,
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
) )
from .event_crawler import _event_to_label, _crawl_events
from .function_call_crawler import _crawl_functions, _function_call_to_label from .event_crawler import _crawl_events
from moonworm.crawler.networks import Network from .function_call_crawler import _crawl_functions
from .db import save_events, save_function_calls
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -81,10 +82,37 @@ def _refetch_new_jobs(
return event_crawl_jobs, function_call_crawl_jobs return event_crawl_jobs, function_call_crawl_jobs
def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
) -> Web3:
"""
Retry connecting to the blockchain.
"""
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3
except Exception as e:
if retry_count == 0:
error = e
break
logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}")
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
)
def continuous_crawler( def continuous_crawler(
db_session: Session, db_session: Session,
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
web3: Web3, web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob], event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob], function_call_crawl_jobs: List[FunctionCallCrawlJob],
start_block: int, start_block: int,
@ -111,6 +139,8 @@ def continuous_crawler(
crawl_start_time = datetime.utcnow() crawl_start_time = datetime.utcnow()
jobs_refetchet_time = crawl_start_time jobs_refetchet_time = crawl_start_time
if web3 is None:
web3 = _retry_connect_web3(blockchain_type)
network = ( network = (
Network.ethereum Network.ethereum
@ -146,92 +176,99 @@ def continuous_crawler(
try: try:
while True: while True:
# query db with limit 1, to avoid session closing try:
db_session.execute("SELECT 1") # query db with limit 1, to avoid session closing
time.sleep(min_sleep_time) db_session.execute("SELECT 1")
time.sleep(min_sleep_time)
end_block = min( end_block = min(
web3.eth.blockNumber - confirmations, web3.eth.blockNumber - confirmations,
start_block + max_blocks_batch, start_block + max_blocks_batch,
)
if start_block + min_blocks_batch > end_block:
min_sleep_time *= 2
continue
min_sleep_time /= 2
logger.info(f"Crawling events from {start_block} to {end_block}")
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
if all_events:
save_labels(
db_session,
[_event_to_label(blockchain_type, event) for event in all_events],
) )
logger.info(f"Crawling function calls from {start_block} to {end_block}") if start_block + min_blocks_batch > end_block:
all_function_calls = _crawl_functions( min_sleep_time *= 2
blockchain_type, logger.info(
ethereum_state_provider, f"Sleeping for {min_sleep_time} seconds because of low block count"
function_call_crawl_jobs, )
start_block, continue
end_block, min_sleep_time = max(min_sleep_time, min_sleep_time / 2)
)
logger.info(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
)
if all_function_calls: logger.info(f"Crawling events from {start_block} to {end_block}")
save_labels( all_events = _crawl_events(
db_session, db_session=db_session,
[
_function_call_to_label(blockchain_type, function_call)
for function_call in all_function_calls
],
)
current_time = datetime.utcnow()
if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
logger.info(
f"Refetching new jobs from bugout journal since {jobs_refetchet_time}"
)
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
jobs_refetchet_time = current_time
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Update heartbeat and send to humbug
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = current_time
heartbeat_template["current_event_jobs_length"] = len(event_crawl_jobs)
heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type, blockchain_type=blockchain_type,
crawler_status=heartbeat_template, web3=web3,
jobs=event_crawl_jobs,
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
) )
logger.info("Sending heartbeat to humbug.", heartbeat_template)
last_heartbeat_time = datetime.utcnow()
start_block = end_block + 1 save_events(db_session, all_events, blockchain_type)
logger.info(
f"Crawling function calls from {start_block} to {end_block}"
)
all_function_calls = _crawl_functions(
blockchain_type,
ethereum_state_provider,
function_call_crawl_jobs,
start_block,
end_block,
)
logger.info(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
)
save_function_calls(db_session, all_function_calls, blockchain_type)
current_time = datetime.utcnow()
if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
logger.info(
f"Refetching new jobs from bugout journal since {jobs_refetchet_time}"
)
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
jobs_refetchet_time = current_time
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Update heartbeat and send to humbug
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = current_time
heartbeat_template["current_event_jobs_length"] = len(
event_crawl_jobs
)
heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
logger.info("Sending heartbeat to humbug.", heartbeat_template)
last_heartbeat_time = datetime.utcnow()
start_block = end_block + 1
except Exception as e:
logger.error(f"Internal error: {e}")
logger.exception(e)
try:
web3 = _retry_connect_web3(blockchain_type)
except Exception as err:
logger.error(f"Failed to reconnect: {err}")
logger.exception(err)
raise err
except BaseException as e: except BaseException as e:
logger.error(f"!!!!Crawler Died!!!!") logger.error(f"!!!!Crawler Died!!!!")
heartbeat_template["status"] = "dead" heartbeat_template["status"] = "dead"

Wyświetl plik

@ -315,29 +315,9 @@ def heartbeat(
) )
def get_last_labeled_block_number(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == CRAWLER_LABEL)
.order_by(label_model.block_number.desc())
.limit(1)
.one_or_none()
)
return block_number[0] if block_number else None
def save_labels(db_session: Session, labels: List[Base]) -> None:
"""
Save labels in the database.
"""
try:
db_session.add_all(labels)
db_session.commit()
except Exception as e:
logger.error(f"Failed to save labels: {e}")
db_session.rollback()
raise e

Wyświetl plik

@ -0,0 +1,172 @@
import logging
from typing import Any, Dict, List, Optional, Union
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import (
Base,
EthereumLabel,
EthereumTransaction,
PolygonLabel,
PolygonTransaction,
)
from moonworm.crawler.function_call_crawler import (
ContractFunctionCall,
)
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import label
from ..blockchain import connect, get_block_model, get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
from .event_crawler import Event
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "event",
"name": event.event_name,
"args": event.args,
},
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
return label
def _function_call_to_label(
blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "tx_call",
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
},
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
return label
def get_last_labeled_block_number(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == CRAWLER_LABEL)
.order_by(label_model.block_number.desc())
.limit(1)
.one_or_none()
)
return block_number[0] if block_number else None
def save_labels(db_session: Session, labels: List[Base]) -> None:
"""
Save labels in the database.
"""
try:
db_session.add_all(labels)
db_session.commit()
except Exception as e:
logger.error(f"Failed to save labels: {e}")
db_session.rollback()
raise e
def save_events(
db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType
) -> None:
label_model = get_label_model(blockchain_type)
events_hashes_to_save = [event.transaction_hash for event in events]
existing_labels = (
db_session.query(label_model.transaction_hash, label_model.log_index)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.log_index != None,
label_model.transaction_hash.in_(events_hashes_to_save),
)
.all()
)
existing_labels_transactions = []
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
for label in existing_labels:
if label[0] not in existing_labels_transactions:
existing_labels_transactions.append(label[0])
existing_log_index_by_tx_hash[label[0]] = []
existing_log_index_by_tx_hash[label[0]].append(label[1])
labels_to_save = []
for event in events:
if event.transaction_hash not in existing_labels_transactions:
labels_to_save.append(_event_to_label(blockchain_type, event))
elif (
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
):
labels_to_save.append(_event_to_label(blockchain_type, event))
save_labels(db_session, labels_to_save)
def save_function_calls(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.log_index == None,
label_model.transaction_hash.in_(transactions_hashes_to_save),
)
.all()
)
existing_labels_transactions = [label[0] for label in existing_labels]
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_labels_transactions
]
save_labels(db_session, labels_to_save)

Wyświetl plik

@ -1,12 +1,10 @@
import logging import logging
from dataclasses import dataclass
import traceback import traceback
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union, cast from typing import Any, Dict, List, Optional, Union, cast
from moonstreamdb.models import Base
from eth_typing.evm import ChecksumAddress from eth_typing.evm import ChecksumAddress
from moonstreamdb.models import Base
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_ from sqlalchemy.sql.expression import and_
@ -32,25 +30,7 @@ class Event:
log_index: int log_index: int
def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "event",
"name": event.event_name,
"args": event.args,
},
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
return label
def _get_block_timestamp_from_web3( def _get_block_timestamp_from_web3(

Wyświetl plik

@ -5,6 +5,7 @@ from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import ( from moonstreamdb.models import (
Base,
EthereumLabel, EthereumLabel,
EthereumTransaction, EthereumTransaction,
PolygonLabel, PolygonLabel,
@ -17,47 +18,20 @@ from moonworm.crawler.function_call_crawler import (
from moonworm.crawler.moonstream_ethereum_state_provider import ( from moonworm.crawler.moonstream_ethereum_state_provider import (
MoonstreamEthereumStateProvider, MoonstreamEthereumStateProvider,
) )
from moonworm.cu_watch import MockState
from moonworm.crawler.networks import Network from moonworm.crawler.networks import Network
from moonworm.cu_watch import MockState
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from web3 import Web3 from web3 import Web3
from moonstreamdb.models import Base
from ..data import AvailableBlockchainType
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
from ..blockchain import connect, get_block_model, get_label_model from ..blockchain import connect, get_block_model, get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL from ..settings import CRAWLER_LABEL
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _function_call_to_label(
blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "tx_call",
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
},
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
return label
def _crawl_functions( def _crawl_functions(
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
ethereum_state_provider: MoonstreamEthereumStateProvider, ethereum_state_provider: MoonstreamEthereumStateProvider,

Wyświetl plik

@ -23,7 +23,7 @@ DOCS_TARGET_PATH = "docs"
# Crawler label # Crawler label
CRAWLER_LABEL = "moonworm" CRAWLER_LABEL = "moonworm-alpha"
# Geth connection address # Geth connection address
MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get( MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get(