From 2886aed32e2fd14d41a079baa1ec49dadc7e0e18 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Fri, 20 Aug 2021 14:14:36 -0700 Subject: [PATCH] Added ethereum_whalewatch provider to API --- backend/moonstream/providers/__init__.py | 7 +- backend/moonstream/providers/bugout.py | 266 ++++++++++++++++++ .../providers/ethereum_blockchain.py | 2 +- backend/moonstream/routes/streams.py | 6 +- backend/requirements.txt | 2 +- backend/setup.py | 2 +- 6 files changed, 277 insertions(+), 8 deletions(-) create mode 100644 backend/moonstream/providers/bugout.py diff --git a/backend/moonstream/providers/__init__.py b/backend/moonstream/providers/__init__.py index 606e809b..23d6f6b7 100644 --- a/backend/moonstream/providers/__init__.py +++ b/backend/moonstream/providers/__init__.py @@ -32,7 +32,7 @@ from bugout.app import Bugout from bugout.data import BugoutResource from sqlalchemy.orm import Session -from . import ethereum_blockchain +from . import bugout, ethereum_blockchain from .. import data from ..stream_queries import StreamQuery from moonstream import stream_queries @@ -40,7 +40,10 @@ from moonstream import stream_queries logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) -event_providers: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain} +event_providers: Dict[str, Any] = { + ethereum_blockchain.event_type: ethereum_blockchain, + bugout.whalewatch_provider.event_type: bugout.whalewatch_provider, +} def get_events( diff --git a/backend/moonstream/providers/bugout.py b/backend/moonstream/providers/bugout.py new file mode 100644 index 00000000..4867c36c --- /dev/null +++ b/backend/moonstream/providers/bugout.py @@ -0,0 +1,266 @@ +""" +Event providers powered by Bugout journals. +""" +import json +import logging +import time +from typing import Dict, List, Optional, Tuple + +from bugout.app import Bugout +from bugout.data import BugoutResource, BugoutSearchResult +from bugout.journal import SearchOrder +from sqlalchemy.orm import Session + +from .. import data +from ..stream_queries import StreamQuery + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARN) + + +class BugoutEventProviderError(Exception): + """ + Catch-all error for BugoutEventProvider instances. + """ + + +class BugoutEventProvider: + """ + Provides events (specified by a conjunction of tags) from a Bugout journal. + """ + + def __init__( + self, + event_type: str, + tags: Optional[List[str]] = None, + batch_size: int = 100, + timeout: float = 30.0, + ): + """ + Args: + - event_type: Name of event this instance provides + - tags: Tags which define events that this provider works with + - batch_size: Number of events to read from journal at a time + - timeout: Request timeout for Bugout requests + """ + self.event_type = event_type + self.batch_size = batch_size + self.timeout = timeout + if tags is None: + tags = [] + self.tags: List[str] = tags + self.query = [f"#{tag}" for tag in self.tags] + + def validate_subscription( + self, subscription_resource_data: data.SubscriptionResourceData + ) -> bool: + """ + This implementation is maximally permissive and returns True for all subscriptions as long as + their subscription_type_id is the configured event_type. + Subclasses of this provider can impose stricter criteria on submissions to the relevant event types. + """ + return subscription_resource_data.subscription_type_id == self.event_type + + def entry_event(self, entry: BugoutSearchResult) -> data.Event: + """ + Load an event from a Bugout journal entry. Assumes that the entry content is a JSON string + with no additional markdown formatting. + """ + event_data = {} + if entry.content is not None: + event_data = json.loads(entry.content) + created_at = int(time.time()) + return data.Event( + event_type=self.event_type, + event_timestamp=created_at, + event_data=event_data, + ) + + def parse_filters( + self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]] + ) -> Optional[List[str]]: + """ + Subclasses can provide additional constraints to apply to the journal search. + + If None is returned, signals that no data should be returned from the provider at all. + """ + relevant_subscriptions = user_subscriptions.get(self.event_type) + if not relevant_subscriptions: + return None + return [] + + def get_events( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]: + """ + Uses journal search endpoint to retrieve events for the given stream boundary and query constraints + from the connected journal. + """ + additional_constraints = self.parse_filters(query, user_subscriptions) + if additional_constraints is None: + return None + + time_constraints: List[str] = [] + if stream_boundary.start_time > 0: + operator = ">" + if stream_boundary.include_start: + operator = ">=" + time_constraints.append( + f"created_at:{operator}{stream_boundary.start_time}" + ) + + if stream_boundary.end_time is not None: + operator = "<" + if stream_boundary.include_end: + operator = "<=" + time_constraints.append(f"created_at:{operator}{stream_boundary.end_time}") + + final_query = " ".join(self.query + time_constraints + additional_constraints) + events: List[data.Event] = [] + offset: Optional[int] = 0 + while offset is not None: + search_results = bugout_client.search( + data_access_token, + data_journal_id, + final_query, + limit=self.batch_size, + offset=offset, + content=True, + timeout=self.timeout, + order=SearchOrder.DESCENDING, + ) + events.extend([self.entry_event(entry) for entry in search_results.results]) + offset = search_results.next_offset + + return stream_boundary, events + + def latest_events( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + query: StreamQuery, + num_events: int, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[List[data.Event]]: + """ + Gets the latest events corresponding to this provider from the given journal. + """ + additional_constraints = self.parse_filters(query, user_subscriptions) + if additional_constraints is None: + return None + + if num_events > self.batch_size: + raise BugoutEventProviderError( + f"You requested too many events: event_type={self.event_type}, num_events={num_events}, limit={self.batch_size}" + ) + + final_query = " ".join(self.query + additional_constraints) + search_results = bugout_client.search( + data_access_token, + data_journal_id, + final_query, + limit=num_events, + content=True, + timeout=self.timeout, + order=SearchOrder.DESCENDING, + ) + return [self.entry_event(entry) for entry in search_results.results] + + def next_event( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Get the earliest event that occurred after the time window represented by the given stream boundary. + """ + additional_constraints = self.parse_filters(query, user_subscriptions) + if additional_constraints is None: + return None + + if stream_boundary.end_time is None: + raise BugoutEventProviderError( + "Cannot return next event for a stream boundary which is current." + ) + operator = ">=" + if stream_boundary.include_end: + operator = ">" + additional_constraints.append( + f"created_at:{operator}{stream_boundary.end_time}" + ) + + final_query = " ".join(self.query + additional_constraints) + search_results = bugout_client.search( + data_access_token, + data_journal_id, + final_query, + limit=1, + content=True, + timeout=self.timeout, + order=SearchOrder.ASCENDING, + ) + if not search_results.results: + return None + return search_results.results[0] + + def previous_event( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Get the latest event that occurred before the time window represented by the given stream boundary. + """ + additional_constraints = self.parse_filters(query, user_subscriptions) + if additional_constraints is None: + return None + + if stream_boundary.start_time == 0: + raise BugoutEventProviderError( + "Cannot return previous event for a stream boundary starting at the beginning of time." + ) + operator = "<=" + if stream_boundary.include_start: + operator = "<" + additional_constraints.append( + f"created_at:{operator}{stream_boundary.start_time}" + ) + + final_query = " ".join(self.query + additional_constraints) + search_results = bugout_client.search( + data_access_token, + data_journal_id, + final_query, + limit=1, + content=True, + timeout=self.timeout, + order=SearchOrder.DESCENDING, + ) + if not search_results.results: + return None + return search_results.results[0] + + +whalewatch_provider = BugoutEventProvider( + event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"] +) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index b1f6b732..ac0ba86c 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -133,6 +133,7 @@ def parse_filters( if not requires_ethereum_blockchain_data: return None + return parsed_filters @@ -232,7 +233,6 @@ def get_events( If the query does not require any data from this provider, returns None. """ - logger.warn("WHAT THE HELL PARAKEET") parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: return None diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 1723448d..ee7d54c9 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -164,7 +164,7 @@ async def latest_events_handler( query, 1, user_subscriptions, - result_timeout=600.0, + result_timeout=6.0, raise_on_error=True, sort_events=True, ) @@ -212,7 +212,7 @@ async def next_event_handler( stream_boundary, query, user_subscriptions, - result_timeout=600.0, + result_timeout=6.0, raise_on_error=True, ) @@ -260,7 +260,7 @@ async def previous_event_handler( stream_boundary, query, user_subscriptions, - result_timeout=600.0, + result_timeout=6.0, raise_on_error=True, ) diff --git a/backend/requirements.txt b/backend/requirements.txt index b359f045..d8e389fa 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,7 +3,7 @@ asgiref==3.4.1 black==21.7b0 boto3==1.18.1 botocore==1.21.1 -bugout==0.1.16 +bugout==0.1.17 certifi==2021.5.30 charset-normalizer==2.0.3 click==8.0.1 diff --git a/backend/setup.py b/backend/setup.py index 95f92e0c..09676761 100644 --- a/backend/setup.py +++ b/backend/setup.py @@ -10,7 +10,7 @@ setup( name="moonstream", version=MOONSTREAM_VERSION, packages=find_packages(), - install_requires=["boto3", "bugout >= 0.1.16", "fastapi", "uvicorn"], + install_requires=["boto3", "bugout >= 0.1.17", "fastapi", "uvicorn"], extras_require={ "dev": ["black", "mypy"], "distribute": ["setuptools", "twine", "wheel"],