Merge pull request #193 from bugout-dev/extend-bugout-provider-tags-parser

Extend bugout provider tags parser
pull/199/head
Neeraj Kashyap 2021-08-27 09:14:09 -07:00 zatwierdzone przez GitHub
commit 20db8d4ef9
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
1 zmienionych plików z 38 dodań i 1 usunięć

Wyświetl plik

@ -91,6 +91,7 @@ class BugoutEventProvider:
"""
is_query_constrained = query.subscription_types or query.subscriptions
relevant_subscriptions = user_subscriptions.get(self.event_type)
if (
is_query_constrained and self.event_type not in query.subscription_types
) or not relevant_subscriptions:
@ -268,10 +269,46 @@ class BugoutEventProvider:
return self.entry_event(search_results.results[0])
class EthereumTXPoolProvider(BugoutEventProvider):
def __init__(
self,
event_type: str,
tags: Optional[List[str]] = None,
batch_size: int = 100,
timeout: float = 30.0,
):
super().__init__(event_type, tags, batch_size, timeout)
def parse_filters(
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
) -> Optional[List[str]]:
is_query_constrained = query.subscription_types or query.subscriptions
relevant_subscriptions = user_subscriptions.get(self.event_type)
if (
is_query_constrained and self.event_type not in query.subscription_types
) or not relevant_subscriptions:
return None
addresses = [
subscription.resource_data["address"]
for subscription in relevant_subscriptions
]
subscriptions_filters = []
for adress in addresses:
subscriptions_filters.extend(
[f"?#from_address:{adress}", f"?#to_address:{adress}"]
)
return subscriptions_filters
whalewatch_provider = BugoutEventProvider(
event_type="ethereum_whalewatch", tags=["crawl_type:ethereum_trending"]
)
ethereum_txpool_provider = BugoutEventProvider(
ethereum_txpool_provider = EthereumTXPoolProvider(
event_type="ethereum_txpool", tags=["client:ethereum-txpool-crawler-0"]
)