Added ethereum_whalewatch provider to API

pull/105/head
Neeraj Kashyap 2021-08-20 14:14:36 -07:00
rodzic 117cfa881e
commit 2886aed32e
6 zmienionych plików z 277 dodań i 8 usunięć

Wyświetl plik

@ -32,7 +32,7 @@ from bugout.app import Bugout
from bugout.data import BugoutResource
from sqlalchemy.orm import Session
from . import ethereum_blockchain
from . import bugout, ethereum_blockchain
from .. import data
from ..stream_queries import StreamQuery
from moonstream import stream_queries
@ -40,7 +40,10 @@ from moonstream import stream_queries
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
event_providers: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain}
event_providers: Dict[str, Any] = {
ethereum_blockchain.event_type: ethereum_blockchain,
bugout.whalewatch_provider.event_type: bugout.whalewatch_provider,
}
def get_events(

Wyświetl plik

@ -0,0 +1,266 @@
"""
Event providers powered by Bugout journals.
"""
import json
import logging
import time
from typing import Dict, List, Optional, Tuple
from bugout.app import Bugout
from bugout.data import BugoutResource, BugoutSearchResult
from bugout.journal import SearchOrder
from sqlalchemy.orm import Session
from .. import data
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
class BugoutEventProviderError(Exception):
"""
Catch-all error for BugoutEventProvider instances.
"""
class BugoutEventProvider:
"""
Provides events (specified by a conjunction of tags) from a Bugout journal.
"""
def __init__(
self,
event_type: str,
tags: Optional[List[str]] = None,
batch_size: int = 100,
timeout: float = 30.0,
):
"""
Args:
- event_type: Name of event this instance provides
- tags: Tags which define events that this provider works with
- batch_size: Number of events to read from journal at a time
- timeout: Request timeout for Bugout requests
"""
self.event_type = event_type
self.batch_size = batch_size
self.timeout = timeout
if tags is None:
tags = []
self.tags: List[str] = tags
self.query = [f"#{tag}" for tag in self.tags]
def validate_subscription(
self, subscription_resource_data: data.SubscriptionResourceData
) -> bool:
"""
This implementation is maximally permissive and returns True for all subscriptions as long as
their subscription_type_id is the configured event_type.
Subclasses of this provider can impose stricter criteria on submissions to the relevant event types.
"""
return subscription_resource_data.subscription_type_id == self.event_type
def entry_event(self, entry: BugoutSearchResult) -> data.Event:
"""
Load an event from a Bugout journal entry. Assumes that the entry content is a JSON string
with no additional markdown formatting.
"""
event_data = {}
if entry.content is not None:
event_data = json.loads(entry.content)
created_at = int(time.time())
return data.Event(
event_type=self.event_type,
event_timestamp=created_at,
event_data=event_data,
)
def parse_filters(
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
) -> Optional[List[str]]:
"""
Subclasses can provide additional constraints to apply to the journal search.
If None is returned, signals that no data should be returned from the provider at all.
"""
relevant_subscriptions = user_subscriptions.get(self.event_type)
if not relevant_subscriptions:
return None
return []
def get_events(
self,
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
"""
Uses journal search endpoint to retrieve events for the given stream boundary and query constraints
from the connected journal.
"""
additional_constraints = self.parse_filters(query, user_subscriptions)
if additional_constraints is None:
return None
time_constraints: List[str] = []
if stream_boundary.start_time > 0:
operator = ">"
if stream_boundary.include_start:
operator = ">="
time_constraints.append(
f"created_at:{operator}{stream_boundary.start_time}"
)
if stream_boundary.end_time is not None:
operator = "<"
if stream_boundary.include_end:
operator = "<="
time_constraints.append(f"created_at:{operator}{stream_boundary.end_time}")
final_query = " ".join(self.query + time_constraints + additional_constraints)
events: List[data.Event] = []
offset: Optional[int] = 0
while offset is not None:
search_results = bugout_client.search(
data_access_token,
data_journal_id,
final_query,
limit=self.batch_size,
offset=offset,
content=True,
timeout=self.timeout,
order=SearchOrder.DESCENDING,
)
events.extend([self.entry_event(entry) for entry in search_results.results])
offset = search_results.next_offset
return stream_boundary, events
def latest_events(
self,
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
query: StreamQuery,
num_events: int,
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[List[data.Event]]:
"""
Gets the latest events corresponding to this provider from the given journal.
"""
additional_constraints = self.parse_filters(query, user_subscriptions)
if additional_constraints is None:
return None
if num_events > self.batch_size:
raise BugoutEventProviderError(
f"You requested too many events: event_type={self.event_type}, num_events={num_events}, limit={self.batch_size}"
)
final_query = " ".join(self.query + additional_constraints)
search_results = bugout_client.search(
data_access_token,
data_journal_id,
final_query,
limit=num_events,
content=True,
timeout=self.timeout,
order=SearchOrder.DESCENDING,
)
return [self.entry_event(entry) for entry in search_results.results]
def next_event(
self,
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[data.Event]:
"""
Get the earliest event that occurred after the time window represented by the given stream boundary.
"""
additional_constraints = self.parse_filters(query, user_subscriptions)
if additional_constraints is None:
return None
if stream_boundary.end_time is None:
raise BugoutEventProviderError(
"Cannot return next event for a stream boundary which is current."
)
operator = ">="
if stream_boundary.include_end:
operator = ">"
additional_constraints.append(
f"created_at:{operator}{stream_boundary.end_time}"
)
final_query = " ".join(self.query + additional_constraints)
search_results = bugout_client.search(
data_access_token,
data_journal_id,
final_query,
limit=1,
content=True,
timeout=self.timeout,
order=SearchOrder.ASCENDING,
)
if not search_results.results:
return None
return search_results.results[0]
def previous_event(
self,
db_session: Session,
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[BugoutResource]],
) -> Optional[data.Event]:
"""
Get the latest event that occurred before the time window represented by the given stream boundary.
"""
additional_constraints = self.parse_filters(query, user_subscriptions)
if additional_constraints is None:
return None
if stream_boundary.start_time == 0:
raise BugoutEventProviderError(
"Cannot return previous event for a stream boundary starting at the beginning of time."
)
operator = "<="
if stream_boundary.include_start:
operator = "<"
additional_constraints.append(
f"created_at:{operator}{stream_boundary.start_time}"
)
final_query = " ".join(self.query + additional_constraints)
search_results = bugout_client.search(
data_access_token,
data_journal_id,
final_query,
limit=1,
content=True,
timeout=self.timeout,
order=SearchOrder.DESCENDING,
)
if not search_results.results:
return None
return search_results.results[0]
whalewatch_provider = BugoutEventProvider(
event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"]
)

Wyświetl plik

@ -133,6 +133,7 @@ def parse_filters(
if not requires_ethereum_blockchain_data:
return None
return parsed_filters
@ -232,7 +233,6 @@ def get_events(
If the query does not require any data from this provider, returns None.
"""
logger.warn("WHAT THE HELL PARAKEET")
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None

Wyświetl plik

@ -164,7 +164,7 @@ async def latest_events_handler(
query,
1,
user_subscriptions,
result_timeout=600.0,
result_timeout=6.0,
raise_on_error=True,
sort_events=True,
)
@ -212,7 +212,7 @@ async def next_event_handler(
stream_boundary,
query,
user_subscriptions,
result_timeout=600.0,
result_timeout=6.0,
raise_on_error=True,
)
@ -260,7 +260,7 @@ async def previous_event_handler(
stream_boundary,
query,
user_subscriptions,
result_timeout=600.0,
result_timeout=6.0,
raise_on_error=True,
)

Wyświetl plik

@ -3,7 +3,7 @@ asgiref==3.4.1
black==21.7b0
boto3==1.18.1
botocore==1.21.1
bugout==0.1.16
bugout==0.1.17
certifi==2021.5.30
charset-normalizer==2.0.3
click==8.0.1

Wyświetl plik

@ -10,7 +10,7 @@ setup(
name="moonstream",
version=MOONSTREAM_VERSION,
packages=find_packages(),
install_requires=["boto3", "bugout >= 0.1.16", "fastapi", "uvicorn"],
install_requires=["boto3", "bugout >= 0.1.17", "fastapi", "uvicorn"],
extras_require={
"dev": ["black", "mypy"],
"distribute": ["setuptools", "twine", "wheel"],