From 577df99a74b733f92b600f18657770d72a30a444 Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 10 May 2024 23:36:38 +0300 Subject: [PATCH 1/4] Migrate state-crawler to moonworm. --- crawlers/mooncrawl/mooncrawl/actions.py | 38 +++- crawlers/mooncrawl/mooncrawl/data.py | 9 + crawlers/mooncrawl/mooncrawl/settings.py | 11 ++ .../mooncrawl/mooncrawl/state_crawler/cli.py | 176 +++++++++++++++++- 4 files changed, 224 insertions(+), 10 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 05c265f5..3cf7e262 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -5,11 +5,11 @@ import time import uuid from collections import OrderedDict from datetime import datetime -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional, Union, List import boto3 # type: ignore import requests # type: ignore -from bugout.data import BugoutResources +from bugout.data import BugoutResources, BugoutSearchResult from bugout.exceptions import BugoutResponseException from moonstream.client import ( # type: ignore ENDPOINT_QUERIES, @@ -170,3 +170,37 @@ def recive_S3_data_from_query( logger.info("Too many retries") break return data_response.json() + + +def get_all_entries_from_search( + journal_id: str, search_query: str, limit: int, token: str, content: bool = False +) -> List[BugoutSearchResult]: + """ + Get all required entries from journal using search interface + """ + offset = 0 + results: List[BugoutSearchResult] = [] + existing_methods = bc.search( + token=token, + journal_id=journal_id, + query=search_query, + content=content, + timeout=10.0, + limit=limit, + offset=offset, + ) + results.extend(existing_methods.results) # type: ignore + if len(results) != existing_methods.total_results: + for offset in range(limit, existing_methods.total_results, limit): + existing_methods = bc.search( + token=token, + journal_id=journal_id, + query=search_query, + content=content, + timeout=10.0, + limit=limit, + offset=offset, + ) + results.extend(existing_methods.results) # type: ignore + + return results diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 3cef9a75..3e76db46 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -58,3 +58,12 @@ class TokenURIs(BaseModel): block_number: str block_timestamp: str address: str + + +class ViewTasks(BaseModel): + type: str + stateMutability: str + inputs: Any + name: str + outputs: List[Dict[str, Any]] + address: str diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 2cc24339..c6749d2b 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -393,3 +393,14 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "": MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000 MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60 + + +# state crawler + +MOONSTREAM_STATE_CRAWLER_JOURNAL_ID = os.environ.get( + "MOONSTREAM_STATE_CRAWLER_JOURNAL_ID", "" +) +if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "": + raise ValueError( + "MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set" + ) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index ab6cf9a1..1cbcb9ce 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -12,14 +12,20 @@ from uuid import UUID from moonstream.client import Moonstream # type: ignore from moonstreamdb.blockchain import AvailableBlockchainType -from web3.middleware import geth_poa_middleware - from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 -from ..actions import recive_S3_data_from_query +from ..actions import recive_S3_data_from_query, get_all_entries_from_search from ..blockchain import connect +from ..data import ViewTasks from ..db import PrePing_SessionLocal -from ..settings import INFURA_PROJECT_ID, infura_networks, multicall_contracts +from ..settings import ( + bugout_client as bc, + INFURA_PROJECT_ID, + infura_networks, + multicall_contracts, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, +) from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 from .web3_util import FunctionSignature @@ -509,11 +515,49 @@ def handle_crawl(args: argparse.Namespace) -> None: Read all view methods of the contracts and crawl """ - with open(args.jobs_file, "r") as f: - jobs = json.load(f) - blockchain_type = AvailableBlockchainType(args.blockchain) + if args.jobs_file is not None: + with open(args.jobs_file, "r") as f: + jobs = json.load(f) + + else: + + jobs = [] + + # Bugout + query = f"#state_job #blockchain:{blockchain_type.value}" + + print(f"Query: {query}") + + existing_jobs = get_all_entries_from_search( + journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + search_query=query, + limit=1000, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + content=True, + ) + + if len(existing_jobs) == 0: + logger.info("No jobs found in the journal") + return + + for job in existing_jobs: + + try: + if job.content is None: + logger.error(f"Job content is None for entry {job.entry_url}") + continue + ### parse json + job_content = json.loads(job.content) + ### validate via ViewTasks + ViewTasks(**job_content) + jobs.append(job_content) + except Exception as e: + + logger.error(f"Job validation of entry {job.entry_url} failed: {e}") + continue + custom_web3_provider = args.web3_uri if args.infura and INFURA_PROJECT_ID is not None: @@ -573,6 +617,100 @@ def clean_labels_handler(args: argparse.Namespace) -> None: db_session.close() +def migrate_state_tasks_handler(args: argparse.Namespace) -> None: + + ### Get all tasks from files + with open(args.jobs_file, "r") as f: + jobs = json.load(f) + + # file example jobs/ethereum-jobs.json + + blockchain_type = AvailableBlockchainType(args.blockchain) + + migrated_blockchain = blockchain_type.value + + ### Get all tasks from the journal + + query = f"#state_job #{migrated_blockchain}" + + existing_jobs = get_all_entries_from_search( + journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + search_query=query, + limit=1000, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + content=True, + ) + + existing_state_tasks_list = [] + + print(f"Existing jobs: {len(existing_jobs)}") + print(f"New jobs: {jobs}") + + ### validate existing jobs + for job in existing_jobs: + + try: + if job.content is None: + logger.error(f"Job content is None for entry {job.entry_url}") + continue + ### parse json + job_content = json.loads(job.content) + ### validate via ViewTasks + ViewTasks(**job_content) + except Exception as e: + + logger.error(f"Job validation of entry {job.entry_url} failed: {e}") + continue + + ### from tags get blockchain, name and address + + for tag in job.tags: + if tag.startswith("blockchain"): + blockchain = tag.split(":")[1] + if tag.startswith("name"): + name = tag.split(":")[1] + if tag.startswith("address"): + address = tag.split(":")[1] + + existing_state_tasks_list.append(f"{blockchain}:{name}:{address}") + + ### Get all tasks from files + + for job in jobs: + + name = job["name"] + + address = job["address"] + + ### Deduplicate tasks + if f"{migrated_blockchain}:{name}:{address}" not in existing_state_tasks_list: + ### create new task + + json_str = json.dumps(job, indent=4) + + ### add tabs to json string for better readability + json_str_with_tabs = "\n".join( + "\t" + line for line in json_str.splitlines() + ) + + try: + bc.create_entry( + title=f"{name}:{address}", + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + content=json_str_with_tabs, + tags=[ + "state_job", + f"blockchain:{migrated_blockchain}", + f"name:{name}", + f"address:{address}", + ], + ) + except Exception as e: + logger.error(f"Error creating entry: {e}") + continue + + def main() -> None: parser = argparse.ArgumentParser() parser.set_defaults(func=lambda _: parser.print_help()) @@ -615,7 +753,7 @@ def main() -> None: "-j", type=str, help="Path to json file with jobs", - required=True, + required=False, ) view_state_crawler_parser.add_argument( "--batch-size", @@ -626,6 +764,28 @@ def main() -> None: ) view_state_crawler_parser.set_defaults(func=handle_crawl) + view_state_migration_parser = subparsers.add_parser( + "migrate-jobs", + help="Migrate jobs from one files to bugout", + ) + view_state_migration_parser.add_argument( + "--jobs-file", + "-j", + type=str, + help="Path to json file with jobs", + required=True, + ) + + view_state_migration_parser.add_argument( + "--blockchain", + "-b", + type=str, + help="Type of blovkchain wich writng in database", + required=True, + ) + + view_state_migration_parser.set_defaults(func=migrate_state_tasks_handler) + view_state_cleaner = subparsers.add_parser( "clean-state-labels", help="Clean labels from database", From 291fd75df622893a3f81fcfef8348c3411f38efe Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 20 Jun 2024 21:25:40 +0300 Subject: [PATCH 2/4] Fix loop. --- .../mooncrawl/mooncrawl/state_crawler/cli.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 1cbcb9ce..c4379ab2 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -523,13 +523,13 @@ def handle_crawl(args: argparse.Namespace) -> None: else: + logger.info("Reading jobs from the journal") + jobs = [] # Bugout query = f"#state_job #blockchain:{blockchain_type.value}" - print(f"Query: {query}") - existing_jobs = get_all_entries_from_search( journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, search_query=query, @@ -542,21 +542,21 @@ def handle_crawl(args: argparse.Namespace) -> None: logger.info("No jobs found in the journal") return - for job in existing_jobs: + for job in existing_jobs: - try: - if job.content is None: - logger.error(f"Job content is None for entry {job.entry_url}") + try: + if job.content is None: + logger.error(f"Job content is None for entry {job.entry_url}") + continue + ### parse json + job_content = json.loads(job.content) + ### validate via ViewTasks + ViewTasks(**job_content) + jobs.append(job_content) + except Exception as e: + + logger.error(f"Job validation of entry {job.entry_url} failed: {e}") continue - ### parse json - job_content = json.loads(job.content) - ### validate via ViewTasks - ViewTasks(**job_content) - jobs.append(job_content) - except Exception as e: - - logger.error(f"Job validation of entry {job.entry_url} failed: {e}") - continue custom_web3_provider = args.web3_uri From b808b1aff9d1ff4fc9898891f8ea938b39fe4b99 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 20 Jun 2024 22:19:47 +0300 Subject: [PATCH 3/4] Fix lint. --- .../mooncrawl/mooncrawl/state_crawler/cli.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index c4379ab2..154213cb 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -643,28 +643,28 @@ def migrate_state_tasks_handler(args: argparse.Namespace) -> None: existing_state_tasks_list = [] - print(f"Existing jobs: {len(existing_jobs)}") - print(f"New jobs: {jobs}") + logger.info(f"Existing jobs: {len(existing_jobs)}") + logger.info(f"New jobs: {jobs}") ### validate existing jobs - for job in existing_jobs: + for bugout_job in existing_jobs: try: - if job.content is None: - logger.error(f"Job content is None for entry {job.entry_url}") + if bugout_job.content is None: + logger.error(f"Job content is None for entry {bugout_job.entry_url}") continue ### parse json - job_content = json.loads(job.content) + job_content = json.loads(bugout_job.content) ### validate via ViewTasks ViewTasks(**job_content) except Exception as e: - logger.error(f"Job validation of entry {job.entry_url} failed: {e}") + logger.error(f"Job validation of entry {bugout_job.entry_url} failed: {e}") continue ### from tags get blockchain, name and address - for tag in job.tags: + for tag in bugout_job.tags: if tag.startswith("blockchain"): blockchain = tag.split(":")[1] if tag.startswith("name"): From 77d1c3fe6f08dc8bbee4d93096914f97f99a97c2 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 20 Jun 2024 22:25:26 +0300 Subject: [PATCH 4/4] Bump version. --- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 240b2333..4984cb78 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.4.5" +MOONCRAWL_VERSION = "0.4.6"