kopia lustrzana https://github.com/bugout-dev/moonstream
Add subclass of bugout provider for memPool data.
rodzic
53a16eb5cc
commit
192e98e682
|
@ -96,19 +96,6 @@ class BugoutEventProvider:
|
||||||
is_query_constrained and self.event_type not in query.subscription_types
|
is_query_constrained and self.event_type not in query.subscription_types
|
||||||
) or not relevant_subscriptions:
|
) or not relevant_subscriptions:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if self.event_type == "ethereum_txpool":
|
|
||||||
addresses = [
|
|
||||||
subscription.resource_data["address"]
|
|
||||||
for subscription in relevant_subscriptions
|
|
||||||
]
|
|
||||||
subscriptions_filters = []
|
|
||||||
for adress in addresses:
|
|
||||||
subscriptions_filters.extend(
|
|
||||||
[f"?#from_address:{adress}", f"?#to_address:{adress}"]
|
|
||||||
)
|
|
||||||
|
|
||||||
return subscriptions_filters
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_events(
|
def get_events(
|
||||||
|
@ -282,10 +269,46 @@ class BugoutEventProvider:
|
||||||
return self.entry_event(search_results.results[0])
|
return self.entry_event(search_results.results[0])
|
||||||
|
|
||||||
|
|
||||||
|
class BugoutMemPoolProvider(BugoutEventProvider):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
event_type: str,
|
||||||
|
tags: Optional[List[str]] = None,
|
||||||
|
batch_size: int = 100,
|
||||||
|
timeout: float = 30.0,
|
||||||
|
):
|
||||||
|
|
||||||
|
super().__init__(event_type, tags, batch_size, timeout)
|
||||||
|
|
||||||
|
def parse_filters(
|
||||||
|
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
||||||
|
) -> Optional[List[str]]:
|
||||||
|
|
||||||
|
is_query_constrained = query.subscription_types or query.subscriptions
|
||||||
|
relevant_subscriptions = user_subscriptions.get(self.event_type)
|
||||||
|
|
||||||
|
if (
|
||||||
|
is_query_constrained and self.event_type not in query.subscription_types
|
||||||
|
) or not relevant_subscriptions:
|
||||||
|
return None
|
||||||
|
addresses = [
|
||||||
|
subscription.resource_data["address"]
|
||||||
|
for subscription in relevant_subscriptions
|
||||||
|
]
|
||||||
|
subscriptions_filters = []
|
||||||
|
for adress in addresses:
|
||||||
|
subscriptions_filters.extend(
|
||||||
|
[f"?#from_address:{adress}", f"?#to_address:{adress}"]
|
||||||
|
)
|
||||||
|
|
||||||
|
return subscriptions_filters
|
||||||
|
|
||||||
|
|
||||||
whalewatch_provider = BugoutEventProvider(
|
whalewatch_provider = BugoutEventProvider(
|
||||||
event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"]
|
event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"]
|
||||||
)
|
)
|
||||||
|
|
||||||
ethereum_txpool_provider = BugoutEventProvider(
|
|
||||||
|
ethereum_txpool_provider = BugoutMemPoolProvider(
|
||||||
event_type="ethereum_txpool", tags=["client:ethereum-txpool-crawler-0"]
|
event_type="ethereum_txpool", tags=["client:ethereum-txpool-crawler-0"]
|
||||||
)
|
)
|
||||||
|
|
Ładowanie…
Reference in New Issue