Add initial working state.

pull/789/head
Andrey 2023-05-23 13:56:38 +03:00
rodzic 07ad71fd9c
commit cf93f99fb1
5 zmienionych plików z 401 dodań i 48 usunięć

Wyświetl plik

@ -20,6 +20,7 @@ from .crawler import (
make_event_crawl_jobs,
make_function_call_crawl_jobs,
find_all_deployed_blocks,
update_job_state_with_filters,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -37,23 +38,45 @@ def handle_crawl(args: argparse.Namespace) -> None:
subscription_type,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
),
moonworm=True,
)
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
if len(initial_event_jobs) > 0:
initial_event_jobs = update_job_state_with_filters( # type: ignore
events=initial_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
initial_function_call_jobs = make_function_call_crawl_jobs(
get_crawl_job_entries(
subscription_type,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
),
moonworm=True,
)
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
if len(initial_function_call_jobs) > 0:
initial_event_jobs = update_job_state_with_filters( # type: ignore
events=initial_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
@ -143,7 +166,6 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
[
"moonworm_task_pikedup:True",
"historical_crawl_status:pending",
"progress:0",
]
)
@ -190,16 +212,46 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
if args.only_functions:
filtered_event_jobs = []
logger.info(
f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}"
)
if args.only_events and args.only_functions:
raise ValueError(
"--only-events and --only-functions cannot be set at the same time"
)
if args.tasks_journal:
if len(filtered_event_jobs) > 0:
filtered_event_jobs = update_job_state_with_filters( # type: ignore
events=filtered_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
)
if len(filtered_function_call_jobs) > 0:
filtered_function_call_jobs = update_job_state_with_filters( # type: ignore
function_calls=filtered_function_call_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
)
logger.info(
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
@ -224,14 +276,23 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
logger.info(f"Last labeled block: {last_labeled_block}")
if args.tasks_journal:
start_block = int(web3.eth.blockNumber) - 1
end_block = min(
find_all_deployed_blocks(blockchain_type, list(addresses_set))
addresses_deployment_blocks = None
# get set of addresses from event jobs and function call jobs
if args.find_deployed_blocks:
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
if args.start is None:
start_block = web3.eth.blockNumber - 1
addresses_deployment_blocks = find_all_deployed_blocks(
blockchain_type, list(addresses_set)
)
else:
start_block = args.start
end_block = args.end
end_block = min(addresses_deployment_blocks.values())
if start_block is None:
logger.info("No start block provided")
@ -276,6 +337,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
args.max_blocks_batch,
args.min_sleep_time,
access_id=args.access_id,
addresses_deployment_blocks=addresses_deployment_blocks,
)
@ -454,6 +516,18 @@ def main() -> None:
default=False,
help="Only crawl events",
)
historical_crawl_parser.add_argument(
"--only-functions",
action="store_true",
default=False,
help="Only crawl function calls",
)
historical_crawl_parser.add_argument(
"--find-deployed-blocks",
action="store_true",
default=False,
help="Find all deployed blocks",
)
historical_crawl_parser.add_argument(
"--tasks-journal",
action="store_true",

Wyświetl plik

@ -220,6 +220,30 @@ def continuous_crawler(
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_job_state_with_filters( # type: ignore
events=event_crawl_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_job_state_with_filters( # type: ignore
events=function_call_crawl_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
jobs_refetchet_time = current_time
if current_time - last_heartbeat_time > timedelta(

Wyświetl plik

@ -5,7 +5,7 @@ import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, cast
from typing import Any, Callable, Dict, List, Optional, cast, Union
from uuid import UUID
from bugout.data import BugoutSearchResult
@ -21,6 +21,8 @@ from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
bugout_client,
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
)
logging.basicConfig(level=logging.INFO)
@ -146,6 +148,7 @@ class EventCrawlJob:
event_abi_hash: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
entries_ids: Dict[ChecksumAddress, Dict[UUID, List[str]]]
created_at: int
@ -153,6 +156,7 @@ class EventCrawlJob:
class FunctionCallCrawlJob:
contract_abi: List[Dict[str, Any]]
contract_address: ChecksumAddress
entries_tags: Dict[UUID, List[str]]
created_at: int
@ -209,13 +213,13 @@ def get_crawl_job_entries(
def find_all_deployed_blocks(
blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress]
):
) -> Dict[ChecksumAddress, int]:
"""
find all deployed blocks for given addresses
"""
web3 = _retry_connect_web3(blockchain_type)
all_deployed_blocks = []
all_deployed_blocks = {}
for address in addresses_set:
try:
code = web3.eth.getCode(address)
@ -226,8 +230,7 @@ def find_all_deployed_blocks(
web3_interval=0.5,
)
if block is not None:
all_deployed_blocks.append(address)
all_deployed_blocks[address] = block
except Exception as e:
logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks
@ -240,9 +243,7 @@ def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
raise ValueError(f"Tag {tag} not found in {entry}")
def make_event_crawl_jobs(
entries: List[BugoutSearchResult], moonworm: bool = False
) -> List[EventCrawlJob]:
def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]:
"""
Create EventCrawlJob objects from bugout entries.
"""
@ -253,27 +254,23 @@ def make_event_crawl_jobs(
abi_hash = _get_tag(entry, "abi_method_hash")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
# if entry.tags not contain moonworm_task_pikedup:True
if "moonworm_task_pikedup:True" not in entry.tags and moonworm:
# Update the tag to pickedup
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=entry.entry_url.split("/")[-1],
tags=["moonworm_task_pikedup:True"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
existing_crawl_job.entries_ids[contract_address] = {
entry_id: entry.tags
}
else:
abi = cast(str, entry.content)
new_crawl_job = EventCrawlJob(
event_abi_hash=abi_hash,
event_abi=json.loads(abi),
contracts=[contract_address],
entries_ids={contract_address: {entry_id: entry.tags}},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
@ -283,7 +280,6 @@ def make_event_crawl_jobs(
def make_function_call_crawl_jobs(
entries: List[BugoutSearchResult],
moonworm: bool = False,
) -> List[FunctionCallCrawlJob]:
"""
Create FunctionCallCrawlJob objects from bugout entries.
@ -293,26 +289,18 @@ def make_function_call_crawl_jobs(
method_signature_by_address: Dict[str, List[str]] = {}
for entry in entries:
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
abi = json.loads(cast(str, entry.content))
method_signature = encode_function_signature(abi)
if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")
if "moonworm_task_pikedup:True" not in entry.tags and moonworm:
# Update the tag to pickedup
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=entry.entry_url.split("/")[-1],
tags=["moonworm_task_pikedup:True"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if contract_address not in crawl_job_by_address:
crawl_job_by_address[contract_address] = FunctionCallCrawlJob(
contract_abi=[abi],
contract_address=contract_address,
entries_tags={entry_id: entry.tags},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
method_signature_by_address[contract_address] = [method_signature]
@ -321,6 +309,9 @@ def make_function_call_crawl_jobs(
if method_signature not in method_signature_by_address[contract_address]:
crawl_job_by_address[contract_address].contract_abi.append(abi)
method_signature_by_address[contract_address].append(method_signature)
crawl_job_by_address[contract_address].entries_tags[
entry_id
] = entry.tags
return [crawl_job for crawl_job in crawl_job_by_address.values()]
@ -449,3 +440,212 @@ def heartbeat(
tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
def bugout_state_update(
entries_tags_add: List[Dict[str, Any]],
entries_tags_delete: List[Dict[str, Any]],
) -> Any:
if len(entries_tags_add) > 0:
new_entreis_state = bugout_client.update_entries_tags( # type: ignore
entries_tags=entries_tags_add,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if len(entries_tags_delete) > 0:
new_entreis_state = bugout_client.delete_entries_tags( # type: ignore
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
return new_entreis_state
def update_job_tags(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
new_entreis_state: Any,
):
for entry in new_entreis_state:
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
for entry_id, tags in entries_ids.items():
if entry_id == entry["journal_entry_id"]:
event.entries_ids[contract_address][entry_id] = tags
if isinstance(event, FunctionCallCrawlJob):
for entry_id, tags in event.entries_tags.items():
if entry_id == entry["journal_entry_id"]:
event.entries_tags[entry_id] = tags
return events
def update_job_state_with_filters(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
address_filter: List[ChecksumAddress],
required_tags: List[str],
tags_to_add: List[str] = [],
tags_to_delete: List[str] = [],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Function that updates the state of the job in bugout.
"""
entries_ids_to_update: List[UUID] = []
### TODO: refactor this function
if len(tags_to_add) == 0 and len(tags_to_delete) == 0:
return events
for event in events:
# functions
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
if address_filter and contract_address not in address_filter:
continue
for entry_id, tags in entries_ids.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
event.entries_ids[contract_address][entry_id].extend(
tags_to_add
)
# events
if isinstance(event, FunctionCallCrawlJob):
if address_filter and event.contract_address not in address_filter:
continue
for entry_id, tags in event.entries_tags.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
if len(entries_ids_to_update) == 0:
return events
new_entries_state = bugout_state_update(
entries_tags_add=[
{"journal_entry_id": entry_id, "tags": tags_to_add}
for entry_id in entries_ids_to_update
],
entries_tags_delete=[
{"journal_entry_id": entry_id, "tags": tags_to_delete}
for entry_id in entries_ids_to_update
],
)
events = update_job_tags(events, new_entries_state)
return events
def update_entries_status_and_proggress(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
progess_map: Dict[ChecksumAddress, float],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Update entries status and proggress in mooncrawl bugout journal
"""
entries_tags_delete = []
entries_tags_add = []
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
proggress = int(progess_map.get(contract_address, 0)) * 100
for entry_id, tags in entries_ids.items():
# proggress
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
in tags
):
continue
entries_tags_delete.append(
{
"journal_entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}"
)
],
}
)
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}"
],
}
)
if proggress >= 100:
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
if isinstance(event, FunctionCallCrawlJob):
proggress = int(progess_map.get(event.contract_address, 0)) * 100
for entry_id, tags in event.entries_tags.items():
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
in tags
):
continue
# proggress
entries_tags_delete.append(
{
"journal_entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}"
)
],
}
)
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}"
],
}
)
if proggress >= 100:
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
new_entries_state = bugout_state_update(
entries_tags_add=entries_tags_add,
entries_tags_delete=entries_tags_delete,
)
events = update_job_tags(events, new_entries_state)
return events

Wyświetl plik

@ -3,6 +3,7 @@ import time
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
@ -11,7 +12,12 @@ from moonworm.crawler.networks import Network # type: ignore
from sqlalchemy.orm.session import Session
from web3 import Web3
from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3
from .crawler import (
EventCrawlJob,
FunctionCallCrawlJob,
_retry_connect_web3,
update_entries_status_and_proggress,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events, _autoscale_crawl_events
from .function_call_crawler import _crawl_functions
@ -31,6 +37,7 @@ def historical_crawler(
max_blocks_batch: int = 100,
min_sleep_time: float = 0.1,
access_id: Optional[UUID] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -60,6 +67,10 @@ def historical_crawler(
blocks_cache: Dict[int, int] = {}
failed_count = 0
original_start_block = start_block
progess_map: Dict[ChecksumAddress, float] = {}
while start_block >= end_block:
try:
time.sleep(min_sleep_time)
@ -119,6 +130,27 @@ def historical_crawler(
db_session, all_function_calls, blockchain_type
)
if addresses_deployment_blocks:
for address, deployment_block in addresses_deployment_blocks.items():
current_position = end_block
progess = original_start_block - current_position / (
original_start_block - deployment_block
)
progess_map[address] = progess
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_entries_status_and_proggress( # type: ignore
events=function_call_crawl_jobs,
progess_map=progess_map,
)
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_entries_status_and_proggress( # type: ignore
events=event_crawl_jobs,
progess_map=progess_map,
)
# Commiting to db
commit_session(db_session)

Wyświetl plik

@ -279,3 +279,26 @@ infura_networks = {
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
# Historical crawler status config
HISTORICAL_CRAWLER_STATUSES = {
"pending": "pending",
"running": "running",
"finished": "finished",
}
# Historical crawler moonworm status config
HISTORICAL_CRAWLER_MOONWORM_STATUSES = {
"pickedup": True,
}
# Statuses tags prefixes
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = {
"moonworm_status": "moonworm_task_pickedup",
"historical_crawl_status": "historical_crawl_status",
"progress_status": "progress",
}