kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #1065 from moonstream-to/add-bugout-metadata-tasks
Migrate state-crawler to bugout.pull/1098/head^2
commit
344335a40b
|
@ -5,11 +5,11 @@ import time
|
|||
import uuid
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional, Union
|
||||
from typing import Any, Dict, Optional, Union, List
|
||||
|
||||
import boto3 # type: ignore
|
||||
import requests # type: ignore
|
||||
from bugout.data import BugoutResources
|
||||
from bugout.data import BugoutResources, BugoutSearchResult
|
||||
from bugout.exceptions import BugoutResponseException
|
||||
from moonstream.client import ( # type: ignore
|
||||
ENDPOINT_QUERIES,
|
||||
|
@ -170,3 +170,37 @@ def recive_S3_data_from_query(
|
|||
logger.info("Too many retries")
|
||||
break
|
||||
return data_response.json()
|
||||
|
||||
|
||||
def get_all_entries_from_search(
|
||||
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
|
||||
) -> List[BugoutSearchResult]:
|
||||
"""
|
||||
Get all required entries from journal using search interface
|
||||
"""
|
||||
offset = 0
|
||||
results: List[BugoutSearchResult] = []
|
||||
existing_methods = bc.search(
|
||||
token=token,
|
||||
journal_id=journal_id,
|
||||
query=search_query,
|
||||
content=content,
|
||||
timeout=10.0,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
results.extend(existing_methods.results) # type: ignore
|
||||
if len(results) != existing_methods.total_results:
|
||||
for offset in range(limit, existing_methods.total_results, limit):
|
||||
existing_methods = bc.search(
|
||||
token=token,
|
||||
journal_id=journal_id,
|
||||
query=search_query,
|
||||
content=content,
|
||||
timeout=10.0,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
results.extend(existing_methods.results) # type: ignore
|
||||
|
||||
return results
|
||||
|
|
|
@ -58,3 +58,12 @@ class TokenURIs(BaseModel):
|
|||
block_number: str
|
||||
block_timestamp: str
|
||||
address: str
|
||||
|
||||
|
||||
class ViewTasks(BaseModel):
|
||||
type: str
|
||||
stateMutability: str
|
||||
inputs: Any
|
||||
name: str
|
||||
outputs: List[Dict[str, Any]]
|
||||
address: str
|
||||
|
|
|
@ -393,3 +393,14 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
|
|||
|
||||
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
|
||||
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
|
||||
|
||||
|
||||
# state crawler
|
||||
|
||||
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID = os.environ.get(
|
||||
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID", ""
|
||||
)
|
||||
if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
|
||||
raise ValueError(
|
||||
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
|
||||
)
|
||||
|
|
|
@ -12,14 +12,20 @@ from uuid import UUID
|
|||
|
||||
from moonstream.client import Moonstream # type: ignore
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType
|
||||
from web3.middleware import geth_poa_middleware
|
||||
|
||||
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
|
||||
|
||||
from ..actions import recive_S3_data_from_query
|
||||
from ..actions import recive_S3_data_from_query, get_all_entries_from_search
|
||||
from ..blockchain import connect
|
||||
from ..data import ViewTasks
|
||||
from ..db import PrePing_SessionLocal
|
||||
from ..settings import INFURA_PROJECT_ID, infura_networks, multicall_contracts
|
||||
from ..settings import (
|
||||
bugout_client as bc,
|
||||
INFURA_PROJECT_ID,
|
||||
infura_networks,
|
||||
multicall_contracts,
|
||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
|
||||
)
|
||||
from .db import clean_labels, commit_session, view_call_to_label
|
||||
from .Multicall2_interface import Contract as Multicall2
|
||||
from .web3_util import FunctionSignature
|
||||
|
@ -509,11 +515,49 @@ def handle_crawl(args: argparse.Namespace) -> None:
|
|||
Read all view methods of the contracts and crawl
|
||||
"""
|
||||
|
||||
with open(args.jobs_file, "r") as f:
|
||||
jobs = json.load(f)
|
||||
|
||||
blockchain_type = AvailableBlockchainType(args.blockchain)
|
||||
|
||||
if args.jobs_file is not None:
|
||||
with open(args.jobs_file, "r") as f:
|
||||
jobs = json.load(f)
|
||||
|
||||
else:
|
||||
|
||||
logger.info("Reading jobs from the journal")
|
||||
|
||||
jobs = []
|
||||
|
||||
# Bugout
|
||||
query = f"#state_job #blockchain:{blockchain_type.value}"
|
||||
|
||||
existing_jobs = get_all_entries_from_search(
|
||||
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
|
||||
search_query=query,
|
||||
limit=1000,
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
content=True,
|
||||
)
|
||||
|
||||
if len(existing_jobs) == 0:
|
||||
logger.info("No jobs found in the journal")
|
||||
return
|
||||
|
||||
for job in existing_jobs:
|
||||
|
||||
try:
|
||||
if job.content is None:
|
||||
logger.error(f"Job content is None for entry {job.entry_url}")
|
||||
continue
|
||||
### parse json
|
||||
job_content = json.loads(job.content)
|
||||
### validate via ViewTasks
|
||||
ViewTasks(**job_content)
|
||||
jobs.append(job_content)
|
||||
except Exception as e:
|
||||
|
||||
logger.error(f"Job validation of entry {job.entry_url} failed: {e}")
|
||||
continue
|
||||
|
||||
custom_web3_provider = args.web3_uri
|
||||
|
||||
if args.infura and INFURA_PROJECT_ID is not None:
|
||||
|
@ -573,6 +617,100 @@ def clean_labels_handler(args: argparse.Namespace) -> None:
|
|||
db_session.close()
|
||||
|
||||
|
||||
def migrate_state_tasks_handler(args: argparse.Namespace) -> None:
|
||||
|
||||
### Get all tasks from files
|
||||
with open(args.jobs_file, "r") as f:
|
||||
jobs = json.load(f)
|
||||
|
||||
# file example jobs/ethereum-jobs.json
|
||||
|
||||
blockchain_type = AvailableBlockchainType(args.blockchain)
|
||||
|
||||
migrated_blockchain = blockchain_type.value
|
||||
|
||||
### Get all tasks from the journal
|
||||
|
||||
query = f"#state_job #{migrated_blockchain}"
|
||||
|
||||
existing_jobs = get_all_entries_from_search(
|
||||
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
|
||||
search_query=query,
|
||||
limit=1000,
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
content=True,
|
||||
)
|
||||
|
||||
existing_state_tasks_list = []
|
||||
|
||||
logger.info(f"Existing jobs: {len(existing_jobs)}")
|
||||
logger.info(f"New jobs: {jobs}")
|
||||
|
||||
### validate existing jobs
|
||||
for bugout_job in existing_jobs:
|
||||
|
||||
try:
|
||||
if bugout_job.content is None:
|
||||
logger.error(f"Job content is None for entry {bugout_job.entry_url}")
|
||||
continue
|
||||
### parse json
|
||||
job_content = json.loads(bugout_job.content)
|
||||
### validate via ViewTasks
|
||||
ViewTasks(**job_content)
|
||||
except Exception as e:
|
||||
|
||||
logger.error(f"Job validation of entry {bugout_job.entry_url} failed: {e}")
|
||||
continue
|
||||
|
||||
### from tags get blockchain, name and address
|
||||
|
||||
for tag in bugout_job.tags:
|
||||
if tag.startswith("blockchain"):
|
||||
blockchain = tag.split(":")[1]
|
||||
if tag.startswith("name"):
|
||||
name = tag.split(":")[1]
|
||||
if tag.startswith("address"):
|
||||
address = tag.split(":")[1]
|
||||
|
||||
existing_state_tasks_list.append(f"{blockchain}:{name}:{address}")
|
||||
|
||||
### Get all tasks from files
|
||||
|
||||
for job in jobs:
|
||||
|
||||
name = job["name"]
|
||||
|
||||
address = job["address"]
|
||||
|
||||
### Deduplicate tasks
|
||||
if f"{migrated_blockchain}:{name}:{address}" not in existing_state_tasks_list:
|
||||
### create new task
|
||||
|
||||
json_str = json.dumps(job, indent=4)
|
||||
|
||||
### add tabs to json string for better readability
|
||||
json_str_with_tabs = "\n".join(
|
||||
"\t" + line for line in json_str.splitlines()
|
||||
)
|
||||
|
||||
try:
|
||||
bc.create_entry(
|
||||
title=f"{name}:{address}",
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
|
||||
content=json_str_with_tabs,
|
||||
tags=[
|
||||
"state_job",
|
||||
f"blockchain:{migrated_blockchain}",
|
||||
f"name:{name}",
|
||||
f"address:{address}",
|
||||
],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating entry: {e}")
|
||||
continue
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.set_defaults(func=lambda _: parser.print_help())
|
||||
|
@ -615,7 +753,7 @@ def main() -> None:
|
|||
"-j",
|
||||
type=str,
|
||||
help="Path to json file with jobs",
|
||||
required=True,
|
||||
required=False,
|
||||
)
|
||||
view_state_crawler_parser.add_argument(
|
||||
"--batch-size",
|
||||
|
@ -626,6 +764,28 @@ def main() -> None:
|
|||
)
|
||||
view_state_crawler_parser.set_defaults(func=handle_crawl)
|
||||
|
||||
view_state_migration_parser = subparsers.add_parser(
|
||||
"migrate-jobs",
|
||||
help="Migrate jobs from one files to bugout",
|
||||
)
|
||||
view_state_migration_parser.add_argument(
|
||||
"--jobs-file",
|
||||
"-j",
|
||||
type=str,
|
||||
help="Path to json file with jobs",
|
||||
required=True,
|
||||
)
|
||||
|
||||
view_state_migration_parser.add_argument(
|
||||
"--blockchain",
|
||||
"-b",
|
||||
type=str,
|
||||
help="Type of blovkchain wich writng in database",
|
||||
required=True,
|
||||
)
|
||||
|
||||
view_state_migration_parser.set_defaults(func=migrate_state_tasks_handler)
|
||||
|
||||
view_state_cleaner = subparsers.add_parser(
|
||||
"clean-state-labels",
|
||||
help="Clean labels from database",
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream crawlers version.
|
||||
"""
|
||||
|
||||
MOONCRAWL_VERSION = "0.4.5"
|
||||
MOONCRAWL_VERSION = "0.4.6"
|
||||
|
|
Ładowanie…
Reference in New Issue