Merge pull request #1117 from moonstream-to/fix-HC-v3

Fix jobs merging.
pull/1121/head
Andrey Dolgolev 2024-07-31 12:25:57 +03:00 zatwierdzone przez GitHub
commit b682fb3d42
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
5 zmienionych plików z 26 dodań i 21 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
import json import json
from hexbytes import HexBytes
import logging import logging
import re import re
import time import time
@ -721,7 +722,7 @@ def get_event_crawl_job_records(
if len(addresses) != 0: if len(addresses) != 0:
query = query.filter( query = query.filter(
AbiJobs.address.in_([binascii.unhexlify(address) for address in addresses]) AbiJobs.address.in_([HexBytes(address) for address in addresses])
) )
crawl_job_records = query.all() crawl_job_records = query.all()
@ -731,25 +732,25 @@ def get_event_crawl_job_records(
for crawl_job_record in crawl_job_records: for crawl_job_record in crawl_job_records:
selector = str(crawl_job_record.abi_selector)
str_address = "0x" + crawl_job_record.address.hex() str_address = "0x" + crawl_job_record.address.hex()
checksummed_address = Web3.toChecksumAddress(str_address) checksummed_address = Web3.toChecksumAddress(str_address)
if crawl_job_record.abi_selector in existing_crawl_job_records: if selector in existing_crawl_job_records:
if ( if (
checksummed_address checksummed_address
not in existing_crawl_job_records[ not in existing_crawl_job_records[selector].contracts
crawl_job_record.abi_selector
].contracts
): ):
existing_crawl_job_records[ existing_crawl_job_records[selector].contracts.append(
crawl_job_record.abi_selector checksummed_address
].contracts.append(checksummed_address) )
else: else:
new_crawl_job = EventCrawlJob( new_crawl_job = EventCrawlJob(
event_abi_hash=str(crawl_job_record.abi_selector), event_abi_hash=selector,
event_abi=json.loads(str(crawl_job_record.abi)), event_abi=json.loads(str(crawl_job_record.abi)),
contracts=[checksummed_address], contracts=[checksummed_address],
address_entries={ address_entries={
@ -762,9 +763,7 @@ def get_event_crawl_job_records(
}, },
created_at=int(crawl_job_record.created_at.timestamp()), created_at=int(crawl_job_record.created_at.timestamp()),
) )
existing_crawl_job_records[str(crawl_job_record.abi_selector)] = ( existing_crawl_job_records[selector] = new_crawl_job
new_crawl_job
)
return existing_crawl_job_records return existing_crawl_job_records
@ -805,6 +804,8 @@ def get_function_call_crawl_job_records(
for crawl_job_record in crawl_job_records: for crawl_job_record in crawl_job_records:
str_address = "0x" + crawl_job_record.address.hex() str_address = "0x" + crawl_job_record.address.hex()
selector = str(crawl_job_record.abi_selector)
if str_address not in existing_crawl_job_records: if str_address not in existing_crawl_job_records:
existing_crawl_job_records[str_address] = FunctionCallCrawlJob( existing_crawl_job_records[str_address] = FunctionCallCrawlJob(
contract_abi=[json.loads(str(crawl_job_record.abi))], contract_abi=[json.loads(str(crawl_job_record.abi))],
@ -816,18 +817,18 @@ def get_function_call_crawl_job_records(
] ]
}, },
created_at=int(crawl_job_record.created_at.timestamp()), created_at=int(crawl_job_record.created_at.timestamp()),
existing_selectors=[str(crawl_job_record.abi_selector)], existing_selectors=[selector],
) )
else: else:
if ( if (
crawl_job_record.abi_selector selector
not in existing_crawl_job_records[str_address].existing_selectors not in existing_crawl_job_records[str_address].existing_selectors
): ):
existing_crawl_job_records[str_address].contract_abi.append( existing_crawl_job_records[str_address].contract_abi.append(
json.loads(str(crawl_job_record.abi)) json.loads(str(crawl_job_record.abi))
) )
existing_crawl_job_records[str_address].existing_selectors.append( existing_crawl_job_records[str_address].existing_selectors.append(
str(crawl_job_record.abi_selector) selector
) )
return existing_crawl_job_records return existing_crawl_job_records

Wyświetl plik

@ -145,12 +145,15 @@ def get_last_labeled_block_number(
def get_first_labeled_block_number( def get_first_labeled_block_number(
db_session: Session, db_session: Session,
blockchain_type: AvailableBlockchainType, blockchain_type: AvailableBlockchainType,
address: str, address: Union[str, HexBytes],
label_name: str = CRAWLER_LABEL, label_name: str = CRAWLER_LABEL,
only_events: bool = False, only_events: bool = False,
db_version: int = 2, db_version: int = 2,
) -> Optional[int]: ) -> Optional[int]:
if db_version == 3:
address = HexBytes(address)
label_model = get_label_model(blockchain_type, version=db_version) label_model = get_label_model(blockchain_type, version=db_version)
base_query = ( base_query = (

Wyświetl plik

@ -189,11 +189,12 @@ def _autoscale_crawl_events(
from_block=from_block, from_block=from_block,
to_block=to_block, to_block=to_block,
batch_size=batch_size, batch_size=batch_size,
contract_address=job.contracts[0], contract_address=job.contracts,
max_blocks_batch=3000, max_blocks_batch=5000,
) )
except Exception as e: except Exception as e:
breakpoint() logger.error(f"Error while fetching events: {e}")
raise e
for raw_event in raw_events: for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp( raw_event["blockTimestamp"] = get_block_timestamp(
db_session, db_session,

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version. Moonstream crawlers version.
""" """
MOONCRAWL_VERSION = "0.4.10" MOONCRAWL_VERSION = "0.4.11"

Wyświetl plik

@ -41,7 +41,7 @@ setup(
"moonstreamdb-v3>=0.0.13", "moonstreamdb-v3>=0.0.13",
"moonstream-types>=0.0.4", "moonstream-types>=0.0.4",
"moonstream>=0.1.1", "moonstream>=0.1.1",
"moonworm[moonstream]>=0.9.2", "moonworm[moonstream]>=0.9.3",
"humbug", "humbug",
"pydantic==1.9.2", "pydantic==1.9.2",
"python-dateutil", "python-dateutil",