Hooked up /streams/ routes to providers

Also added a top-level providers interface which gets data from each
individual event provider in the background before returning data to the
API. Check `backend/moonstream/providers/__init__.py`.
pull/105/head
Neeraj Kashyap 2021-08-20 11:14:21 -07:00
rodzic 5db3c60769
commit 117cfa881e
4 zmienionych plików z 441 dodań i 38 usunięć

Wyświetl plik

@ -0,0 +1,267 @@
"""
Maps provider interface over all available providers.
Available providers are exposed through the `event_providers` dictionary.
Exposes all the standard event provider methods:
- get_events
- latest_events
- next_event
- previous_event
In addition to their standard arguments, adds the following arguments to each of these methods:
- `max_threads` - Since the mapper retrieves events from each providers in the background, this allows
the caller to specify how many threads they want to make available to the background executor. This
can be set to None, in which case it uses the Python default behaviour for the max_workers=None to
concurrent.futures.ThreadPoolExecutor. (Default: None)
- result_timeout - A float representing the number of seconds to wait for the background workers to get
events from the individual providers. (Default: 30.0)
- raise_on_error - Set this to True to raise an error if any of the individual event providers experiences an
error fulfilling its method. If set to False, ignores event providers which failed and still tries to
return data to the caller. (Default: False)
- sort_events - Set this to True to sort the events that come in from different providers. Set it to False
if the order does not matter and you would rather emphasize speed. Only available for method which involve
lists of events. (Default: True)
"""
from concurrent.futures import Future, ThreadPoolExecutor
import logging
from typing import Any, Dict, List, Optional, Tuple
from bugout.app import Bugout
from bugout.data import BugoutResource
from sqlalchemy.orm import Session
from . import ethereum_blockchain
from .. import data
from ..stream_queries import StreamQuery
from moonstream import stream_queries
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
event_providers: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain}
def get_events(
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
max_threads: Optional[int] = None,
result_timeout: float = 30.0,
raise_on_error: bool = False,
sort_events: bool = True,
) -> Tuple[data.StreamBoundary, List[data.Event]]:
"""
Gets events from all providers and sends them back with the stream boundary.
"""
futures: Dict[str, Future] = {}
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
futures[provider_name] = executor.submit(
provider.get_events,
db_session,
bugout_client,
data_journal_id,
data_access_token,
stream_boundary,
query,
user_subscriptions,
)
results: Dict[str, Tuple[data.StreamBoundary, List[data.Event]]] = {}
for provider_name, future in futures.items():
try:
result = future.result(timeout=result_timeout)
if result is not None:
results[provider_name] = result
except Exception as e:
if not raise_on_error:
logger.warn(
f"Error receiving events from provider: {provider_name}:\n{repr(e)}"
)
else:
raise e
events = [event for _, event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
return (stream_boundary, events)
def latest_events(
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
query: StreamQuery,
num_events: int,
user_subscriptions: Dict[str, List[BugoutResource]],
max_threads: Optional[int] = None,
result_timeout: float = 30.0,
raise_on_error: bool = False,
sort_events: bool = True,
) -> List[data.Event]:
"""
Gets num_events most recent events from all providers, compiles them into a single list, and
returns them to the caller.
NOTE: Unlike simple event providers, the interpretation of num_events here is that we return num_event
events per individual event provider!
"""
futures: Dict[str, Future] = {}
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
futures[provider_name] = executor.submit(
provider.latest_events,
db_session,
bugout_client,
data_journal_id,
data_access_token,
query,
num_events,
user_subscriptions,
)
results: Dict[str, List[data.Event]] = {}
for provider_name, future in futures.items():
try:
result = future.result(timeout=result_timeout)
if result is not None:
results[provider_name] = result
except Exception as e:
if not raise_on_error:
logger.warn(
f"Error receiving events from provider: {provider_name}:\n{repr(e)}"
)
else:
raise e
events = [event for event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
return events
def next_event(
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
max_threads: Optional[int] = None,
result_timeout: float = 30.0,
raise_on_error: bool = False,
) -> Optional[data.Event]:
"""
Get earliest event after stream boundary across all available providers.
"""
futures: Dict[str, Future] = {}
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
futures[provider_name] = executor.submit(
provider.next_event,
db_session,
bugout_client,
data_journal_id,
data_access_token,
stream_boundary,
query,
user_subscriptions,
)
results: Dict[str, data.Event] = {}
for provider_name, future in futures.items():
try:
result = future.result(timeout=result_timeout)
if result is not None:
results[provider_name] = result
except Exception as e:
if not raise_on_error:
logger.warn(
f"Error receiving events from provider: {provider_name}:\n{repr(e)}"
)
else:
raise e
event: Optional[data.Event] = None
for candidate in results.values():
if event is None:
event = candidate
elif event.event_timestamp > candidate.event_timestamp:
event = candidate
return event
def previous_event(
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
max_threads: Optional[int] = None,
result_timeout: float = 30.0,
raise_on_error: bool = False,
) -> Optional[data.Event]:
"""
Get latest event before stream boundary across all available providers.
"""
futures: Dict[str, Future] = {}
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
futures[provider_name] = executor.submit(
provider.previous_event,
db_session,
bugout_client,
data_journal_id,
data_access_token,
stream_boundary,
query,
user_subscriptions,
)
results: Dict[str, data.Event] = {}
for provider_name, future in futures.items():
try:
result = future.result(timeout=result_timeout)
if result is not None:
results[provider_name] = result
except Exception as e:
if not raise_on_error:
logger.warn(
f"Error receiving events from provider: {provider_name}:\n{repr(e)}"
)
else:
raise e
event: Optional[data.Event] = None
for candidate in results.values():
if event is None:
event = candidate
elif event.event_timestamp < candidate.event_timestamp:
event = candidate
return event

Wyświetl plik

@ -19,6 +19,7 @@ from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
event_type = "ethereum_blockchain"
@ -74,7 +75,7 @@ def default_filters(subscriptions: List[BugoutResource]) -> List[str]:
def parse_filters(
query: StreamQuery, user_subscriptions: Dict[str, List[Dict[str, Any]]]
query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
) -> Optional[Filters]:
"""
Passes raw filter strings into a Filters object which is used to construct a database query
@ -223,7 +224,7 @@ def get_events(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
"""
Returns ethereum_blockchain events for the given addresses in the time period represented
@ -231,6 +232,7 @@ def get_events(
If the query does not require any data from this provider, returns None.
"""
logger.warn("WHAT THE HELL PARAKEET")
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None
@ -259,7 +261,7 @@ def latest_events(
data_access_token: str,
query: StreamQuery,
num_events: int,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[List[data.Event]]:
"""
Returns the num_events latest events from the current provider, subject to the constraints imposed
@ -291,7 +293,7 @@ def next_event(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[data.Event]:
"""
Returns the earliest event occuring after the given stream boundary corresponding to the given
@ -331,7 +333,7 @@ def previous_event(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[data.Event]:
"""
Returns the latest event occuring before the given stream boundary corresponding to the given

Wyświetl plik

@ -2,20 +2,24 @@
The Moonstream subscriptions HTTP API
"""
import logging
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from bugout.data import BugoutResource
from fastapi import FastAPI, HTTPException, Request, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb import db
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import user
from .. import data
from ..middleware import BroodAuthMiddleware
from ..providers import ethereum_blockchain
from ..providers import (
event_providers,
get_events,
latest_events,
next_event,
previous_event,
)
from ..settings import (
DOCS_TARGET_PATH,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -58,7 +62,7 @@ whitelist_paths.update(DOCS_PATHS)
app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths)
def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]:
def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]:
"""
Returns the given user's subscriptions grouped by subscription type.
"""
@ -71,7 +75,7 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]:
)
# TODO(andrey, kompotkot, zomglings): PAGINATION!!!
user_subscriptions: Dict[str, List[Dict[str, Any]]] = {}
user_subscriptions: Dict[str, List[BugoutResource]] = {}
for subscription in response.resources:
subscription_type = subscription.resource_data.get("subscription_type_id")
if subscription_type is None:
@ -83,11 +87,8 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]:
return user_subscriptions
EVENT_PROVIDERS: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain}
@app.get("/", tags=["streams"], response_model=data.GetEventsResponse)
async def search_transactions(
async def stream_handler(
request: Request,
q: str = Query(""),
start_time: int = Query(0),
@ -96,6 +97,16 @@ async def search_transactions(
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
) -> data.GetEventsResponse:
"""
Gets all events in the client's stream subject to the constraints defined by the following query
parameters:
- q: Query string which filters over subscriptions
- start_time, end_time, include_start, include_end: These define the window of time from which
we want to retrieve events.
All times must be given as seconds since the Unix epoch.
"""
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
@ -105,29 +116,152 @@ async def search_transactions(
user_subscriptions = get_user_subscriptions(request.state.token)
query = stream_queries.StreamQuery(
subscription_types=[subtype for subtype in EVENT_PROVIDERS], subscriptions=[]
subscription_types=[subtype for subtype in event_providers], subscriptions=[]
)
if q.strip() != "":
query = stream_queries.parse_query_string(q)
results = {
event_type: provider.get_events(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary,
query,
user_subscriptions,
)
for event_type, provider in EVENT_PROVIDERS.items()
}
events = [
event
for _, event_list in results.values()
if event_list is not None
for event in event_list
]
events.sort(key=lambda event: event.event_timestamp, reverse=True)
_, events = get_events(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary,
query,
user_subscriptions,
# 10 minute timeout on per-provider results
result_timeout=600.0,
raise_on_error=True,
)
response = data.GetEventsResponse(stream_boundary=stream_boundary, events=events)
return response
@app.get("/latest", tags=["streams"])
async def latest_events_handler(
request: Request, q=Query(""), db_session: Session = Depends(db.yield_db_session)
) -> List[data.Event]:
"""
Gets the latest events in the client's stream subject to the constraints defined by the following query
parameters:
- q: Query string which filters over subscriptions
All times must be given as seconds since the Unix epoch.
"""
user_subscriptions = get_user_subscriptions(request.state.token)
query = stream_queries.StreamQuery(
subscription_types=[subtype for subtype in event_providers], subscriptions=[]
)
if q.strip() != "":
query = stream_queries.parse_query_string(q)
events = latest_events(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
query,
1,
user_subscriptions,
result_timeout=600.0,
raise_on_error=True,
sort_events=True,
)
return events
@app.get("/next", tags=["stream"])
async def next_event_handler(
request: Request,
q: str = Query(""),
start_time: int = Query(0),
end_time: Optional[int] = Query(None),
include_start: bool = Query(False),
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
) -> Optional[data.Event]:
"""
Gets the next event in the client's stream subject to the constraints defined by the following query
parameters:
- q: Query string which filters over subscriptions
- start_time, end_time, include_start, include_end: These define the window of time after which
we want to retrieve the next event.
All times must be given as seconds since the Unix epoch.
"""
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
include_start=include_start,
include_end=include_end,
)
user_subscriptions = get_user_subscriptions(request.state.token)
query = stream_queries.StreamQuery(
subscription_types=[subtype for subtype in event_providers], subscriptions=[]
)
if q.strip() != "":
query = stream_queries.parse_query_string(q)
event = next_event(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary,
query,
user_subscriptions,
result_timeout=600.0,
raise_on_error=True,
)
return event
@app.get("/previous", tags=["stream"])
async def previous_event_handler(
request: Request,
q: str = Query(""),
start_time: int = Query(0),
end_time: Optional[int] = Query(None),
include_start: bool = Query(False),
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
) -> Optional[data.Event]:
"""
Gets the previous event in the client's stream subject to the constraints defined by the following query
parameters:
- q: Query string which filters over subscriptions
- start_time, end_time, include_start, include_end: These define the window of time before which
we want to retrieve the previous event.
All times must be given as seconds since the Unix epoch.
"""
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
include_start=include_start,
include_end=include_end,
)
user_subscriptions = get_user_subscriptions(request.state.token)
query = stream_queries.StreamQuery(
subscription_types=[subtype for subtype in event_providers], subscriptions=[]
)
if q.strip() != "":
query = stream_queries.parse_query_string(q)
event = previous_event(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary,
query,
user_subscriptions,
result_timeout=600.0,
raise_on_error=True,
)
return event

Wyświetl plik

@ -14,8 +14,8 @@ MOONSTREAM_APPLICATION_ID = os.environ.get("MOONSTREAM_APPLICATION_ID", "")
if MOONSTREAM_APPLICATION_ID == "":
raise ValueError("MOONSTREAM_APPLICATION_ID environment variable must be set")
MOONSTREAM_DATA_JOURNAL_ID = os.environ.get("MOONSTREAM_DATA_JOURNAL_ID")
if MOONSTREAM_DATA_JOURNAL_ID is None:
MOONSTREAM_DATA_JOURNAL_ID = os.environ.get("MOONSTREAM_DATA_JOURNAL_ID", "")
if MOONSTREAM_DATA_JOURNAL_ID == "":
raise ValueError("MOONSTREAM_DATA_JOURNAL_ID environment variable must be set")
MOONSTREAM_ADMIN_ACCESS_TOKEN = os.environ.get("MOONSTREAM_ADMIN_ACCESS_TOKEN", "")