From aeae2264914bbe58abc8d0aa4d75621c3df805f3 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Thu, 16 Sep 2021 09:25:22 -0700 Subject: [PATCH] Added /streams/info endpoint This gives information about available event providers on the API. --- backend/moonstream/providers/bugout.py | 44 +++++++++++++++++-- .../providers/ethereum_blockchain.py | 10 ++++- backend/moonstream/routes/streams.py | 18 +++++++- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/backend/moonstream/providers/bugout.py b/backend/moonstream/providers/bugout.py index 020733be..c1b08497 100644 --- a/backend/moonstream/providers/bugout.py +++ b/backend/moonstream/providers/bugout.py @@ -35,6 +35,9 @@ class BugoutEventProvider: def __init__( self, event_type: str, + description: str, + default_time_interval_seconds: int, + estimated_events_per_time_interval: float, tags: Optional[List[str]] = None, batch_size: int = 100, timeout: float = 30.0, @@ -47,6 +50,9 @@ class BugoutEventProvider: - timeout: Request timeout for Bugout requests """ self.event_type = event_type + self.description = description + self.default_time_interval_seconds = default_time_interval_seconds + self.estimated_events_per_time_interval = estimated_events_per_time_interval self.batch_size = batch_size self.timeout = timeout if tags is None: @@ -273,12 +279,23 @@ class EthereumTXPoolProvider(BugoutEventProvider): def __init__( self, event_type: str, + description: str, + default_time_interval_seconds: int, + estimated_events_per_time_interval: float, tags: Optional[List[str]] = None, batch_size: int = 100, timeout: float = 30.0, ): - super().__init__(event_type, tags, batch_size, timeout) + super().__init__( + event_type=event_type, + description=description, + default_time_interval_seconds=default_time_interval_seconds, + estimated_events_per_time_interval=estimated_events_per_time_interval, + tags=tags, + batch_size=batch_size, + timeout=timeout, + ) def parse_filters( self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]] @@ -304,11 +321,32 @@ class EthereumTXPoolProvider(BugoutEventProvider): return subscriptions_filters +whalewatch_description = """Event provider for Ethereum whale watch. + +Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories: +1. Number of transactions sent +2. Number of transactions received +3. Amount (in WEI) sent +4. Amount (in WEI) received + +To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint.""" whalewatch_provider = BugoutEventProvider( - event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"] + event_type="ethereum_whalewatch", + description=whalewatch_description, + default_time_interval_seconds=310, + estimated_events_per_time_interval=1, + tags=["crawl_type:ethereum_trending"], ) +ethereum_txpool_description = """Event provider for Ethereum transaction pool. +Shows the latest events (from the previous hour) in the Ethereum transaction pool. + +To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint.""" ethereum_txpool_provider = EthereumTXPoolProvider( - event_type="ethereum_txpool", tags=["client:ethereum-txpool-crawler-0"] + event_type="ethereum_txpool", + description=ethereum_txpool_description, + default_time_interval_seconds=5, + estimated_events_per_time_interval=50, + tags=["client:ethereum-txpool-crawler-0"], ) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 5a478a84..1eb87bde 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -14,7 +14,6 @@ from sqlalchemy.orm import Session, Query from sqlalchemy.sql.functions import user from .. import data -from ..settings import DEFAULT_STREAM_TIMEINTERVAL from ..stream_boundaries import validate_stream_boundary from ..stream_queries import StreamQuery @@ -25,6 +24,15 @@ logger.setLevel(logging.WARN) event_type = "ethereum_blockchain" +description = f"""Event provider for transactions from the Ethereum blockchain. + +To restrict your queries to this provider, add a filter of \"type:{event_type}\" to your query (query parameter: \"q\") on the /streams endpoint.""" + +default_time_interval_seconds: int = 5 * 60 + +# 200 transactions per block, 4 blocks per minute. +estimated_events_per_time_interval: float = 5 * 800 + def validate_subscription( subscription_resource_data: data.SubscriptionResourceData, diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 10d0426c..14fab19e 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -2,7 +2,7 @@ The Moonstream subscriptions HTTP API """ import logging -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from bugout.data import BugoutResource from fastapi import FastAPI, Request, Query, Depends @@ -58,8 +58,9 @@ app.add_middleware( allow_headers=["*"], ) -whitelist_paths: Dict[str, str] = {} +whitelist_paths: Dict[str, str] = {"/streams/info": "GET"} whitelist_paths.update(DOCS_PATHS) +whitelist_paths.update() app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths) @@ -88,6 +89,19 @@ def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]: return user_subscriptions +@app.get("/info", tags=["streams"]) +async def info_handler() -> Dict[str, Any]: + info = { + event_type: { + "description": provider.description, + "default_time_interval_seconds": provider.default_time_interval_seconds, + "estimated_events_per_time_interval": provider.estimated_events_per_time_interval, + } + for event_type, provider in event_providers.items() + } + return info + + @app.get("/", tags=["streams"], response_model=data.GetEventsResponse) async def stream_handler( request: Request,