Added /streams/info endpoint

This gives information about available event providers on the API.
pull/265/head
Neeraj Kashyap 2021-09-16 09:25:22 -07:00
rodzic d73c8d2a95
commit aeae226491
3 zmienionych plików z 66 dodań i 6 usunięć

Wyświetl plik

@ -35,6 +35,9 @@ class BugoutEventProvider:
def __init__( def __init__(
self, self,
event_type: str, event_type: str,
description: str,
default_time_interval_seconds: int,
estimated_events_per_time_interval: float,
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
batch_size: int = 100, batch_size: int = 100,
timeout: float = 30.0, timeout: float = 30.0,
@ -47,6 +50,9 @@ class BugoutEventProvider:
- timeout: Request timeout for Bugout requests - timeout: Request timeout for Bugout requests
""" """
self.event_type = event_type self.event_type = event_type
self.description = description
self.default_time_interval_seconds = default_time_interval_seconds
self.estimated_events_per_time_interval = estimated_events_per_time_interval
self.batch_size = batch_size self.batch_size = batch_size
self.timeout = timeout self.timeout = timeout
if tags is None: if tags is None:
@ -273,12 +279,23 @@ class EthereumTXPoolProvider(BugoutEventProvider):
def __init__( def __init__(
self, self,
event_type: str, event_type: str,
description: str,
default_time_interval_seconds: int,
estimated_events_per_time_interval: float,
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
batch_size: int = 100, batch_size: int = 100,
timeout: float = 30.0, timeout: float = 30.0,
): ):
super().__init__(event_type, tags, batch_size, timeout) super().__init__(
event_type=event_type,
description=description,
default_time_interval_seconds=default_time_interval_seconds,
estimated_events_per_time_interval=estimated_events_per_time_interval,
tags=tags,
batch_size=batch_size,
timeout=timeout,
)
def parse_filters( def parse_filters(
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]] self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
@ -304,11 +321,32 @@ class EthereumTXPoolProvider(BugoutEventProvider):
return subscriptions_filters return subscriptions_filters
whalewatch_description = """Event provider for Ethereum whale watch.
Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories:
1. Number of transactions sent
2. Number of transactions received
3. Amount (in WEI) sent
4. Amount (in WEI) received
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
whalewatch_provider = BugoutEventProvider( whalewatch_provider = BugoutEventProvider(
event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"] event_type="ethereum_whalewatch",
description=whalewatch_description,
default_time_interval_seconds=310,
estimated_events_per_time_interval=1,
tags=["crawl_type:ethereum_trending"],
) )
ethereum_txpool_description = """Event provider for Ethereum transaction pool.
Shows the latest events (from the previous hour) in the Ethereum transaction pool.
To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint."""
ethereum_txpool_provider = EthereumTXPoolProvider( ethereum_txpool_provider = EthereumTXPoolProvider(
event_type="ethereum_txpool", tags=["client:ethereum-txpool-crawler-0"] event_type="ethereum_txpool",
description=ethereum_txpool_description,
default_time_interval_seconds=5,
estimated_events_per_time_interval=50,
tags=["client:ethereum-txpool-crawler-0"],
) )

Wyświetl plik

@ -14,7 +14,6 @@ from sqlalchemy.orm import Session, Query
from sqlalchemy.sql.functions import user from sqlalchemy.sql.functions import user
from .. import data from .. import data
from ..settings import DEFAULT_STREAM_TIMEINTERVAL
from ..stream_boundaries import validate_stream_boundary from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery from ..stream_queries import StreamQuery
@ -25,6 +24,15 @@ logger.setLevel(logging.WARN)
event_type = "ethereum_blockchain" event_type = "ethereum_blockchain"
description = f"""Event provider for transactions from the Ethereum blockchain.
To restrict your queries to this provider, add a filter of \"type:{event_type}\" to your query (query parameter: \"q\") on the /streams endpoint."""
default_time_interval_seconds: int = 5 * 60
# 200 transactions per block, 4 blocks per minute.
estimated_events_per_time_interval: float = 5 * 800
def validate_subscription( def validate_subscription(
subscription_resource_data: data.SubscriptionResourceData, subscription_resource_data: data.SubscriptionResourceData,

Wyświetl plik

@ -2,7 +2,7 @@
The Moonstream subscriptions HTTP API The Moonstream subscriptions HTTP API
""" """
import logging import logging
from typing import Dict, List, Optional from typing import Any, Dict, List, Optional
from bugout.data import BugoutResource from bugout.data import BugoutResource
from fastapi import FastAPI, Request, Query, Depends from fastapi import FastAPI, Request, Query, Depends
@ -58,8 +58,9 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
whitelist_paths: Dict[str, str] = {} whitelist_paths: Dict[str, str] = {"/streams/info": "GET"}
whitelist_paths.update(DOCS_PATHS) whitelist_paths.update(DOCS_PATHS)
whitelist_paths.update()
app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths) app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths)
@ -88,6 +89,19 @@ def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]:
return user_subscriptions return user_subscriptions
@app.get("/info", tags=["streams"])
async def info_handler() -> Dict[str, Any]:
info = {
event_type: {
"description": provider.description,
"default_time_interval_seconds": provider.default_time_interval_seconds,
"estimated_events_per_time_interval": provider.estimated_events_per_time_interval,
}
for event_type, provider in event_providers.items()
}
return info
@app.get("/", tags=["streams"], response_model=data.GetEventsResponse) @app.get("/", tags=["streams"], response_model=data.GetEventsResponse)
async def stream_handler( async def stream_handler(
request: Request, request: Request,