Add initial version.

pull/789/head
Andrey 2023-05-11 17:20:34 +03:00
rodzic 4a72ec0b5f
commit 07ad71fd9c
3 zmienionych plików z 124 dodań i 38 usunięć

Wyświetl plik

@ -503,41 +503,25 @@ def apply_moonworm_tasks(
subscription_type: str,
abi: Any,
address: str,
entries_limit: int = 100,
) -> None:
"""
Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist
"""
entries_pack = []
moonworm_abi_tasks_entries_pack = []
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type}",
limit=100,
limit=entries_limit, # load per request
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
# create historical crawl task in journal
bc.create_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_HISTORICAL_CRAWL_JOURNAL,
title=address,
content=json.dumps(
{
"address": address,
"subscription_type": subscription_type,
"abi": abi,
}
),
tags=[
f"address:{address}",
f"subscription_type:{subscription_type}",
f"status:active",
f"task_type:historical_crawl",
],
)
# will use create_entries_pack for creating entries in journal
existing_tags = [entry.tags for entry in entries]
@ -556,7 +540,7 @@ def apply_moonworm_tasks(
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
moonworm_abi_tasks_entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
@ -564,21 +548,26 @@ def apply_moonworm_tasks(
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_method_hash:{hash}",
f"abi_selector:{Web3.keccak(abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"status:active",
f"task_type:moonworm",
f"moonworm_task_pikedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)
f"historical_crawl_status:pending", # pending, in_progress, done
f"progress:0", # 0-100 %
],
}
)
except Exception as e:
reporter.error_report(e)
if len(entries_pack) > 0:
if len(moonworm_abi_tasks_entries_pack) > 0:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=entries_pack,
timeout=15,
entries=moonworm_abi_tasks_entries_pack,
timeout=25,
)

Wyświetl plik

@ -8,7 +8,10 @@ from web3 import Web3
from web3.middleware import geth_poa_middleware
from ..db import yield_db_session_ctx
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID
from ..settings import (
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
NB_CONTROLLER_ACCESS_ID,
)
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (
SubscriptionTypes,
@ -16,6 +19,7 @@ from .crawler import (
get_crawl_job_entries,
make_event_crawl_jobs,
make_function_call_crawl_jobs,
find_all_deployed_blocks,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -33,7 +37,8 @@ def handle_crawl(args: argparse.Namespace) -> None:
subscription_type,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
),
moonworm=True,
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
@ -42,7 +47,8 @@ def handle_crawl(args: argparse.Namespace) -> None:
subscription_type,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
),
moonworm=True,
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
@ -125,20 +131,34 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
extend_tags = []
addresses_filter = []
if args.address is not None:
addresses_filter = [Web3.toChecksumAddress(args.address)]
if args.tasks_journal:
addresses_filter = []
extend_tags.extend(
[
"moonworm_task_pikedup:True",
"historical_crawl_status:pending",
"progress:0",
]
)
all_event_jobs = make_event_crawl_jobs(
get_crawl_job_entries(
subscription_type,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
extend_tags=extend_tags,
)
)
filtered_event_jobs = []
for job in all_event_jobs:
if addresses_filter:
if addresses_filter and not args.tasks_journal:
intersection = [
address for address in job.contracts if address in addresses_filter
]
@ -155,17 +175,17 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
subscription_type,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
extend_tags=extend_tags,
)
)
if addresses_filter:
filtered_function_call_jobs = [
job
for job in all_function_call_jobs
if job.contract_address in addresses_filter
]
filtered_function_call_jobs = [job for job in all_function_call_jobs]
else:
filtered_function_call_jobs = all_function_call_jobs
# get set of addresses from event jobs and function call jobs
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
@ -174,6 +194,12 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
@ -198,7 +224,15 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
if args.tasks_journal:
start_block = int(web3.eth.blockNumber) - 1
end_block = min(
find_all_deployed_blocks(blockchain_type, list(addresses_set))
)
else:
start_block = args.start
end_block = args.end
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
@ -226,9 +260,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
else:
logger.info(f"Using start block: {start_block}")
if start_block < args.end:
if start_block < end_block:
raise ValueError(
f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction."
f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction."
)
historical_crawler(
@ -238,7 +272,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
filtered_event_jobs,
filtered_function_call_jobs,
start_block,
args.end,
end_block,
args.max_blocks_batch,
args.min_sleep_time,
access_id=args.access_id,
@ -420,6 +454,12 @@ def main() -> None:
default=False,
help="Only crawl events",
)
historical_crawl_parser.add_argument(
"--tasks-journal",
action="store_true",
default=False,
help="Use tasks journal wich will fill all required fields for historical crawl",
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
args = parser.parse_args()

Wyświetl plik

@ -12,6 +12,7 @@ from bugout.data import BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from web3.main import Web3
from moonworm.deployment import find_deployment_block
from ..blockchain import connect
from ..reporter import reporter
@ -161,6 +162,7 @@ def get_crawl_job_entries(
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
created_at_filter: Optional[int] = None,
limit: int = 200,
extend_tags: Optional[List[str]] = None,
) -> List[BugoutSearchResult]:
"""
Get all event ABIs from bugout journal
@ -172,6 +174,10 @@ def get_crawl_job_entries(
"""
query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}"
if extend_tags is not None:
for tag in extend_tags:
query += f" #{tag}"
if created_at_filter is not None:
# Filtering by created_at
# Filtering not by strictly greater than
@ -201,6 +207,32 @@ def get_crawl_job_entries(
return entries
def find_all_deployed_blocks(
blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress]
):
"""
find all deployed blocks for given addresses
"""
web3 = _retry_connect_web3(blockchain_type)
all_deployed_blocks = []
for address in addresses_set:
try:
code = web3.eth.getCode(address)
if code != "0x":
block = find_deployment_block(
web3_client=web3,
contract_address=address,
web3_interval=0.5,
)
if block is not None:
all_deployed_blocks.append(address)
except Exception as e:
logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks
def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
for entry_tag in entry.tags:
if entry_tag.startswith(tag):
@ -208,7 +240,9 @@ def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
raise ValueError(f"Tag {tag} not found in {entry}")
def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]:
def make_event_crawl_jobs(
entries: List[BugoutSearchResult], moonworm: bool = False
) -> List[EventCrawlJob]:
"""
Create EventCrawlJob objects from bugout entries.
"""
@ -219,6 +253,17 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
abi_hash = _get_tag(entry, "abi_method_hash")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
# if entry.tags not contain moonworm_task_pikedup:True
if "moonworm_task_pikedup:True" not in entry.tags and moonworm:
# Update the tag to pickedup
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=entry.entry_url.split("/")[-1],
tags=["moonworm_task_pikedup:True"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
@ -238,6 +283,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
def make_function_call_crawl_jobs(
entries: List[BugoutSearchResult],
moonworm: bool = False,
) -> List[FunctionCallCrawlJob]:
"""
Create FunctionCallCrawlJob objects from bugout entries.
@ -252,6 +298,17 @@ def make_function_call_crawl_jobs(
method_signature = encode_function_signature(abi)
if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")
if "moonworm_task_pikedup:True" not in entry.tags and moonworm:
# Update the tag to pickedup
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=entry.entry_url.split("/")[-1],
tags=["moonworm_task_pikedup:True"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if contract_address not in crawl_job_by_address:
crawl_job_by_address[contract_address] = FunctionCallCrawlJob(
contract_abi=[abi],