2021-08-20 21:14:36 +00:00
|
|
|
"""
|
|
|
|
Event providers powered by Bugout journals.
|
|
|
|
"""
|
2021-08-23 13:07:31 +00:00
|
|
|
from datetime import datetime
|
2021-08-20 21:14:36 +00:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
|
|
from bugout.app import Bugout
|
|
|
|
from bugout.data import BugoutResource, BugoutSearchResult
|
|
|
|
from bugout.journal import SearchOrder
|
2021-08-23 14:57:39 +00:00
|
|
|
from dateutil.parser import isoparse
|
|
|
|
from dateutil.tz import UTC
|
2021-08-20 21:14:36 +00:00
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
|
|
from .. import data
|
|
|
|
from ..stream_queries import StreamQuery
|
|
|
|
|
2021-11-30 14:23:13 +00:00
|
|
|
from ..settings import HUMBUG_TXPOOL_CLIENT_ID
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logger.setLevel(logging.WARN)
|
|
|
|
|
2021-09-14 17:19:04 +00:00
|
|
|
allowed_tags = ["tag:erc721"]
|
|
|
|
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
class BugoutEventProviderError(Exception):
|
|
|
|
"""
|
|
|
|
Catch-all error for BugoutEventProvider instances.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class BugoutEventProvider:
|
|
|
|
"""
|
|
|
|
Provides events (specified by a conjunction of tags) from a Bugout journal.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
event_type: str,
|
2021-09-16 16:25:22 +00:00
|
|
|
description: str,
|
|
|
|
default_time_interval_seconds: int,
|
|
|
|
estimated_events_per_time_interval: float,
|
2021-08-20 21:14:36 +00:00
|
|
|
tags: Optional[List[str]] = None,
|
|
|
|
batch_size: int = 100,
|
|
|
|
timeout: float = 30.0,
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
- event_type: Name of event this instance provides
|
|
|
|
- tags: Tags which define events that this provider works with
|
|
|
|
- batch_size: Number of events to read from journal at a time
|
|
|
|
- timeout: Request timeout for Bugout requests
|
|
|
|
"""
|
|
|
|
self.event_type = event_type
|
2021-09-16 16:25:22 +00:00
|
|
|
self.description = description
|
|
|
|
self.default_time_interval_seconds = default_time_interval_seconds
|
|
|
|
self.estimated_events_per_time_interval = estimated_events_per_time_interval
|
2021-08-20 21:14:36 +00:00
|
|
|
self.batch_size = batch_size
|
|
|
|
self.timeout = timeout
|
|
|
|
if tags is None:
|
|
|
|
tags = []
|
|
|
|
self.tags: List[str] = tags
|
|
|
|
self.query = [f"#{tag}" for tag in self.tags]
|
|
|
|
|
|
|
|
def validate_subscription(
|
|
|
|
self, subscription_resource_data: data.SubscriptionResourceData
|
|
|
|
) -> bool:
|
|
|
|
"""
|
|
|
|
This implementation is maximally permissive and returns True for all subscriptions as long as
|
|
|
|
their subscription_type_id is the configured event_type.
|
|
|
|
Subclasses of this provider can impose stricter criteria on submissions to the relevant event types.
|
|
|
|
"""
|
|
|
|
return subscription_resource_data.subscription_type_id == self.event_type
|
|
|
|
|
|
|
|
def entry_event(self, entry: BugoutSearchResult) -> data.Event:
|
|
|
|
"""
|
|
|
|
Load an event from a Bugout journal entry. Assumes that the entry content is a JSON string
|
|
|
|
with no additional markdown formatting.
|
|
|
|
"""
|
|
|
|
event_data = {}
|
|
|
|
if entry.content is not None:
|
|
|
|
event_data = json.loads(entry.content)
|
2021-08-23 14:57:39 +00:00
|
|
|
created_at_dt = isoparse(entry.created_at)
|
|
|
|
created_at_dt = created_at_dt.replace(tzinfo=UTC)
|
|
|
|
created_at = int(created_at_dt.timestamp())
|
2021-08-20 21:14:36 +00:00
|
|
|
return data.Event(
|
|
|
|
event_type=self.event_type,
|
|
|
|
event_timestamp=created_at,
|
|
|
|
event_data=event_data,
|
|
|
|
)
|
|
|
|
|
|
|
|
def parse_filters(
|
|
|
|
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
|
|
|
) -> Optional[List[str]]:
|
|
|
|
"""
|
|
|
|
Subclasses can provide additional constraints to apply to the journal search.
|
|
|
|
|
|
|
|
If None is returned, signals that no data should be returned from the provider at all.
|
|
|
|
"""
|
2021-08-21 18:44:34 +00:00
|
|
|
is_query_constrained = query.subscription_types or query.subscriptions
|
2021-08-20 21:14:36 +00:00
|
|
|
relevant_subscriptions = user_subscriptions.get(self.event_type)
|
2021-08-26 13:21:23 +00:00
|
|
|
|
2021-08-21 18:44:34 +00:00
|
|
|
if (
|
|
|
|
is_query_constrained and self.event_type not in query.subscription_types
|
|
|
|
) or not relevant_subscriptions:
|
2021-08-20 21:14:36 +00:00
|
|
|
return None
|
|
|
|
return []
|
|
|
|
|
|
|
|
def get_events(
|
|
|
|
self,
|
|
|
|
db_session: Session,
|
|
|
|
bugout_client: Bugout,
|
|
|
|
data_journal_id: str,
|
|
|
|
data_access_token: str,
|
|
|
|
stream_boundary: data.StreamBoundary,
|
|
|
|
query: StreamQuery,
|
|
|
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
|
|
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
|
|
|
"""
|
|
|
|
Uses journal search endpoint to retrieve events for the given stream boundary and query constraints
|
|
|
|
from the connected journal.
|
|
|
|
"""
|
|
|
|
additional_constraints = self.parse_filters(query, user_subscriptions)
|
|
|
|
if additional_constraints is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
time_constraints: List[str] = []
|
|
|
|
if stream_boundary.start_time > 0:
|
2021-08-23 14:57:39 +00:00
|
|
|
start_time = datetime.utcfromtimestamp(
|
|
|
|
stream_boundary.start_time
|
|
|
|
).isoformat()
|
2021-08-20 21:14:36 +00:00
|
|
|
operator = ">"
|
|
|
|
if stream_boundary.include_start:
|
|
|
|
operator = ">="
|
2021-08-23 14:57:39 +00:00
|
|
|
time_constraints.append(f"created_at:{operator}{start_time}")
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
if stream_boundary.end_time is not None:
|
2021-08-23 14:57:39 +00:00
|
|
|
end_time = datetime.utcfromtimestamp(stream_boundary.end_time).isoformat()
|
2021-08-20 21:14:36 +00:00
|
|
|
operator = "<"
|
|
|
|
if stream_boundary.include_end:
|
|
|
|
operator = "<="
|
2021-08-23 14:57:39 +00:00
|
|
|
time_constraints.append(f"created_at:{operator}{end_time}")
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
final_query = " ".join(self.query + time_constraints + additional_constraints)
|
|
|
|
events: List[data.Event] = []
|
|
|
|
offset: Optional[int] = 0
|
|
|
|
while offset is not None:
|
|
|
|
search_results = bugout_client.search(
|
|
|
|
data_access_token,
|
|
|
|
data_journal_id,
|
|
|
|
final_query,
|
|
|
|
limit=self.batch_size,
|
|
|
|
offset=offset,
|
|
|
|
content=True,
|
|
|
|
timeout=self.timeout,
|
|
|
|
order=SearchOrder.DESCENDING,
|
|
|
|
)
|
|
|
|
events.extend([self.entry_event(entry) for entry in search_results.results])
|
|
|
|
offset = search_results.next_offset
|
|
|
|
|
|
|
|
return stream_boundary, events
|
|
|
|
|
|
|
|
def latest_events(
|
|
|
|
self,
|
|
|
|
db_session: Session,
|
|
|
|
bugout_client: Bugout,
|
|
|
|
data_journal_id: str,
|
|
|
|
data_access_token: str,
|
|
|
|
query: StreamQuery,
|
|
|
|
num_events: int,
|
|
|
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
|
|
) -> Optional[List[data.Event]]:
|
|
|
|
"""
|
|
|
|
Gets the latest events corresponding to this provider from the given journal.
|
|
|
|
"""
|
|
|
|
additional_constraints = self.parse_filters(query, user_subscriptions)
|
|
|
|
if additional_constraints is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
if num_events > self.batch_size:
|
|
|
|
raise BugoutEventProviderError(
|
|
|
|
f"You requested too many events: event_type={self.event_type}, num_events={num_events}, limit={self.batch_size}"
|
|
|
|
)
|
|
|
|
|
|
|
|
final_query = " ".join(self.query + additional_constraints)
|
|
|
|
search_results = bugout_client.search(
|
|
|
|
data_access_token,
|
|
|
|
data_journal_id,
|
|
|
|
final_query,
|
|
|
|
limit=num_events,
|
|
|
|
content=True,
|
|
|
|
timeout=self.timeout,
|
|
|
|
order=SearchOrder.DESCENDING,
|
|
|
|
)
|
|
|
|
return [self.entry_event(entry) for entry in search_results.results]
|
|
|
|
|
|
|
|
def next_event(
|
|
|
|
self,
|
|
|
|
db_session: Session,
|
|
|
|
bugout_client: Bugout,
|
|
|
|
data_journal_id: str,
|
|
|
|
data_access_token: str,
|
|
|
|
stream_boundary: data.StreamBoundary,
|
|
|
|
query: StreamQuery,
|
|
|
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
|
|
) -> Optional[data.Event]:
|
|
|
|
"""
|
|
|
|
Get the earliest event that occurred after the time window represented by the given stream boundary.
|
|
|
|
"""
|
|
|
|
additional_constraints = self.parse_filters(query, user_subscriptions)
|
|
|
|
if additional_constraints is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
if stream_boundary.end_time is None:
|
|
|
|
raise BugoutEventProviderError(
|
|
|
|
"Cannot return next event for a stream boundary which is current."
|
|
|
|
)
|
2021-08-23 14:57:39 +00:00
|
|
|
end_time = datetime.utcfromtimestamp(stream_boundary.end_time).isoformat()
|
2021-08-20 21:14:36 +00:00
|
|
|
operator = ">="
|
|
|
|
if stream_boundary.include_end:
|
|
|
|
operator = ">"
|
2021-08-23 14:12:44 +00:00
|
|
|
additional_constraints.append(f"created_at:{operator}{end_time}")
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
final_query = " ".join(self.query + additional_constraints)
|
|
|
|
search_results = bugout_client.search(
|
|
|
|
data_access_token,
|
|
|
|
data_journal_id,
|
|
|
|
final_query,
|
|
|
|
limit=1,
|
|
|
|
content=True,
|
|
|
|
timeout=self.timeout,
|
|
|
|
order=SearchOrder.ASCENDING,
|
|
|
|
)
|
|
|
|
if not search_results.results:
|
|
|
|
return None
|
2021-08-21 18:44:34 +00:00
|
|
|
return self.entry_event(search_results.results[0])
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
def previous_event(
|
|
|
|
self,
|
|
|
|
db_session: Session,
|
|
|
|
bugout_client: Bugout,
|
|
|
|
data_journal_id: str,
|
|
|
|
data_access_token: str,
|
|
|
|
stream_boundary: data.StreamBoundary,
|
|
|
|
query: StreamQuery,
|
|
|
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
|
|
) -> Optional[data.Event]:
|
|
|
|
"""
|
|
|
|
Get the latest event that occurred before the time window represented by the given stream boundary.
|
|
|
|
"""
|
|
|
|
additional_constraints = self.parse_filters(query, user_subscriptions)
|
|
|
|
if additional_constraints is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
if stream_boundary.start_time == 0:
|
|
|
|
raise BugoutEventProviderError(
|
|
|
|
"Cannot return previous event for a stream boundary starting at the beginning of time."
|
|
|
|
)
|
2021-08-23 14:57:39 +00:00
|
|
|
start_time = datetime.utcfromtimestamp(stream_boundary.start_time).isoformat()
|
2021-08-20 21:14:36 +00:00
|
|
|
operator = "<="
|
|
|
|
if stream_boundary.include_start:
|
|
|
|
operator = "<"
|
2021-08-23 14:12:44 +00:00
|
|
|
additional_constraints.append(f"created_at:{operator}{start_time}")
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
final_query = " ".join(self.query + additional_constraints)
|
|
|
|
search_results = bugout_client.search(
|
|
|
|
data_access_token,
|
|
|
|
data_journal_id,
|
|
|
|
final_query,
|
|
|
|
limit=1,
|
|
|
|
content=True,
|
|
|
|
timeout=self.timeout,
|
|
|
|
order=SearchOrder.DESCENDING,
|
|
|
|
)
|
|
|
|
if not search_results.results:
|
|
|
|
return None
|
2021-08-21 18:44:34 +00:00
|
|
|
return self.entry_event(search_results.results[0])
|
2021-08-20 21:14:36 +00:00
|
|
|
|
|
|
|
|
2021-08-27 15:41:08 +00:00
|
|
|
class EthereumTXPoolProvider(BugoutEventProvider):
|
2021-08-27 13:58:13 +00:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
event_type: str,
|
2021-09-16 16:25:22 +00:00
|
|
|
description: str,
|
|
|
|
default_time_interval_seconds: int,
|
|
|
|
estimated_events_per_time_interval: float,
|
2021-08-27 13:58:13 +00:00
|
|
|
tags: Optional[List[str]] = None,
|
|
|
|
batch_size: int = 100,
|
|
|
|
timeout: float = 30.0,
|
|
|
|
):
|
|
|
|
|
2021-09-16 16:25:22 +00:00
|
|
|
super().__init__(
|
|
|
|
event_type=event_type,
|
|
|
|
description=description,
|
|
|
|
default_time_interval_seconds=default_time_interval_seconds,
|
|
|
|
estimated_events_per_time_interval=estimated_events_per_time_interval,
|
|
|
|
tags=tags,
|
|
|
|
batch_size=batch_size,
|
|
|
|
timeout=timeout,
|
|
|
|
)
|
2021-08-27 13:58:13 +00:00
|
|
|
|
|
|
|
def parse_filters(
|
|
|
|
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
|
|
|
) -> Optional[List[str]]:
|
|
|
|
|
|
|
|
is_query_constrained = query.subscription_types or query.subscriptions
|
|
|
|
relevant_subscriptions = user_subscriptions.get(self.event_type)
|
|
|
|
|
|
|
|
if (
|
|
|
|
is_query_constrained and self.event_type not in query.subscription_types
|
|
|
|
) or not relevant_subscriptions:
|
|
|
|
return None
|
|
|
|
addresses = [
|
|
|
|
subscription.resource_data["address"]
|
|
|
|
for subscription in relevant_subscriptions
|
|
|
|
]
|
|
|
|
subscriptions_filters = []
|
2021-09-07 12:35:05 +00:00
|
|
|
for address in addresses:
|
2021-09-14 17:19:04 +00:00
|
|
|
if address in allowed_tags:
|
|
|
|
subscriptions_filters.append(address)
|
|
|
|
else:
|
|
|
|
subscriptions_filters.extend(
|
|
|
|
[f"?#from_address:{address}", f"?#to_address:{address}"]
|
|
|
|
)
|
2021-08-27 13:58:13 +00:00
|
|
|
|
|
|
|
return subscriptions_filters
|
|
|
|
|
|
|
|
|
2021-10-01 14:07:42 +00:00
|
|
|
class PublicDataProvider(BugoutEventProvider):
|
2021-09-22 10:02:47 +00:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
event_type: str,
|
|
|
|
description: str,
|
|
|
|
default_time_interval_seconds: int,
|
|
|
|
estimated_events_per_time_interval: float,
|
|
|
|
tags: Optional[List[str]] = None,
|
|
|
|
batch_size: int = 100,
|
|
|
|
timeout: float = 30.0,
|
|
|
|
):
|
|
|
|
|
|
|
|
super().__init__(
|
|
|
|
event_type=event_type,
|
|
|
|
description=description,
|
|
|
|
default_time_interval_seconds=default_time_interval_seconds,
|
|
|
|
estimated_events_per_time_interval=estimated_events_per_time_interval,
|
|
|
|
tags=tags,
|
|
|
|
batch_size=batch_size,
|
|
|
|
timeout=timeout,
|
|
|
|
)
|
|
|
|
|
|
|
|
def parse_filters(
|
|
|
|
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
|
|
|
) -> Optional[List[str]]:
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
2021-09-16 16:25:22 +00:00
|
|
|
whalewatch_description = """Event provider for Ethereum whale watch.
|
|
|
|
|
|
|
|
Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories:
|
|
|
|
1. Number of transactions sent
|
|
|
|
2. Number of transactions received
|
|
|
|
3. Amount (in WEI) sent
|
|
|
|
4. Amount (in WEI) received
|
|
|
|
|
|
|
|
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
2021-10-01 14:07:42 +00:00
|
|
|
whalewatch_provider = PublicDataProvider(
|
2021-09-16 16:25:22 +00:00
|
|
|
event_type="ethereum_whalewatch",
|
|
|
|
description=whalewatch_description,
|
|
|
|
default_time_interval_seconds=310,
|
|
|
|
estimated_events_per_time_interval=1,
|
|
|
|
tags=["crawl_type:ethereum_trending"],
|
2021-08-20 21:14:36 +00:00
|
|
|
)
|
2021-08-24 16:57:51 +00:00
|
|
|
|
2021-09-16 16:25:22 +00:00
|
|
|
ethereum_txpool_description = """Event provider for Ethereum transaction pool.
|
|
|
|
|
|
|
|
Shows the latest events (from the previous hour) in the Ethereum transaction pool.
|
2021-08-27 13:58:13 +00:00
|
|
|
|
2021-09-16 16:25:22 +00:00
|
|
|
To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
2021-08-27 15:41:08 +00:00
|
|
|
ethereum_txpool_provider = EthereumTXPoolProvider(
|
2021-09-16 16:25:22 +00:00
|
|
|
event_type="ethereum_txpool",
|
|
|
|
description=ethereum_txpool_description,
|
|
|
|
default_time_interval_seconds=5,
|
|
|
|
estimated_events_per_time_interval=50,
|
2021-11-30 14:23:13 +00:00
|
|
|
tags=[f"client:{HUMBUG_TXPOOL_CLIENT_ID}"],
|
2021-08-24 16:57:51 +00:00
|
|
|
)
|
2021-09-16 16:56:12 +00:00
|
|
|
|
|
|
|
nft_summary_description = """Event provider for NFT market summaries.
|
|
|
|
|
|
|
|
This provider periodically generates NFT market summaries for the last hour of market activity.
|
|
|
|
|
|
|
|
Currently, it summarizes the activities on the following NFT markets:
|
|
|
|
1. The Ethereum market
|
|
|
|
|
|
|
|
This provider is currently not accessible for subscription. The data from this provider is publicly
|
|
|
|
available at the /nft endpoint."""
|
2021-10-01 14:07:42 +00:00
|
|
|
nft_summary_provider = PublicDataProvider(
|
2021-09-16 16:56:12 +00:00
|
|
|
event_type="nft_summary",
|
|
|
|
description=nft_summary_description,
|
|
|
|
# 40 blocks per summary, 15 seconds per block + 2 seconds wiggle room.
|
|
|
|
default_time_interval_seconds=40 * 17,
|
|
|
|
estimated_events_per_time_interval=1,
|
|
|
|
tags=["crawl_type:nft_ethereum"],
|
|
|
|
)
|