moonstream/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py

730 wiersze
26 KiB
Python
Czysty Zwykły widok Historia

2021-12-07 14:39:33 +00:00
import json
2021-12-09 12:56:38 +00:00
import logging
2021-12-14 15:39:04 +00:00
import re
2021-12-09 12:56:38 +00:00
import time
from dataclasses import dataclass
2021-12-09 17:05:21 +00:00
from datetime import datetime
from enum import Enum
2024-01-31 10:09:04 +00:00
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
2022-06-16 12:53:19 +00:00
from uuid import UUID
2021-12-07 14:39:33 +00:00
2024-01-31 10:09:04 +00:00
from bugout.data import BugoutJournalEntries, BugoutSearchResult
2021-12-09 12:56:38 +00:00
from eth_typing.evm import ChecksumAddress
2022-08-10 16:55:49 +00:00
from moonstreamdb.blockchain import AvailableBlockchainType
2023-05-30 14:10:03 +00:00
from moonworm.deployment import find_deployment_block # type: ignore
2024-01-31 10:09:04 +00:00
from web3.main import Web3
2021-12-09 12:56:38 +00:00
2022-06-16 12:53:19 +00:00
from ..blockchain import connect
2021-12-14 15:39:04 +00:00
from ..reporter import reporter
2021-12-07 14:39:33 +00:00
from ..settings import (
2022-08-17 14:25:00 +00:00
BUGOUT_REQUEST_TIMEOUT_SECONDS,
2024-01-31 10:09:04 +00:00
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
2021-12-07 14:39:33 +00:00
MOONSTREAM_ADMIN_ACCESS_TOKEN,
2021-12-09 12:56:38 +00:00
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
bugout_client,
2021-12-07 14:39:33 +00:00
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
2021-12-09 12:56:38 +00:00
class SubscriptionTypes(Enum):
2021-12-16 21:39:35 +00:00
POLYGON_BLOCKCHAIN = "polygon_smartcontract"
ETHEREUM_BLOCKCHAIN = "ethereum_smartcontract"
2022-08-10 16:55:49 +00:00
MUMBAI_BLOCKCHAIN = "mumbai_smartcontract"
2022-05-26 13:00:35 +00:00
XDAI_BLOCKCHAIN = "xdai_smartcontract"
2023-03-06 15:58:25 +00:00
WYRM_BLOCKCHAIN = "wyrm_smartcontract"
2023-07-12 12:31:53 +00:00
ZKSYNC_ERA_TESTNET_BLOCKCHAIN = "zksync_era_testnet_smartcontract"
2023-08-29 03:41:47 +00:00
ZKSYNC_ERA_BLOCKCHAIN = "zksync_era_smartcontract"
2024-01-31 10:09:04 +00:00
ARBITRUM_NOVA_BLOCKCHAIN = "arbitrum_nova_smartcontract"
2024-02-20 12:57:29 +00:00
ARBITRUM_SEPOLIA_BLOCKCHAIN = "arbitrum_sepolia_smartcontract"
2021-12-09 12:56:38 +00:00
2021-12-16 23:33:00 +00:00
def abi_input_signature(input_abi: Dict[str, Any]) -> str:
"""
Stringifies a function ABI input object according to the ABI specification:
https://docs.soliditylang.org/en/v0.5.3/abi-spec.html
"""
input_type = input_abi["type"]
if input_type.startswith("tuple"):
component_types = [
abi_input_signature(component) for component in input_abi["components"]
]
input_type = f"({','.join(component_types)}){input_type[len('tuple'):]}"
return input_type
def abi_function_signature(function_abi: Dict[str, Any]) -> str:
"""
Stringifies a function ABI according to the ABI specification:
https://docs.soliditylang.org/en/v0.5.3/abi-spec.html
"""
function_name = function_abi["name"]
function_arg_types = [
abi_input_signature(input_item) for input_item in function_abi["inputs"]
]
function_signature = f"{function_name}({','.join(function_arg_types)})"
return function_signature
def encode_function_signature(function_abi: Dict[str, Any]) -> Optional[str]:
"""
Encodes the given function (from ABI) with arguments arg_1, ..., arg_n into its 4 byte signature
by calculating:
keccak256("<function_name>(<arg_1_type>,...,<arg_n_type>")
If function_abi is not actually a function ABI (detected by checking if function_abi["type"] == "function),
returns None.
"""
if function_abi["type"] != "function":
return None
function_signature = abi_function_signature(function_abi)
encoded_signature = Web3.keccak(text=function_signature)[:4]
return encoded_signature.hex()
2021-12-14 15:39:04 +00:00
def _generate_reporter_callback(
crawler_type: str, blockchain_type: AvailableBlockchainType
) -> Callable[[Exception], None]:
def reporter_callback(error: Exception) -> None:
reporter.error_report(
error,
[
"moonworm",
"crawler",
"decode_error",
crawler_type,
blockchain_type.value,
],
)
return reporter_callback
2022-06-16 12:53:19 +00:00
def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
access_id: Optional[UUID] = None,
) -> Web3:
"""
Retry connecting to the blockchain.
"""
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type, access_id=access_id)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3
except Exception as e:
if retry_count == 0:
error = e
break
logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}")
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
)
2021-12-09 12:56:38 +00:00
def blockchain_type_to_subscription_type(
blockchain_type: AvailableBlockchainType,
) -> SubscriptionTypes:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
return SubscriptionTypes.ETHEREUM_BLOCKCHAIN
elif blockchain_type == AvailableBlockchainType.POLYGON:
return SubscriptionTypes.POLYGON_BLOCKCHAIN
2022-08-10 16:55:49 +00:00
elif blockchain_type == AvailableBlockchainType.MUMBAI:
return SubscriptionTypes.MUMBAI_BLOCKCHAIN
2022-05-26 13:00:35 +00:00
elif blockchain_type == AvailableBlockchainType.XDAI:
return SubscriptionTypes.XDAI_BLOCKCHAIN
2023-03-06 15:58:25 +00:00
elif blockchain_type == AvailableBlockchainType.WYRM:
return SubscriptionTypes.WYRM_BLOCKCHAIN
2023-07-12 12:31:53 +00:00
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA_TESTNET:
return SubscriptionTypes.ZKSYNC_ERA_TESTNET_BLOCKCHAIN
2023-08-29 03:41:47 +00:00
elif blockchain_type == AvailableBlockchainType.ZKSYNC_ERA:
return SubscriptionTypes.ZKSYNC_ERA_BLOCKCHAIN
2024-01-31 10:09:04 +00:00
elif blockchain_type == AvailableBlockchainType.ARBITRUM_NOVA:
return SubscriptionTypes.ARBITRUM_NOVA_BLOCKCHAIN
2024-02-20 12:57:29 +00:00
elif blockchain_type == AvailableBlockchainType.ARBITRUM_SEPOLIA:
return SubscriptionTypes.ARBITRUM_SEPOLIA_BLOCKCHAIN
2021-12-09 12:56:38 +00:00
else:
raise ValueError(f"Unknown blockchain type: {blockchain_type}")
@dataclass
class EventCrawlJob:
event_abi_hash: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
2023-05-25 13:06:33 +00:00
address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]]
2021-12-09 12:56:38 +00:00
created_at: int
2021-12-14 15:39:04 +00:00
@dataclass
class FunctionCallCrawlJob:
contract_abi: List[Dict[str, Any]]
contract_address: ChecksumAddress
2023-05-23 10:56:38 +00:00
entries_tags: Dict[UUID, List[str]]
2021-12-14 15:39:04 +00:00
created_at: int
2021-12-09 12:56:38 +00:00
def get_crawl_job_entries(
subscription_type: SubscriptionTypes,
crawler_type: str,
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
2022-11-09 19:36:30 +00:00
created_at_filter: Optional[int] = None,
2021-12-09 12:56:38 +00:00
limit: int = 200,
2023-05-11 14:20:34 +00:00
extend_tags: Optional[List[str]] = None,
2021-12-09 12:56:38 +00:00
) -> List[BugoutSearchResult]:
"""
Get all event ABIs from bugout journal
where tags are:
- #crawler_type:crawler_type (either event or function)
- #status:active
- #subscription_type:subscription_type (either polygon_blockchain or ethereum_blockchain)
"""
query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}"
2023-05-11 14:20:34 +00:00
if extend_tags is not None:
for tag in extend_tags:
2023-05-30 14:34:49 +00:00
query += f" {tag.rstrip()}"
2023-05-11 14:20:34 +00:00
2021-12-09 12:56:38 +00:00
if created_at_filter is not None:
# Filtering by created_at
# Filtering not by strictly greater than
# because theoretically we can miss some jobs
# (in the last query bugout didn't return all of by last created_at)
# On the other hand, we may have multiple same jobs that will be filtered out
#
2021-12-16 23:04:25 +00:00
query += f" created_at:>={created_at_filter}"
2021-12-09 12:56:38 +00:00
current_offset = 0
2023-08-01 08:58:36 +00:00
entries: List[BugoutSearchResult] = []
2021-12-09 12:56:38 +00:00
while True:
search_result = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=journal_id,
query=query,
offset=current_offset,
limit=limit,
2022-08-17 14:42:37 +00:00
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
2021-12-09 12:56:38 +00:00
)
2023-08-01 08:58:36 +00:00
search_results = cast(List[BugoutSearchResult], search_result.results)
entries.extend(search_results)
2021-12-09 12:56:38 +00:00
# if len(entries) >= search_result.total_results:
2023-08-01 08:58:36 +00:00
if len(search_results) == 0:
2021-12-09 12:56:38 +00:00
break
current_offset += limit
return entries
2023-05-11 14:20:34 +00:00
def find_all_deployed_blocks(
2023-05-25 13:06:33 +00:00
web3: Web3, addresses_set: List[ChecksumAddress]
2023-05-23 10:56:38 +00:00
) -> Dict[ChecksumAddress, int]:
2023-05-11 14:20:34 +00:00
"""
find all deployed blocks for given addresses
"""
2023-05-23 10:56:38 +00:00
all_deployed_blocks = {}
2023-05-11 14:20:34 +00:00
for address in addresses_set:
try:
code = web3.eth.getCode(address)
if code != "0x":
block = find_deployment_block(
web3_client=web3,
contract_address=address,
web3_interval=0.5,
)
if block is not None:
2023-05-23 10:56:38 +00:00
all_deployed_blocks[address] = block
2023-05-25 13:06:33 +00:00
if block is None:
2023-05-29 13:29:45 +00:00
logger.error(f"Failed to get deployment block for {address}")
2023-05-11 14:20:34 +00:00
except Exception as e:
logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks
2021-12-14 15:39:04 +00:00
def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
for entry_tag in entry.tags:
if entry_tag.startswith(tag):
return entry_tag.split(":")[1]
raise ValueError(f"Tag {tag} not found in {entry}")
2023-05-23 10:56:38 +00:00
def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]:
2021-12-09 12:56:38 +00:00
"""
Create EventCrawlJob objects from bugout entries.
"""
crawl_job_by_hash: Dict[str, EventCrawlJob] = {}
for entry in entries:
2021-12-09 17:05:21 +00:00
abi_hash = _get_tag(entry, "abi_method_hash")
2021-12-09 12:56:38 +00:00
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
2023-05-23 10:56:38 +00:00
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
2023-05-11 14:20:34 +00:00
2021-12-09 12:56:38 +00:00
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
2023-05-25 13:06:33 +00:00
existing_crawl_job.address_entries[contract_address] = {
2023-05-23 10:56:38 +00:00
entry_id: entry.tags
}
2021-12-09 12:56:38 +00:00
else:
abi = cast(str, entry.content)
new_crawl_job = EventCrawlJob(
event_abi_hash=abi_hash,
event_abi=json.loads(abi),
contracts=[contract_address],
2023-05-25 13:06:33 +00:00
address_entries={contract_address: {entry_id: entry.tags}},
2021-12-09 12:56:38 +00:00
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
return [crawl_job for crawl_job in crawl_job_by_hash.values()]
2021-12-14 15:39:04 +00:00
def make_function_call_crawl_jobs(
entries: List[BugoutSearchResult],
) -> List[FunctionCallCrawlJob]:
"""
Create FunctionCallCrawlJob objects from bugout entries.
"""
crawl_job_by_address: Dict[str, FunctionCallCrawlJob] = {}
2022-04-18 14:25:13 +00:00
method_signature_by_address: Dict[str, List[str]] = {}
2021-12-14 15:39:04 +00:00
for entry in entries:
2023-05-23 10:56:38 +00:00
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
2021-12-14 15:39:04 +00:00
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
2022-04-18 14:25:13 +00:00
abi = json.loads(cast(str, entry.content))
method_signature = encode_function_signature(abi)
2022-04-18 20:36:19 +00:00
if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")
2023-05-11 14:20:34 +00:00
2021-12-14 15:39:04 +00:00
if contract_address not in crawl_job_by_address:
crawl_job_by_address[contract_address] = FunctionCallCrawlJob(
2022-04-18 14:25:13 +00:00
contract_abi=[abi],
2021-12-14 15:39:04 +00:00
contract_address=contract_address,
2023-05-23 10:56:38 +00:00
entries_tags={entry_id: entry.tags},
2021-12-14 15:39:04 +00:00
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
2022-04-18 14:25:13 +00:00
method_signature_by_address[contract_address] = [method_signature]
2021-12-14 15:39:04 +00:00
else:
2022-04-18 14:25:13 +00:00
if method_signature not in method_signature_by_address[contract_address]:
crawl_job_by_address[contract_address].contract_abi.append(abi)
method_signature_by_address[contract_address].append(method_signature)
2023-05-23 10:56:38 +00:00
crawl_job_by_address[contract_address].entries_tags[
entry_id
] = entry.tags
2021-12-14 15:39:04 +00:00
return [crawl_job for crawl_job in crawl_job_by_address.values()]
2021-12-09 12:56:38 +00:00
def merge_event_crawl_jobs(
old_crawl_jobs: List[EventCrawlJob], new_event_crawl_jobs: List[EventCrawlJob]
) -> List[EventCrawlJob]:
"""
Merge new event crawl jobs with old ones.
If there is a new event crawl job with the same event_abi_hash
then we will merge the contracts to one job.
Othervise new job will be created
Important:
old_crawl_jobs will be modified
Returns:
Merged list of event crawl jobs
"""
for new_crawl_job in new_event_crawl_jobs:
for old_crawl_job in old_crawl_jobs:
if new_crawl_job.event_abi_hash == old_crawl_job.event_abi_hash:
2021-12-16 23:33:00 +00:00
old_crawl_job.contracts.extend(
[
contract
for contract in new_crawl_job.contracts
if contract not in old_crawl_job.contracts
]
)
2023-06-12 20:43:15 +00:00
for contract_address, entries in new_crawl_job.address_entries.items():
if contract_address in old_crawl_job.address_entries:
old_crawl_job.address_entries[contract_address].update(entries)
else:
old_crawl_job.address_entries[contract_address] = entries
2021-12-09 12:56:38 +00:00
break
else:
old_crawl_jobs.append(new_crawl_job)
return old_crawl_jobs
2021-12-14 15:39:04 +00:00
def merge_function_call_crawl_jobs(
old_crawl_jobs: List[FunctionCallCrawlJob],
new_function_call_crawl_jobs: List[FunctionCallCrawlJob],
) -> List[FunctionCallCrawlJob]:
"""
Merge new function call crawl jobs with old ones.
If there is a new function call crawl job with the same contract_address
then we will merge the contracts to one job.
Othervise new job will be created
Important:
old_crawl_jobs will be modified
Returns:
Merged list of function call crawl jobs
2023-06-12 20:43:15 +00:00
2021-12-14 15:39:04 +00:00
"""
for new_crawl_job in new_function_call_crawl_jobs:
for old_crawl_job in old_crawl_jobs:
if new_crawl_job.contract_address == old_crawl_job.contract_address:
2021-12-16 23:33:00 +00:00
old_selectors = [
encode_function_signature(function_abi)
for function_abi in old_crawl_job.contract_abi
]
old_crawl_job.contract_abi.extend(
[
function_abi
for function_abi in new_crawl_job.contract_abi
if encode_function_signature(function_abi) not in old_selectors
]
)
2021-12-14 15:39:04 +00:00
break
else:
old_crawl_jobs.append(new_crawl_job)
return old_crawl_jobs
2021-12-09 12:56:38 +00:00
def _get_heartbeat_entry_id(
crawler_type: str, blockchain_type: AvailableBlockchainType
) -> str:
2021-12-07 14:39:33 +00:00
entries = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
2021-12-09 12:56:38 +00:00
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
2021-12-09 17:05:21 +00:00
query=f"#{crawler_type} #heartbeat #{blockchain_type.value} !#dead",
2021-12-07 14:39:33 +00:00
limit=1,
2022-08-17 14:42:37 +00:00
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
2021-12-07 14:39:33 +00:00
)
2023-08-01 08:58:36 +00:00
search_results = cast(List[BugoutSearchResult], entries.results)
if search_results:
return search_results[0].entry_url.split("/")[-1]
2021-12-07 14:39:33 +00:00
else:
logger.info(f"No {crawler_type} heartbeat entry found, creating one")
entry = bugout_client.create_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
2021-12-09 12:56:38 +00:00
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
title=f"{crawler_type} Heartbeat - {blockchain_type.value}",
tags=[crawler_type, "heartbeat", blockchain_type.value],
2021-12-07 14:39:33 +00:00
content="",
2022-08-17 14:42:37 +00:00
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
2021-12-07 14:39:33 +00:00
)
return str(entry.id)
2021-12-09 12:56:38 +00:00
def heartbeat(
crawler_type: str,
blockchain_type: AvailableBlockchainType,
crawler_status: Dict[str, Any],
2021-12-09 17:05:21 +00:00
is_dead: bool = False,
2021-12-09 12:56:38 +00:00
) -> None:
2021-12-07 14:39:33 +00:00
"""
Periodically crawler will update the status in bugout entry:
- Started at timestamp
- Started at block number
- Status: Running/Dead
- Last crawled block number
- Number of current jobs
- Time taken to crawl last crawl_step and speed per block
and other information later will be added.
"""
2021-12-09 12:56:38 +00:00
heartbeat_entry_id = _get_heartbeat_entry_id(crawler_type, blockchain_type)
2021-12-07 14:39:33 +00:00
bugout_client.update_entry_content(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
2021-12-09 12:56:38 +00:00
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
2021-12-07 14:39:33 +00:00
entry_id=heartbeat_entry_id,
title=f"{crawler_type} Heartbeat - {blockchain_type.value}. Status: {crawler_status['status']} - {crawler_status['current_time']}",
2021-12-07 14:39:33 +00:00
content=f"{json.dumps(crawler_status, indent=2)}",
2022-08-17 14:42:37 +00:00
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
2021-12-07 14:39:33 +00:00
)
2021-12-09 17:05:21 +00:00
if is_dead:
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=heartbeat_entry_id,
tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"],
2022-08-17 14:42:37 +00:00
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
2021-12-09 17:05:21 +00:00
)
2023-05-23 10:56:38 +00:00
def bugout_state_update(
entries_tags_add: List[Dict[str, Any]],
entries_tags_delete: List[Dict[str, Any]],
2023-05-25 13:06:33 +00:00
) -> BugoutJournalEntries:
"""
Run update of entries tags in bugout
2023-05-29 13:29:45 +00:00
First add tags to entries
Second delete tags from entries
With condition that if first step failed, second step will not be executed
2023-05-25 13:06:33 +00:00
"""
2023-05-23 10:56:38 +00:00
2023-05-29 13:29:45 +00:00
new_entreis_state = BugoutJournalEntries(entries=[])
2023-05-23 10:56:38 +00:00
2023-05-25 13:06:33 +00:00
if len(entries_tags_add) > 0:
2023-05-29 13:29:45 +00:00
try:
new_entreis_state = bugout_client.create_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_add,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to add tags to entries: {e}")
if len(entries_tags_delete) > 0 and (
len(entries_tags_add) < 0 or len(new_entreis_state.entries) > 0
):
try:
new_entreis_state = bugout_client.delete_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to delete tags from entries: {e}")
2023-05-25 13:06:33 +00:00
2023-05-23 10:56:38 +00:00
return new_entreis_state
2023-05-25 13:06:33 +00:00
def moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Apply jobs of moonworm as taked to process
"""
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_job_state_with_filters( # type: ignore
events=event_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=["moonworm_task_pickedup:True"],
tags_to_delete=["moonworm_task_pickedup:False"],
)
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_job_state_with_filters( # type: ignore
events=function_call_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:True"
],
tags_to_delete=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False"
],
)
return event_crawl_jobs, function_call_crawl_jobs
2023-05-23 10:56:38 +00:00
def update_job_tags(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
2023-05-25 13:06:33 +00:00
new_entreis_state: BugoutJournalEntries,
2023-05-23 10:56:38 +00:00
):
2023-05-25 13:06:33 +00:00
"""
Update tags of the jobs in job object
"""
entry_tags_by_id = {entry.id: entry.tags for entry in new_entreis_state.entries}
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.address_entries.items():
for entry_id in entries_ids.keys():
if entry_id in entry_tags_by_id:
2024-02-20 12:57:29 +00:00
event.address_entries[contract_address][entry_id] = (
entry_tags_by_id[entry_id]
)
2023-05-25 13:06:33 +00:00
if isinstance(event, FunctionCallCrawlJob):
for entry_id in event.entries_tags.keys():
if entry_id in entry_tags_by_id:
event.entries_tags[entry_id] = entry_tags_by_id[entry_id]
2023-05-23 10:56:38 +00:00
return events
def update_job_state_with_filters(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
address_filter: List[ChecksumAddress],
required_tags: List[str],
tags_to_add: List[str] = [],
tags_to_delete: List[str] = [],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Function that updates the state of the job in bugout.
"""
entries_ids_to_update: List[UUID] = []
### TODO: refactor this function
if len(tags_to_add) == 0 and len(tags_to_delete) == 0:
return events
for event in events:
2023-05-25 13:06:33 +00:00
# events
2023-05-23 10:56:38 +00:00
if isinstance(event, EventCrawlJob):
2023-05-25 13:06:33 +00:00
for contract_address, entries_ids in event.address_entries.items():
2023-05-23 10:56:38 +00:00
if address_filter and contract_address not in address_filter:
continue
for entry_id, tags in entries_ids.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
2023-05-25 13:06:33 +00:00
# functions
2023-05-23 10:56:38 +00:00
if isinstance(event, FunctionCallCrawlJob):
if address_filter and event.contract_address not in address_filter:
continue
for entry_id, tags in event.entries_tags.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
if len(entries_ids_to_update) == 0:
return events
new_entries_state = bugout_state_update(
entries_tags_add=[
2023-05-25 13:06:33 +00:00
{"entry_id": entry_id, "tags": tags_to_add}
2023-05-23 10:56:38 +00:00
for entry_id in entries_ids_to_update
],
entries_tags_delete=[
2023-05-25 13:06:33 +00:00
{"entry_id": entry_id, "tags": tags_to_delete}
2023-05-23 10:56:38 +00:00
for entry_id in entries_ids_to_update
],
)
events = update_job_tags(events, new_entries_state)
return events
2023-05-25 13:06:33 +00:00
def update_entries_status_and_progress(
2023-05-23 10:56:38 +00:00
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
progess_map: Dict[ChecksumAddress, float],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
2023-05-25 13:06:33 +00:00
Update entries status and progress in mooncrawl bugout journal
2023-05-23 10:56:38 +00:00
"""
2023-05-29 13:29:45 +00:00
entries_tags_delete: List[Dict[str, Any]] = []
2023-05-23 10:56:38 +00:00
2023-05-29 13:29:45 +00:00
entries_tags_add: List[Dict[str, Any]] = []
2023-05-23 10:56:38 +00:00
for event in events:
if isinstance(event, EventCrawlJob):
2023-05-25 13:06:33 +00:00
for contract_address, entries_ids in event.address_entries.items():
progress = round(progess_map.get(contract_address, 0), 4) * 100
2023-05-23 10:56:38 +00:00
2023-05-29 13:29:45 +00:00
(
entries_tags_delete,
entries_tags_add,
) = add_progress_to_tags(
entries=entries_ids,
contract_progress=progress,
entries_tags_delete=entries_tags_delete,
entries_tags_add=entries_tags_add,
)
2023-05-23 10:56:38 +00:00
if isinstance(event, FunctionCallCrawlJob):
2023-05-25 13:06:33 +00:00
progress = round(progess_map.get(event.contract_address, 0), 4) * 100
2023-05-23 10:56:38 +00:00
2023-05-29 13:29:45 +00:00
(
entries_tags_delete,
entries_tags_add,
) = add_progress_to_tags(
2023-06-12 19:42:20 +00:00
entries=event.entries_tags,
2023-05-29 13:29:45 +00:00
contract_progress=progress,
entries_tags_delete=entries_tags_delete,
entries_tags_add=entries_tags_add,
)
2023-05-23 10:56:38 +00:00
new_entries_state = bugout_state_update(
entries_tags_add=entries_tags_add,
entries_tags_delete=entries_tags_delete,
)
events = update_job_tags(events, new_entries_state)
return events
2023-05-29 13:29:45 +00:00
def add_progress_to_tags(
entries: Dict[UUID, List[str]],
contract_progress: float,
entries_tags_delete: List[Dict[str, Any]],
entries_tags_add: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Calculate progress and add finished tag if progress is 100
"""
new_progress = f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{contract_progress}"
for entry_id, tags in entries.items():
# progress
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
in tags
):
continue
if new_progress not in tags:
entries_tags_delete.append(
{
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
)
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [new_progress],
}
)
if contract_progress >= 100:
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
return entries_tags_delete, entries_tags_add