From 4a72ec0b5f828ae7e2d3754ff091b7609c9477e6 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 9 May 2023 15:29:17 +0300 Subject: [PATCH 01/14] Add init logic. --- backend/moonstreamapi/actions.py | 22 ++++++++++++++++++++++ backend/moonstreamapi/settings.py | 10 ++++++++++ 2 files changed, 32 insertions(+) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 70e84e9b..8e4de50f 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -41,6 +41,7 @@ from .settings import ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_MOONWORM_TASKS_JOURNAL, MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_HISTORICAL_CRAWL_JOURNAL, ) from .settings import bugout_client as bc, entity_client as ec @@ -517,6 +518,27 @@ def apply_moonworm_tasks( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) + # create historical crawl task in journal + + bc.create_entry( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_HISTORICAL_CRAWL_JOURNAL, + title=address, + content=json.dumps( + { + "address": address, + "subscription_type": subscription_type, + "abi": abi, + } + ), + tags=[ + f"address:{address}", + f"subscription_type:{subscription_type}", + f"status:active", + f"task_type:historical_crawl", + ], + ) + existing_tags = [entry.tags for entry in entries] existing_hashes = [ diff --git a/backend/moonstreamapi/settings.py b/backend/moonstreamapi/settings.py index b988a85c..2c1d2f3d 100644 --- a/backend/moonstreamapi/settings.py +++ b/backend/moonstreamapi/settings.py @@ -102,6 +102,16 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "": "MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set" ) +# Historical crawl journal +MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get( + "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", "" +) +if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "": + raise ValueError( + "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set" + ) + + # Web3 MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get( "MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", "" From 07ad71fd9c209af6b17c88ffa40038cf3f08bfd2 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 11 May 2023 17:20:34 +0300 Subject: [PATCH 02/14] Add initial version. --- backend/moonstreamapi/actions.py | 37 ++++------- .../mooncrawl/moonworm_crawler/cli.py | 66 +++++++++++++++---- .../mooncrawl/moonworm_crawler/crawler.py | 59 ++++++++++++++++- 3 files changed, 124 insertions(+), 38 deletions(-) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 8e4de50f..9dde3817 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -503,41 +503,25 @@ def apply_moonworm_tasks( subscription_type: str, abi: Any, address: str, + entries_limit: int = 100, ) -> None: """ Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist """ - entries_pack = [] + moonworm_abi_tasks_entries_pack = [] try: entries = get_all_entries_from_search( journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, search_query=f"tag:address:{address} tag:subscription_type:{subscription_type}", - limit=100, + limit=entries_limit, # load per request token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) # create historical crawl task in journal - bc.create_entry( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_HISTORICAL_CRAWL_JOURNAL, - title=address, - content=json.dumps( - { - "address": address, - "subscription_type": subscription_type, - "abi": abi, - } - ), - tags=[ - f"address:{address}", - f"subscription_type:{subscription_type}", - f"status:active", - f"task_type:historical_crawl", - ], - ) + # will use create_entries_pack for creating entries in journal existing_tags = [entry.tags for entry in entries] @@ -556,7 +540,7 @@ def apply_moonworm_tasks( for hash in abi_hashes_dict: if hash not in existing_hashes: - entries_pack.append( + moonworm_abi_tasks_entries_pack.append( { "title": address, "content": json.dumps(abi_hashes_dict[hash], indent=4), @@ -564,21 +548,26 @@ def apply_moonworm_tasks( f"address:{address}", f"type:{abi_hashes_dict[hash]['type']}", f"abi_method_hash:{hash}", + f"abi_selector:{Web3.keccak(abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}", f"subscription_type:{subscription_type}", f"abi_name:{abi_hashes_dict[hash]['name']}", f"status:active", + f"task_type:moonworm", + f"moonworm_task_pikedup:False", # True if task picked up by moonworm-crawler(default each 120 sec) + f"historical_crawl_status:pending", # pending, in_progress, done + f"progress:0", # 0-100 % ], } ) except Exception as e: reporter.error_report(e) - if len(entries_pack) > 0: + if len(moonworm_abi_tasks_entries_pack) > 0: bc.create_entries_pack( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entries=entries_pack, - timeout=15, + entries=moonworm_abi_tasks_entries_pack, + timeout=25, ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 43d435d9..a2852daf 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -8,7 +8,10 @@ from web3 import Web3 from web3.middleware import geth_poa_middleware from ..db import yield_db_session_ctx -from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID +from ..settings import ( + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + NB_CONTROLLER_ACCESS_ID, +) from .continuous_crawler import _retry_connect_web3, continuous_crawler from .crawler import ( SubscriptionTypes, @@ -16,6 +19,7 @@ from .crawler import ( get_crawl_job_entries, make_event_crawl_jobs, make_function_call_crawl_jobs, + find_all_deployed_blocks, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -33,7 +37,8 @@ def handle_crawl(args: argparse.Namespace) -> None: subscription_type, "event", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ) + ), + moonworm=True, ) logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") @@ -42,7 +47,8 @@ def handle_crawl(args: argparse.Namespace) -> None: subscription_type, "function", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ) + ), + moonworm=True, ) logger.info( f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" @@ -125,20 +131,34 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: blockchain_type = AvailableBlockchainType(args.blockchain_type) subscription_type = blockchain_type_to_subscription_type(blockchain_type) + extend_tags = [] + addresses_filter = [] if args.address is not None: addresses_filter = [Web3.toChecksumAddress(args.address)] + if args.tasks_journal: + addresses_filter = [] + extend_tags.extend( + [ + "moonworm_task_pikedup:True", + "historical_crawl_status:pending", + "progress:0", + ] + ) + all_event_jobs = make_event_crawl_jobs( get_crawl_job_entries( subscription_type, "event", MOONSTREAM_MOONWORM_TASKS_JOURNAL, + extend_tags=extend_tags, ) ) + filtered_event_jobs = [] for job in all_event_jobs: - if addresses_filter: + if addresses_filter and not args.tasks_journal: intersection = [ address for address in job.contracts if address in addresses_filter ] @@ -155,17 +175,17 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: subscription_type, "function", MOONSTREAM_MOONWORM_TASKS_JOURNAL, + extend_tags=extend_tags, ) ) + if addresses_filter: - filtered_function_call_jobs = [ - job - for job in all_function_call_jobs - if job.contract_address in addresses_filter - ] + filtered_function_call_jobs = [job for job in all_function_call_jobs] else: filtered_function_call_jobs = all_function_call_jobs + # get set of addresses from event jobs and function call jobs + if args.only_events: filtered_function_call_jobs = [] logger.info(f"Removing function call crawl jobs since --only-events is set") @@ -174,6 +194,12 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" ) + addresses_set = set() + for job in filtered_event_jobs: + addresses_set.update(job.contracts) + for function_job in filtered_function_call_jobs: + addresses_set.add(function_job.contract_address) + logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -198,7 +224,15 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) logger.info(f"Last labeled block: {last_labeled_block}") - start_block = args.start + if args.tasks_journal: + start_block = int(web3.eth.blockNumber) - 1 + end_block = min( + find_all_deployed_blocks(blockchain_type, list(addresses_set)) + ) + else: + start_block = args.start + end_block = args.end + if start_block is None: logger.info("No start block provided") if last_labeled_block is not None: @@ -226,9 +260,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: else: logger.info(f"Using start block: {start_block}") - if start_block < args.end: + if start_block < end_block: raise ValueError( - f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction." + f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction." ) historical_crawler( @@ -238,7 +272,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: filtered_event_jobs, filtered_function_call_jobs, start_block, - args.end, + end_block, args.max_blocks_batch, args.min_sleep_time, access_id=args.access_id, @@ -420,6 +454,12 @@ def main() -> None: default=False, help="Only crawl events", ) + historical_crawl_parser.add_argument( + "--tasks-journal", + action="store_true", + default=False, + help="Use tasks journal wich will fill all required fields for historical crawl", + ) historical_crawl_parser.set_defaults(func=handle_historical_crawl) args = parser.parse_args() diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 9f21fefe..1adb9d66 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -12,6 +12,7 @@ from bugout.data import BugoutSearchResult from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from web3.main import Web3 +from moonworm.deployment import find_deployment_block from ..blockchain import connect from ..reporter import reporter @@ -161,6 +162,7 @@ def get_crawl_job_entries( journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL, created_at_filter: Optional[int] = None, limit: int = 200, + extend_tags: Optional[List[str]] = None, ) -> List[BugoutSearchResult]: """ Get all event ABIs from bugout journal @@ -172,6 +174,10 @@ def get_crawl_job_entries( """ query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}" + if extend_tags is not None: + for tag in extend_tags: + query += f" #{tag}" + if created_at_filter is not None: # Filtering by created_at # Filtering not by strictly greater than @@ -201,6 +207,32 @@ def get_crawl_job_entries( return entries +def find_all_deployed_blocks( + blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress] +): + """ + find all deployed blocks for given addresses + """ + + web3 = _retry_connect_web3(blockchain_type) + all_deployed_blocks = [] + for address in addresses_set: + try: + code = web3.eth.getCode(address) + if code != "0x": + block = find_deployment_block( + web3_client=web3, + contract_address=address, + web3_interval=0.5, + ) + if block is not None: + all_deployed_blocks.append(address) + + except Exception as e: + logger.error(f"Failed to get code for {address}: {e}") + return all_deployed_blocks + + def _get_tag(entry: BugoutSearchResult, tag: str) -> str: for entry_tag in entry.tags: if entry_tag.startswith(tag): @@ -208,7 +240,9 @@ def _get_tag(entry: BugoutSearchResult, tag: str) -> str: raise ValueError(f"Tag {tag} not found in {entry}") -def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]: +def make_event_crawl_jobs( + entries: List[BugoutSearchResult], moonworm: bool = False +) -> List[EventCrawlJob]: """ Create EventCrawlJob objects from bugout entries. """ @@ -219,6 +253,17 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ abi_hash = _get_tag(entry, "abi_method_hash") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) + # if entry.tags not contain moonworm_task_pikedup:True + if "moonworm_task_pikedup:True" not in entry.tags and moonworm: + # Update the tag to pickedup + bugout_client.update_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entry_id=entry.entry_url.split("/")[-1], + tags=["moonworm_task_pikedup:True"], + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + existing_crawl_job = crawl_job_by_hash.get(abi_hash) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: @@ -238,6 +283,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ def make_function_call_crawl_jobs( entries: List[BugoutSearchResult], + moonworm: bool = False, ) -> List[FunctionCallCrawlJob]: """ Create FunctionCallCrawlJob objects from bugout entries. @@ -252,6 +298,17 @@ def make_function_call_crawl_jobs( method_signature = encode_function_signature(abi) if method_signature is None: raise ValueError(f"{abi} is not a function ABI") + + if "moonworm_task_pikedup:True" not in entry.tags and moonworm: + # Update the tag to pickedup + bugout_client.update_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entry_id=entry.entry_url.split("/")[-1], + tags=["moonworm_task_pikedup:True"], + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + if contract_address not in crawl_job_by_address: crawl_job_by_address[contract_address] = FunctionCallCrawlJob( contract_abi=[abi], From cf93f99fb15293d1cf2ce41c53090bfacb3540a0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 23 May 2023 13:56:38 +0300 Subject: [PATCH 03/14] Add initial working state. --- .../mooncrawl/moonworm_crawler/cli.py | 110 ++++++-- .../moonworm_crawler/continuous_crawler.py | 24 ++ .../mooncrawl/moonworm_crawler/crawler.py | 258 ++++++++++++++++-- .../moonworm_crawler/historical_crawler.py | 34 ++- crawlers/mooncrawl/mooncrawl/settings.py | 23 ++ 5 files changed, 401 insertions(+), 48 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index a2852daf..9f0d3749 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -20,6 +20,7 @@ from .crawler import ( make_event_crawl_jobs, make_function_call_crawl_jobs, find_all_deployed_blocks, + update_job_state_with_filters, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -37,23 +38,45 @@ def handle_crawl(args: argparse.Namespace) -> None: subscription_type, "event", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ), - moonworm=True, + ) ) logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") + if len(initial_event_jobs) > 0: + initial_event_jobs = update_job_state_with_filters( # type: ignore + events=initial_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + initial_function_call_jobs = make_function_call_crawl_jobs( get_crawl_job_entries( subscription_type, "function", MOONSTREAM_MOONWORM_TASKS_JOURNAL, - ), - moonworm=True, + ) ) logger.info( f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" ) + if len(initial_function_call_jobs) > 0: + initial_event_jobs = update_job_state_with_filters( # type: ignore + events=initial_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -143,7 +166,6 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: [ "moonworm_task_pikedup:True", "historical_crawl_status:pending", - "progress:0", ] ) @@ -190,16 +212,46 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: filtered_function_call_jobs = [] logger.info(f"Removing function call crawl jobs since --only-events is set") + if args.only_functions: + filtered_event_jobs = [] + logger.info( + f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}" + ) + + if args.only_events and args.only_functions: + raise ValueError( + "--only-events and --only-functions cannot be set at the same time" + ) + + if args.tasks_journal: + if len(filtered_event_jobs) > 0: + filtered_event_jobs = update_job_state_with_filters( # type: ignore + events=filtered_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + + if len(filtered_function_call_jobs) > 0: + filtered_function_call_jobs = update_job_state_with_filters( # type: ignore + function_calls=filtered_function_call_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + logger.info( f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" ) - addresses_set = set() - for job in filtered_event_jobs: - addresses_set.update(job.contracts) - for function_job in filtered_function_call_jobs: - addresses_set.add(function_job.contract_address) - logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -224,14 +276,23 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) logger.info(f"Last labeled block: {last_labeled_block}") - if args.tasks_journal: - start_block = int(web3.eth.blockNumber) - 1 - end_block = min( - find_all_deployed_blocks(blockchain_type, list(addresses_set)) + addresses_deployment_blocks = None + + # get set of addresses from event jobs and function call jobs + if args.find_deployed_blocks: + addresses_set = set() + for job in filtered_event_jobs: + addresses_set.update(job.contracts) + for function_job in filtered_function_call_jobs: + addresses_set.add(function_job.contract_address) + + if args.start is None: + start_block = web3.eth.blockNumber - 1 + + addresses_deployment_blocks = find_all_deployed_blocks( + blockchain_type, list(addresses_set) ) - else: - start_block = args.start - end_block = args.end + end_block = min(addresses_deployment_blocks.values()) if start_block is None: logger.info("No start block provided") @@ -276,6 +337,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: args.max_blocks_batch, args.min_sleep_time, access_id=args.access_id, + addresses_deployment_blocks=addresses_deployment_blocks, ) @@ -454,6 +516,18 @@ def main() -> None: default=False, help="Only crawl events", ) + historical_crawl_parser.add_argument( + "--only-functions", + action="store_true", + default=False, + help="Only crawl function calls", + ) + historical_crawl_parser.add_argument( + "--find-deployed-blocks", + action="store_true", + default=False, + help="Find all deployed blocks", + ) historical_crawl_parser.add_argument( "--tasks-journal", action="store_true", diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index d33c8b35..8657754e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -220,6 +220,30 @@ def continuous_crawler( event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( event_crawl_jobs, function_call_crawl_jobs, blockchain_type ) + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_job_state_with_filters( # type: ignore + events=event_crawl_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_job_state_with_filters( # type: ignore + events=function_call_crawl_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pikedup:False", + ], + tags_to_add=["moonworm_task_pikedup:True"], + tags_to_delete=["moonworm_task_pikedup:False"], + ) + jobs_refetchet_time = current_time if current_time - last_heartbeat_time > timedelta( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 1adb9d66..f4d58432 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -5,7 +5,7 @@ import time from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional, cast +from typing import Any, Callable, Dict, List, Optional, cast, Union from uuid import UUID from bugout.data import BugoutSearchResult @@ -21,6 +21,8 @@ from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, + HISTORICAL_CRAWLER_STATUSES, ) logging.basicConfig(level=logging.INFO) @@ -146,6 +148,7 @@ class EventCrawlJob: event_abi_hash: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] + entries_ids: Dict[ChecksumAddress, Dict[UUID, List[str]]] created_at: int @@ -153,6 +156,7 @@ class EventCrawlJob: class FunctionCallCrawlJob: contract_abi: List[Dict[str, Any]] contract_address: ChecksumAddress + entries_tags: Dict[UUID, List[str]] created_at: int @@ -209,13 +213,13 @@ def get_crawl_job_entries( def find_all_deployed_blocks( blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress] -): +) -> Dict[ChecksumAddress, int]: """ find all deployed blocks for given addresses """ web3 = _retry_connect_web3(blockchain_type) - all_deployed_blocks = [] + all_deployed_blocks = {} for address in addresses_set: try: code = web3.eth.getCode(address) @@ -226,8 +230,7 @@ def find_all_deployed_blocks( web3_interval=0.5, ) if block is not None: - all_deployed_blocks.append(address) - + all_deployed_blocks[address] = block except Exception as e: logger.error(f"Failed to get code for {address}: {e}") return all_deployed_blocks @@ -240,9 +243,7 @@ def _get_tag(entry: BugoutSearchResult, tag: str) -> str: raise ValueError(f"Tag {tag} not found in {entry}") -def make_event_crawl_jobs( - entries: List[BugoutSearchResult], moonworm: bool = False -) -> List[EventCrawlJob]: +def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]: """ Create EventCrawlJob objects from bugout entries. """ @@ -253,27 +254,23 @@ def make_event_crawl_jobs( abi_hash = _get_tag(entry, "abi_method_hash") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) - # if entry.tags not contain moonworm_task_pikedup:True - if "moonworm_task_pikedup:True" not in entry.tags and moonworm: - # Update the tag to pickedup - bugout_client.update_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entry_id=entry.entry_url.split("/")[-1], - tags=["moonworm_task_pikedup:True"], - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji existing_crawl_job = crawl_job_by_hash.get(abi_hash) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) + existing_crawl_job.entries_ids[contract_address] = { + entry_id: entry.tags + } + else: abi = cast(str, entry.content) new_crawl_job = EventCrawlJob( event_abi_hash=abi_hash, event_abi=json.loads(abi), contracts=[contract_address], + entries_ids={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) crawl_job_by_hash[abi_hash] = new_crawl_job @@ -283,7 +280,6 @@ def make_event_crawl_jobs( def make_function_call_crawl_jobs( entries: List[BugoutSearchResult], - moonworm: bool = False, ) -> List[FunctionCallCrawlJob]: """ Create FunctionCallCrawlJob objects from bugout entries. @@ -293,26 +289,18 @@ def make_function_call_crawl_jobs( method_signature_by_address: Dict[str, List[str]] = {} for entry in entries: + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) method_signature = encode_function_signature(abi) if method_signature is None: raise ValueError(f"{abi} is not a function ABI") - if "moonworm_task_pikedup:True" not in entry.tags and moonworm: - # Update the tag to pickedup - bugout_client.update_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entry_id=entry.entry_url.split("/")[-1], - tags=["moonworm_task_pikedup:True"], - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) - if contract_address not in crawl_job_by_address: crawl_job_by_address[contract_address] = FunctionCallCrawlJob( contract_abi=[abi], contract_address=contract_address, + entries_tags={entry_id: entry.tags}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) method_signature_by_address[contract_address] = [method_signature] @@ -321,6 +309,9 @@ def make_function_call_crawl_jobs( if method_signature not in method_signature_by_address[contract_address]: crawl_job_by_address[contract_address].contract_abi.append(abi) method_signature_by_address[contract_address].append(method_signature) + crawl_job_by_address[contract_address].entries_tags[ + entry_id + ] = entry.tags return [crawl_job for crawl_job in crawl_job_by_address.values()] @@ -449,3 +440,212 @@ def heartbeat( tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"], timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) + + +def bugout_state_update( + entries_tags_add: List[Dict[str, Any]], + entries_tags_delete: List[Dict[str, Any]], +) -> Any: + if len(entries_tags_add) > 0: + new_entreis_state = bugout_client.update_entries_tags( # type: ignore + entries_tags=entries_tags_add, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + if len(entries_tags_delete) > 0: + new_entreis_state = bugout_client.delete_entries_tags( # type: ignore + entries_tags=entries_tags_delete, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + return new_entreis_state + + +def update_job_tags( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + new_entreis_state: Any, +): + for entry in new_entreis_state: + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + for entry_id, tags in entries_ids.items(): + if entry_id == entry["journal_entry_id"]: + event.entries_ids[contract_address][entry_id] = tags + + if isinstance(event, FunctionCallCrawlJob): + for entry_id, tags in event.entries_tags.items(): + if entry_id == entry["journal_entry_id"]: + event.entries_tags[entry_id] = tags + + return events + + +def update_job_state_with_filters( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + address_filter: List[ChecksumAddress], + required_tags: List[str], + tags_to_add: List[str] = [], + tags_to_delete: List[str] = [], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Function that updates the state of the job in bugout. + """ + + entries_ids_to_update: List[UUID] = [] + + ### TODO: refactor this function + + if len(tags_to_add) == 0 and len(tags_to_delete) == 0: + return events + + for event in events: + # functions + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + if address_filter and contract_address not in address_filter: + continue + for entry_id, tags in entries_ids.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + event.entries_ids[contract_address][entry_id].extend( + tags_to_add + ) + + # events + if isinstance(event, FunctionCallCrawlJob): + if address_filter and event.contract_address not in address_filter: + continue + for entry_id, tags in event.entries_tags.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + if len(entries_ids_to_update) == 0: + return events + + new_entries_state = bugout_state_update( + entries_tags_add=[ + {"journal_entry_id": entry_id, "tags": tags_to_add} + for entry_id in entries_ids_to_update + ], + entries_tags_delete=[ + {"journal_entry_id": entry_id, "tags": tags_to_delete} + for entry_id in entries_ids_to_update + ], + ) + + events = update_job_tags(events, new_entries_state) + + return events + + +def update_entries_status_and_proggress( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + progess_map: Dict[ChecksumAddress, float], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Update entries status and proggress in mooncrawl bugout journal + """ + + entries_tags_delete = [] + + entries_tags_add = [] + + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.entries_ids.items(): + proggress = int(progess_map.get(contract_address, 0)) * 100 + + for entry_id, tags in entries_ids.items(): + # proggress + + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + entries_tags_delete.append( + { + "journal_entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + ], + } + ) + + if proggress >= 100: + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + if isinstance(event, FunctionCallCrawlJob): + proggress = int(progess_map.get(event.contract_address, 0)) * 100 + + for entry_id, tags in event.entries_tags.items(): + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + # proggress + entries_tags_delete.append( + { + "journal_entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + ], + } + ) + + if proggress >= 100: + entries_tags_add.append( + { + "journal_entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + new_entries_state = bugout_state_update( + entries_tags_add=entries_tags_add, + entries_tags_delete=entries_tags_delete, + ) + + events = update_job_tags(events, new_entries_state) + + return events diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 564f1dab..aeca1659 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -3,6 +3,7 @@ import time from typing import Dict, List, Optional, Tuple from uuid import UUID +from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, @@ -11,7 +12,12 @@ from moonworm.crawler.networks import Network # type: ignore from sqlalchemy.orm.session import Session from web3 import Web3 -from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3 +from .crawler import ( + EventCrawlJob, + FunctionCallCrawlJob, + _retry_connect_web3, + update_entries_status_and_proggress, +) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -31,6 +37,7 @@ def historical_crawler( max_blocks_batch: int = 100, min_sleep_time: float = 0.1, access_id: Optional[UUID] = None, + addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None, ): assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" assert min_sleep_time > 0, "min_sleep_time must be greater than 0" @@ -60,6 +67,10 @@ def historical_crawler( blocks_cache: Dict[int, int] = {} failed_count = 0 + original_start_block = start_block + + progess_map: Dict[ChecksumAddress, float] = {} + while start_block >= end_block: try: time.sleep(min_sleep_time) @@ -119,6 +130,27 @@ def historical_crawler( db_session, all_function_calls, blockchain_type ) + if addresses_deployment_blocks: + for address, deployment_block in addresses_deployment_blocks.items(): + current_position = end_block + + progess = original_start_block - current_position / ( + original_start_block - deployment_block + ) + progess_map[address] = progess + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_entries_status_and_proggress( # type: ignore + events=function_call_crawl_jobs, + progess_map=progess_map, + ) + + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_entries_status_and_proggress( # type: ignore + events=event_crawl_jobs, + progess_map=progess_map, + ) + # Commiting to db commit_session(db_session) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index cd697b0f..d9a24f70 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -279,3 +279,26 @@ infura_networks = { BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + + +# Historical crawler status config + +HISTORICAL_CRAWLER_STATUSES = { + "pending": "pending", + "running": "running", + "finished": "finished", +} + +# Historical crawler moonworm status config + +HISTORICAL_CRAWLER_MOONWORM_STATUSES = { + "pickedup": True, +} + +# Statuses tags prefixes + +HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = { + "moonworm_status": "moonworm_task_pickedup", + "historical_crawl_status": "historical_crawl_status", + "progress_status": "progress", +} From bcc9897fb179de5dd851a911d2a262d66895e4ff Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 25 May 2023 16:06:33 +0300 Subject: [PATCH 04/14] Add fixes. --- backend/moonstreamapi/actions.py | 24 ++- backend/moonstreamapi/settings.py | 16 +- crawlers/mooncrawl/mooncrawl/crawler.py | 2 +- .../mooncrawl/moonworm_crawler/cli.py | 41 ++--- .../moonworm_crawler/continuous_crawler.py | 34 ++-- .../mooncrawl/moonworm_crawler/crawler.py | 161 ++++++++++++------ .../moonworm_crawler/historical_crawler.py | 10 +- 7 files changed, 161 insertions(+), 127 deletions(-) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 9dde3817..d32ea565 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -41,7 +41,6 @@ from .settings import ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_MOONWORM_TASKS_JOURNAL, MOONSTREAM_ADMIN_ACCESS_TOKEN, - MOONSTREAM_HISTORICAL_CRAWL_JOURNAL, ) from .settings import bugout_client as bc, entity_client as ec @@ -519,6 +518,8 @@ def apply_moonworm_tasks( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) + print(f"Found {len(entries)} tasks for address {address}") + # create historical crawl task in journal # will use create_entries_pack for creating entries in journal @@ -548,27 +549,32 @@ def apply_moonworm_tasks( f"address:{address}", f"type:{abi_hashes_dict[hash]['type']}", f"abi_method_hash:{hash}", - f"abi_selector:{Web3.keccak(abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}", + f"abi_selector:{Web3.keccak(text=abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}", f"subscription_type:{subscription_type}", f"abi_name:{abi_hashes_dict[hash]['name']}", f"status:active", f"task_type:moonworm", - f"moonworm_task_pikedup:False", # True if task picked up by moonworm-crawler(default each 120 sec) + f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec) f"historical_crawl_status:pending", # pending, in_progress, done f"progress:0", # 0-100 % ], } ) except Exception as e: + logger.error(f"Error get moonworm tasks: {str(e)}") reporter.error_report(e) if len(moonworm_abi_tasks_entries_pack) > 0: - bc.create_entries_pack( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entries=moonworm_abi_tasks_entries_pack, - timeout=25, - ) + try: + bc.create_entries_pack( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries=moonworm_abi_tasks_entries_pack, + timeout=25, + ) + except Exception as e: + logger.error(f"Error create moonworm tasks: {str(e)}") + reporter.error_report(e) def name_normalization(query_name: str) -> str: diff --git a/backend/moonstreamapi/settings.py b/backend/moonstreamapi/settings.py index 2c1d2f3d..038653c5 100644 --- a/backend/moonstreamapi/settings.py +++ b/backend/moonstreamapi/settings.py @@ -102,14 +102,14 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "": "MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set" ) -# Historical crawl journal -MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get( - "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", "" -) -if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "": - raise ValueError( - "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set" - ) +# # Historical crawl journal +# MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get( +# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", "" +# ) +# if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "": +# raise ValueError( +# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set" +# ) # Web3 diff --git a/crawlers/mooncrawl/mooncrawl/crawler.py b/crawlers/mooncrawl/mooncrawl/crawler.py index b733ad89..0dcd7f69 100644 --- a/crawlers/mooncrawl/mooncrawl/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/crawler.py @@ -13,7 +13,7 @@ from typing import Iterator, List from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -import dateutil.parser +import dateutil.parser # type: ignore from .blockchain import ( DateRange, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 9f0d3749..131a328e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -21,6 +21,7 @@ from .crawler import ( make_function_call_crawl_jobs, find_all_deployed_blocks, update_job_state_with_filters, + moonworm_crawler_update_job_as_pickedup, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -42,18 +43,6 @@ def handle_crawl(args: argparse.Namespace) -> None: ) logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}") - if len(initial_event_jobs) > 0: - initial_event_jobs = update_job_state_with_filters( # type: ignore - events=initial_event_jobs, - address_filter=[], - required_tags=[ - "historical_crawl_status:pending", - "moonworm_task_pikedup:False", - ], - tags_to_add=["moonworm_task_pikedup:True"], - tags_to_delete=["moonworm_task_pikedup:False"], - ) - initial_function_call_jobs = make_function_call_crawl_jobs( get_crawl_job_entries( subscription_type, @@ -65,17 +54,13 @@ def handle_crawl(args: argparse.Namespace) -> None: f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" ) - if len(initial_function_call_jobs) > 0: - initial_event_jobs = update_job_state_with_filters( # type: ignore - events=initial_event_jobs, - address_filter=[], - required_tags=[ - "historical_crawl_status:pending", - "moonworm_task_pikedup:False", - ], - tags_to_add=["moonworm_task_pikedup:True"], - tags_to_delete=["moonworm_task_pikedup:False"], - ) + ( + initial_event_jobs, + initial_function_call_jobs, + ) = moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs=initial_event_jobs, + function_call_crawl_jobs=initial_function_call_jobs, + ) logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: @@ -164,7 +149,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: addresses_filter = [] extend_tags.extend( [ - "moonworm_task_pikedup:True", + "moonworm_task_pickedup:True", "historical_crawl_status:pending", ] ) @@ -230,7 +215,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: address_filter=[], required_tags=[ "historical_crawl_status:pending", - "moonworm_task_pikedup:True", + "moonworm_task_pickedup:True", ], tags_to_add=["historical_crawl_status:in_progress"], tags_to_delete=["historical_crawl_status:pending"], @@ -238,11 +223,11 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: if len(filtered_function_call_jobs) > 0: filtered_function_call_jobs = update_job_state_with_filters( # type: ignore - function_calls=filtered_function_call_jobs, + events=filtered_function_call_jobs, address_filter=[], required_tags=[ "historical_crawl_status:pending", - "moonworm_task_pikedup:True", + "moonworm_task_pickedup:True", ], tags_to_add=["historical_crawl_status:in_progress"], tags_to_delete=["historical_crawl_status:pending"], @@ -290,7 +275,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: start_block = web3.eth.blockNumber - 1 addresses_deployment_blocks = find_all_deployed_blocks( - blockchain_type, list(addresses_set) + web3, list(addresses_set) ) end_block = min(addresses_deployment_blocks.values()) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 8657754e..fa8249b6 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -24,10 +24,15 @@ from .crawler import ( make_function_call_crawl_jobs, merge_event_crawl_jobs, merge_function_call_crawl_jobs, + moonworm_crawler_update_job_as_pickedup, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions +from ..settings import ( + HISTORICAL_CRAWLER_STATUSES, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -220,29 +225,14 @@ def continuous_crawler( event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( event_crawl_jobs, function_call_crawl_jobs, blockchain_type ) - if len(event_crawl_jobs) > 0: - event_crawl_jobs = update_job_state_with_filters( # type: ignore - events=event_crawl_jobs, - address_filter=[], - required_tags=[ - "historical_crawl_status:pending", - "moonworm_task_pikedup:False", - ], - tags_to_add=["moonworm_task_pikedup:True"], - tags_to_delete=["moonworm_task_pikedup:False"], - ) - if len(function_call_crawl_jobs) > 0: - function_call_crawl_jobs = update_job_state_with_filters( # type: ignore - events=function_call_crawl_jobs, - address_filter=[], - required_tags=[ - "historical_crawl_status:pending", - "moonworm_task_pikedup:False", - ], - tags_to_add=["moonworm_task_pikedup:True"], - tags_to_delete=["moonworm_task_pikedup:False"], - ) + ( + event_crawl_jobs, + function_call_crawl_jobs, + ) = moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs=event_crawl_jobs, + function_call_crawl_jobs=function_call_crawl_jobs, + ) jobs_refetchet_time = current_time diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index f4d58432..17b554f7 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -5,10 +5,10 @@ import time from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional, cast, Union +from typing import Any, Callable, Dict, List, Optional, cast, Union, Tuple from uuid import UUID -from bugout.data import BugoutSearchResult +from bugout.data import BugoutSearchResult, BugoutJournalEntries from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from web3.main import Web3 @@ -148,7 +148,7 @@ class EventCrawlJob: event_abi_hash: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] - entries_ids: Dict[ChecksumAddress, Dict[UUID, List[str]]] + address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]] created_at: int @@ -212,13 +212,12 @@ def get_crawl_job_entries( def find_all_deployed_blocks( - blockchain_type: AvailableBlockchainType, addresses_set: List[ChecksumAddress] + web3: Web3, addresses_set: List[ChecksumAddress] ) -> Dict[ChecksumAddress, int]: """ find all deployed blocks for given addresses """ - web3 = _retry_connect_web3(blockchain_type) all_deployed_blocks = {} for address in addresses_set: try: @@ -231,6 +230,10 @@ def find_all_deployed_blocks( ) if block is not None: all_deployed_blocks[address] = block + if block is None: + logger.warning( + f"Failed to find deployment block for {address}, code: {code}" + ) except Exception as e: logger.error(f"Failed to get code for {address}: {e}") return all_deployed_blocks @@ -260,7 +263,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) - existing_crawl_job.entries_ids[contract_address] = { + existing_crawl_job.address_entries[contract_address] = { entry_id: entry.tags } @@ -270,7 +273,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ event_abi_hash=abi_hash, event_abi=json.loads(abi), contracts=[contract_address], - entries_ids={contract_address: {entry_id: entry.tags}}, + address_entries={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) crawl_job_by_hash[abi_hash] = new_crawl_job @@ -445,38 +448,92 @@ def heartbeat( def bugout_state_update( entries_tags_add: List[Dict[str, Any]], entries_tags_delete: List[Dict[str, Any]], -) -> Any: - if len(entries_tags_add) > 0: - new_entreis_state = bugout_client.update_entries_tags( # type: ignore - entries_tags=entries_tags_add, +) -> BugoutJournalEntries: + """ + Run update of entries tags in bugout + First delete tags, then add tags + """ + + if len(entries_tags_delete) > 0: + new_entreis_state = bugout_client.delete_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_delete, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) - if len(entries_tags_delete) > 0: - new_entreis_state = bugout_client.delete_entries_tags( # type: ignore - entries_tags=entries_tags_delete, + if len(entries_tags_add) > 0: + new_entreis_state = bugout_client.create_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_add, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) return new_entreis_state +def moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs: List[EventCrawlJob], + function_call_crawl_jobs: List[FunctionCallCrawlJob], +) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Apply jobs of moonworm as taked to process + """ + + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_job_state_with_filters( # type: ignore + events=event_crawl_jobs, + address_filter=[], + required_tags=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}", + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False", + ], + tags_to_add=["moonworm_task_pickedup:True"], + tags_to_delete=["moonworm_task_pickedup:False"], + ) + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_job_state_with_filters( # type: ignore + events=function_call_crawl_jobs, + address_filter=[], + required_tags=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}", + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False", + ], + tags_to_add=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:True" + ], + tags_to_delete=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False" + ], + ) + + return event_crawl_jobs, function_call_crawl_jobs + + def update_job_tags( events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], - new_entreis_state: Any, + new_entreis_state: BugoutJournalEntries, ): - for entry in new_entreis_state: - for event in events: - if isinstance(event, EventCrawlJob): - for contract_address, entries_ids in event.entries_ids.items(): - for entry_id, tags in entries_ids.items(): - if entry_id == entry["journal_entry_id"]: - event.entries_ids[contract_address][entry_id] = tags + """ + Update tags of the jobs in job object + """ + entry_tags_by_id = {entry.id: entry.tags for entry in new_entreis_state.entries} - if isinstance(event, FunctionCallCrawlJob): - for entry_id, tags in event.entries_tags.items(): - if entry_id == entry["journal_entry_id"]: - event.entries_tags[entry_id] = tags + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.address_entries.items(): + for entry_id in entries_ids.keys(): + if entry_id in entry_tags_by_id: + event.address_entries[contract_address][ + entry_id + ] = entry_tags_by_id[entry_id] + + if isinstance(event, FunctionCallCrawlJob): + for entry_id in event.entries_tags.keys(): + if entry_id in entry_tags_by_id: + event.entries_tags[entry_id] = entry_tags_by_id[entry_id] return events @@ -500,20 +557,16 @@ def update_job_state_with_filters( return events for event in events: - # functions + # events if isinstance(event, EventCrawlJob): - for contract_address, entries_ids in event.entries_ids.items(): + for contract_address, entries_ids in event.address_entries.items(): if address_filter and contract_address not in address_filter: continue for entry_id, tags in entries_ids.items(): if set(required_tags).issubset(set(tags)): entries_ids_to_update.append(entry_id) - event.entries_ids[contract_address][entry_id].extend( - tags_to_add - ) - - # events + # functions if isinstance(event, FunctionCallCrawlJob): if address_filter and event.contract_address not in address_filter: continue @@ -526,11 +579,11 @@ def update_job_state_with_filters( new_entries_state = bugout_state_update( entries_tags_add=[ - {"journal_entry_id": entry_id, "tags": tags_to_add} + {"entry_id": entry_id, "tags": tags_to_add} for entry_id in entries_ids_to_update ], entries_tags_delete=[ - {"journal_entry_id": entry_id, "tags": tags_to_delete} + {"entry_id": entry_id, "tags": tags_to_delete} for entry_id in entries_ids_to_update ], ) @@ -540,12 +593,12 @@ def update_job_state_with_filters( return events -def update_entries_status_and_proggress( +def update_entries_status_and_progress( events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], progess_map: Dict[ChecksumAddress, float], ) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: """ - Update entries status and proggress in mooncrawl bugout journal + Update entries status and progress in mooncrawl bugout journal """ entries_tags_delete = [] @@ -554,11 +607,11 @@ def update_entries_status_and_proggress( for event in events: if isinstance(event, EventCrawlJob): - for contract_address, entries_ids in event.entries_ids.items(): - proggress = int(progess_map.get(contract_address, 0)) * 100 + for contract_address, entries_ids in event.address_entries.items(): + progress = round(progess_map.get(contract_address, 0), 4) * 100 for entry_id, tags in entries_ids.items(): - # proggress + # progress if ( f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" @@ -568,12 +621,12 @@ def update_entries_status_and_proggress( entries_tags_delete.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ tag for tag in tags if tag.startswith( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" ) ], } @@ -581,17 +634,17 @@ def update_entries_status_and_proggress( entries_tags_add.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}" ], } ) - if proggress >= 100: + if progress >= 100: entries_tags_add.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" ], @@ -599,7 +652,7 @@ def update_entries_status_and_proggress( ) if isinstance(event, FunctionCallCrawlJob): - proggress = int(progess_map.get(event.contract_address, 0)) * 100 + progress = round(progess_map.get(event.contract_address, 0), 4) * 100 for entry_id, tags in event.entries_tags.items(): if ( @@ -608,15 +661,15 @@ def update_entries_status_and_proggress( ): continue - # proggress + # progress entries_tags_delete.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ tag for tag in tags if tag.startswith( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}" + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" ) ], } @@ -624,17 +677,17 @@ def update_entries_status_and_proggress( entries_tags_add.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['proggress']}:{proggress}" + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}" ], } ) - if proggress >= 100: + if progress >= 100: entries_tags_add.append( { - "journal_entry_id": entry_id, + "entry_id": entry_id, "tags": [ f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" ], diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index aeca1659..0cedf625 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -16,7 +16,7 @@ from .crawler import ( EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3, - update_entries_status_and_proggress, + update_entries_status_and_progress, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events, _autoscale_crawl_events @@ -132,21 +132,21 @@ def historical_crawler( if addresses_deployment_blocks: for address, deployment_block in addresses_deployment_blocks.items(): - current_position = end_block + current_position = batch_end_block - progess = original_start_block - current_position / ( + progess = (original_start_block - current_position) / ( original_start_block - deployment_block ) progess_map[address] = progess if len(function_call_crawl_jobs) > 0: - function_call_crawl_jobs = update_entries_status_and_proggress( # type: ignore + function_call_crawl_jobs = update_entries_status_and_progress( # type: ignore events=function_call_crawl_jobs, progess_map=progess_map, ) if len(event_crawl_jobs) > 0: - event_crawl_jobs = update_entries_status_and_proggress( # type: ignore + event_crawl_jobs = update_entries_status_and_progress( # type: ignore events=event_crawl_jobs, progess_map=progess_map, ) From de39b35f696c1bb3dde4405b6178372d7ad44a44 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 25 May 2023 16:44:10 +0300 Subject: [PATCH 05/14] Bump versions. --- crawlers/mooncrawl/mooncrawl/version.py | 2 +- crawlers/mooncrawl/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 47f45764..581a985f 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.3.0" +MOONCRAWL_VERSION = "0.3.1" diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 96271e8d..8104d35c 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -34,7 +34,7 @@ setup( zip_safe=False, install_requires=[ "boto3", - "bugout>=0.1.19", + "bugout>=0.2.8", "chardet", "fastapi", "moonstreamdb>=0.3.3", From 6f86ad6a02a9109cc40f76d1dd4235e0b39e7563 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 25 May 2023 17:00:20 +0300 Subject: [PATCH 06/14] Add jobs endpoint. --- backend/moonstreamapi/actions.py | 15 +++++ backend/moonstreamapi/routes/subscriptions.py | 56 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index d32ea565..7edd18e3 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -729,3 +729,18 @@ def query_parameter_hash(params: Dict[str, Any]) -> str: ).hexdigest() return hash + + +def get_moonworm_jobs( + address: str, + subscription_type_id: str, + entries_limit: int = 100, +): + entries = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}", + limit=entries_limit, # load per request + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + return entries diff --git a/backend/moonstreamapi/routes/subscriptions.py b/backend/moonstreamapi/routes/subscriptions.py index 3fc020cf..6c5ddb34 100644 --- a/backend/moonstreamapi/routes/subscriptions.py +++ b/backend/moonstreamapi/routes/subscriptions.py @@ -15,6 +15,7 @@ from ..actions import ( apply_moonworm_tasks, get_entity_subscription_collection_id, EntityCollectionNotFoundException, + get_moonworm_jobs, ) from ..admin import subscription_types from .. import data @@ -22,7 +23,7 @@ from ..admin import subscription_types from ..middleware import MoonstreamHTTPException from ..reporter import reporter from ..settings import bugout_client as bc, entity_client as ec -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL from ..web3_provider import yield_web3_provider @@ -479,6 +480,59 @@ async def get_subscription_abi_handler( ) +@router.get( + "/{subscription_id}/jobs", + tags=["subscriptions"], + response_model=data.SubdcriptionsAbiResponse, +) +async def get_subscription_jobs_handler( + request: Request, + subscription_id: str, +) -> Any: + token = request.state.token + user = request.state.user + + try: + collection_id = get_entity_subscription_collection_id( + resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + user_id=user.id, + ) + + # get subscription entity + subscription_resource = ec.get_entity( + token=token, + collection_id=collection_id, + entity_id=subscription_id, + ) + + except EntityCollectionNotFoundException as e: + raise MoonstreamHTTPException( + status_code=404, + detail="User subscriptions collection not found", + internal_error=e, + ) + except Exception as e: + logger.error( + f"Error get subscriptions for user ({user}) with token ({token}), error: {str(e)}" + ) + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + for field in subscription_resource.required_fields: + if "subscription_type_id" in field: + subscription_type_id = field["subscription_type_id"] + + if "address" in field: + subscription_address = field["address"] + + get_moonworm_jobs_response = get_moonworm_jobs( + subscription_type_id=subscription_type_id, + address=subscription_address, + ) + + return get_moonworm_jobs_response + + @router.get( "/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse ) From 43e0367f179fb2e1b0521ed7dcdea0a17f3ba1ec Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 29 May 2023 16:29:45 +0300 Subject: [PATCH 07/14] Add changes. --- .../mooncrawl/moonworm_crawler/cli.py | 4 + .../mooncrawl/moonworm_crawler/crawler.py | 200 +++++++++--------- 2 files changed, 106 insertions(+), 98 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 131a328e..6ec3e61d 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -277,6 +277,10 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: addresses_deployment_blocks = find_all_deployed_blocks( web3, list(addresses_set) ) + if len(addresses_deployment_blocks) == 0: + raise ValueError( + "No addresses found in the blockchain. Please check your addresses and try again" + ) end_block = min(addresses_deployment_blocks.values()) if start_block is None: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 17b554f7..f959fdf0 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -231,9 +231,7 @@ def find_all_deployed_blocks( if block is not None: all_deployed_blocks[address] = block if block is None: - logger.warning( - f"Failed to find deployment block for {address}, code: {code}" - ) + logger.error(f"Failed to get deployment block for {address}") except Exception as e: logger.error(f"Failed to get code for {address}: {e}") return all_deployed_blocks @@ -451,24 +449,36 @@ def bugout_state_update( ) -> BugoutJournalEntries: """ Run update of entries tags in bugout - First delete tags, then add tags + First add tags to entries + Second delete tags from entries + With condition that if first step failed, second step will not be executed """ - if len(entries_tags_delete) > 0: - new_entreis_state = bugout_client.delete_entries_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entries_tags=entries_tags_delete, - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) + new_entreis_state = BugoutJournalEntries(entries=[]) if len(entries_tags_add) > 0: - new_entreis_state = bugout_client.create_entries_tags( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entries_tags=entries_tags_add, - timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, - ) + try: + new_entreis_state = bugout_client.create_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_add, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.error(f"Failed to add tags to entries: {e}") + + if len(entries_tags_delete) > 0 and ( + len(entries_tags_add) < 0 or len(new_entreis_state.entries) > 0 + ): + try: + new_entreis_state = bugout_client.delete_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_delete, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.error(f"Failed to delete tags from entries: {e}") return new_entreis_state @@ -601,98 +611,37 @@ def update_entries_status_and_progress( Update entries status and progress in mooncrawl bugout journal """ - entries_tags_delete = [] + entries_tags_delete: List[Dict[str, Any]] = [] - entries_tags_add = [] + entries_tags_add: List[Dict[str, Any]] = [] for event in events: if isinstance(event, EventCrawlJob): for contract_address, entries_ids in event.address_entries.items(): progress = round(progess_map.get(contract_address, 0), 4) * 100 - for entry_id, tags in entries_ids.items(): - # progress - - if ( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" - in tags - ): - continue - - entries_tags_delete.append( - { - "entry_id": entry_id, - "tags": [ - tag - for tag in tags - if tag.startswith( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" - ) - ], - } - ) - - entries_tags_add.append( - { - "entry_id": entry_id, - "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}" - ], - } - ) - - if progress >= 100: - entries_tags_add.append( - { - "entry_id": entry_id, - "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" - ], - } - ) + ( + entries_tags_delete, + entries_tags_add, + ) = add_progress_to_tags( + entries=entries_ids, + contract_progress=progress, + entries_tags_delete=entries_tags_delete, + entries_tags_add=entries_tags_add, + ) if isinstance(event, FunctionCallCrawlJob): progress = round(progess_map.get(event.contract_address, 0), 4) * 100 - for entry_id, tags in event.entries_tags.items(): - if ( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" - in tags - ): - continue - - # progress - entries_tags_delete.append( - { - "entry_id": entry_id, - "tags": [ - tag - for tag in tags - if tag.startswith( - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" - ) - ], - } - ) - - entries_tags_add.append( - { - "entry_id": entry_id, - "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{progress}" - ], - } - ) - - if progress >= 100: - entries_tags_add.append( - { - "entry_id": entry_id, - "tags": [ - f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" - ], - } - ) + ( + entries_tags_delete, + entries_tags_add, + ) = add_progress_to_tags( + entries=entries_ids, + contract_progress=progress, + entries_tags_delete=entries_tags_delete, + entries_tags_add=entries_tags_add, + ) new_entries_state = bugout_state_update( entries_tags_add=entries_tags_add, @@ -702,3 +651,58 @@ def update_entries_status_and_progress( events = update_job_tags(events, new_entries_state) return events + + +def add_progress_to_tags( + entries: Dict[UUID, List[str]], + contract_progress: float, + entries_tags_delete: List[Dict[str, Any]], + entries_tags_add: List[Dict[str, Any]], +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Calculate progress and add finished tag if progress is 100 + """ + + new_progress = f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{contract_progress}" + + for entry_id, tags in entries.items(): + # progress + + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + if new_progress not in tags: + entries_tags_delete.append( + { + "entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "entry_id": entry_id, + "tags": [new_progress], + } + ) + + if contract_progress >= 100: + entries_tags_add.append( + { + "entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + return entries_tags_delete, entries_tags_add From ad2045dd80c18f5cc7f86cd78615c1a2bf4abd2c Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 29 May 2023 16:46:11 +0300 Subject: [PATCH 08/14] Add changes. --- backend/moonstreamapi/actions.py | 13 ++++++++++--- backend/moonstreamapi/settings.py | 10 ---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 7edd18e3..fa7ae4d7 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -518,8 +518,6 @@ def apply_moonworm_tasks( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) - print(f"Found {len(entries)} tasks for address {address}") - # create historical crawl task in journal # will use create_entries_pack for creating entries in journal @@ -541,6 +539,15 @@ def apply_moonworm_tasks( for hash in abi_hashes_dict: if hash not in existing_hashes: + abi_selector = Web3.keccak( + text=abi_hashes_dict[hash]["name"] + + "(" + + ",".join( + map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"]) + ) + + ")" + )[:4].hex() + moonworm_abi_tasks_entries_pack.append( { "title": address, @@ -549,7 +556,7 @@ def apply_moonworm_tasks( f"address:{address}", f"type:{abi_hashes_dict[hash]['type']}", f"abi_method_hash:{hash}", - f"abi_selector:{Web3.keccak(text=abi_hashes_dict[hash]['name'] + '(' + ','.join(map(lambda x: x['type'], abi_hashes_dict[hash]['inputs'])) + ')')[:4].hex()}", + f"abi_selector:{abi_selector}", f"subscription_type:{subscription_type}", f"abi_name:{abi_hashes_dict[hash]['name']}", f"status:active", diff --git a/backend/moonstreamapi/settings.py b/backend/moonstreamapi/settings.py index 038653c5..b988a85c 100644 --- a/backend/moonstreamapi/settings.py +++ b/backend/moonstreamapi/settings.py @@ -102,16 +102,6 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "": "MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set" ) -# # Historical crawl journal -# MOONSTREAM_HISTORICAL_CRAWL_JOURNAL = os.environ.get( -# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL", "" -# ) -# if MOONSTREAM_HISTORICAL_CRAWL_JOURNAL == "": -# raise ValueError( -# "MOONSTREAM_HISTORICAL_CRAWL_JOURNAL environment variable must be set" -# ) - - # Web3 MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get( "MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", "" From 839622df10109b75550d26fd01e7b0eb6d791ce5 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 29 May 2023 17:12:57 +0300 Subject: [PATCH 09/14] Add changes. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index f959fdf0..75b1154b 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -180,7 +180,7 @@ def get_crawl_job_entries( if extend_tags is not None: for tag in extend_tags: - query += f" #{tag}" + query += f" #{tag.rstrip()}" if created_at_filter is not None: # Filtering by created_at From 13fc900bc1bf1f66275fde87f9fb864cd21cf6b3 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 30 May 2023 17:10:03 +0300 Subject: [PATCH 10/14] Add deployments. --- crawlers/deploy/deploy.bash | 113 +++++++++++++++++- .../ethereum-historical-crawl-events.service | 17 +++ .../ethereum-historical-crawl-events.timer | 9 ++ ...reum-historical-crawl-transactions.service | 17 +++ ...hereum-historical-crawl-transactions.timer | 9 ++ .../mumbai-history-crawl-events.service | 17 +++ .../deploy/mumbai-history-crawl-events.timer | 9 ++ .../mumbai-history-crawl-transactions.service | 17 +++ .../mumbai-history-crawl-transactions.timer | 9 ++ .../polygon-historical-crawl-events.service | 17 +++ .../polygon-historical-crawl-events.timer | 9 ++ ...ygon-historical-crawl-transactions.service | 17 +++ ...olygon-historical-crawl-transactions.timer | 9 ++ .../wyrm-historical-crawl-events.service | 17 +++ .../deploy/wyrm-historical-crawl-events.timer | 9 ++ ...wyrm-historical-crawl-transactions.service | 17 +++ .../wyrm-historical-crawl-transactions.timer | 9 ++ .../xdai-historical-crawl-events.service | 17 +++ .../deploy/xdai-historical-crawl-events.timer | 9 ++ ...xdai-historical-crawl-transactions.service | 17 +++ .../xdai-historical-crawl-transactions.timer | 9 ++ .../mooncrawl/moonworm_crawler/crawler.py | 2 +- 22 files changed, 373 insertions(+), 2 deletions(-) create mode 100644 crawlers/deploy/ethereum-historical-crawl-events.service create mode 100644 crawlers/deploy/ethereum-historical-crawl-events.timer create mode 100644 crawlers/deploy/ethereum-historical-crawl-transactions.service create mode 100644 crawlers/deploy/ethereum-historical-crawl-transactions.timer create mode 100644 crawlers/deploy/mumbai-history-crawl-events.service create mode 100644 crawlers/deploy/mumbai-history-crawl-events.timer create mode 100644 crawlers/deploy/mumbai-history-crawl-transactions.service create mode 100644 crawlers/deploy/mumbai-history-crawl-transactions.timer create mode 100644 crawlers/deploy/polygon-historical-crawl-events.service create mode 100644 crawlers/deploy/polygon-historical-crawl-events.timer create mode 100644 crawlers/deploy/polygon-historical-crawl-transactions.service create mode 100644 crawlers/deploy/polygon-historical-crawl-transactions.timer create mode 100644 crawlers/deploy/wyrm-historical-crawl-events.service create mode 100644 crawlers/deploy/wyrm-historical-crawl-events.timer create mode 100644 crawlers/deploy/wyrm-historical-crawl-transactions.service create mode 100644 crawlers/deploy/wyrm-historical-crawl-transactions.timer create mode 100644 crawlers/deploy/xdai-historical-crawl-events.service create mode 100644 crawlers/deploy/xdai-historical-crawl-events.timer create mode 100644 crawlers/deploy/xdai-historical-crawl-transactions.service create mode 100644 crawlers/deploy/xdai-historical-crawl-transactions.timer diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index fab521a6..907f0046 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -37,6 +37,10 @@ ETHEREUM_MISSING_TIMER_FILE="ethereum-missing.timer" ETHEREUM_MOONWORM_CRAWLER_SERVICE_FILE="ethereum-moonworm-crawler.service" ETHEREUM_ORANGE_DAO_REPORTS_TOKENONOMICS_SERVICE_FILE="ethereum-orange-dao-reports-tokenonomics.service" ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE="ethereum-orange-dao-reports-tokenonomics.timer" +ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="ethereum-historical-crawl-transactions.service" +ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="ethereum-historical-crawl-transactions.timer" +ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="ethereum-historical-crawl-events.service" +ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="ethereum-historical-crawl-events.timer" # Polygon service files POLYGON_SYNCHRONIZE_SERVICE="polygon-synchronize.service" @@ -56,6 +60,10 @@ POLYGON_CU_REPORTS_TOKENONOMICS_SERVICE_FILE="polygon-cu-reports-tokenonomics.se POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE="polygon-cu-reports-tokenonomics.timer" POLYGON_CU_NFT_DASHBOARD_SERVICE_FILE="polygon-cu-nft-dashboard.service" POLYGON_CU_NFT_DASHBOARD_TIMER_FILE="polygon-cu-nft-dashboard.timer" +POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="polygon-historical-crawl-transactions.service" +POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="polygon-historical-crawl-transactions.timer" +POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="polygon-historical-crawl-events.service" +POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="polygon-historical-crawl-events.timer" # Mumbai service files MUMBAI_SYNCHRONIZE_SERVICE="mumbai-synchronize.service" @@ -68,6 +76,11 @@ MUMBAI_STATE_CLEAN_SERVICE_FILE="mumbai-state-clean.service" MUMBAI_STATE_CLEAN_TIMER_FILE="mumbai-state-clean.timer" MUMBAI_METADATA_SERVICE_FILE="mumbai-metadata.service" MUMBAI_METADATA_TIMER_FILE="mumbai-metadata.timer" +MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE="mumbai-history-crawl-transactions.service" +MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE="mumbai-history-crawl-transactions.timer" +MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE="mumbai-history-crawl-events.service" +MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE="mumbai-history-crawl-events.timer" + # XDai service files XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service" @@ -76,6 +89,10 @@ XDAI_MISSING_TIMER_FILE="xdai-missing.timer" XDAI_STATISTICS_SERVICE_FILE="xdai-statistics.service" XDAI_STATISTICS_TIMER_FILE="xdai-statistics.timer" XDAI_MOONWORM_CRAWLER_SERVICE_FILE="xdai-moonworm-crawler.service" +XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="xdai-historical-crawl-transactions.service" +XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="xdai-historical-crawl-transactions.timer" +XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="xdai-historical-crawl-events.service" +XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="xdai-historical-crawl-events.timer" # Wyrm service files WYRM_SYNCHRONIZE_SERVICE="wyrm-synchronize.service" @@ -84,6 +101,10 @@ WYRM_MISSING_TIMER_FILE="wyrm-missing.timer" WYRM_STATISTICS_SERVICE_FILE="wyrm-statistics.service" WYRM_STATISTICS_TIMER_FILE="wyrm-statistics.timer" WYRM_MOONWORM_CRAWLER_SERVICE_FILE="wyrm-moonworm-crawler.service" +WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="wyrm-historical-crawl-transactions.service" +WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="wyrm-historical-crawl-transactions.timer" +WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="wyrm-historical-crawl-events.service" +WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="wyrm-historical-crawl-events.timer" set -eu @@ -181,6 +202,24 @@ cp "${SCRIPT_DIR}/${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}" "/home/ubuntu/ XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Ethereum historical transactions crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Ethereum historical events crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo @@ -269,6 +308,24 @@ cp "${SCRIPT_DIR}/${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}" "/home/ubuntu/.config/ XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Polygon historical transactions crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Polygon historical events crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing Mumbai block with transactions syncronizer service definition with ${MUMBAI_SYNCHRONIZE_SERVICE}" @@ -321,6 +378,24 @@ cp "${SCRIPT_DIR}/${MUMBAI_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/u XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${MUMBAI_METADATA_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical transactions crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical events crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing XDai block with transactions syncronizer service definition with ${XDAI_SYNCHRONIZE_SERVICE}" @@ -355,6 +430,24 @@ cp "${SCRIPT_DIR}/${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/s XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing xDai historical transactions crawler service and timer with: ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing xDai historical events crawler service and timer with: ${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing Wyrm block with transactions syncronizer service definition with ${WYRM_SYNCHRONIZE_SERVICE}" @@ -387,4 +480,22 @@ echo -e "${PREFIX_INFO} Replacing existing Wyrm moonworm crawler service definit chmod 644 "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" cp "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" \ No newline at end of file +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Wyrm historical transactions crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Wyrm historical events crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" diff --git a/crawlers/deploy/ethereum-historical-crawl-events.service b/crawlers/deploy/ethereum-historical-crawl-events.service new file mode 100644 index 00000000..006a03c4 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Ethereum historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=ethereum-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-historical-crawl-events.timer b/crawlers/deploy/ethereum-historical-crawl-events.timer new file mode 100644 index 00000000..57801df3 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on ethereum + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/ethereum-historical-crawl-transactions.service b/crawlers/deploy/ethereum-historical-crawl-transactions.service new file mode 100644 index 00000000..1b6b4c2a --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Ethereum historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=ethereum-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-historical-crawl-transactions.timer b/crawlers/deploy/ethereum-historical-crawl-transactions.timer new file mode 100644 index 00000000..4038c649 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on ethereum + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/mumbai-history-crawl-events.service b/crawlers/deploy/mumbai-history-crawl-events.service new file mode 100644 index 00000000..488a1a94 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Mumbai historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=mumbai-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-history-crawl-events.timer b/crawlers/deploy/mumbai-history-crawl-events.timer new file mode 100644 index 00000000..8fd46b46 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on mumbai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/mumbai-history-crawl-transactions.service b/crawlers/deploy/mumbai-history-crawl-transactions.service new file mode 100644 index 00000000..1efb0082 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Mumbai historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=mumbai-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-history-crawl-transactions.timer b/crawlers/deploy/mumbai-history-crawl-transactions.timer new file mode 100644 index 00000000..b1c8d824 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on mumbai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/polygon-historical-crawl-events.service b/crawlers/deploy/polygon-historical-crawl-events.service new file mode 100644 index 00000000..c8017a93 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Polygon historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=polygon-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-historical-crawl-events.timer b/crawlers/deploy/polygon-historical-crawl-events.timer new file mode 100644 index 00000000..f0560ac8 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on polygon + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.service b/crawlers/deploy/polygon-historical-crawl-transactions.service new file mode 100644 index 00000000..c7be1251 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Polygon historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=mumbai-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.timer b/crawlers/deploy/polygon-historical-crawl-transactions.timer new file mode 100644 index 00000000..4a2cdd21 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on polygon + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/wyrm-historical-crawl-events.service b/crawlers/deploy/wyrm-historical-crawl-events.service new file mode 100644 index 00000000..cd73d8c0 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Wyrm historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=wyrm-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-events.timer b/crawlers/deploy/wyrm-historical-crawl-events.timer new file mode 100644 index 00000000..bd402c01 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on wyrm + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/wyrm-historical-crawl-transactions.service b/crawlers/deploy/wyrm-historical-crawl-transactions.service new file mode 100644 index 00000000..5c496583 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Wyrm historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=wyrm-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-transactions.timer b/crawlers/deploy/wyrm-historical-crawl-transactions.timer new file mode 100644 index 00000000..40ea12bd --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on wyrm + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/xdai-historical-crawl-events.service b/crawlers/deploy/xdai-historical-crawl-events.service new file mode 100644 index 00000000..11b0fa5a --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=XDai historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=xdai-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-events.timer b/crawlers/deploy/xdai-historical-crawl-events.timer new file mode 100644 index 00000000..153cfaa8 --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on xdai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/xdai-historical-crawl-transactions.service b/crawlers/deploy/xdai-historical-crawl-transactions.service new file mode 100644 index 00000000..8fb6303b --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=XDai historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=xdai-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-transactions.timer b/crawlers/deploy/xdai-historical-crawl-transactions.timer new file mode 100644 index 00000000..89dbcb8f --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on xdai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 75b1154b..4aabb0ea 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -12,7 +12,7 @@ from bugout.data import BugoutSearchResult, BugoutJournalEntries from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from web3.main import Web3 -from moonworm.deployment import find_deployment_block +from moonworm.deployment import find_deployment_block # type: ignore from ..blockchain import connect from ..reporter import reporter From ee9fba74694b8069a2e292c9fab713bbe1263e07 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 30 May 2023 17:13:18 +0300 Subject: [PATCH 11/14] Add fix for end_block. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 6ec3e61d..f90cb666 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -263,6 +263,8 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: addresses_deployment_blocks = None + end_block = args.end + # get set of addresses from event jobs and function call jobs if args.find_deployed_blocks: addresses_set = set() @@ -456,7 +458,7 @@ def main() -> None: "--end", "-e", type=int, - required=True, + required=False, ) historical_crawl_parser.add_argument( "--blockchain-type", From 9f82dc3b7f54188227de04fe8a7c764cf1010938 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 30 May 2023 17:34:49 +0300 Subject: [PATCH 12/14] Add changes. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 4 ++-- crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index f90cb666..d593c9c2 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -149,8 +149,8 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: addresses_filter = [] extend_tags.extend( [ - "moonworm_task_pickedup:True", - "historical_crawl_status:pending", + "#moonworm_task_pickedup:True", + "!#historical_crawl_status:finsihed", ] ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 4aabb0ea..3418959d 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -180,7 +180,7 @@ def get_crawl_job_entries( if extend_tags is not None: for tag in extend_tags: - query += f" #{tag.rstrip()}" + query += f" {tag.rstrip()}" if created_at_filter is not None: # Filtering by created_at From 96af5d950cdae0ca3c73e8e9789c2938939c64fd Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 31 May 2023 16:59:59 +0300 Subject: [PATCH 13/14] Fix typo. --- crawlers/deploy/polygon-historical-crawl-transactions.service | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.service b/crawlers/deploy/polygon-historical-crawl-transactions.service index c7be1251..30a245b9 100644 --- a/crawlers/deploy/polygon-historical-crawl-transactions.service +++ b/crawlers/deploy/polygon-historical-crawl-transactions.service @@ -11,7 +11,7 @@ Restart=on-failure RestartSec=15s ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-transactions CPUWeight=70 -SyslogIdentifier=mumbai-historical-crawler-transactions +SyslogIdentifier=polygon-historical-crawler-transactions [Install] WantedBy=multi-user.target \ No newline at end of file From 2e211a6b187fa612f5f1d426c55f38e402b3da4c Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 31 May 2023 17:06:49 +0300 Subject: [PATCH 14/14] Use mappings instead hardcode. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index d593c9c2..d83e41a0 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -11,6 +11,8 @@ from ..db import yield_db_session_ctx from ..settings import ( MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID, + HISTORICAL_CRAWLER_STATUSES, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, ) from .continuous_crawler import _retry_connect_web3, continuous_crawler from .crawler import ( @@ -150,7 +152,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: extend_tags.extend( [ "#moonworm_task_pickedup:True", - "!#historical_crawl_status:finsihed", + f"!#{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}", ] )