Merge branch 'revert-1095-revert-1085-add-moonworm-v3-db-crawler' of github.com:moonstream-to/api into revert-1095-revert-1085-add-moonworm-v3-db-crawler

pull/1097/head
Andrey 2024-06-21 13:58:12 +03:00
commit 2e6fd29fb5
5 zmienionych plików z 224 dodań i 10 usunięć

Wyświetl plik

@ -5,11 +5,11 @@ import time
import uuid import uuid
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union, List
import boto3 # type: ignore import boto3 # type: ignore
import requests # type: ignore import requests # type: ignore
from bugout.data import BugoutResources from bugout.data import BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException from bugout.exceptions import BugoutResponseException
from moonstream.client import ( # type: ignore from moonstream.client import ( # type: ignore
ENDPOINT_QUERIES, ENDPOINT_QUERIES,
@ -170,3 +170,37 @@ def recive_S3_data_from_query(
logger.info("Too many retries") logger.info("Too many retries")
break break
return data_response.json() return data_response.json()
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
"""
offset = 0
results: List[BugoutSearchResult] = []
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit):
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
return results

Wyświetl plik

@ -58,3 +58,12 @@ class TokenURIs(BaseModel):
block_number: str block_number: str
block_timestamp: str block_timestamp: str
address: str address: str
class ViewTasks(BaseModel):
type: str
stateMutability: str
inputs: Any
name: str
outputs: List[Dict[str, Any]]
address: str

Wyświetl plik

@ -436,3 +436,13 @@ MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN = os.environ.get( MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN", "" "MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN", ""
) )
# state crawler
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID = os.environ.get(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID", ""
)
if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
)

Wyświetl plik

@ -12,13 +12,20 @@ from uuid import UUID
from moonstream.client import Moonstream # type: ignore from moonstream.client import Moonstream # type: ignore
from moonstreamtypes.blockchain import AvailableBlockchainType from moonstreamtypes.blockchain import AvailableBlockchainType
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from ..actions import recive_S3_data_from_query from ..actions import recive_S3_data_from_query, get_all_entries_from_search
from ..blockchain import connect from ..blockchain import connect
from ..data import ViewTasks
from ..db import PrePing_SessionLocal from ..db import PrePing_SessionLocal
from ..settings import INFURA_PROJECT_ID, infura_networks, multicall_contracts from ..settings import (
bugout_client as bc,
INFURA_PROJECT_ID,
infura_networks,
multicall_contracts,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
)
from .db import clean_labels, commit_session, view_call_to_label from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2 from .Multicall2_interface import Contract as Multicall2
from .web3_util import FunctionSignature from .web3_util import FunctionSignature
@ -508,10 +515,48 @@ def handle_crawl(args: argparse.Namespace) -> None:
Read all view methods of the contracts and crawl Read all view methods of the contracts and crawl
""" """
blockchain_type = AvailableBlockchainType(args.blockchain)
if args.jobs_file is not None:
with open(args.jobs_file, "r") as f: with open(args.jobs_file, "r") as f:
jobs = json.load(f) jobs = json.load(f)
blockchain_type = AvailableBlockchainType(args.blockchain) else:
logger.info("Reading jobs from the journal")
jobs = []
# Bugout
query = f"#state_job #blockchain:{blockchain_type.value}"
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
if len(existing_jobs) == 0:
logger.info("No jobs found in the journal")
return
for job in existing_jobs:
try:
if job.content is None:
logger.error(f"Job content is None for entry {job.entry_url}")
continue
### parse json
job_content = json.loads(job.content)
### validate via ViewTasks
ViewTasks(**job_content)
jobs.append(job_content)
except Exception as e:
logger.error(f"Job validation of entry {job.entry_url} failed: {e}")
continue
custom_web3_provider = args.web3_uri custom_web3_provider = args.web3_uri
@ -572,6 +617,100 @@ def clean_labels_handler(args: argparse.Namespace) -> None:
db_session.close() db_session.close()
def migrate_state_tasks_handler(args: argparse.Namespace) -> None:
### Get all tasks from files
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
# file example jobs/ethereum-jobs.json
blockchain_type = AvailableBlockchainType(args.blockchain)
migrated_blockchain = blockchain_type.value
### Get all tasks from the journal
query = f"#state_job #{migrated_blockchain}"
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
existing_state_tasks_list = []
logger.info(f"Existing jobs: {len(existing_jobs)}")
logger.info(f"New jobs: {jobs}")
### validate existing jobs
for bugout_job in existing_jobs:
try:
if bugout_job.content is None:
logger.error(f"Job content is None for entry {bugout_job.entry_url}")
continue
### parse json
job_content = json.loads(bugout_job.content)
### validate via ViewTasks
ViewTasks(**job_content)
except Exception as e:
logger.error(f"Job validation of entry {bugout_job.entry_url} failed: {e}")
continue
### from tags get blockchain, name and address
for tag in bugout_job.tags:
if tag.startswith("blockchain"):
blockchain = tag.split(":")[1]
if tag.startswith("name"):
name = tag.split(":")[1]
if tag.startswith("address"):
address = tag.split(":")[1]
existing_state_tasks_list.append(f"{blockchain}:{name}:{address}")
### Get all tasks from files
for job in jobs:
name = job["name"]
address = job["address"]
### Deduplicate tasks
if f"{migrated_blockchain}:{name}:{address}" not in existing_state_tasks_list:
### create new task
json_str = json.dumps(job, indent=4)
### add tabs to json string for better readability
json_str_with_tabs = "\n".join(
"\t" + line for line in json_str.splitlines()
)
try:
bc.create_entry(
title=f"{name}:{address}",
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
content=json_str_with_tabs,
tags=[
"state_job",
f"blockchain:{migrated_blockchain}",
f"name:{name}",
f"address:{address}",
],
)
except Exception as e:
logger.error(f"Error creating entry: {e}")
continue
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help()) parser.set_defaults(func=lambda _: parser.print_help())
@ -614,7 +753,7 @@ def main() -> None:
"-j", "-j",
type=str, type=str,
help="Path to json file with jobs", help="Path to json file with jobs",
required=True, required=False,
) )
view_state_crawler_parser.add_argument( view_state_crawler_parser.add_argument(
"--batch-size", "--batch-size",
@ -625,6 +764,28 @@ def main() -> None:
) )
view_state_crawler_parser.set_defaults(func=handle_crawl) view_state_crawler_parser.set_defaults(func=handle_crawl)
view_state_migration_parser = subparsers.add_parser(
"migrate-jobs",
help="Migrate jobs from one files to bugout",
)
view_state_migration_parser.add_argument(
"--jobs-file",
"-j",
type=str,
help="Path to json file with jobs",
required=True,
)
view_state_migration_parser.add_argument(
"--blockchain",
"-b",
type=str,
help="Type of blovkchain wich writng in database",
required=True,
)
view_state_migration_parser.set_defaults(func=migrate_state_tasks_handler)
view_state_cleaner = subparsers.add_parser( view_state_cleaner = subparsers.add_parser(
"clean-state-labels", "clean-state-labels",
help="Clean labels from database", help="Clean labels from database",