kopia lustrzana https://github.com/bugout-dev/moonstream
				
				
				
			Merge pull request #567 from bugout-dev/crawlers-access-id
Crawlers access id to work with nodebalancerpull/559/head
						commit
						8368fb0b9b
					
				|  | @ -8,5 +8,7 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks missing --blockchain ethereum -n | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     blocks missing --blockchain ethereum -n | ||||
| SyslogIdentifier=ethereum-missing | ||||
|  |  | |||
|  | @ -11,7 +11,9 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain ethereum -c 6 -j 2 | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     blocks synchronize --blockchain ethereum -c 6 -j 2 | ||||
| SyslogIdentifier=ethereum-synchronize | ||||
| 
 | ||||
| [Install] | ||||
|  |  | |||
|  | @ -8,5 +8,7 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler trending | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     trending | ||||
| SyslogIdentifier=ethereum-trending | ||||
|  |  | |||
|  | @ -11,7 +11,9 @@ WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool | |||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| Restart=on-failure | ||||
| RestartSec=15s | ||||
| ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain ethereum | ||||
| ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool \ | ||||
|     -blockchain ethereum \ | ||||
|     -access-id "${NB_CONTROLLER_ACCESS_ID}" | ||||
| SyslogIdentifier=ethereum-txpool | ||||
| 
 | ||||
| [Install] | ||||
|  |  | |||
|  | @ -8,5 +8,7 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks missing --blockchain polygon -n | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     blocks missing --blockchain polygon -n | ||||
| SyslogIdentifier=polygon-missing | ||||
|  |  | |||
|  | @ -11,7 +11,9 @@ WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | |||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| Restart=on-failure | ||||
| RestartSec=15s | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli crawl -b polygon | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     crawl -b polygon | ||||
| SyslogIdentifier=polygon-moonworm-crawler | ||||
| 
 | ||||
| [Install] | ||||
|  |  | |||
|  | @ -8,5 +8,7 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.stats_worker.dashboard generate --blockchain polygon | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.stats_worker.dashboard \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     generate --blockchain polygon | ||||
| SyslogIdentifier=polygon-statistics | ||||
|  |  | |||
|  | @ -11,7 +11,9 @@ User=ubuntu | |||
| Group=www-data | ||||
| WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler blocks synchronize --blockchain polygon -c 60 -j 2 | ||||
| ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.crawler \ | ||||
|     --access-id "${NB_CONTROLLER_ACCESS_ID}" \ | ||||
|     blocks synchronize --blockchain polygon -c 60 -j 2 | ||||
| SyslogIdentifier=polygon-synchronize | ||||
| 
 | ||||
| [Install] | ||||
|  |  | |||
|  | @ -11,7 +11,9 @@ WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool | |||
| EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||
| Restart=on-failure | ||||
| RestartSec=15s | ||||
| ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain polygon | ||||
| ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool \ | ||||
|     -blockchain polygon \ | ||||
|     -access-id "${NB_CONTROLLER_ACCESS_ID}" | ||||
| SyslogIdentifier=polygon-txpool | ||||
| 
 | ||||
| [Install] | ||||
|  |  | |||
|  | @ -23,6 +23,7 @@ from .settings import ( | |||
|     ORIGINS, | ||||
|     bugout_client as bc, | ||||
|     BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, | ||||
|     NB_CONTROLLER_ACCESS_ID, | ||||
|     MOONSTREAM_S3_QUERIES_BUCKET, | ||||
|     MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, | ||||
|     MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, | ||||
|  | @ -119,6 +120,7 @@ async def status_handler( | |||
|             timescales=stats_update.timescales, | ||||
|             dashboard=dashboard_resource, | ||||
|             subscription_by_id=subscription_by_id, | ||||
|             access_id=NB_CONTROLLER_ACCESS_ID, | ||||
|         ) | ||||
| 
 | ||||
|     except Exception as e: | ||||
|  |  | |||
|  | @ -1,6 +1,7 @@ | |||
| import logging | ||||
| from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor, wait | ||||
| from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from moonstreamdb.db import yield_db_session, yield_db_session_ctx | ||||
| from moonstreamdb.models import ( | ||||
|  | @ -22,6 +23,8 @@ from web3.types import BlockData | |||
| 
 | ||||
| from .data import AvailableBlockchainType, DateRange | ||||
| from .settings import ( | ||||
|     NB_ACCESS_ID_HEADER, | ||||
|     NB_DATA_SOURCE_HEADER, | ||||
|     MOONSTREAM_CRAWL_WORKERS, | ||||
|     MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI, | ||||
|     MOONSTREAM_POLYGON_WEB3_PROVIDER_URI, | ||||
|  | @ -37,9 +40,23 @@ class BlockCrawlError(Exception): | |||
|     """ | ||||
| 
 | ||||
| 
 | ||||
| def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None): | ||||
| def connect( | ||||
|     blockchain_type: AvailableBlockchainType, | ||||
|     web3_uri: Optional[str] = None, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> Web3: | ||||
|     web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() | ||||
| 
 | ||||
|     request_kwargs: Any = None | ||||
|     if access_id is not None: | ||||
|         request_kwargs = { | ||||
|             "headers": { | ||||
|                 NB_ACCESS_ID_HEADER: str(access_id), | ||||
|                 NB_DATA_SOURCE_HEADER: "blockchain", | ||||
|                 "Content-Type": "application/json", | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|     if web3_uri is None: | ||||
|         if blockchain_type == AvailableBlockchainType.ETHEREUM: | ||||
|             web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI | ||||
|  | @ -49,7 +66,7 @@ def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = | |||
|             raise Exception("Wrong blockchain type provided for web3 URI") | ||||
| 
 | ||||
|     if web3_uri.startswith("http://") or web3_uri.startswith("https://"): | ||||
|         web3_provider = Web3.HTTPProvider(web3_uri) | ||||
|         web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs) | ||||
|     else: | ||||
|         web3_provider = Web3.IPCProvider(web3_uri) | ||||
|     web3_client = Web3(web3_provider) | ||||
|  | @ -181,7 +198,9 @@ def add_block_transactions( | |||
| 
 | ||||
| 
 | ||||
| def get_latest_blocks( | ||||
|     blockchain_type: AvailableBlockchainType, confirmations: int = 0 | ||||
|     blockchain_type: AvailableBlockchainType, | ||||
|     confirmations: int = 0, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> Tuple[Optional[int], int]: | ||||
|     """ | ||||
|     Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method). | ||||
|  | @ -189,7 +208,7 @@ def get_latest_blocks( | |||
|     If confirmations > 0, and the latest block on the node has block number N, this returns the block | ||||
|     with block_number (N - confirmations) | ||||
|     """ | ||||
|     web3_client = connect(blockchain_type) | ||||
|     web3_client = connect(blockchain_type, access_id=access_id) | ||||
|     latest_block_number: int = web3_client.eth.block_number | ||||
|     if confirmations > 0: | ||||
|         latest_block_number -= confirmations | ||||
|  | @ -212,11 +231,12 @@ def crawl_blocks( | |||
|     blockchain_type: AvailableBlockchainType, | ||||
|     blocks_numbers: List[int], | ||||
|     with_transactions: bool = False, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> None: | ||||
|     """ | ||||
|     Open database and geth sessions and fetch block data from blockchain. | ||||
|     """ | ||||
|     web3_client = connect(blockchain_type) | ||||
|     web3_client = connect(blockchain_type, access_id=access_id) | ||||
|     with yield_db_session_ctx() as db_session: | ||||
|         pbar = tqdm(total=len(blocks_numbers)) | ||||
|         for block_number in blocks_numbers: | ||||
|  | @ -256,6 +276,7 @@ def check_missing_blocks( | |||
|     blockchain_type: AvailableBlockchainType, | ||||
|     blocks_numbers: List[int], | ||||
|     notransactions=False, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> List[int]: | ||||
|     """ | ||||
|     Query block from postgres. If block does not presented in database, | ||||
|  | @ -294,7 +315,7 @@ def check_missing_blocks( | |||
|                 [block[0], block[1]] for block in blocks_exist_raw_query.all() | ||||
|             ] | ||||
| 
 | ||||
|             web3_client = connect(blockchain_type) | ||||
|             web3_client = connect(blockchain_type, access_id=access_id) | ||||
| 
 | ||||
|             blocks_exist_len = len(blocks_exist) | ||||
|             pbar = tqdm(total=blocks_exist_len) | ||||
|  | @ -304,7 +325,7 @@ def check_missing_blocks( | |||
|                 block = web3_client.eth.get_block( | ||||
|                     block_in_db[0], full_transactions=True | ||||
|                 ) | ||||
|                 if len(block.transactions) != block_in_db[1]: | ||||
|                 if len(block.transactions) != block_in_db[1]:  # type: ignore | ||||
|                     corrupted_blocks.append(block_in_db[0]) | ||||
|                     # Delete existing corrupted block and add to missing list | ||||
|                     del_block = ( | ||||
|  | @ -336,6 +357,7 @@ def crawl_blocks_executor( | |||
|     block_numbers_list: List[int], | ||||
|     with_transactions: bool = False, | ||||
|     num_processes: int = MOONSTREAM_CRAWL_WORKERS, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> None: | ||||
|     """ | ||||
|     Execute crawler in processes. | ||||
|  | @ -363,14 +385,20 @@ def crawl_blocks_executor( | |||
|     results: List[Future] = [] | ||||
|     if num_processes == 1: | ||||
|         logger.warning("Executing block crawler in lazy mod") | ||||
|         return crawl_blocks(blockchain_type, block_numbers_list, with_transactions) | ||||
|         return crawl_blocks( | ||||
|             blockchain_type, block_numbers_list, with_transactions, access_id=access_id | ||||
|         ) | ||||
|     else: | ||||
|         with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: | ||||
|             for worker in worker_indices: | ||||
|                 block_chunk = worker_job_lists[worker] | ||||
|                 logger.info(f"Spawned process for {len(block_chunk)} blocks") | ||||
|                 result = executor.submit( | ||||
|                     crawl_blocks, blockchain_type, block_chunk, with_transactions | ||||
|                     crawl_blocks, | ||||
|                     blockchain_type, | ||||
|                     block_chunk, | ||||
|                     with_transactions, | ||||
|                     access_id, | ||||
|                 ) | ||||
|                 result.add_done_callback(record_error) | ||||
|                 results.append(result) | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ import json | |||
| import logging | ||||
| import time | ||||
| from typing import Optional | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from moonstreamdb.db import yield_db_session_ctx | ||||
| from sqlalchemy.orm.session import Session | ||||
|  | @ -11,6 +12,7 @@ from web3 import Web3 | |||
| from ..blockchain import connect | ||||
| from ..data import AvailableBlockchainType | ||||
| from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore | ||||
| from ..settings import NB_CONTROLLER_ACCESS_ID | ||||
| 
 | ||||
| logging.basicConfig(level=logging.INFO) | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -117,7 +119,7 @@ def run_crawler_desc( | |||
| 
 | ||||
| def handle_parser(args: argparse.Namespace): | ||||
|     with yield_db_session_ctx() as session: | ||||
|         w3 = connect(AvailableBlockchainType.ETHEREUM) | ||||
|         w3 = connect(AvailableBlockchainType.ETHEREUM, access_id=args.access_id) | ||||
|         if args.order == "asc": | ||||
|             run_crawler_asc( | ||||
|                 w3=w3, | ||||
|  | @ -153,6 +155,14 @@ def generate_parser(): | |||
|     """ | ||||
| 
 | ||||
|     parser = argparse.ArgumentParser(description="Moonstream Deployment Crawler") | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--access-id", | ||||
|         default=NB_CONTROLLER_ACCESS_ID, | ||||
|         type=UUID, | ||||
|         help="User access ID", | ||||
|     ) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--start", "-s", type=int, default=None, help="block to start crawling from" | ||||
|     ) | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ import time | |||
| from datetime import datetime, timedelta, timezone | ||||
| from enum import Enum | ||||
| from typing import Iterator, List | ||||
| from uuid import UUID | ||||
| 
 | ||||
| import dateutil.parser | ||||
| 
 | ||||
|  | @ -22,7 +23,7 @@ from .blockchain import ( | |||
| ) | ||||
| from .data import AvailableBlockchainType | ||||
| from .publish import publish_json | ||||
| from .settings import MOONSTREAM_CRAWL_WORKERS | ||||
| from .settings import NB_CONTROLLER_ACCESS_ID, MOONSTREAM_CRAWL_WORKERS | ||||
| from .version import MOONCRAWL_VERSION | ||||
| 
 | ||||
| logging.basicConfig(level=logging.INFO) | ||||
|  | @ -93,7 +94,9 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None: | |||
|     """ | ||||
|     while True: | ||||
|         latest_stored_block_number, latest_block_number = get_latest_blocks( | ||||
|             AvailableBlockchainType(args.blockchain), args.confirmations | ||||
|             AvailableBlockchainType(args.blockchain), | ||||
|             args.confirmations, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
|         if latest_stored_block_number is None: | ||||
|             latest_stored_block_number = 0 | ||||
|  | @ -137,6 +140,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None: | |||
|                 block_numbers_list=blocks_numbers_list, | ||||
|                 with_transactions=True, | ||||
|                 num_processes=args.jobs, | ||||
|                 access_id=args.access_id, | ||||
|             ) | ||||
|         logger.info( | ||||
|             f"Synchronized blocks from {latest_stored_block_number} to {latest_block_number}" | ||||
|  | @ -155,6 +159,7 @@ def crawler_blocks_add_handler(args: argparse.Namespace) -> None: | |||
|             blockchain_type=AvailableBlockchainType(args.blockchain), | ||||
|             block_numbers_list=blocks_numbers_list, | ||||
|             with_transactions=True, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
| 
 | ||||
|     logger.info( | ||||
|  | @ -177,7 +182,9 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None: | |||
|         confirmations = 150 | ||||
|         shift = 2000 | ||||
|         _, latest_block_number = get_latest_blocks( | ||||
|             AvailableBlockchainType(args.blockchain), confirmations | ||||
|             AvailableBlockchainType(args.blockchain), | ||||
|             confirmations, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
|         block_range = f"{latest_block_number-shift}-{latest_block_number}" | ||||
| 
 | ||||
|  | @ -190,6 +197,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None: | |||
|             blockchain_type=AvailableBlockchainType(args.blockchain), | ||||
|             blocks_numbers=blocks_numbers_list, | ||||
|             notransactions=args.notransactions, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
|         if len(missing_blocks_numbers) > 0: | ||||
|             logger.info(f"Found {len(missing_blocks_numbers)} missing blocks") | ||||
|  | @ -206,6 +214,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None: | |||
|             block_numbers_list=missing_blocks_numbers_total, | ||||
|             with_transactions=True, | ||||
|             num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
|     logger.info( | ||||
|         f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers " | ||||
|  | @ -246,6 +255,13 @@ def main() -> None: | |||
| 
 | ||||
|     time_now = datetime.now(timezone.utc) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--access-id", | ||||
|         default=NB_CONTROLLER_ACCESS_ID, | ||||
|         type=UUID, | ||||
|         help="User access ID", | ||||
|     ) | ||||
| 
 | ||||
|     # Blockchain blocks parser | ||||
|     parser_crawler_blocks = subcommands.add_parser( | ||||
|         "blocks", description="Blockchain blocks commands" | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ import argparse | |||
| import logging | ||||
| import os | ||||
| import time | ||||
| from typing import Any | ||||
| 
 | ||||
| import requests | ||||
| from moonstreamdb.db import yield_db_session_ctx | ||||
|  | @ -25,7 +26,7 @@ def identities_cmc_add_handler(args: argparse.Namespace) -> None: | |||
|     """ | ||||
|     Parse metadata for Ethereum tokens. | ||||
|     """ | ||||
|     headers = { | ||||
|     headers: Any = { | ||||
|         "X-CMC_PRO_API_KEY": COINMARKETCAP_API_KEY, | ||||
|         "Accept": "application/json", | ||||
|         "Accept-Encoding": "deflate, gzip", | ||||
|  |  | |||
|  | @ -1,13 +1,14 @@ | |||
| import argparse | ||||
| import logging | ||||
| from typing import Optional | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from moonstreamdb.db import yield_db_session_ctx | ||||
| from web3 import Web3 | ||||
| from web3.middleware import geth_poa_middleware | ||||
| 
 | ||||
| from ..blockchain import AvailableBlockchainType | ||||
| from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client | ||||
| from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID | ||||
| from .continuous_crawler import _retry_connect_web3, continuous_crawler | ||||
| from .crawler import ( | ||||
|     SubscriptionTypes, | ||||
|  | @ -54,7 +55,7 @@ def handle_crawl(args: argparse.Namespace) -> None: | |||
|             logger.info( | ||||
|                 "No web3 provider URL provided, using default (blockchan.py: connect())" | ||||
|             ) | ||||
|             web3 = _retry_connect_web3(blockchain_type) | ||||
|             web3 = _retry_connect_web3(blockchain_type, access_id=args.access_id) | ||||
|         else: | ||||
|             logger.info(f"Using web3 provider URL: {args.web3}") | ||||
|             web3 = Web3( | ||||
|  | @ -109,11 +110,21 @@ def handle_crawl(args: argparse.Namespace) -> None: | |||
|             args.min_sleep_time, | ||||
|             args.heartbeat_interval, | ||||
|             args.new_jobs_refetch_interval, | ||||
|             access_id=args.access_id, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     parser = argparse.ArgumentParser() | ||||
|     parser.set_defaults(func=lambda _: parser.print_help()) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--access-id", | ||||
|         default=NB_CONTROLLER_ACCESS_ID, | ||||
|         type=UUID, | ||||
|         help="User access ID", | ||||
|     ) | ||||
| 
 | ||||
|     subparsers = parser.add_subparsers() | ||||
| 
 | ||||
|     crawl_parser = subparsers.add_parser("crawl") | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ import time | |||
| import traceback | ||||
| from datetime import datetime, timedelta | ||||
| from typing import Dict, List, Optional, Tuple | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from moonworm.crawler.moonstream_ethereum_state_provider import (  # type: ignore | ||||
|     MoonstreamEthereumStateProvider, | ||||
|  | @ -85,6 +86,7 @@ def _retry_connect_web3( | |||
|     blockchain_type: AvailableBlockchainType, | ||||
|     retry_count: int = 10, | ||||
|     sleep_time: float = 5, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> Web3: | ||||
|     """ | ||||
|     Retry connecting to the blockchain. | ||||
|  | @ -92,7 +94,7 @@ def _retry_connect_web3( | |||
|     while retry_count > 0: | ||||
|         retry_count -= 1 | ||||
|         try: | ||||
|             web3 = connect(blockchain_type) | ||||
|             web3 = connect(blockchain_type, access_id=access_id) | ||||
|             web3.eth.block_number | ||||
|             logger.info(f"Connected to {blockchain_type}") | ||||
|             return web3 | ||||
|  | @ -121,6 +123,7 @@ def continuous_crawler( | |||
|     min_sleep_time: float = 0.1, | ||||
|     heartbeat_interval: float = 60, | ||||
|     new_jobs_refetch_interval: float = 120, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ): | ||||
|     crawler_type = "continuous" | ||||
|     assert ( | ||||
|  | @ -139,7 +142,7 @@ def continuous_crawler( | |||
| 
 | ||||
|     jobs_refetchet_time = crawl_start_time | ||||
|     if web3 is None: | ||||
|         web3 = _retry_connect_web3(blockchain_type) | ||||
|         web3 = _retry_connect_web3(blockchain_type, access_id=access_id) | ||||
| 
 | ||||
|     network = ( | ||||
|         Network.ethereum | ||||
|  | @ -281,7 +284,7 @@ def continuous_crawler( | |||
|                     logger.error("Too many failures, exiting") | ||||
|                     raise e | ||||
|                 try: | ||||
|                     web3 = _retry_connect_web3(blockchain_type) | ||||
|                     web3 = _retry_connect_web3(blockchain_type, access_id=access_id) | ||||
|                 except Exception as err: | ||||
|                     logger.error(f"Failed to reconnect: {err}") | ||||
|                     logger.exception(err) | ||||
|  |  | |||
|  | @ -8,6 +8,7 @@ import sys | |||
| import time | ||||
| from datetime import datetime, timezone | ||||
| from typing import Any, Dict, Optional, cast | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from bugout.app import Bugout | ||||
| from bugout.journal import SearchOrder | ||||
|  | @ -20,6 +21,7 @@ from ..blockchain import connect | |||
| from ..data import AvailableBlockchainType | ||||
| from ..publish import publish_json | ||||
| from ..settings import ( | ||||
|     NB_CONTROLLER_ACCESS_ID, | ||||
|     MOONSTREAM_ADMIN_ACCESS_TOKEN, | ||||
|     MOONSTREAM_DATA_JOURNAL_ID, | ||||
|     MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI, | ||||
|  | @ -55,7 +57,11 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3: | |||
|         raise ValueError( | ||||
|             "Could not find Web3 connection information in arguments or in MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable" | ||||
|         ) | ||||
|     return connect(AvailableBlockchainType.ETHEREUM, web3_connection_string) | ||||
|     return connect( | ||||
|         AvailableBlockchainType.ETHEREUM, | ||||
|         web3_connection_string, | ||||
|         access_id=args.access_id, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def get_latest_block_from_node(web3_client: Web3): | ||||
|  | @ -256,6 +262,14 @@ def ethereum_summary_handler(args: argparse.Namespace) -> None: | |||
| def main() -> None: | ||||
|     parser = argparse.ArgumentParser(description="Moonstream NFT crawlers") | ||||
|     parser.set_defaults(func=lambda _: parser.print_help()) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--access-id", | ||||
|         default=NB_CONTROLLER_ACCESS_ID, | ||||
|         type=UUID, | ||||
|         help="User access ID", | ||||
|     ) | ||||
| 
 | ||||
|     subcommands = parser.add_subparsers(description="Subcommands") | ||||
| 
 | ||||
|     parser_ethereum = subcommands.add_parser( | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| import os | ||||
| from typing import cast | ||||
| from typing import cast, Optional | ||||
| from uuid import UUID | ||||
| 
 | ||||
| from bugout.app import Bugout | ||||
| 
 | ||||
|  | @ -98,3 +99,16 @@ if MOONSTREAM_S3_QUERIES_BUCKET_PREFIX == "": | |||
|     raise ValueError( | ||||
|         "MOONSTREAM_S3_QUERIES_BUCKET_PREFIX environment variable must be set" | ||||
|     ) | ||||
| 
 | ||||
| # Node balancer | ||||
| NB_ACCESS_ID_HEADER = os.environ.get("NB_ACCESS_ID_HEADER", "x-node-balancer-access-id") | ||||
| NB_DATA_SOURCE_HEADER = os.environ.get( | ||||
|     "NB_DATA_SOURCE_HEADER", "x-node-balancer-data-source" | ||||
| ) | ||||
| 
 | ||||
| NB_CONTROLLER_ACCESS_ID: Optional[UUID] = None | ||||
| NB_CONTROLLER_ACCESS_ID_RAW = os.environ.get("NB_CONTROLLER_ACCESS_ID", "") | ||||
| try: | ||||
|     NB_CONTROLLER_ACCESS_ID = UUID(NB_CONTROLLER_ACCESS_ID_RAW) | ||||
| except: | ||||
|     pass | ||||
|  |  | |||
|  | @ -8,7 +8,7 @@ import logging | |||
| import time | ||||
| from datetime import datetime, timedelta | ||||
| from enum import Enum | ||||
| from typing import Any, Callable, Dict, List | ||||
| from typing import Any, Callable, Dict, List, Optional | ||||
| from uuid import UUID | ||||
| 
 | ||||
| import boto3  # type: ignore | ||||
|  | @ -28,6 +28,7 @@ from ..data import AvailableBlockchainType | |||
| from ..reporter import reporter | ||||
| from ..settings import ( | ||||
|     CRAWLER_LABEL, | ||||
|     NB_CONTROLLER_ACCESS_ID, | ||||
|     MOONSTREAM_ADMIN_ACCESS_TOKEN, | ||||
|     MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, | ||||
| ) | ||||
|  | @ -337,7 +338,9 @@ def generate_list_of_names( | |||
| 
 | ||||
| 
 | ||||
| def process_external( | ||||
|     abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType | ||||
|     abi_external_calls: List[Dict[str, Any]], | ||||
|     blockchain: AvailableBlockchainType, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ): | ||||
|     """ | ||||
|     Request all required external data | ||||
|  | @ -383,7 +386,7 @@ def process_external( | |||
|             logger.error(f"Error processing external call: {e}") | ||||
| 
 | ||||
|     if external_calls: | ||||
|         web3_client = connect(blockchain) | ||||
|         web3_client = connect(blockchain, access_id=access_id) | ||||
| 
 | ||||
|     for extcall in external_calls: | ||||
|         try: | ||||
|  | @ -434,6 +437,7 @@ def generate_web3_metrics( | |||
|     address: str, | ||||
|     crawler_label: str, | ||||
|     abi_json: Any, | ||||
|     access_id: Optional[UUID] = None, | ||||
| ) -> List[Any]: | ||||
|     """ | ||||
|     Generate stats for cards components | ||||
|  | @ -446,6 +450,7 @@ def generate_web3_metrics( | |||
|     extention_data = process_external( | ||||
|         abi_external_calls=abi_external_calls, | ||||
|         blockchain=blockchain_type, | ||||
|         access_id=access_id, | ||||
|     ) | ||||
| 
 | ||||
|     extention_data.append( | ||||
|  | @ -611,6 +616,7 @@ def stats_generate_handler(args: argparse.Namespace): | |||
|                         address=address, | ||||
|                         crawler_label=crawler_label, | ||||
|                         abi_json=abi_json, | ||||
|                         access_id=args.access_id, | ||||
|                     ) | ||||
| 
 | ||||
|                     # Generate blocks state information | ||||
|  | @ -693,6 +699,7 @@ def stats_generate_api_task( | |||
|     timescales: List[str], | ||||
|     dashboard: BugoutResource, | ||||
|     subscription_by_id: Dict[str, BugoutResource], | ||||
|     access_id: Optional[UUID] = None, | ||||
| ): | ||||
|     """ | ||||
|     Start crawler with generate. | ||||
|  | @ -770,6 +777,7 @@ def stats_generate_api_task( | |||
|                     address=address, | ||||
|                     crawler_label=crawler_label, | ||||
|                     abi_json=abi_json, | ||||
|                     access_id=access_id, | ||||
|                 ) | ||||
| 
 | ||||
|                 # Generate blocks state information | ||||
|  | @ -843,6 +851,14 @@ def stats_generate_api_task( | |||
| def main() -> None: | ||||
|     parser = argparse.ArgumentParser(description="Command Line Interface") | ||||
|     parser.set_defaults(func=lambda _: parser.print_help()) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--access-id", | ||||
|         default=NB_CONTROLLER_ACCESS_ID, | ||||
|         type=UUID, | ||||
|         help="User access ID", | ||||
|     ) | ||||
| 
 | ||||
|     subcommands = parser.add_subparsers( | ||||
|         description="Drone dashboard statistics commands" | ||||
|     ) | ||||
|  |  | |||
|  | @ -2,4 +2,4 @@ | |||
| Moonstream crawlers version. | ||||
| """ | ||||
| 
 | ||||
| MOONCRAWL_VERSION = "0.1.4" | ||||
| MOONCRAWL_VERSION = "0.1.5" | ||||
|  |  | |||
|  | @ -1,20 +1,30 @@ | |||
| # Bugout environment variables | ||||
| export BUGOUT_BROOD_URL="https://auth.bugout.dev" | ||||
| export BUGOUT_SPIRE_URL="https://spire.bugout.dev" | ||||
| export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>" | ||||
| 
 | ||||
| # Moonstream environment variables | ||||
| export MOONSTREAM_CORS_ALLOWED_ORIGINS="http://localhost:3000,https://moonstream.to,https://www.moonstream.to" | ||||
| export MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI="https://<connection_path_uri_to_ethereum_node>" | ||||
| export MOONSTREAM_POLYGON_WEB3_PROVIDER_URI="https://<connection_path_uri_to_polygon_node>" | ||||
| export MOONSTREAM_CRAWL_WORKERS=4 | ||||
| export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>" | ||||
| export MOONSTREAM_DB_URI_READ_ONLY="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>" | ||||
| export MOONSTREAM_ETHERSCAN_TOKEN="<Token_for_etherscan>" | ||||
| export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_for_smart_contracts>" | ||||
| export MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX="<Previx_for_AWS_S3_bucket_(prod,dev,..)>" | ||||
| export MOONSTREAM_CRAWL_WORKERS=4 | ||||
| export MOONSTREAM_HUMBUG_TOKEN="<Token_for_crawlers_store_data_via_Humbug>" | ||||
| export COINMARKETCAP_API_KEY="<API_key_to_parse_conmarketcap>" | ||||
| export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>" | ||||
| export MOONSTREAM_DATA_JOURNAL_ID="<Bugout_journal_id_for_moonstream>" | ||||
| export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>" | ||||
| export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Bugout_access_token_for_moonstream>" | ||||
| export NFT_HUMBUG_TOKEN="<Token_for_nft_crawler>" | ||||
| export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>" | ||||
| 
 | ||||
| # Blockchain nodes environment variables | ||||
| export MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI="https://<connection_path_uri_to_ethereum_node>" | ||||
| export MOONSTREAM_POLYGON_WEB3_PROVIDER_URI="https://<connection_path_uri_to_polygon_node>" | ||||
| export NB_CONTROLLER_ACCESS_ID="<access_uuid_for_moonstream_nodebalancer>" | ||||
| 
 | ||||
| # AWS environment variables | ||||
| export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_for_smart_contracts>" | ||||
| export MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX="<Previx_for_AWS_S3_bucket_(prod,dev,..)>" | ||||
| export MOONSTREAM_S3_QUERIES_BUCKET="<AWS_S3_bucket_to_store_sql_queries>" | ||||
| export MOONSTREAM_S3_QUERIES_BUCKET_PREFIX="dev" | ||||
| 
 | ||||
| # 3rd parties environment variables | ||||
| export MOONSTREAM_ETHERSCAN_TOKEN="<Token_for_etherscan>" | ||||
| export COINMARKETCAP_API_KEY="<API_key_to_parse_conmarketcap>" | ||||
|  |  | |||
|  | @ -143,8 +143,10 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu | |||
| } | ||||
| 
 | ||||
| func InitTxPool() { | ||||
| 	var accessID string | ||||
| 	var blockchain string | ||||
| 	var intervalSeconds int | ||||
| 	flag.StringVar(&accessID, "access-id", "", "Access ID for Moonstream node balancer") | ||||
| 	flag.StringVar(&blockchain, "blockchain", "", "Blockchain to crawl") | ||||
| 	flag.IntVar(&intervalSeconds, "interval", 1, "Number of seconds to wait between RPC calls to query the transaction pool (default: 1)") | ||||
| 	flag.Parse() | ||||
|  | @ -180,6 +182,10 @@ func InitTxPool() { | |||
| 	if err != nil { | ||||
| 		panic(fmt.Sprintf("Could not connect to geth: %s", err.Error())) | ||||
| 	} | ||||
| 	if accessID != "" { | ||||
| 		gethClient.SetHeader("X-Node-Balancer-Access-Id", accessID) | ||||
| 		gethClient.SetHeader("X-Node-Balancer-Data-Source", "blockchain") | ||||
| 	} | ||||
| 	defer gethClient.Close() | ||||
| 
 | ||||
| 	// Humbug client to be able write data in Bugout journal
 | ||||
|  |  | |||
|  | @ -0,0 +1,10 @@ | |||
| #!/usr/bin/env sh | ||||
| 
 | ||||
| # Colpile application and run with provided arguments | ||||
| set -e | ||||
| 
 | ||||
| PROGRAM_NAME="txpool" | ||||
| 
 | ||||
| go build -o "$PROGRAM_NAME" . | ||||
| 
 | ||||
| ./"$PROGRAM_NAME" "$@" | ||||
|  | @ -1,5 +1,9 @@ | |||
| export MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI="https://<connection_path_uri_to_ethereum_node>" | ||||
| export MOONSTREAM_POLYGON_WEB3_PROVIDER_URI="https://<connection_path_uri_to_polygon_node>" | ||||
| export HUMBUG_TXPOOL_CLIENT_ID="<client id for the crawling machine>" | ||||
| export HUMBUG_TXPOOL_TOKEN="<Generate an integration and a Humbug token from https://bugout.dev/account/teams>" | ||||
| export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout Humbug token for crash reports>" | ||||
| 
 | ||||
| # Nodes environment variables | ||||
| export MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI="https://<connection_path_uri_to_ethereum_node>" | ||||
| export MOONSTREAM_POLYGON_WEB3_PROVIDER_URI="https://<connection_path_uri_to_polygon_node>" | ||||
| # Moonstream nodebalancer | ||||
| export NB_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_crawlers>" | ||||
|  |  | |||
		Ładowanie…
	
		Reference in New Issue
	
	 Sergei Sumarokov
						Sergei Sumarokov