diff --git a/.github/workflows/test.clients.python.yml b/.github/workflows/test.clients.python.yml index 7a454b44..65494ea7 100644 --- a/.github/workflows/test.clients.python.yml +++ b/.github/workflows/test.clients.python.yml @@ -31,8 +31,8 @@ jobs: - name: Check that versions are synchronized working-directory: ./clients/python run: | - CLIENT_VERSION=$(python -c "from moonstream.client import CLIENT_VERSION; print(CLIENT_VERSION)") + MOONSTREAM_CLIENT_VERSION=$(python -c "from moonstream.client import MOONSTREAM_CLIENT_VERSION; print(MOONSTREAM_CLIENT_VERSION)") SETUP_PY_VERSION=$(python setup.py --version) - echo "Client version: $CLIENT_VERSION" + echo "Client version: $MOONSTREAM_CLIENT_VERSION" echo "setup.py version: $SETUP_PY_VERSION" - test "$CLIENT_VERSION" = "$SETUP_PY_VERSION" + test "$MOONSTREAM_CLIENT_VERSION" = "$SETUP_PY_VERSION" diff --git a/backend/.isort.cfg b/backend/.isort.cfg new file mode 100644 index 00000000..81d54de1 --- /dev/null +++ b/backend/.isort.cfg @@ -0,0 +1,3 @@ +[settings] +profile = black +multi_line_output = 3 \ No newline at end of file diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index 597af7d5..f489c81f 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -6,7 +6,6 @@ import uuid import boto3 # type: ignore from bugout.data import BugoutSearchResults -from bugout.exceptions import BugoutResponseException from bugout.journal import SearchOrder from ens.utils import is_valid_ens_name # type: ignore from eth_utils.address import is_address # type: ignore diff --git a/backend/moonstreamapi/api.py b/backend/moonstreamapi/api.py index 5e281725..a625b7dd 100644 --- a/backend/moonstreamapi/api.py +++ b/backend/moonstreamapi/api.py @@ -11,7 +11,6 @@ from fastapi.middleware.cors import CORSMiddleware from . import actions from . import data from .routes.address_info import router as addressinfo_router -from .routes.nft import router as nft_router from .routes.streams import router as streams_router from .routes.subscriptions import router as subscriptions_router from .routes.txinfo import router as txinfo_router @@ -32,7 +31,6 @@ tags_metadata = [ "name": "labels", "description": "Labels for transactions, addresses with additional information.", }, - {"name": "nft", "description": "NFT market summaries."}, {"name": "streams", "description": "Operations with data streams and filters."}, {"name": "subscriptions", "description": "Operations with user subscriptions."}, {"name": "time", "description": "Server timestamp endpoints."}, @@ -124,7 +122,6 @@ async def status_handler() -> data.StatusResponse: app.include_router(addressinfo_router) -app.include_router(nft_router) app.include_router(streams_router) app.include_router(subscriptions_router) app.include_router(txinfo_router) diff --git a/backend/moonstreamapi/data.py b/backend/moonstreamapi/data.py index 1ee1fb39..155d0329 100644 --- a/backend/moonstreamapi/data.py +++ b/backend/moonstreamapi/data.py @@ -1,13 +1,12 @@ """ Pydantic schemas for the Moonstream HTTP API """ +from datetime import datetime from enum import Enum -from typing import List, Optional, Dict, Any, Union +from typing import Any, Dict, List, Optional, Union from uuid import UUID - from pydantic import BaseModel, Field -from datetime import datetime USER_ONBOARDING_STATE = "onboarding_state" @@ -154,6 +153,7 @@ class StreamBoundary(BaseModel): end_time: Optional[int] = None include_start: bool = False include_end: bool = False + reversed_time: bool = False class Event(BaseModel): @@ -230,6 +230,10 @@ class OnboardingState(BaseModel): steps: Dict[str, int] +class SubdcriptionsAbiResponse(BaseModel): + url: str + + class DashboardMeta(BaseModel): subscription_id: UUID generic: Optional[List[Dict[str, str]]] @@ -249,3 +253,8 @@ class DashboardResource(BaseModel): class DashboardCreate(BaseModel): name: str subscriptions: List[DashboardMeta] + + +class DashboardUpdate(BaseModel): + name: Optional[str] + subscriptions: List[DashboardMeta] = Field(default_factory=list) diff --git a/backend/moonstreamapi/providers/__init__.py b/backend/moonstreamapi/providers/__init__.py index f884fe1b..32a93206 100644 --- a/backend/moonstreamapi/providers/__init__.py +++ b/backend/moonstreamapi/providers/__init__.py @@ -24,17 +24,17 @@ if the order does not matter and you would rather emphasize speed. Only availabl lists of events. (Default: True) """ -from concurrent.futures import Future, ThreadPoolExecutor import logging +from concurrent.futures import Future, ThreadPoolExecutor from typing import Any, Dict, List, Optional, Tuple from bugout.app import Bugout from bugout.data import BugoutResource from sqlalchemy.orm import Session -from . import bugout, ethereum_blockchain from .. import data from ..stream_queries import StreamQuery +from . import bugout, ethereum_blockchain logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) @@ -74,7 +74,14 @@ def get_events( with ThreadPoolExecutor( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: - for provider_name, provider in event_providers.items(): + # Filter our not queried event_types + event_providers_filtered = { + key: value + for (key, value) in event_providers.items() + if value.event_type in query.subscription_types + } + + for provider_name, provider in event_providers_filtered.items(): futures[provider_name] = executor.submit( provider.get_events, db_session, @@ -100,9 +107,15 @@ def get_events( else: raise ReceivingEventsException(e) + stream_boundary = [boundary for boundary, _ in results.values()][0] events = [event for _, event_list in results.values() for event in event_list] if sort_events: - events.sort(key=lambda event: event.event_timestamp, reverse=True) + # If stream_boundary time was reversed, so do not reverse by timestamp, + # it is already in correct oreder + events.sort( + key=lambda event: event.event_timestamp, + reverse=not stream_boundary.reversed_time, + ) return (stream_boundary, events) @@ -132,7 +145,14 @@ def latest_events( with ThreadPoolExecutor( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: - for provider_name, provider in event_providers.items(): + # Filter our not queried event_types + event_providers_filtered = { + key: value + for (key, value) in event_providers.items() + if value.event_type in query.subscription_types + } + + for provider_name, provider in event_providers_filtered.items(): futures[provider_name] = executor.submit( provider.latest_events, db_session, @@ -157,7 +177,6 @@ def latest_events( ) else: raise ReceivingEventsException(e) - events = [event for event_list in results.values() for event in event_list] if sort_events: events.sort(key=lambda event: event.event_timestamp, reverse=True) @@ -185,7 +204,14 @@ def next_event( with ThreadPoolExecutor( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: - for provider_name, provider in event_providers.items(): + # Filter our not queried event_types + event_providers_filtered = { + key: value + for (key, value) in event_providers.items() + if value.event_type in query.subscription_types + } + + for provider_name, provider in event_providers_filtered.items(): futures[provider_name] = executor.submit( provider.next_event, db_session, @@ -241,7 +267,14 @@ def previous_event( with ThreadPoolExecutor( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: - for provider_name, provider in event_providers.items(): + # Filter our not queried event_types + event_providers_filtered = { + key: value + for (key, value) in event_providers.items() + if value.event_type in query.subscription_types + } + + for provider_name, provider in event_providers_filtered.items(): futures[provider_name] = executor.submit( provider.previous_event, db_session, diff --git a/backend/moonstreamapi/providers/bugout.py b/backend/moonstreamapi/providers/bugout.py index f7b29f76..df73711c 100644 --- a/backend/moonstreamapi/providers/bugout.py +++ b/backend/moonstreamapi/providers/bugout.py @@ -327,34 +327,6 @@ class EthereumTXPoolProvider(BugoutEventProvider): return subscriptions_filters -class PublicDataProvider(BugoutEventProvider): - def __init__( - self, - event_type: str, - description: str, - default_time_interval_seconds: int, - estimated_events_per_time_interval: float, - tags: Optional[List[str]] = None, - batch_size: int = 100, - timeout: float = 30.0, - ): - - super().__init__( - event_type=event_type, - description=description, - default_time_interval_seconds=default_time_interval_seconds, - estimated_events_per_time_interval=estimated_events_per_time_interval, - tags=tags, - batch_size=batch_size, - timeout=timeout, - ) - - def parse_filters( - self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]] - ) -> Optional[List[str]]: - return [] - - whalewatch_description = """Event provider for Ethereum whale watch. Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories: @@ -364,7 +336,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour 4. Amount (in WEI) received To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint.""" -whalewatch_provider = PublicDataProvider( +whalewatch_provider = BugoutEventProvider( event_type="ethereum_whalewatch", description=whalewatch_description, default_time_interval_seconds=310, @@ -384,21 +356,3 @@ ethereum_txpool_provider = EthereumTXPoolProvider( estimated_events_per_time_interval=50, tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"], ) - -nft_summary_description = """Event provider for NFT market summaries. - -This provider periodically generates NFT market summaries for the last hour of market activity. - -Currently, it summarizes the activities on the following NFT markets: -1. The Ethereum market - -This provider is currently not accessible for subscription. The data from this provider is publicly -available at the /nft endpoint.""" -nft_summary_provider = PublicDataProvider( - event_type="nft_summary", - description=nft_summary_description, - # 40 blocks per summary, 15 seconds per block + 2 seconds wiggle room. - default_time_interval_seconds=40 * 17, - estimated_events_per_time_interval=1, - tags=["crawl_type:nft_ethereum"], -) diff --git a/backend/moonstreamapi/providers/ethereum_blockchain.py b/backend/moonstreamapi/providers/ethereum_blockchain.py index b2c173bc..4114120a 100644 --- a/backend/moonstreamapi/providers/ethereum_blockchain.py +++ b/backend/moonstreamapi/providers/ethereum_blockchain.py @@ -1,25 +1,18 @@ -from dataclasses import dataclass, field import logging -from typing import cast, Dict, Any, List, Optional, Tuple +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple, cast from bugout.app import Bugout from bugout.data import BugoutResource - -from moonstreamdb.models import ( - EthereumBlock, - EthereumTransaction, - EthereumLabel, -) -from sqlalchemy import or_, and_, text -from sqlalchemy.orm import Session, Query +from moonstreamdb.models import EthereumBlock, EthereumLabel, EthereumTransaction +from sqlalchemy import and_, or_, text +from sqlalchemy.orm import Query, Session from sqlalchemy.sql.functions import user - from .. import data from ..stream_boundaries import validate_stream_boundary from ..stream_queries import StreamQuery - logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) @@ -59,7 +52,9 @@ def validate_subscription( return True, errors -def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None: +def stream_boundary_validator( + stream_boundary: data.StreamBoundary, +) -> data.StreamBoundary: """ Stream boundary validator for the ethereum_blockchain event provider. @@ -68,9 +63,10 @@ def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None: Raises an error for invalid stream boundaries, else returns None. """ valid_period_seconds = 2 * 60 * 60 - validate_stream_boundary( + _, stream_boundary = validate_stream_boundary( stream_boundary, valid_period_seconds, raise_when_invalid=True ) + return stream_boundary @dataclass @@ -298,7 +294,7 @@ def get_events( If the query does not require any data from this provider, returns None. """ - stream_boundary_validator(stream_boundary) + stream_boundary = stream_boundary_validator(stream_boundary) parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: diff --git a/backend/moonstreamapi/routes/dashboards.py b/backend/moonstreamapi/routes/dashboards.py index 4711a2f5..7cfccfc2 100644 --- a/backend/moonstreamapi/routes/dashboards.py +++ b/backend/moonstreamapi/routes/dashboards.py @@ -7,7 +7,7 @@ from uuid import UUID import boto3 # type: ignore from bugout.data import BugoutResource, BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import APIRouter, Request, Query +from fastapi import APIRouter, Request, Query, Body from .. import actions from .. import data @@ -41,7 +41,7 @@ blockchain_by_subscription_id = { @router.post("/", tags=["dashboards"], response_model=BugoutResource) async def add_dashboard_handler( - request: Request, dashboard: data.DashboardCreate + request: Request, dashboard: data.DashboardCreate = Body(...) ) -> BugoutResource: """ Add subscription to blockchain stream data for user. @@ -231,10 +231,7 @@ async def get_dashboard_handler( @router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource) async def update_dashboard_handler( - request: Request, - dashboard_id: str, - name: Optional[str], - subscriptions: List[data.DashboardMeta], + request: Request, dashboard_id: str, dashboard: data.DashboardUpdate = Body(...) ) -> BugoutResource: """ Update dashboards mainly fully overwrite name and subscription metadata @@ -244,7 +241,7 @@ async def update_dashboard_handler( user = request.state.user - dashboard_subscriptions = subscriptions + dashboard_subscriptions = dashboard.subscriptions params = { "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, @@ -277,7 +274,7 @@ async def update_dashboard_handler( "bucket" ] abi_path = available_subscriptions[dashboard_subscription.subscription_id][ - "abi_path" + "s3_path" ] if bucket is None or abi_path is None: @@ -306,8 +303,7 @@ async def update_dashboard_handler( internal_error=e, detail=f"We can't access the abi for subscription with id:{dashboard_subscription.subscription_id}.", ) - - abi = data.DashboardMeta(**response["Body"].read().decode("utf-8")) + abi = json.loads(response["Body"].read()) actions.dashboards_abi_validation( dashboard_subscription, abi, s3_path=s3_path @@ -321,23 +317,25 @@ async def update_dashboard_handler( dashboard_resource: Dict[str, Any] = {} - if subscriptions: + if dashboard_subscriptions: - dashboard_resource["subscriptions"] = subscriptions + dashboard_resource["dashboard_subscriptions"] = json.loads(dashboard.json())[ + "subscriptions" + ] - if name is not None: - dashboard_resource["name"] = name + if dashboard.name is not None: + dashboard_resource["name"] = dashboard.name try: resource: BugoutResource = bc.update_resource( token=token, resource_id=dashboard_id, - resource_data=dashboard_resource, + resource_data=data.SubscriptionUpdate(update=dashboard_resource).dict(), ) except BugoutResponseException as e: raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: - logger.error(f"Error creating subscription resource: {str(e)}") + logger.error(f"Error updating subscription resource: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) return resource @@ -411,12 +409,11 @@ async def get_dashboard_data_links_handler( for subscription in dashboard_subscriptions: - hash = subscription.resource_data["abi_hash"] available_timescales = [timescale.value for timescale in data.TimeScale] stats[subscription.id] = {} for timescale in available_timescales: try: - result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' + result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json' stats_presigned_url = s3_client.generate_presigned_url( "get_object", Params={ diff --git a/backend/moonstreamapi/routes/nft.py b/backend/moonstreamapi/routes/nft.py deleted file mode 100644 index 9ad48e2e..00000000 --- a/backend/moonstreamapi/routes/nft.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -Moonstream's /nft endpoints. - -These endpoints provide public access to NFT market summaries. No authentication required. -""" -import logging -from typing import Optional - -from fastapi import APIRouter, Depends, Query -from fastapi.middleware.cors import CORSMiddleware -from moonstreamdb import db -from sqlalchemy.orm import Session - -from .. import data -from ..providers.bugout import nft_summary_provider -from ..settings import ( - bugout_client, - MOONSTREAM_ADMIN_ACCESS_TOKEN, - MOONSTREAM_DATA_JOURNAL_ID, -) -from ..stream_queries import StreamQuery - -logger = logging.getLogger(__name__) - -router = APIRouter(prefix="/nft") - - -@router.get("/", tags=["streams"], response_model=data.GetEventsResponse) -async def stream_handler( - start_time: int = Query(0), - end_time: Optional[int] = Query(None), - include_start: bool = Query(False), - include_end: bool = Query(False), - db_session: Session = Depends(db.yield_db_session), -) -> data.GetEventsResponse: - stream_boundary = data.StreamBoundary( - start_time=start_time, - end_time=end_time, - include_start=include_start, - include_end=include_end, - ) - - result = nft_summary_provider.get_events( - db_session=db_session, - bugout_client=bugout_client, - data_journal_id=MOONSTREAM_DATA_JOURNAL_ID, - data_access_token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - stream_boundary=stream_boundary, - user_subscriptions={nft_summary_provider.event_type: []}, - query=StreamQuery(subscription_types=[nft_summary_provider.event_type]), - ) - - if result is None: - return data.GetEventsResponse(stream_boundary=stream_boundary, events=[]) - - provider_stream_boundary, events = result - return data.GetEventsResponse( - stream_boundary=provider_stream_boundary, events=events - ) diff --git a/backend/moonstreamapi/routes/subscriptions.py b/backend/moonstreamapi/routes/subscriptions.py index c818f37b..c81014a2 100644 --- a/backend/moonstreamapi/routes/subscriptions.py +++ b/backend/moonstreamapi/routes/subscriptions.py @@ -314,6 +314,51 @@ async def update_subscriptions_handler( ) +@router.get( + "/{subscription_id}/abi", + tags=["subscriptions"], + response_model=data.SubdcriptionsAbiResponse, +) +async def get_subscription_abi_handler( + request: Request, + subscription_id: str, +) -> data.SubdcriptionsAbiResponse: + + token = request.state.token + + try: + subscription_resource: BugoutResource = bc.get_resource( + token=token, + resource_id=subscription_id, + ) + except BugoutResponseException as e: + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + logger.error(f"Error creating subscription resource: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + if subscription_resource.resource_data["abi"] is None: + raise MoonstreamHTTPException( + status_code=404, + detail="Subscription abi not exists.", + ) + + s3_client = boto3.client("s3") + + result_key = f"{subscription_resource.resource_data['s3_path']}" + presigned_url = s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": subscription_resource.resource_data["bucket"], + "Key": result_key, + }, + ExpiresIn=300, + HttpMethod="GET", + ) + + return data.SubdcriptionsAbiResponse(url=presigned_url) + + @router.get( "/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse ) diff --git a/backend/moonstreamapi/stream_boundaries.py b/backend/moonstreamapi/stream_boundaries.py index b285fd01..66a282fa 100644 --- a/backend/moonstreamapi/stream_boundaries.py +++ b/backend/moonstreamapi/stream_boundaries.py @@ -2,6 +2,7 @@ Utilities to work with stream boundaries. """ import time +from typing import Tuple from .data import StreamBoundary @@ -16,7 +17,7 @@ def validate_stream_boundary( stream_boundary: StreamBoundary, time_difference_seconds: int, raise_when_invalid: bool = False, -) -> bool: +) -> Tuple[bool, StreamBoundary]: """ This function can be used by event providers to check if a stream boundary is valid according to their requirements. @@ -33,6 +34,16 @@ def validate_stream_boundary( f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}" ) else: - return False + return False, stream_boundary - return True + # If required reversed time stream of events + if start_time > end_time: + include_start = stream_boundary.include_start + include_end = stream_boundary.include_end + stream_boundary.start_time = end_time + stream_boundary.end_time = start_time + stream_boundary.include_start = include_end + stream_boundary.include_end = include_start + stream_boundary.reversed_time = True + + return True, stream_boundary diff --git a/backend/moonstreamapi/test_stream_boundaries.py b/backend/moonstreamapi/test_stream_boundaries.py index 44a2ca09..bf0adbed 100644 --- a/backend/moonstreamapi/test_stream_boundaries.py +++ b/backend/moonstreamapi/test_stream_boundaries.py @@ -3,8 +3,8 @@ Tests for stream boundary utilities. """ import unittest -from .data import StreamBoundary from . import stream_boundaries +from .data import StreamBoundary class TestValidateStreamBoundary(unittest.TestCase): @@ -12,45 +12,44 @@ class TestValidateStreamBoundary(unittest.TestCase): stream_boundary = StreamBoundary( start_time=1, end_time=5, include_start=True, include_end=True ) - self.assertTrue( - stream_boundaries.validate_stream_boundary( - stream_boundary, 10, raise_when_invalid=False - ) + valid, _ = stream_boundaries.validate_stream_boundary( + stream_boundary, 10, raise_when_invalid=False ) + self.assertTrue(valid) def test_invalid_stream_boundary(self): stream_boundary = StreamBoundary( start_time=1, end_time=5, include_start=True, include_end=True ) - self.assertFalse( - stream_boundaries.validate_stream_boundary( - stream_boundary, 1, raise_when_invalid=False - ) + valid, _ = stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=False ) + self.assertFalse(valid) def test_invalid_stream_boundary_error(self): stream_boundary = StreamBoundary( start_time=1, end_time=5, include_start=True, include_end=True ) + valid, _ = stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=True + ) with self.assertRaises(stream_boundaries.InvalidStreamBoundary): - stream_boundaries.validate_stream_boundary( - stream_boundary, 1, raise_when_invalid=True - ) + valid def test_unconstrainted_invalid_stream_boundary(self): stream_boundary = StreamBoundary() - self.assertFalse( - stream_boundaries.validate_stream_boundary( - stream_boundary, 1, raise_when_invalid=False - ) + valid, _ = stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=False ) + self.assertFalse(valid) def test_unconstrained_invalid_stream_boundary_error(self): stream_boundary = StreamBoundary() + valid, _ = stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=True + ) with self.assertRaises(stream_boundaries.InvalidStreamBoundary): - stream_boundaries.validate_stream_boundary( - stream_boundary, 1, raise_when_invalid=True - ) + valid if __name__ == "__main__": diff --git a/clients/python/.isort.cfg b/clients/python/.isort.cfg new file mode 100644 index 00000000..81d54de1 --- /dev/null +++ b/clients/python/.isort.cfg @@ -0,0 +1,3 @@ +[settings] +profile = black +multi_line_output = 3 \ No newline at end of file diff --git a/clients/python/README.md b/clients/python/README.md index 7d68db78..f0bfac67 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -11,3 +11,93 @@ Install using `pip`: ```bash pip install moonstream ``` + +## Usage + +- Source environment variable with access token to Moonstream, you can create one on page https://moonstream.to/account/tokens/ + +```python +access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN") +``` + +- Create an object of Moonstream client and authorize + +```python +mc = Moonstream() +mc.authorize(access_token) +``` + +## create_stream method + +Return a stream of event for time range. + +**From timestamp to None, from bottom to top** + +When `end_time` is not set. + +```python +for events in mc.create_stream( + start_time=1637834400, end_time=None, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +In this case we will be receiving events from bottom of history to recent time in next order: + +```python +[1637836177, ..., 1637834440] +[1637837980, ..., 1637836226] +# Until we will get latest event, +# then we will be receiving empty lists +[] +[] +# Until new events will be available +[1637839306, 1637839306, 1637839306, 1637839306] +[] +# Continuing... +``` + +**From timestamp to timestamp, from top to bottom** + +When `start_time` is greater then `end_time`. + +```python +for events in mc.create_stream( + start_time=1637839281, end_time=1637830890, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +Stream of event packs will be generating from recent timestamp to older and inner list of transactions for each pack will be in most recent to older event timestamp range: + +```python +[1637839280, ..., 1637838094] +[1637838086, ..., 1637836340] +... +[1637834488, ..., 1637832699] +[1637832676, ..., 1637830903] +``` + +**From timestamp to timestamp, from bottom to top** + +When `start_time` is less then `end_time`. + +```python +for events in mc.create_stream( + start_time=1637830890, end_time=1637839281, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +You start receiving list of older events from bottom of history to newest: + +```python +[1637832676, ..., 1637830903] +[1637834488, ..., 1637832699] +... +[1637838086, ..., 1637836340] +[1637839280, ..., 1637838094] +``` diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index b2d0211f..827a6a49 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -1,20 +1,19 @@ -from dataclasses import dataclass, field import logging import os -from typing import Any, Dict, List, Optional +import time +from dataclasses import dataclass, field +from typing import Any, Dict, Generator, List, Optional, Tuple import requests +from .version import MOONSTREAM_CLIENT_VERSION + logger = logging.getLogger(__name__) log_level = logging.INFO if os.environ.get("DEBUG", "").lower() in ["true", "1"]: log_level = logging.DEBUG logger.setLevel(log_level) - -# Keep this synchronized with the version in setup.py -CLIENT_VERSION = "0.0.2" - ENDPOINT_PING = "/ping" ENDPOINT_VERSION = "/version" ENDPOINT_NOW = "/now" @@ -100,7 +99,9 @@ class Moonstream: self.timeout = timeout self._session = requests.Session() self._session.headers.update( - {"User-Agent": f"Moonstream Python client (version {CLIENT_VERSION})"} + { + "User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})" + } ) def ping(self) -> Dict[str, Any]: @@ -388,6 +389,96 @@ class Moonstream: r.raise_for_status() return r.json() + def create_stream( + self, + start_time: int, + end_time: Optional[int] = None, + q: str = "", + ) -> Generator[Dict[str, Any], None, None]: + """ + Return a stream of event. Event packs will be generated with 1 hour time range. + + Arguments: + - start_time - One of time border. + - end_time - Time until the end of stream, if set to None stream will be going forward endlessly. + - q - Optional query to filter over your available subscriptions and subscription types. + + Returns: A dictionary stream representing the results of your query. + """ + # TODO(kompotkot): Add tests + shift_two_hours = 2 * 60 * 60 # 2 hours + shift_half_hour = 1 * 30 * 30 # 30 min + + def fetch_events( + modified_start_time: int, modified_end_time: int + ) -> Generator[Tuple[Dict[str, Any], bool], None, None]: + # If it is going from top to bottom in history, + # then time_range will be reversed + reversed_time = False + if modified_start_time > modified_end_time: + reversed_time = True + max_boundary = max(modified_start_time, modified_end_time) + min_boundary = min(modified_start_time, modified_end_time) + + time_range_list = [] + # 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}] + if max_boundary - min_boundary > shift_half_hour: + for i in range(min_boundary, max_boundary, shift_half_hour): + end_i = ( + i + shift_half_hour - 1 + if i + shift_half_hour <= max_boundary + else max_boundary + ) + time_range_list.append({"start_time": i, "end_time": end_i}) + else: + time_range_list.append( + {"start_time": min_boundary, "end_time": max_boundary} + ) + if reversed_time: + time_range_list.reverse() + + for time_range in time_range_list: + r_json = self.events( + start_time=time_range["start_time"], + end_time=time_range["end_time"], + include_start=True, + include_end=True, + q=q, + ) + + yield r_json, reversed_time + + time_range_list = time_range_list[:] + + if end_time is None: + float_start_time = start_time + + while True: + end_time = int(self.server_time()) + # If time range is greater then 2 hours, + # shift float_start time close to end_time to prevent stream block + if end_time - float_start_time > shift_two_hours: + float_start_time = shift_two_hours + for r_json, reversed_time in fetch_events(float_start_time, end_time): + + yield r_json + + events = r_json.get("events", []) + if len(events) > 0: + # Updating float_start_time after first iteration to last event time + if reversed_time: + float_start_time = events[-1].get("event_timestamp") - 1 + else: + float_start_time = events[0].get("event_timestamp") + 1 + + else: + # If there are no events in response, wait + # until new will be added + time.sleep(5) + else: + for r_json, reversed_time in fetch_events(start_time, end_time): + yield r_json + def client_from_env() -> Moonstream: """ diff --git a/clients/python/moonstream/version.py b/clients/python/moonstream/version.py new file mode 100644 index 00000000..96c1d3e8 --- /dev/null +++ b/clients/python/moonstream/version.py @@ -0,0 +1 @@ +MOONSTREAM_CLIENT_VERSION = "0.0.3" diff --git a/clients/python/setup.py b/clients/python/setup.py index b12b4518..f75e6813 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -1,12 +1,14 @@ from setuptools import find_packages, setup +from moonstream.version import MOONSTREAM_CLIENT_VERSION + long_description = "" with open("README.md") as ifp: long_description = ifp.read() setup( name="moonstream", - version="0.0.2", + version=MOONSTREAM_CLIENT_VERSION, packages=find_packages(), package_data={"moonstream": ["py.typed"]}, install_requires=["requests", "dataclasses; python_version=='3.6'"], @@ -14,6 +16,7 @@ setup( "dev": [ "black", "mypy", + "isort", "wheel", "types-requests", "types-dataclasses", diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index 37aa750f..75f5d593 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -44,6 +44,7 @@ POLYGON_MISSING_SERVICE_FILE="polygon-missing.service" POLYGON_MISSING_TIMER_FILE="polygon-missing.timer" POLYGON_STATISTICS_SERVICE_FILE="polygon-statistics.service" POLYGON_STATISTICS_TIMER_FILE="polygon-statistics.timer" +POLYGON_TXPOOL_SERVICE_FILE="polygon-txpool.service" set -eu @@ -52,8 +53,8 @@ echo echo echo -e "${PREFIX_INFO} Building executable Ethereum transaction pool crawler script with Go" EXEC_DIR=$(pwd) -cd "${APP_CRAWLERS_DIR}/ethtxpool" -HOME=/root /usr/local/go/bin/go build -o "${APP_CRAWLERS_DIR}/ethtxpool/ethtxpool" "${APP_CRAWLERS_DIR}/ethtxpool/main.go" +cd "${APP_CRAWLERS_DIR}/txpool" +HOME=/root /usr/local/go/bin/go build -o "${APP_CRAWLERS_DIR}/txpool/txpool" "${APP_CRAWLERS_DIR}/txpool/main.go" cd "${EXEC_DIR}" echo @@ -141,3 +142,11 @@ cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_SERVICE_FILE}" "/etc/systemd/system/${POL cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_TIMER_FILE}" "/etc/systemd/system/${POLYGON_STATISTICS_TIMER_FILE}" systemctl daemon-reload systemctl restart "${POLYGON_STATISTICS_TIMER_FILE}" + +# echo +# echo +# echo -e "${PREFIX_INFO} Replacing existing Polygon transaction pool crawler service definition with ${POLYGON_TXPOOL_SERVICE_FILE}" +# chmod 644 "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" +# cp "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_TXPOOL_SERVICE_FILE}" +# systemctl daemon-reload +# systemctl restart "${POLYGON_TXPOOL_SERVICE_FILE}" diff --git a/crawlers/deploy/ethereum-txpool.service b/crawlers/deploy/ethereum-txpool.service index c8b45ea5..d4221a6f 100644 --- a/crawlers/deploy/ethereum-txpool.service +++ b/crawlers/deploy/ethereum-txpool.service @@ -5,9 +5,9 @@ After=network.target [Service] User=ubuntu Group=www-data -WorkingDirectory=/home/ubuntu/moonstream/crawlers/ethtxpool +WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream/crawlers/ethtxpool/ethtxpool +ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain ethereum SyslogIdentifier=ethereum-txpool [Install] diff --git a/crawlers/deploy/polygon-txpool.service b/crawlers/deploy/polygon-txpool.service new file mode 100644 index 00000000..cf4dae92 --- /dev/null +++ b/crawlers/deploy/polygon-txpool.service @@ -0,0 +1,14 @@ +[Unit] +Description=Polygon txpool crawler +After=network.target + +[Service] +User=ubuntu +Group=www-data +WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain polygon +SyslogIdentifier=polygon-txpool + +[Install] +WantedBy=multi-user.target diff --git a/crawlers/ethtxpool/sample.env b/crawlers/ethtxpool/sample.env deleted file mode 100644 index 5b375379..00000000 --- a/crawlers/ethtxpool/sample.env +++ /dev/null @@ -1,5 +0,0 @@ -export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1" -export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545" -export ETHTXPOOL_HUMBUG_CLIENT_ID="" -export ETHTXPOOL_HUMBUG_TOKEN="" -export HUMBUG_REPORTER_CRAWLERS_TOKEN="" diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py index 5c1109b8..f415c35c 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/dashboard.py @@ -9,11 +9,12 @@ import time from datetime import datetime, timedelta from enum import Enum from typing import Any, Callable, Dict, List +from uuid import UUID import boto3 # type: ignore from bugout.data import BugoutResources from moonstreamdb.db import yield_db_session_ctx -from sqlalchemy import Column, and_, func, text +from sqlalchemy import Column, and_, func, text, distinct from sqlalchemy.orm import Query, Session from sqlalchemy.sql.operators import in_op @@ -29,6 +30,7 @@ from ..settings import ( MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, CRAWLER_LABEL, ) +from ..reporter import reporter from ..settings import bugout_client as bc from web3 import Web3 @@ -49,6 +51,8 @@ blockchain_by_subscription_id = { class TimeScale(Enum): + # TODO(Andrey) Unlock when we be sure about perfomanse of agregation on transactions table. + # Right now it can be hungs # year = "year" month = "month" week = "week" @@ -69,6 +73,9 @@ timescales_delta: Dict[str, Dict[str, timedelta]] = { "day": {"timedelta": timedelta(hours=24)}, } + +abi_type_to_dashboards_type = {"function": "methods", "event": "events"} + BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" @@ -78,11 +85,11 @@ def push_statistics( subscription: Any, timescale: str, bucket: str, - hash: str, + dashboard_id: UUID, ) -> None: result_bytes = json.dumps(statistics_data).encode("utf-8") - result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json' + result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json' s3 = boto3.client("s3") s3.put_object( @@ -336,8 +343,7 @@ def generate_data( label_counts_subquery = ( label_counts.group_by( - text("timeseries_points"), - label_model.label_data["name"].astext, + text("timeseries_points"), label_model.label_data["name"].astext ) .order_by(text("timeseries_points desc")) .subquery(name="label_counts") @@ -405,10 +411,97 @@ def get_unique_address( ) +def generate_list_of_names( + type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any +): + + """ + Generate list of names for select from database by name field + """ + + if read_abi: + names = [item["name"] for item in abi_json if item["type"] == type] + else: + + names = [ + item["name"] + for item in subscription_filters[abi_type_to_dashboards_type[type]] + ] + + return names + + +def process_external( + abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType +): + """ + Request all required external data + TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares). + """ + + extention_data = [] + + external_calls = [] + + for external_call in abi_external_calls: + try: + func_input_abi = [] + input_args = [] + for func_input in external_call["inputs"]: + func_input_abi.append( + {"name": func_input["name"], "type": func_input["type"]} + ) + input_args.append( + cast_to_python_type(func_input["type"])(func_input["value"]) + ) + + func_abi = [ + { + "name": external_call["name"], + "inputs": func_input_abi, + "outputs": external_call["outputs"], + "type": "function", + "stateMutability": "view", + } + ] + + external_calls.append( + { + "display_name": external_call["display_name"], + "address": Web3.toChecksumAddress(external_call["address"]), + "name": external_call["name"], + "abi": func_abi, + "input_args": input_args, + } + ) + except Exception as e: + print(f"Error processing external call: {e}") + + web3_client = connect(blockchain) + + for extcall in external_calls: + try: + contract = web3_client.eth.contract( + address=extcall["address"], abi=extcall["abi"] + ) + response = contract.functions[extcall["name"]]( + *extcall["input_args"] + ).call() + + extention_data.append( + {"display_name": extcall["display_name"], "value": response} + ) + except Exception as e: + print(f"Failed to call {extcall['name']} error: {e}") + + return extention_data + + def get_count( name: str, type: str, db_session: Session, + select_expression: Any, blockchain_type: AvailableBlockchainType, address: str, ): @@ -418,7 +511,7 @@ def get_count( label_model = get_label_model(blockchain_type) return ( - db_session.query(label_model) + db_session.query(select_expression) .filter(label_model.address == address) .filter(label_model.label == CRAWLER_LABEL) .filter(label_model.label_data["type"].astext == type) @@ -435,222 +528,223 @@ def stats_generate_handler(args: argparse.Namespace): with yield_db_session_ctx() as db_session: # read all subscriptions - required_subscriptions: BugoutResources = bc.list_resources( + + # ethereum_blockchain + + start_time = time.time() + blockchain_type = AvailableBlockchainType(args.blockchain) + + # polygon_blockchain + dashboard_resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD}, + timeout=10, + ) + + print(f"Amount of dashboards: {len(dashboard_resources.resources)}") + + # Create subscriptions dict for get subscriptions by id. + blockchain_subscriptions: BugoutResources = bc.list_resources( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params={ "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, - "abi": "true", "subscription_type_id": subscription_id_by_blockchain[args.blockchain], }, timeout=10, ) - print(f"Subscriptions for processing: {len(required_subscriptions.resources)}") + print( + f"Amount of blockchain subscriptions: {len(blockchain_subscriptions.resources)}" + ) + + subscription_by_id = { + str(blockchain_subscription.id): blockchain_subscription + for blockchain_subscription in blockchain_subscriptions.resources + } s3_client = boto3.client("s3") - # Already processed - already_processed = [] + for dashboard in dashboard_resources.resources: - for subscription in required_subscriptions.resources: - bucket = subscription.resource_data["bucket"] - key = subscription.resource_data["s3_path"] - address = subscription.resource_data["address"] + for dashboard_subscription_filters in dashboard.resource_data[ + "dashboard_subscriptions" + ]: - print(f"Expected bucket: s3://{bucket}/{key}") - - abi = s3_client.get_object( - Bucket=bucket, - Key=key, - ) - abi_json = json.loads(abi["Body"].read()) - - abi_string = json.dumps(abi_json, sort_keys=True, indent=2) - - hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest() - - if f"{address}/{hash}" in already_processed: - continue - - s3_data_object = {} - - abi_functions = [item for item in abi_json if item["type"] == "function"] - abi_events = [item for item in abi_json if item["type"] == "event"] - - abi_external_calls = [ - item for item in abi_json if item["type"] == "external_call" - ] - - external_calls = [] - - for external_call in abi_external_calls: try: - func_input_abi = [] - input_args = [] - for func_input in external_call["inputs"]: - func_input_abi.append( - {"name": func_input["name"], "type": func_input["type"]} - ) - input_args.append( - cast_to_python_type(func_input["type"])(func_input["value"]) - ) + subscription_id = dashboard_subscription_filters["subscription_id"] - func_abi = [ - { - "name": external_call["name"], - "inputs": func_input_abi, - "outputs": external_call["outputs"], - "type": "function", - "stateMutability": "view", - } + if subscription_id not in subscription_by_id: + # Meen it's are different blockchain type + continue + + s3_data_object = {} + + extention_data = [] + + address = subscription_by_id[subscription_id].resource_data[ + "address" ] - external_calls.append( - { - "display_name": external_call["display_name"], - "address": Web3.toChecksumAddress(external_call["address"]), - "name": external_call["name"], - "abi": func_abi, - "input_args": input_args, - } - ) - except Exception as e: - print(f"Error processing external call: {e}") + generic = dashboard_subscription_filters["generic"] - web3_client = connect(blockchain_type) - # { - # "type": "external_call" - # "display_name": "Total weth earned" - # "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a", - # "name": "balanceOf", - # "inputs": [ - # { - # "name": "owner", - # "type": "address" - # "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88" - # } - # ], - # "outputs": [ - # { - # "internalType": "uint256", - # "name": "", - # "type": "uint256" - # } - # } + if not subscription_by_id[subscription_id].resource_data["abi"]: - extention_data = [] - for extcall in external_calls: - try: - contract = web3_client.eth.contract( - address=extcall["address"], abi=extcall["abi"] - ) - response = contract.functions[extcall["name"]]( - *extcall["input_args"] - ).call() + methods = [] + events = [] + + else: + + bucket = subscription_by_id[subscription_id].resource_data[ + "bucket" + ] + key = subscription_by_id[subscription_id].resource_data[ + "s3_path" + ] + + abi = s3_client.get_object( + Bucket=bucket, + Key=key, + ) + abi_json = json.loads(abi["Body"].read()) + + methods = generate_list_of_names( + type="function", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters["all_methods"], + abi_json=abi_json, + ) + + events = generate_list_of_names( + type="event", + subscription_filters=dashboard_subscription_filters, + read_abi=dashboard_subscription_filters["all_events"], + abi_json=abi_json, + ) + + abi_external_calls = [ + item for item in abi_json if item["type"] == "external_call" + ] + + extention_data = process_external( + abi_external_calls=abi_external_calls, + blockchain=blockchain_type, + ) extention_data.append( - {"display_name": extcall["display_name"], "value": response} + { + "display_name": "Overall unique token owners.", + "value": get_unique_address( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + ), + } ) - except Exception as e: - print(f"Failed to call {extcall['name']} error: {e}") - extention_data.append( - { - "display_name": "Overall unique token owners.", - "value": get_unique_address( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - ), - } - ) + if "HatchStartedEvent" in events: - abi_functions_names = [item["name"] for item in abi_functions] + extention_data.append( + { + "display_name": "Number of hatches started.", + "value": get_count( + name="HatchStartedEvent", + type="event", + db_session=db_session, + select_expression=get_label_model(blockchain_type), + blockchain_type=blockchain_type, + address=address, + ), + } + ) - abi_events_names = [item["name"] for item in abi_events] + if "HatchFinishedEvent" in events: - if "HatchStartedEvent" in abi_events_names: + extention_data.append( + { + "display_name": "Number of hatches finished.", + "value": get_count( + name="HatchFinishedEvent", + type="event", + db_session=db_session, + select_expression=distinct( + get_label_model(blockchain_type).label_data[ + "args" + ]["tokenId"] + ), + blockchain_type=blockchain_type, + address=address, + ), + } + ) - extention_data.append( - { - "display_name": "Number of hatches started.", - "value": get_count( - name="HatchStartedEvent", - type="event", + for timescale in [timescale.value for timescale in TimeScale]: + + start_date = ( + datetime.utcnow() - timescales_delta[timescale]["timedelta"] + ) + + print(f"Timescale: {timescale}") + + s3_data_object["web3_metric"] = extention_data + + functions_calls_data = generate_data( db_session=db_session, blockchain_type=blockchain_type, address=address, - ), - } - ) + timescale=timescale, + functions=methods, + start=start_date, + metric_type="tx_call", + ) - if "HatchFinishedEvent" in abi_events_names: + s3_data_object["functions"] = functions_calls_data - extention_data.append( - { - "display_name": "Number of hatches finished.", - "value": get_count( - name="HatchFinishedEvent", - type="event", + events_data = generate_data( db_session=db_session, blockchain_type=blockchain_type, address=address, - ), - } - ) + timescale=timescale, + functions=events, + start=start_date, + metric_type="event", + ) - for timescale in [timescale.value for timescale in TimeScale]: + s3_data_object["events"] = events_data - start_date = ( - datetime.utcnow() - timescales_delta[timescale]["timedelta"] - ) + s3_data_object["generic"] = generate_metrics( + db_session=db_session, + blockchain_type=blockchain_type, + address=address, + timescale=timescale, + metrics=generic, + start=start_date, + ) - print(f"Timescale: {timescale}") + push_statistics( + statistics_data=s3_data_object, + subscription=subscription_by_id[subscription_id], + timescale=timescale, + bucket=bucket, + dashboard_id=dashboard.id, + ) + except Exception as err: + reporter.error_report( + err, + [ + "dashboard", + "statistics", + f"blockchain:{args.blockchain}" + f"subscriptions:{subscription_id}", + f"dashboard:{dashboard.id}", + ], + ) + print(err) - s3_data_object["web3_metric"] = extention_data - - functions_calls_data = generate_data( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - functions=abi_functions_names, - start=start_date, - metric_type="tx_call", - ) - - s3_data_object["functions"] = functions_calls_data - # generate data - - events_data = generate_data( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - functions=abi_events_names, - start=start_date, - metric_type="event", - ) - - s3_data_object["events"] = events_data - - s3_data_object["generic"] = generate_metrics( - db_session=db_session, - blockchain_type=blockchain_type, - address=address, - timescale=timescale, - metrics=abi_events_names, - start=start_date, - ) - - push_statistics( - statistics_data=s3_data_object, - subscription=subscription, - timescale=timescale, - bucket=bucket, - hash=hash, - ) - already_processed.append(f"{address}/{hash}") + reporter.custom_report( + title=f"Dashboard stats generated.", + content=f"Generate statistics for {args.blockchain}. \n Generation time: {time.time() - start_time}.", + tags=["dashboard", "statistics", f"blockchain:{args.blockchain}"], + ) def main() -> None: diff --git a/crawlers/ethtxpool/.gitignore b/crawlers/txpool/.gitignore similarity index 100% rename from crawlers/ethtxpool/.gitignore rename to crawlers/txpool/.gitignore diff --git a/crawlers/txpool/cmd/data.go b/crawlers/txpool/cmd/data.go new file mode 100644 index 00000000..f87b98fd --- /dev/null +++ b/crawlers/txpool/cmd/data.go @@ -0,0 +1,40 @@ +package cmd + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type Transaction struct { + Type hexutil.Uint64 `json:"type"` + + // Common transaction fields: + Nonce *hexutil.Uint64 `json:"nonce"` + GasPrice *hexutil.Big `json:"gasPrice"` + MaxPriorityFeePerGas *hexutil.Big `json:"maxPriorityFeePerGas"` + MaxFeePerGas *hexutil.Big `json:"maxFeePerGas"` + Gas *hexutil.Uint64 `json:"gas"` + Value *hexutil.Big `json:"value"` + Data *hexutil.Bytes `json:"input"` + V *hexutil.Big `json:"v"` + R *hexutil.Big `json:"r"` + S *hexutil.Big `json:"s"` + To *common.Address `json:"to"` + + // Access list transaction fields: + ChainID *hexutil.Big `json:"chainId,omitempty"` + // AccessList *AccessList `json:"accessList,omitempty"` + + // Only used for encoding: + Hash common.Hash `json:"hash"` +} + +type PendingTransaction struct { + From string `json:"from"` + Nonce uint64 `json:"nonce"` + Transaction *Transaction `json:"transaction"` +} + +type PendingTransactions struct { + Transactions PendingTransaction `json:"transactions"` +} diff --git a/crawlers/ethtxpool/main.go b/crawlers/txpool/cmd/txpool.go similarity index 63% rename from crawlers/ethtxpool/main.go rename to crawlers/txpool/cmd/txpool.go index 6a4c9bc7..3692c4f7 100644 --- a/crawlers/ethtxpool/main.go +++ b/crawlers/txpool/cmd/txpool.go @@ -2,22 +2,23 @@ Ethereum blockchain transaction pool crawler. Execute: -go run main.go -geth http://127.0.0.1:8545 +go run main.go -blockchain ethereum -interval 1 */ -package main +package cmd import ( "encoding/json" "flag" "fmt" - "math/big" + "log" "os" + "strings" "time" + settings "github.com/bugout-dev/moonstream/crawlers/txpool/configs" + humbug "github.com/bugout-dev/humbug/go/pkg" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/google/uuid" ) @@ -29,40 +30,6 @@ func humbugClient(sessionID string, clientID string, humbugToken string) (*humbu return reporter, err } -type Transaction struct { - Type hexutil.Uint64 `json:"type"` - - // Common transaction fields: - Nonce *hexutil.Uint64 `json:"nonce"` - GasPrice *hexutil.Big `json:"gasPrice"` - MaxPriorityFeePerGas *hexutil.Big `json:"maxPriorityFeePerGas"` - MaxFeePerGas *hexutil.Big `json:"maxFeePerGas"` - Gas *hexutil.Uint64 `json:"gas"` - Value *hexutil.Big `json:"value"` - Data *hexutil.Bytes `json:"input"` - V *hexutil.Big `json:"v"` - R *hexutil.Big `json:"r"` - S *hexutil.Big `json:"s"` - To *common.Address `json:"to"` - - // Access list transaction fields: - ChainID *hexutil.Big `json:"chainId,omitempty"` - // AccessList *AccessList `json:"accessList,omitempty"` - - // Only used for encoding: - Hash common.Hash `json:"hash"` -} - -type PendingTransaction struct { - From string `json:"from"` - Nonce uint64 `json:"nonce"` - Transaction *Transaction `json:"transaction"` -} - -type PendingTransactions struct { - Transactions PendingTransaction `json:"transactions"` -} - // Split list of reports on nested lists func generateChunks(xs []humbug.Report, chunkSize int) [][]humbug.Report { if len(xs) == 0 { @@ -83,8 +50,7 @@ func generateChunks(xs []humbug.Report, chunkSize int) [][]humbug.Report { } // Fetch list of transactions form Ethereum TxPool -func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.HumbugReporter) { - initPoll := true +func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.HumbugReporter, blockchain string) { currentTransactions := make(map[common.Hash]bool) // Structure of the map: @@ -92,17 +58,17 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu var result map[string]map[string]map[uint64]*Transaction for { - fmt.Println("Checking pending transactions in node:") + log.Println("Checking pending transactions in node") gethClient.Call(&result, "txpool_content") pendingTransactions := result["pending"] // Mark all transactions from previous iteration as false - cacheSize := 0 + cacheSizeCounter := 0 for transactionHash := range currentTransactions { currentTransactions[transactionHash] = false - cacheSize++ + cacheSizeCounter++ } - fmt.Printf("\tSize of pending transactions cache at the beginning: %d\n", cacheSize) + log.Printf("Size of pending transactions cache at the beginning: %d\n", cacheSizeCounter) reports := []humbug.Report{} @@ -112,6 +78,7 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu for nonce, transaction := range transactionsByNonce { pendingTx := PendingTransaction{From: fromAddress, Nonce: nonce, Transaction: transaction} + // Check if transaction already exist in our currentTransactions list and pass this transaction transactionHash := transaction.Hash _, transactionProcessed := currentTransactions[transactionHash] if !transactionProcessed { @@ -121,7 +88,7 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu continue } - ReportTitle := "Ethereum: Pending transaction: " + transactionHash.String() + ReportTitle := fmt.Sprintf("%s: Pending transaction: ", strings.Title(blockchain)) + transactionHash.String() ReportTags := []string{ "hash:" + transactionHash.String(), "from_address:" + fromAddress, @@ -130,8 +97,8 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu fmt.Sprintf("max_priority_fee_per_gas:%d", pendingTx.Transaction.MaxPriorityFeePerGas.ToInt()), fmt.Sprintf("max_fee_per_gas:%d", pendingTx.Transaction.MaxFeePerGas.ToInt()), fmt.Sprintf("gas:%d", pendingTx.Transaction.Gas), - fmt.Sprintf("value:%d", new(big.Float).Quo(new(big.Float).SetInt(transaction.Value.ToInt()), big.NewFloat(params.Ether))), - "crawl_type:ethereum_txpool", + fmt.Sprintf("value:%d", transaction.Value.ToInt()), + fmt.Sprintf("crawl_type:%s_txpool", blockchain), } report := humbug.Report{ Title: ReportTitle, @@ -145,10 +112,12 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu } } - if !initPoll { + // TODO(kompotkot): Passing txs is wrong solution, but for end user + // it is similar like this txs even not passed through this node. + if len(reports) < 10000 { reportChunks := generateChunks(reports, 500) for _, chunk := range reportChunks { - fmt.Printf("\tPublishing chunk with: %d/%d reports\n", len(chunk), addedTransactionsCounter) + log.Printf("Published chunk with: %d/%d reports\n", len(chunk), addedTransactionsCounter) reporter.PublishBulk(chunk) time.Sleep(time.Duration(interval) * time.Second) } @@ -163,30 +132,36 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu droppedTransactionsCounter++ } } - fmt.Printf("\tDropped transactions: %d\n", droppedTransactionsCounter) + log.Printf("Dropped transactions: %d\n", droppedTransactionsCounter) - fmt.Printf("Sleeping for %d seconds\n", interval) + log.Printf("Sleeping for %d seconds\n", interval) time.Sleep(time.Duration(interval) * time.Second) } else { - fmt.Printf("Initial start of crawler, too many transactions: %d, passing them...\n", addedTransactionsCounter) - initPoll = false + log.Printf("Too many transactions: %d, passing them...\n", addedTransactionsCounter) } } } -func main() { +func InitTxPool() { + var blockchain string var intervalSeconds int + flag.StringVar(&blockchain, "blockchain", "", "Blockchain to crawl") flag.IntVar(&intervalSeconds, "interval", 1, "Number of seconds to wait between RPC calls to query the transaction pool (default: 1)") flag.Parse() - var MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_ADDR") - var MOONSTREAM_NODE_ETHEREUM_IPC_PORT = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_PORT") - var MOONSTREAM_IPC_PATH = fmt.Sprintf("http://%s:%s", MOONSTREAM_NODE_ETHEREUM_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_IPC_PORT) + switch blockchain { + case "ethereum", "polygon": + log.Printf("%s blockchain\n", strings.Title(blockchain)) + default: + panic(fmt.Sprintln("Invalid blockchain provided")) + } + + MOONSTREAM_IPC_PATH := settings.GetIpcPath(blockchain) sessionID := uuid.New().String() // Humbug crash client to collect errors - crashReporter, err := humbugClient(sessionID, "moonstream-crawlers", os.Getenv("HUMBUG_REPORTER_CRAWLERS_TOKEN")) + crashReporter, err := humbugClient(sessionID, "moonstream-crawlers", settings.HUMBUG_REPORTER_CRAWLERS_TOKEN) if err != nil { panic(fmt.Sprintf("Invalid Humbug Crash configuration: %s", err.Error())) } @@ -208,10 +183,10 @@ func main() { defer gethClient.Close() // Humbug client to be able write data in Bugout journal - reporter, err := humbugClient(sessionID, os.Getenv("ETHTXPOOL_HUMBUG_CLIENT_ID"), os.Getenv("ETHTXPOOL_HUMBUG_TOKEN")) + reporter, err := humbugClient(sessionID, settings.HUMBUG_TXPOOL_CLIENT_ID, settings.HUMBUG_TXPOOL_TOKEN) if err != nil { panic(fmt.Sprintf("Invalid Humbug configuration: %s", err.Error())) } - PollTxpoolContent(gethClient, intervalSeconds, reporter) + PollTxpoolContent(gethClient, intervalSeconds, reporter, blockchain) } diff --git a/crawlers/txpool/configs/settings.go b/crawlers/txpool/configs/settings.go new file mode 100644 index 00000000..018cb9de --- /dev/null +++ b/crawlers/txpool/configs/settings.go @@ -0,0 +1,20 @@ +package settings + +import ( + "fmt" + "os" + "strings" +) + +// Internal crash journal to collect errors +var HUMBUG_REPORTER_CRAWLERS_TOKEN = os.Getenv("HUMBUG_REPORTER_CRAWLERS_TOKEN") + +var HUMBUG_TXPOOL_CLIENT_ID = os.Getenv("HUMBUG_TXPOOL_CLIENT_ID") +var HUMBUG_TXPOOL_TOKEN = os.Getenv("HUMBUG_TXPOOL_TOKEN") + +// Geth connection URL +func GetIpcPath(blockchain string) string { + MOONSTREAM_NODE_IPC_ADDR := os.Getenv(fmt.Sprintf("MOONSTREAM_NODE_%s_IPC_ADDR", strings.ToUpper(blockchain))) + MOONSTREAM_NODE_IPC_PORT := os.Getenv(fmt.Sprintf("MOONSTREAM_NODE_%s_IPC_PORT", strings.ToUpper(blockchain))) + return fmt.Sprintf("http://%s:%s", MOONSTREAM_NODE_IPC_ADDR, MOONSTREAM_NODE_IPC_PORT) +} diff --git a/crawlers/ethtxpool/go.mod b/crawlers/txpool/go.mod similarity index 73% rename from crawlers/ethtxpool/go.mod rename to crawlers/txpool/go.mod index 912776e8..2e9e3ed7 100644 --- a/crawlers/ethtxpool/go.mod +++ b/crawlers/txpool/go.mod @@ -1,4 +1,4 @@ -module github.com/bugout-dev/moonstream/crawlers/ethtxpool +module github.com/bugout-dev/moonstream/crawlers/txpool go 1.16 diff --git a/crawlers/ethtxpool/go.sum b/crawlers/txpool/go.sum similarity index 100% rename from crawlers/ethtxpool/go.sum rename to crawlers/txpool/go.sum diff --git a/crawlers/txpool/main.go b/crawlers/txpool/main.go new file mode 100644 index 00000000..2a9594b1 --- /dev/null +++ b/crawlers/txpool/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/bugout-dev/moonstream/crawlers/txpool/cmd" +) + +func main() { + cmd.InitTxPool() +} diff --git a/crawlers/txpool/sample.env b/crawlers/txpool/sample.env new file mode 100644 index 00000000..55fb4818 --- /dev/null +++ b/crawlers/txpool/sample.env @@ -0,0 +1,7 @@ +export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1" +export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545" +export MOONSTREAM_NODE_POLYGON_IPC_ADDR="127.0.0.1" +export MOONSTREAM_NODE_POLYGON_IPC_PORT="8545" +export HUMBUG_TXPOOL_CLIENT_ID="" +export HUMBUG_TXPOOL_TOKEN="" +export HUMBUG_REPORTER_CRAWLERS_TOKEN="" diff --git a/frontend/pages/dashboard/[dashboardId].js b/frontend/pages/dashboard/[dashboardId].js index 8abbb6cb..67630496 100644 --- a/frontend/pages/dashboard/[dashboardId].js +++ b/frontend/pages/dashboard/[dashboardId].js @@ -259,8 +259,7 @@ const Analytics = () => { diff --git a/frontend/src/components/NewDashboard.js b/frontend/src/components/NewDashboard.js index 9c45ff1e..840fb26c 100644 --- a/frontend/src/components/NewDashboard.js +++ b/frontend/src/components/NewDashboard.js @@ -65,6 +65,30 @@ const NewDashboard = (props) => { subscriptions.subscriptionsCache.data?.subscriptions ); + useEffect(() => { + newDashboardForm.subscriptions.forEach((element, idx) => { + const subscription = + subscriptions.subscriptionsCache.data?.subscriptions.find( + (subscription_item) => + element.subscription_id === subscription_item.id + ); + + if ( + element.subscription_id && + subscription && + newDashboardForm.subscriptions[idx].abi !== subscription?.abi + ) { + const newestDashboardForm = { ...newDashboardForm }; + newestDashboardForm.subscriptions[idx].abi = subscription.abi; + setNewDashboardForm(newestDashboardForm); + } + }); + }, [ + subscriptions.subscriptionsCache.data, + newDashboardForm, + setNewDashboardForm, + ]); + useEffect(() => { if (!subscriptions.subscriptionsCache.isLoading) { const massaged = subscriptions.subscriptionsCache.data?.subscriptions.map( @@ -195,7 +219,6 @@ const NewDashboard = (props) => { }; setNewDashboardForm(newState); }} - // isOpen={showSuggestions} itemToString={(item) => (item ? item.label : "")} initialSelectedItem={subscibedItem ?? undefined} > @@ -208,7 +231,6 @@ const NewDashboard = (props) => { isOpen, inputValue, highlightedIndex, - // selectedItem, getRootProps, }) => { const labelColor = @@ -217,7 +239,6 @@ const NewDashboard = (props) => { return ( { placeholder="Subscription to use in dashboard" isTruncated fontSize="sm" - // defaultValue={ - // subscibedItem?.label ?? "yoyoy" - // } {...getInputProps({ defaultValue: subscibedItem?.label ?? "iha", @@ -271,18 +289,6 @@ const NewDashboard = (props) => { - {/* */} {isOpen ? ( { }} - // - // - // - // {pickerItems?.map((subscription, oid) => ( - // - // {/* */} - // {subscription.label} - // - // ))} - // {/* - // {({ value }) => ( - // New Subscription: {value} - // )} - // */} - // - // )} @@ -460,7 +437,7 @@ const NewDashboard = (props) => { onClick={() => overlay.toggleModal({ type: MODAL_TYPES.UPLOAD_ABI, - props: { id: subscibedItem.id }, + props: { id: subscibedItem.subscription_id }, }) } > @@ -620,14 +597,6 @@ const NewDashboard = (props) => { - - {/* - ABI -