pull/481/head
yhtiyar 2021-12-09 20:05:21 +03:00
rodzic 49d4a2cb27
commit cc5532370a
3 zmienionych plików z 134 dodań i 67 usunięć

Wyświetl plik

@ -1,10 +1,11 @@
import web3
from .crawler import *
from .event_crawler import continious_event_crawler
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client
from moonstreamdb.db import yield_db_session_ctx
from web3.middleware import geth_poa_middleware
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client
from .crawler import *
from .event_crawler import continuous_event_crawler
def crawl_events():
initial_jobs = make_event_crawl_jobs(
@ -23,7 +24,7 @@ def crawl_events():
)
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
continious_event_crawler(
continuous_event_crawler(
db_session=session,
blockchain_type=AvailableBlockchainType.POLYGON,
web3=web3,

Wyświetl plik

@ -1,12 +1,12 @@
from datetime import datetime
from enum import Enum
import json
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, cast
from bugout.data import BugoutSearchResult
from bugout.data import BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamdb.models import Base
from sqlalchemy.orm.session import Session
@ -136,7 +136,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
for entry in entries:
# TODO in entries there is misspelling of 'abi_method_hash'
abi_hash = _get_tag(entry, "abi_metod_hash")
abi_hash = _get_tag(entry, "abi_method_hash")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
@ -186,7 +186,7 @@ def _get_heartbeat_entry_id(
entries = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
query=f"#{crawler_type} #heartbeat #{blockchain_type.value}",
query=f"#{crawler_type} #heartbeat #{blockchain_type.value} !#dead",
limit=1,
)
if entries.results:
@ -207,6 +207,7 @@ def heartbeat(
crawler_type: str,
blockchain_type: AvailableBlockchainType,
crawler_status: Dict[str, Any],
is_dead: bool = False,
) -> None:
"""
Periodically crawler will update the status in bugout entry:
@ -227,6 +228,13 @@ def heartbeat(
title=f"{crawler_type} Heartbeat - {blockchain_type.value}",
content=f"{json.dumps(crawler_status, indent=2)}",
)
if is_dead:
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=heartbeat_entry_id,
tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"],
)
def save_labels(db_session: Session, labels: List[Base]) -> None:

Wyświetl plik

@ -1,11 +1,12 @@
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union, cast
from moonstreamdb.models import Base
from eth_typing.evm import ChecksumAddress
from moonworm.crawler.log_scanner import _fetch_events_chunk
from moonstreamdb.models import Base
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_
from web3 import Web3
@ -15,12 +16,12 @@ from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import (
EventCrawlJob,
blockchain_type_to_subscription_type,
get_crawl_job_entries,
heartbeat,
make_event_crawl_jobs,
save_labels,
blockchain_type_to_subscription_type,
merge_event_crawl_jobs,
save_labels,
)
logging.basicConfig(level=logging.INFO)
@ -176,8 +177,11 @@ def _get_max_created_at_of_jobs(jobs: List[EventCrawlJob]) -> int:
return max(job.created_at for job in jobs)
# FIx type
def continious_event_crawler(
def _date_to_str(date: datetime) -> str:
return date.strftime("%Y-%m-%d %H:%M:%S")
def continuous_event_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
@ -187,81 +191,135 @@ def continious_event_crawler(
min_blocks_batch: int = 10,
confirmations: int = 60,
min_sleep_time: float = 1,
heartbeat_interval: float = 60,
new_jobs_refetch_interval: float = 120,
):
assert min_blocks_batch < max_blocks_batch
assert (
min_blocks_batch < max_blocks_batch
), "min_blocks_batch must be less than max_blocks_batch"
assert min_blocks_batch > 0, "min_blocks_batch must be greater than 0"
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert confirmations > 0, "confirmations must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
assert heartbeat_interval > 0, "heartbeat_interval must be greater than 0"
assert (
new_jobs_refetch_interval > 0
), "new_jobs_refetch_interval must be greater than 0"
crawl_start_time = int(time.time())
crawl_start_time = datetime.utcnow()
jobs_refetchet_time = crawl_start_time
heartbeat_template = {
"status": "crawling",
"start_block": start_block,
"last_block": start_block,
"crawl_start_time": crawl_start_time,
"current_time": time.time(),
"crawl_start_time": _date_to_str(crawl_start_time),
"current_time": _date_to_str(crawl_start_time),
"current_jobs_length": len(crawl_jobs),
"jobs_last_refetched_at": _date_to_str(jobs_refetchet_time),
# "current_jobs": crawl_jobs,
}
logger.info(f"Starting continuous event crawler start_block={start_block}")
logger.info("Sending initial heartbeat")
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
last_heartbeat_time = datetime.utcnow()
blocks_cache: Dict[int, int] = {}
while True:
# query db with limit 1, to avoid session closing
db_session.execute("SELECT 1")
time.sleep(min_sleep_time)
try:
while True:
# query db with limit 1, to avoid session closing
db_session.execute("SELECT 1")
time.sleep(min_sleep_time)
end_block = min(
web3.eth.blockNumber - confirmations,
start_block + max_blocks_batch,
)
if start_block + min_blocks_batch > end_block:
min_sleep_time *= 2
continue
min_sleep_time /= 2
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=crawl_jobs,
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
if all_events:
save_labels(
db_session,
[_event_to_label(blockchain_type, event) for event in all_events],
end_block = min(
web3.eth.blockNumber - confirmations,
start_block + max_blocks_batch,
)
old_jobs_length = len(crawl_jobs)
if start_block + min_blocks_batch > end_block:
min_sleep_time *= 2
continue
min_sleep_time /= 2
new_entries = get_crawl_job_entries(
subscription_type=blockchain_type_to_subscription_type(blockchain_type),
crawler_type="event",
created_at_filter=_get_max_created_at_of_jobs(crawl_jobs),
)
new_jobs = make_event_crawl_jobs(new_entries)
crawl_jobs = merge_event_crawl_jobs(crawl_jobs, new_jobs)
logger.info(f"Crawling events from {start_block} to {end_block}")
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=crawl_jobs,
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
)
logger.info(f"Found {len(crawl_jobs) - old_jobs_length} new crawl jobs. ")
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
# Update heartbeat and send to humbug
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = time.time()
if all_events:
save_labels(
db_session,
[_event_to_label(blockchain_type, event) for event in all_events],
)
current_time = datetime.utcnow()
if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
logger.info("Looking for new jobs.")
old_jobs_length = len(crawl_jobs)
new_entries = get_crawl_job_entries(
subscription_type=blockchain_type_to_subscription_type(
blockchain_type
),
crawler_type="event",
created_at_filter=_get_max_created_at_of_jobs(crawl_jobs),
)
new_jobs = make_event_crawl_jobs(new_entries)
crawl_jobs = merge_event_crawl_jobs(crawl_jobs, new_jobs)
logger.info(
f"Found {len(crawl_jobs) - old_jobs_length} new crawl jobs. "
)
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Update heartbeat and send to humbug
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = current_time
heartbeat_template["current_jobs_length"] = len(crawl_jobs)
heartbeat_template["jobs_last_refetched_at"] = jobs_refetchet_time
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
logger.info("Sending heartbeat to humbug.", heartbeat_template)
last_heartbeat_time = datetime.utcnow()
start_block = end_block + 1
except BaseException as e:
logger.error(f"!!!!Crawler Died!!!!")
heartbeat_template["status"] = "dead"
heartbeat_template["current_time"] = _date_to_str(datetime.utcnow())
heartbeat_template["current_jobs_length"] = len(crawl_jobs)
# heartbeat_template["current_jobs"] = crawl_jobs
heartbeat_template["jobs_last_refetched_at"] = _date_to_str(jobs_refetchet_time)
heartbeat_template["die_reason"] = f"{e.__class__.__name__}: {e}"
heartbeat_template["last_block"] = end_block
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
start_block = end_block + 1
logger.exception(e)