kopia lustrzana https://github.com/bugout-dev/moonstream
rodzic
8df9cdeee1
commit
a9320b8521
|
@ -1,4 +1,4 @@
|
||||||
from dataclasses import dataclass, Field
|
from dataclasses import dataclass, field
|
||||||
import logging
|
import logging
|
||||||
from typing import cast, Dict, Any, List, Optional, Tuple
|
from typing import cast, Dict, Any, List, Optional, Tuple
|
||||||
|
|
||||||
|
@ -13,8 +13,8 @@ from sqlalchemy import or_, and_, text
|
||||||
from sqlalchemy.orm import Session, Query
|
from sqlalchemy.orm import Session, Query
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
|
|
||||||
from ..settings import DEFAULT_STREAM_TIMEINTERVAL
|
from ..settings import DEFAULT_STREAM_TIMEINTERVAL
|
||||||
|
from ..stream_queries import StreamQuery
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -52,8 +52,8 @@ class Filters:
|
||||||
or a to address.
|
or a to address.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from_addresses: List[str] = Field(default_factory=list)
|
from_addresses: List[str] = field(default_factory=list)
|
||||||
to_addresses: List[str] = Field(default_factory=list)
|
to_addresses: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
def default_filters(subscriptions: List[BugoutResource]) -> List[str]:
|
def default_filters(subscriptions: List[BugoutResource]) -> List[str]:
|
||||||
|
|
|
@ -29,8 +29,9 @@ def parse_query_string(q: str) -> StreamQuery:
|
||||||
Args:
|
Args:
|
||||||
1. q - Query string. It is parsed as follows:
|
1. q - Query string. It is parsed as follows:
|
||||||
a. Query string is tokenized (by splitting on whitespace).
|
a. Query string is tokenized (by splitting on whitespace).
|
||||||
b. Tokens of the form "type:<subscription_type>" populate the subscription_types field of the resulting StreamQuery
|
b. Tokens of the form "type:<subscription_type>" populate the subscription_types field of the resulting StreamQuery.
|
||||||
c. Tokens of the form "sub:<subscription_type>:<address> populate the subscriptions field of the resulting StreamQuery
|
c. Tokens of the form "sub:<subscription_type>:<filter> populate the subscriptions field of the resulting StreamQuery.
|
||||||
|
This "<filter>" should be a valid filter for the event provider corresponding to the given subscription type.
|
||||||
|
|
||||||
Returns: Parsed StreamQuery object.
|
Returns: Parsed StreamQuery object.
|
||||||
"""
|
"""
|
||||||
|
@ -43,11 +44,13 @@ def parse_query_string(q: str) -> StreamQuery:
|
||||||
subscription_types.append(token[len(SUBSCRIPTION_TYPE_PREFIX) :])
|
subscription_types.append(token[len(SUBSCRIPTION_TYPE_PREFIX) :])
|
||||||
elif token.startswith(SUBSCRIPTION_PREFIX):
|
elif token.startswith(SUBSCRIPTION_PREFIX):
|
||||||
contents = token[len(SUBSCRIPTION_PREFIX) :]
|
contents = token[len(SUBSCRIPTION_PREFIX) :]
|
||||||
components = tuple(contents.split(SUBSCRIPTION_SEPARATOR))
|
components = contents.split(SUBSCRIPTION_SEPARATOR)
|
||||||
if len(components) == 2:
|
if len(components) < 2:
|
||||||
subscriptions.append(cast(Tuple[str, str], components))
|
|
||||||
else:
|
|
||||||
logger.error(f"Invalid subscription token: {token}")
|
logger.error(f"Invalid subscription token: {token}")
|
||||||
|
else:
|
||||||
|
subscriptions.append(
|
||||||
|
(components[0], SUBSCRIPTION_SEPARATOR.join(components[1:]))
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.error(f"Invalid token: {token}")
|
logger.error(f"Invalid token: {token}")
|
||||||
|
|
||||||
|
|
|
@ -29,20 +29,26 @@ class TestParseQueryString(unittest.TestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_multiple_subscriptions(self):
|
def test_multiple_subscriptions(self):
|
||||||
q = "sub:ethereum_blockchain:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929"
|
q = "sub:ethereum_blockchain:from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:to:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929"
|
||||||
query = parse_query_string(q)
|
query = parse_query_string(q)
|
||||||
self.assertListEqual(query.subscription_types, [])
|
self.assertListEqual(query.subscription_types, [])
|
||||||
self.assertListEqual(
|
self.assertListEqual(
|
||||||
query.subscriptions,
|
query.subscriptions,
|
||||||
[
|
[
|
||||||
("ethereum_blockchain", "0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66"),
|
(
|
||||||
("ethereum_blockchain", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
"ethereum_blockchain",
|
||||||
|
"from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"ethereum_blockchain",
|
||||||
|
"to:0x2819c144d5946404c0516b6f817a960db37d4929",
|
||||||
|
),
|
||||||
("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_multiple_subscription_types_and_subscriptions(self):
|
def test_multiple_subscription_types_and_subscriptions(self):
|
||||||
q = "type:ethereum_whalewatch type:solana_blockchain sub:ethereum_blockchain:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929"
|
q = "type:ethereum_whalewatch type:solana_blockchain sub:ethereum_blockchain:from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66 sub:ethereum_blockchain:to:0x2819c144d5946404c0516b6f817a960db37d4929 sub:ethereum_txpool:0x2819c144d5946404c0516b6f817a960db37d4929"
|
||||||
query = parse_query_string(q)
|
query = parse_query_string(q)
|
||||||
self.assertListEqual(
|
self.assertListEqual(
|
||||||
query.subscription_types, ["ethereum_whalewatch", "solana_blockchain"]
|
query.subscription_types, ["ethereum_whalewatch", "solana_blockchain"]
|
||||||
|
@ -50,8 +56,14 @@ class TestParseQueryString(unittest.TestCase):
|
||||||
self.assertListEqual(
|
self.assertListEqual(
|
||||||
query.subscriptions,
|
query.subscriptions,
|
||||||
[
|
[
|
||||||
("ethereum_blockchain", "0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66"),
|
(
|
||||||
("ethereum_blockchain", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
"ethereum_blockchain",
|
||||||
|
"from:0xbb2569ca55552fb4c1d73ec536e06a620c3d3d66",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"ethereum_blockchain",
|
||||||
|
"to:0x2819c144d5946404c0516b6f817a960db37d4929",
|
||||||
|
),
|
||||||
("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
("ethereum_txpool", "0x2819c144d5946404c0516b6f817a960db37d4929"),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
Ładowanie…
Reference in New Issue