Updated ethereum_blockchain provider to respect StreamQuery objects

pull/105/head
Neeraj Kashyap 2021-08-18 15:57:45 -07:00
rodzic a9320b8521
commit 13a7775bae
1 zmienionych plików z 79 dodań i 19 usunięć

Wyświetl plik

@ -72,7 +72,9 @@ def default_filters(subscriptions: List[BugoutResource]) -> List[str]:
return filters
def parse_filters(filters: List[str]) -> Filters:
def parse_filters(
query: StreamQuery, user_subscriptions: Dict[str, List[Dict[str, Any]]]
) -> Optional[Filters]:
"""
Passes raw filter strings into a Filters object which is used to construct a database query
for ethereum transactions.
@ -81,21 +83,54 @@ def parse_filters(filters: List[str]) -> Filters:
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
None indicating that the StreamQuery does not require any data from this provider.
"""
provider_subscriptions = user_subscriptions.get(event_type)
# If the user has no subscriptions to this event type, we do not have to return any data!
if not provider_subscriptions:
return None
subscribed_addresses = {
subscription.get("address")
for subscription in provider_subscriptions
if subscription.get("address") is not None
}
requires_ethereum_blockchain_data = False
for subtype in query.subscription_types:
if subtype == event_type:
requires_ethereum_blockchain_data = True
parsed_filters = Filters()
from_slice_start = len("from:")
to_slice_start = len("to:")
for raw_filter in filters:
if raw_filter.startswith("from:"):
parsed_filters.from_addresses.append(raw_filter[from_slice_start:])
elif raw_filter.startswith("to:"):
parsed_filters.to_addresses.append(raw_filter[to_slice_start:])
else:
parsed_filters.from_addresses.append(raw_filter)
parsed_filters.to_addresses.append(raw_filter)
for provider_type, raw_filter in query.subscriptions:
if provider_type != event_type:
continue
if raw_filter.startswith("from:"):
address = raw_filter[from_slice_start:]
if address in subscribed_addresses:
parsed_filters.from_addresses.append(address)
elif raw_filter.startswith("to:"):
address = raw_filter[to_slice_start:]
if address in subscribed_addresses:
parsed_filters.to_addresses.append(address)
else:
address = raw_filter
if address in subscribed_addresses:
parsed_filters.from_addresses.append(address)
parsed_filters.to_addresses.append(address)
if parsed_filters.from_addresses or parsed_filters.to_addresses:
requires_ethereum_blockchain_data = True
if not requires_ethereum_blockchain_data:
return None
return parsed_filters
@ -186,13 +221,17 @@ def get_events(
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
filters: List[str],
) -> Tuple[data.StreamBoundary, List[data.Event]]:
query: StreamQuery,
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
"""
Returns ethereum_blockchain events for the given addresses in the time period represented
by stream_boundary.
If the query does not require any data from this provider, returns None.
"""
parsed_filters = parse_filters(filters)
parsed_filters = parse_filters(query)
if parsed_filters is None:
return None
ethereum_transactions = query_ethereum_transactions(
db_session, stream_boundary, parsed_filters
@ -216,19 +255,23 @@ def latest_events(
bugout_client: Bugout,
data_journal_id: str,
data_access_token: str,
filters: List[str],
query: StreamQuery,
num_events: int,
) -> List[data.Event]:
) -> Optional[List[data.Event]]:
"""
Returns the num_events latest events from the current provider, subject to the constraints imposed
by the given filters.
If the query does not require any data from this provider, returns None.
"""
assert num_events > 0, f"num_events ({num_events}) should be positive."
stream_boundary = data.StreamBoundary(
start_time=0, include_start=True, end_time=None, include_end=False
)
parsed_filters = parse_filters(filters)
parsed_filters = parse_filters(query)
if parsed_filters is None:
return None
ethereum_transactions = (
query_ethereum_transactions(db_session, stream_boundary, parsed_filters)
.order_by(text("timestamp desc"))
@ -244,8 +287,14 @@ def next_event(
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
filters: List[str],
query: StreamQuery,
) -> Optional[data.Event]:
"""
Returns the earliest event occuring after the given stream boundary corresponding to the given
query from this provider.
If the query does not require any data from this provider, returns None.
"""
assert (
stream_boundary.end_time is not None
), "Cannot return next event for up-to-date stream boundary"
@ -255,7 +304,10 @@ def next_event(
end_time=None,
include_end=False,
)
parsed_filters = parse_filters(filters)
parsed_filters = parse_filters(query)
if parsed_filters is None:
return None
maybe_ethereum_transaction = (
query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters)
.order_by(text("timestamp asc"))
@ -274,8 +326,14 @@ def previous_event(
data_journal_id: str,
data_access_token: str,
stream_boundary: data.StreamBoundary,
filters: List[str],
query: StreamQuery,
) -> Optional[data.Event]:
"""
Returns the latest event occuring before the given stream boundary corresponding to the given
query from this provider.
If the query does not require any data from this provider, returns None.
"""
assert (
stream_boundary.start_time != 0
), "Cannot return previous event for stream starting at time 0"
@ -285,7 +343,9 @@ def previous_event(
end_time=stream_boundary.start_time,
include_end=(not stream_boundary.include_start),
)
parsed_filters = parse_filters(filters)
parsed_filters = parse_filters(query)
if parsed_filters is None:
return None
maybe_ethereum_transaction = (
query_ethereum_transactions(
db_session, previous_stream_boundary, parsed_filters