pull/789/head
Andrey 2023-05-29 16:29:45 +03:00
rodzic 6f86ad6a02
commit 43e0367f17
2 zmienionych plików z 106 dodań i 98 usunięć

Wyświetl plik

@ -277,6 +277,10 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
addresses_deployment_blocks = find_all_deployed_blocks( addresses_deployment_blocks = find_all_deployed_blocks(
web3, list(addresses_set) web3, list(addresses_set)
) )
if len(addresses_deployment_blocks) == 0:
raise ValueError(
"No addresses found in the blockchain. Please check your addresses and try again"
)
end_block = min(addresses_deployment_blocks.values()) end_block = min(addresses_deployment_blocks.values())
if start_block is None: if start_block is None:

Wyświetl plik

@ -231,9 +231,7 @@ def find_all_deployed_blocks(
if block is not None: if block is not None:
all_deployed_blocks[address] = block all_deployed_blocks[address] = block
if block is None: if block is None:
logger.warning( logger.error(f"Failed to get deployment block for {address}")
f"Failed to find deployment block for {address}, code: {code}"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to get code for {address}: {e}") logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks return all_deployed_blocks
@ -451,24 +449,36 @@ def bugout_state_update(
) -> BugoutJournalEntries: ) -> BugoutJournalEntries:
""" """
Run update of entries tags in bugout Run update of entries tags in bugout
First delete tags, then add tags First add tags to entries
Second delete tags from entries
With condition that if first step failed, second step will not be executed
""" """
if len(entries_tags_delete) > 0: new_entreis_state = BugoutJournalEntries(entries=[])
new_entreis_state = bugout_client.delete_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if len(entries_tags_add) > 0: if len(entries_tags_add) > 0:
new_entreis_state = bugout_client.create_entries_tags( try:
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, new_entreis_state = bugout_client.create_entries_tags(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
entries_tags=entries_tags_add, journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, entries_tags=entries_tags_add,
) timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to add tags to entries: {e}")
if len(entries_tags_delete) > 0 and (
len(entries_tags_add) < 0 or len(new_entreis_state.entries) > 0
):
try:
new_entreis_state = bugout_client.delete_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to delete tags from entries: {e}")
return new_entreis_state return new_entreis_state
@ -601,98 +611,37 @@ def update_entries_status_and_progress(
Update entries status and progress in mooncrawl bugout journal Update entries status and progress in mooncrawl bugout journal
""" """
entries_tags_delete = [] entries_tags_delete: List[Dict[str, Any]] = []
entries_tags_add = [] entries_tags_add: List[Dict[str, Any]] = []
for event in events: for event in events:
if isinstance(event, EventCrawlJob): if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.address_entries.items(): for contract_address, entries_ids in event.address_entries.items():
progress = round(progess_map.get(contract_address, 0), 4) * 100 progress = round(progess_map.get(contract_address, 0), 4) * 100
for entry_id, tags in entries_ids.items(): (
# progress entries_tags_delete,
entries_tags_add,
if ( ) = add_progress_to_tags(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" entries=entries_ids,
in tags contract_progress=progress,
): entries_tags_delete=entries_tags_delete,
continue entries_tags_add=entries_tags_add,
)
entries_tags_delete.append(
{
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
)
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}"
],
}
)
if progress >= 100:
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
if isinstance(event, FunctionCallCrawlJob): if isinstance(event, FunctionCallCrawlJob):
progress = round(progess_map.get(event.contract_address, 0), 4) * 100 progress = round(progess_map.get(event.contract_address, 0), 4) * 100
for entry_id, tags in event.entries_tags.items(): (
if ( entries_tags_delete,
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" entries_tags_add,
in tags ) = add_progress_to_tags(
): entries=entries_ids,
continue contract_progress=progress,
entries_tags_delete=entries_tags_delete,
# progress entries_tags_add=entries_tags_add,
entries_tags_delete.append( )
{
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
)
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}"
],
}
)
if progress >= 100:
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
new_entries_state = bugout_state_update( new_entries_state = bugout_state_update(
entries_tags_add=entries_tags_add, entries_tags_add=entries_tags_add,
@ -702,3 +651,58 @@ def update_entries_status_and_progress(
events = update_job_tags(events, new_entries_state) events = update_job_tags(events, new_entries_state)
return events return events
def add_progress_to_tags(
entries: Dict[UUID, List[str]],
contract_progress: float,
entries_tags_delete: List[Dict[str, Any]],
entries_tags_add: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Calculate progress and add finished tag if progress is 100
"""
new_progress = f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{contract_progress}"
for entry_id, tags in entries.items():
# progress
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
in tags
):
continue
if new_progress not in tags:
entries_tags_delete.append(
{
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
)
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [new_progress],
}
)
if contract_progress >= 100:
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
return entries_tags_delete, entries_tags_add