From a9320b8521addd0adc59d0958cba4cb2bc2805aa Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Wed, 18 Aug 2021 11:06:30 -0700 Subject: [PATCH] Updated how filters are parsed from StreamQuery WIP --- .../providers/ethereum_blockchain.py | 8 +++---- backend/moonstream/stream_queries.py | 15 +++++++----- backend/moonstream/test_stream_queries.py | 24 ++++++++++++++----- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 9df47b78..ede33883 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, Field +from dataclasses import dataclass, field import logging from typing import cast, Dict, Any, List, Optional, Tuple @@ -13,8 +13,8 @@ from sqlalchemy import or_, and_, text from sqlalchemy.orm import Session, Query from .. import data - from ..settings import DEFAULT_STREAM_TIMEINTERVAL +from ..stream_queries import StreamQuery logger = logging.getLogger(__name__) @@ -52,8 +52,8 @@ class Filters: or a to address. """ - from_addresses: List[str] = Field(default_factory=list) - to_addresses: List[str] = Field(default_factory=list) + from_addresses: List[str] = field(default_factory=list) + to_addresses: List[str] = field(default_factory=list) def default_filters(subscriptions: List[BugoutResource]) -> List[str]: diff --git a/backend/moonstream/stream_queries.py b/backend/moonstream/stream_queries.py index 149121ce..16218751 100644 --- a/backend/moonstream/stream_queries.py +++ b/backend/moonstream/stream_queries.py @@ -29,8 +29,9 @@ def parse_query_string(q: str) -> StreamQuery: Args: 1. q - Query string. It is parsed as follows: a. Query string is tokenized (by splitting on whitespace). - b. Tokens of the form "type:" populate the subscription_types field of the resulting StreamQuery - c. Tokens of the form "sub::
populate the subscriptions field of the resulting StreamQuery + b. Tokens of the form "type:" populate the subscription_types field of the resulting StreamQuery. + c. Tokens of the form "sub:: populate the subscriptions field of the resulting StreamQuery. + This "" should be a valid filter for the event provider corresponding to the given subscription type. Returns: Parsed StreamQuery object. """ @@ -43,11 +44,13 @@ def parse_query_string(q: str) -> StreamQuery: subscription_types.append(token[len(SUBSCRIPTION_TYPE_PREFIX) :]) elif token.startswith(SUBSCRIPTION_PREFIX): contents = token[len(SUBSCRIPTION_PREFIX) :] - components = tuple(contents.split(SUBSCRIPTION_SEPARATOR)) - if len(components) == 2: - subscriptions.append(cast(Tuple[str, str], components)) - else: + components = contents.split(SUBSCRIPTION_SEPARATOR) + if len(components) < 2: logger.error(f"Invalid subscription token: {token}") + else: + subscriptions.append( + (components[0], SUBSCRIPTION_SEPARATOR.join(components[1:])) + ) else: logger.error(f"Invalid token: {token}") diff --git a/backend/moonstream/test_stream_queries.py b/backend/moonstream/test_stream_queries.py index 5119b7be..a3271efa 100644 --- a/backend/moonstream/test_stream_queries.py +++ b/backend/moonstream/test_stream_queries.py @@ -29,20 +29,26 @@ class TestParseQueryString(unittest.TestCase): ) def test_multiple_subscriptions(self): - q = "sub:ethereum_blockchain:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929" + q = "sub:ethereum_blockchain:from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:to:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929" query = parse_query_string(q) self.assertListEqual(query.subscription_types, []) self.assertListEqual( query.subscriptions, [ - ("ethereum_blockchain", "0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66"), - ("ethereum_blockchain", "0x2819c144d5946404c0516b6f817a960db37d4929"), + ( + "ethereum_blockchain", + "from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66", + ), + ( + "ethereum_blockchain", + "to:0x2819c144d5946404c0516b6f817a960db37d4929", + ), ("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"), ], ) def test_multiple_subscription_types_and_subscriptions(self): - q = "type:ethereum_whalewatch type:solana_blockchain sub:ethereum_blockchain:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929" + q = "type:ethereum_whalewatch type:solana_blockchain sub:ethereum_blockchain:from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:to:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929" query = parse_query_string(q) self.assertListEqual( query.subscription_types, ["ethereum_whalewatch", "solana_blockchain"] @@ -50,8 +56,14 @@ class TestParseQueryString(unittest.TestCase): self.assertListEqual( query.subscriptions, [ - ("ethereum_blockchain", "0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66"), - ("ethereum_blockchain", "0x2819c144d5946404c0516b6f817a960db37d4929"), + ( + "ethereum_blockchain", + "from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66", + ), + ( + "ethereum_blockchain", + "to:0x2819c144d5946404c0516b6f817a960db37d4929", + ), ("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"), ], )