kopia lustrzana https://github.com/bugout-dev/moonstream
Fixed issue with ethereum provider not looking at subscriptions
Before, the `ethereum_blockchain` provider was returning all blockchain transactions in the given stream boundary. Now, it only returns transactions that are relevant to the user's subscriptions.pull/105/head
rodzic
cc4dfa5ebf
commit
6959b78daf
|
@ -73,15 +73,18 @@ class Filters:
|
||||||
to_addresses: List[str] = field(default_factory=list)
|
to_addresses: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
def default_filters(subscriptions: List[BugoutResource]) -> List[str]:
|
def default_filters(subscriptions: List[BugoutResource]) -> Filters:
|
||||||
"""
|
"""
|
||||||
Default filter strings for the given list of subscriptions.
|
Default filter strings for the given list of subscriptions.
|
||||||
"""
|
"""
|
||||||
filters = []
|
filters = Filters()
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
subscription_address = subscription.resource_data.get("address")
|
subscription_address = cast(
|
||||||
|
Optional[str], subscription.resource_data.get("address")
|
||||||
|
)
|
||||||
if subscription_address is not None:
|
if subscription_address is not None:
|
||||||
filters.append(cast(str, subscription_address))
|
filters.from_addresses.append(subscription_address)
|
||||||
|
filters.to_addresses.append(subscription_address)
|
||||||
else:
|
else:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||||
|
@ -104,10 +107,20 @@ def parse_filters(
|
||||||
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
||||||
None indicating that the StreamQuery does not require any data from this provider.
|
None indicating that the StreamQuery does not require any data from this provider.
|
||||||
"""
|
"""
|
||||||
|
if query.subscription_types and not any(
|
||||||
|
subtype == event_type for subtype in query.subscription_types
|
||||||
|
):
|
||||||
|
return None
|
||||||
|
|
||||||
provider_subscriptions = user_subscriptions.get(event_type)
|
provider_subscriptions = user_subscriptions.get(event_type)
|
||||||
|
|
||||||
# If the user has no subscriptions to this event type, we do not have to return any data!
|
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||||
if not provider_subscriptions:
|
if not provider_subscriptions:
|
||||||
return None
|
return None
|
||||||
|
parsed_filters = default_filters(provider_subscriptions)
|
||||||
|
|
||||||
|
from_prefix_length = len("from:")
|
||||||
|
to_prefix_length = len("to:")
|
||||||
|
|
||||||
subscribed_addresses = {
|
subscribed_addresses = {
|
||||||
subscription.resource_data.get("address")
|
subscription.resource_data.get("address")
|
||||||
|
@ -115,38 +128,28 @@ def parse_filters(
|
||||||
if subscription.resource_data.get("address") is not None
|
if subscription.resource_data.get("address") is not None
|
||||||
}
|
}
|
||||||
|
|
||||||
requires_ethereum_blockchain_data = False
|
if query.subscriptions:
|
||||||
for subtype in query.subscription_types:
|
parsed_filters.from_addresses = []
|
||||||
if subtype == event_type:
|
parsed_filters.to_addresses = []
|
||||||
requires_ethereum_blockchain_data = True
|
for provider_type, raw_filter in query.subscriptions:
|
||||||
|
if provider_type != event_type:
|
||||||
|
continue
|
||||||
|
|
||||||
parsed_filters = Filters()
|
if raw_filter.startswith("from:"):
|
||||||
|
address = raw_filter[from_prefix_length:]
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.from_addresses.append(address)
|
||||||
|
elif raw_filter.startswith("to:"):
|
||||||
|
address = raw_filter[to_prefix_length:]
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.to_addresses.append(address)
|
||||||
|
else:
|
||||||
|
address = raw_filter
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.from_addresses.append(address)
|
||||||
|
parsed_filters.to_addresses.append(address)
|
||||||
|
|
||||||
from_slice_start = len("from:")
|
if not (parsed_filters.from_addresses or parsed_filters.to_addresses):
|
||||||
to_slice_start = len("to:")
|
|
||||||
|
|
||||||
for provider_type, raw_filter in query.subscriptions:
|
|
||||||
if provider_type != event_type:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if raw_filter.startswith("from:"):
|
|
||||||
address = raw_filter[from_slice_start:]
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.from_addresses.append(address)
|
|
||||||
elif raw_filter.startswith("to:"):
|
|
||||||
address = raw_filter[to_slice_start:]
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.to_addresses.append(address)
|
|
||||||
else:
|
|
||||||
address = raw_filter
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.from_addresses.append(address)
|
|
||||||
parsed_filters.to_addresses.append(address)
|
|
||||||
|
|
||||||
if parsed_filters.from_addresses or parsed_filters.to_addresses:
|
|
||||||
requires_ethereum_blockchain_data = True
|
|
||||||
|
|
||||||
if not requires_ethereum_blockchain_data:
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return parsed_filters
|
return parsed_filters
|
||||||
|
|
Ładowanie…
Reference in New Issue