kopia lustrzana https://github.com/bugout-dev/moonstream
working crawler
rodzic
8a7413500c
commit
c9ea6b85ff
|
@ -8,7 +8,7 @@ from web3.middleware import geth_poa_middleware
|
|||
|
||||
from ..blockchain import AvailableBlockchainType
|
||||
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client
|
||||
from .continuous_crawler import continuous_crawler, _retry_connect_web3
|
||||
from .continuous_crawler import _retry_connect_web3, continuous_crawler
|
||||
from .crawler import (
|
||||
SubscriptionTypes,
|
||||
get_crawl_job_entries,
|
||||
|
@ -42,15 +42,19 @@ def handle_crawl(args: argparse.Namespace) -> None:
|
|||
logger.info(
|
||||
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
|
||||
)
|
||||
print(initial_function_call_jobs)
|
||||
|
||||
# Couldn't figure out how to convert from string to AvailableBlockchainType
|
||||
# AvailableBlockchainType(args.blockchain_type) is not working
|
||||
blockchain_type = AvailableBlockchainType(args.blockchain_type)
|
||||
|
||||
logger.info(f"Blockchain type: {blockchain_type.value}")
|
||||
with yield_db_session_ctx() as db_session:
|
||||
web3: Optional[Web3] = None
|
||||
if args.web3 is None:
|
||||
logger.info(
|
||||
"No web3 provider URL provided, using default (blockchan.py: connect())"
|
||||
)
|
||||
web3 = _retry_connect_web3(args.blockchain_type)
|
||||
web3 = _retry_connect_web3(blockchain_type)
|
||||
else:
|
||||
logger.info(f"Using web3 provider URL: {args.web3}")
|
||||
web3 = Web3(
|
||||
|
@ -65,7 +69,7 @@ def handle_crawl(args: argparse.Namespace) -> None:
|
|||
last_labeled_block = get_last_labeled_block_number(db_session, blockchain_type)
|
||||
logger.info(f"Last labeled block: {last_labeled_block}")
|
||||
|
||||
start_block = args.start_block
|
||||
start_block = args.start
|
||||
if start_block is None:
|
||||
logger.info("No start block provided")
|
||||
if last_labeled_block is not None:
|
||||
|
|
|
@ -173,6 +173,7 @@ def continuous_crawler(
|
|||
last_heartbeat_time = datetime.utcnow()
|
||||
blocks_cache: Dict[int, int] = {}
|
||||
|
||||
failed_count = 0
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
|
@ -242,25 +243,38 @@ def continuous_crawler(
|
|||
if current_time - last_heartbeat_time > timedelta(
|
||||
seconds=heartbeat_interval
|
||||
):
|
||||
# Update heartbeat and send to humbug
|
||||
# Update heartbeat
|
||||
heartbeat_template["last_block"] = end_block
|
||||
heartbeat_template["current_time"] = current_time
|
||||
heartbeat_template["current_time"] = _date_to_str(current_time)
|
||||
heartbeat_template["current_event_jobs_length"] = len(
|
||||
event_crawl_jobs
|
||||
)
|
||||
heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time
|
||||
heartbeat_template["jobs_last_refetched_at"] = _date_to_str(
|
||||
jobs_refetchet_time
|
||||
)
|
||||
heartbeat_template["current_function_call_jobs_length"] = len(
|
||||
function_call_crawl_jobs
|
||||
)
|
||||
heartbeat_template[
|
||||
"function_call metrics"
|
||||
] = ethereum_state_provider.metrics
|
||||
heartbeat(
|
||||
crawler_type="event",
|
||||
blockchain_type=blockchain_type,
|
||||
crawler_status=heartbeat_template,
|
||||
)
|
||||
logger.info("Sending heartbeat to humbug.", heartbeat_template)
|
||||
logger.info("Sending heartbeat.", heartbeat_template)
|
||||
last_heartbeat_time = datetime.utcnow()
|
||||
|
||||
start_block = end_block + 1
|
||||
failed_count = 0
|
||||
except Exception as e:
|
||||
logger.error(f"Internal error: {e}")
|
||||
logger.exception(e)
|
||||
failed_count += 1
|
||||
if failed_count > 10:
|
||||
logger.error("Too many failures, exiting")
|
||||
raise e
|
||||
try:
|
||||
web3 = _retry_connect_web3(blockchain_type)
|
||||
except Exception as err:
|
||||
|
@ -292,6 +306,7 @@ def continuous_crawler(
|
|||
crawler_type="event",
|
||||
blockchain_type=blockchain_type,
|
||||
crawler_status=heartbeat_template,
|
||||
is_dead=True,
|
||||
)
|
||||
|
||||
logger.exception(e)
|
||||
|
|
|
@ -17,7 +17,6 @@ from mooncrawl.data import AvailableBlockchainType
|
|||
|
||||
from ..reporter import reporter
|
||||
from ..settings import (
|
||||
CRAWLER_LABEL,
|
||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
|
||||
bugout_client,
|
||||
|
@ -28,8 +27,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class SubscriptionTypes(Enum):
|
||||
POLYGON_BLOCKCHAIN = "polygon_blockchain"
|
||||
ETHEREUM_BLOCKCHAIN = "ethereum_blockchain"
|
||||
POLYGON_BLOCKCHAIN = "polygon_smartcontract"
|
||||
ETHEREUM_BLOCKCHAIN = "ethereum_smartcontract"
|
||||
|
||||
|
||||
def _generate_reporter_callback(
|
||||
|
|
Ładowanie…
Reference in New Issue