pull/789/head
Andrey 2023-05-25 16:06:33 +03:00
rodzic cf93f99fb1
commit bcc9897fb1
7 zmienionych plików z 161 dodań i 127 usunięć

Wyświetl plik

@ -41,7 +41,6 @@ from .settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_HISTORICAL_CRAWL_JOURNAL,
)
from .settings import bugout_client as bc, entity_client as ec
@ -519,6 +518,8 @@ def apply_moonworm_tasks(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
print(f"Found {len(entries)} tasks for address {address}")
# create historical crawl task in journal
# will use create_entries_pack for creating entries in journal
@ -548,27 +549,32 @@ def apply_moonworm_tasks(
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_method_hash:{hash}",
f"abi_selector:{Web3.keccak(abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}",
f"abi_selector:{Web3.keccak(text=abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"status:active",
f"task_type:moonworm",
f"moonworm_task_pikedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)
f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)
f"historical_crawl_status:pending", # pending, in_progress, done
f"progress:0", # 0-100 %
],
}
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
reporter.error_report(e)
if len(moonworm_abi_tasks_entries_pack) > 0:
try:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=moonworm_abi_tasks_entries_pack,
timeout=25,
)
except Exception as e:
logger.error(f"Error create moonworm tasks: {str(e)}")
reporter.error_report(e)
def name_normalization(query_name: str) -> str:

Wyświetl plik

@ -102,14 +102,14 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)
# Historical crawl journal
MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get(
"MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", ""
)
if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "":
raise ValueError(
"MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set"
)
# # Historical crawl journal
# MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get(
# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", ""
# )
# if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "":
# raise ValueError(
# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set"
# )
# Web3

Wyświetl plik

@ -13,7 +13,7 @@ from typing import Iterator, List
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
import dateutil.parser
import dateutil.parser # type: ignore
from .blockchain import (
DateRange,

Wyświetl plik

@ -21,6 +21,7 @@ from .crawler import (
make_function_call_crawl_jobs,
find_all_deployed_blocks,
update_job_state_with_filters,
moonworm_crawler_update_job_as_pickedup,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -42,18 +43,6 @@ def handle_crawl(args: argparse.Namespace) -> None:
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
if len(initial_event_jobs) > 0:
initial_event_jobs = update_job_state_with_filters( # type: ignore
events=initial_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
initial_function_call_jobs = make_function_call_crawl_jobs(
get_crawl_job_entries(
subscription_type,
@ -65,16 +54,12 @@ def handle_crawl(args: argparse.Namespace) -> None:
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
if len(initial_function_call_jobs) > 0:
initial_event_jobs = update_job_state_with_filters( # type: ignore
events=initial_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
(
initial_event_jobs,
initial_function_call_jobs,
) = moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs=initial_event_jobs,
function_call_crawl_jobs=initial_function_call_jobs,
)
logger.info(f"Blockchain type: {blockchain_type.value}")
@ -164,7 +149,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
addresses_filter = []
extend_tags.extend(
[
"moonworm_task_pikedup:True",
"moonworm_task_pickedup:True",
"historical_crawl_status:pending",
]
)
@ -230,7 +215,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:True",
"moonworm_task_pickedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
@ -238,11 +223,11 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
if len(filtered_function_call_jobs) > 0:
filtered_function_call_jobs = update_job_state_with_filters( # type: ignore
function_calls=filtered_function_call_jobs,
events=filtered_function_call_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:True",
"moonworm_task_pickedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
@ -290,7 +275,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
start_block = web3.eth.blockNumber - 1
addresses_deployment_blocks = find_all_deployed_blocks(
blockchain_type, list(addresses_set)
web3, list(addresses_set)
)
end_block = min(addresses_deployment_blocks.values())

Wyświetl plik

@ -24,10 +24,15 @@ from .crawler import (
make_function_call_crawl_jobs,
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
from ..settings import (
HISTORICAL_CRAWLER_STATUSES,
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -220,28 +225,13 @@ def continuous_crawler(
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_job_state_with_filters( # type: ignore
events=event_crawl_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
)
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_job_state_with_filters( # type: ignore
events=function_call_crawl_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pikedup:False",
],
tags_to_add=["moonworm_task_pikedup:True"],
tags_to_delete=["moonworm_task_pikedup:False"],
(
event_crawl_jobs,
function_call_crawl_jobs,
) = moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs=event_crawl_jobs,
function_call_crawl_jobs=function_call_crawl_jobs,
)
jobs_refetchet_time = current_time

Wyświetl plik

@ -5,10 +5,10 @@ import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, cast, Union
from typing import Any, Callable, Dict, List, Optional, cast, Union, Tuple
from uuid import UUID
from bugout.data import BugoutSearchResult
from bugout.data import BugoutSearchResult, BugoutJournalEntries
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from web3.main import Web3
@ -148,7 +148,7 @@ class EventCrawlJob:
event_abi_hash: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
entries_ids: Dict[ChecksumAddress, Dict[UUID, List[str]]]
address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]]
created_at: int
@ -212,13 +212,12 @@ def get_crawl_job_entries(
def find_all_deployed_blocks(
blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress]
web3: Web3, addresses_set: List[ChecksumAddress]
) -> Dict[ChecksumAddress, int]:
"""
find all deployed blocks for given addresses
"""
web3 = _retry_connect_web3(blockchain_type)
all_deployed_blocks = {}
for address in addresses_set:
try:
@ -231,6 +230,10 @@ def find_all_deployed_blocks(
)
if block is not None:
all_deployed_blocks[address] = block
if block is None:
logger.warning(
f"Failed to find deployment block for {address}, code: {code}"
)
except Exception as e:
logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks
@ -260,7 +263,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
existing_crawl_job.entries_ids[contract_address] = {
existing_crawl_job.address_entries[contract_address] = {
entry_id: entry.tags
}
@ -270,7 +273,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
event_abi_hash=abi_hash,
event_abi=json.loads(abi),
contracts=[contract_address],
entries_ids={contract_address: {entry_id: entry.tags}},
address_entries={contract_address: {entry_id: entry.tags}},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
@ -445,38 +448,92 @@ def heartbeat(
def bugout_state_update(
entries_tags_add: List[Dict[str, Any]],
entries_tags_delete: List[Dict[str, Any]],
) -> Any:
if len(entries_tags_add) > 0:
new_entreis_state = bugout_client.update_entries_tags( # type: ignore
entries_tags=entries_tags_add,
) -> BugoutJournalEntries:
"""
Run update of entries tags in bugout
First delete tags, then add tags
"""
if len(entries_tags_delete) > 0:
new_entreis_state = bugout_client.delete_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if len(entries_tags_delete) > 0:
new_entreis_state = bugout_client.delete_entries_tags( # type: ignore
entries_tags=entries_tags_delete,
if len(entries_tags_add) > 0:
new_entreis_state = bugout_client.create_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_add,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
return new_entreis_state
def moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Apply jobs of moonworm as taked to process
"""
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_job_state_with_filters( # type: ignore
events=event_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=["moonworm_task_pickedup:True"],
tags_to_delete=["moonworm_task_pickedup:False"],
)
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_job_state_with_filters( # type: ignore
events=function_call_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:True"
],
tags_to_delete=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False"
],
)
return event_crawl_jobs, function_call_crawl_jobs
def update_job_tags(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
new_entreis_state: Any,
new_entreis_state: BugoutJournalEntries,
):
for entry in new_entreis_state:
"""
Update tags of the jobs in job object
"""
entry_tags_by_id = {entry.id: entry.tags for entry in new_entreis_state.entries}
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
for entry_id, tags in entries_ids.items():
if entry_id == entry["journal_entry_id"]:
event.entries_ids[contract_address][entry_id] = tags
for contract_address, entries_ids in event.address_entries.items():
for entry_id in entries_ids.keys():
if entry_id in entry_tags_by_id:
event.address_entries[contract_address][
entry_id
] = entry_tags_by_id[entry_id]
if isinstance(event, FunctionCallCrawlJob):
for entry_id, tags in event.entries_tags.items():
if entry_id == entry["journal_entry_id"]:
event.entries_tags[entry_id] = tags
for entry_id in event.entries_tags.keys():
if entry_id in entry_tags_by_id:
event.entries_tags[entry_id] = entry_tags_by_id[entry_id]
return events
@ -500,20 +557,16 @@ def update_job_state_with_filters(
return events
for event in events:
# functions
# events
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
for contract_address, entries_ids in event.address_entries.items():
if address_filter and contract_address not in address_filter:
continue
for entry_id, tags in entries_ids.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
event.entries_ids[contract_address][entry_id].extend(
tags_to_add
)
# events
# functions
if isinstance(event, FunctionCallCrawlJob):
if address_filter and event.contract_address not in address_filter:
continue
@ -526,11 +579,11 @@ def update_job_state_with_filters(
new_entries_state = bugout_state_update(
entries_tags_add=[
{"journal_entry_id": entry_id, "tags": tags_to_add}
{"entry_id": entry_id, "tags": tags_to_add}
for entry_id in entries_ids_to_update
],
entries_tags_delete=[
{"journal_entry_id": entry_id, "tags": tags_to_delete}
{"entry_id": entry_id, "tags": tags_to_delete}
for entry_id in entries_ids_to_update
],
)
@ -540,12 +593,12 @@ def update_job_state_with_filters(
return events
def update_entries_status_and_proggress(
def update_entries_status_and_progress(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
progess_map: Dict[ChecksumAddress, float],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Update entries status and proggress in mooncrawl bugout journal
Update entries status and progress in mooncrawl bugout journal
"""
entries_tags_delete = []
@ -554,11 +607,11 @@ def update_entries_status_and_proggress(
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.entries_ids.items():
proggress = int(progess_map.get(contract_address, 0)) * 100
for contract_address, entries_ids in event.address_entries.items():
progress = round(progess_map.get(contract_address, 0), 4) * 100
for entry_id, tags in entries_ids.items():
# proggress
# progress
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
@ -568,12 +621,12 @@ def update_entries_status_and_proggress(
entries_tags_delete.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}"
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
@ -581,17 +634,17 @@ def update_entries_status_and_proggress(
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}"
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}"
],
}
)
if proggress >= 100:
if progress >= 100:
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
@ -599,7 +652,7 @@ def update_entries_status_and_proggress(
)
if isinstance(event, FunctionCallCrawlJob):
proggress = int(progess_map.get(event.contract_address, 0)) * 100
progress = round(progess_map.get(event.contract_address, 0), 4) * 100
for entry_id, tags in event.entries_tags.items():
if (
@ -608,15 +661,15 @@ def update_entries_status_and_proggress(
):
continue
# proggress
# progress
entries_tags_delete.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}"
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
@ -624,17 +677,17 @@ def update_entries_status_and_proggress(
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}"
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}"
],
}
)
if proggress >= 100:
if progress >= 100:
entries_tags_add.append(
{
"journal_entry_id": entry_id,
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],

Wyświetl plik

@ -16,7 +16,7 @@ from .crawler import (
EventCrawlJob,
FunctionCallCrawlJob,
_retry_connect_web3,
update_entries_status_and_proggress,
update_entries_status_and_progress,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events, _autoscale_crawl_events
@ -132,21 +132,21 @@ def historical_crawler(
if addresses_deployment_blocks:
for address, deployment_block in addresses_deployment_blocks.items():
current_position = end_block
current_position = batch_end_block
progess = original_start_block - current_position / (
progess = (original_start_block - current_position) / (
original_start_block - deployment_block
)
progess_map[address] = progess
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_entries_status_and_proggress( # type: ignore
function_call_crawl_jobs = update_entries_status_and_progress( # type: ignore
events=function_call_crawl_jobs,
progess_map=progess_map,
)
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_entries_status_and_proggress( # type: ignore
event_crawl_jobs = update_entries_status_and_progress( # type: ignore
events=event_crawl_jobs,
progess_map=progess_map,
)