kopia lustrzana https://github.com/bugout-dev/moonstream
Add event provider and rewrite existing provider.
rodzic
0f3f4886e6
commit
cdb59e4857
|
@ -170,7 +170,6 @@ class GetEventsResponse(BaseModel):
|
||||||
class TxinfoEthereumBlockchainRequest(BaseModel):
|
class TxinfoEthereumBlockchainRequest(BaseModel):
|
||||||
tx: EthereumTransaction
|
tx: EthereumTransaction
|
||||||
|
|
||||||
|
|
||||||
class EthereumSmartContractSourceInfo(BaseModel):
|
class EthereumSmartContractSourceInfo(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
source_code: str
|
source_code: str
|
||||||
|
|
|
@ -32,7 +32,7 @@ from bugout.app import Bugout
|
||||||
from bugout.data import BugoutResource
|
from bugout.data import BugoutResource
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from . import bugout, ethereum_blockchain
|
from . import bugout, transactions, moonworm_provider
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..stream_queries import StreamQuery
|
from ..stream_queries import StreamQuery
|
||||||
|
|
||||||
|
@ -47,8 +47,13 @@ class ReceivingEventsException(Exception):
|
||||||
|
|
||||||
|
|
||||||
event_providers: Dict[str, Any] = {
|
event_providers: Dict[str, Any] = {
|
||||||
ethereum_blockchain.event_type: ethereum_blockchain,
|
moonworm_provider.EthereumMoonwormProvider.event_type: moonworm_provider.EthereumMoonwormProvider,
|
||||||
bugout.whalewatch_provider.event_type: bugout.whalewatch_provider,
|
moonworm_provider.PolygonMoonwormProvider.event_type: moonworm_provider.PolygonMoonwormProvider,
|
||||||
|
transactions.ErhereumTransactions.event_type: transactions.ErhereumTransactions,
|
||||||
|
transactions.PolygonTransactions.event_type: transactions.PolygonTransactions,
|
||||||
|
bugout.polygon_whalewatch_provider.event_type: bugout.polygon_whalewatch_provider,
|
||||||
|
bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider,
|
||||||
|
bugout.ethereum_whalewatch_provider.event_type: bugout.ethereum_whalewatch_provider,
|
||||||
bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider,
|
bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,6 +80,7 @@ def get_events(
|
||||||
max_workers=max_threads, thread_name_prefix="event_providers_"
|
max_workers=max_threads, thread_name_prefix="event_providers_"
|
||||||
) as executor:
|
) as executor:
|
||||||
for provider_name, provider in event_providers.items():
|
for provider_name, provider in event_providers.items():
|
||||||
|
|
||||||
futures[provider_name] = executor.submit(
|
futures[provider_name] = executor.submit(
|
||||||
provider.get_events,
|
provider.get_events,
|
||||||
db_session,
|
db_session,
|
||||||
|
@ -132,7 +138,9 @@ def latest_events(
|
||||||
with ThreadPoolExecutor(
|
with ThreadPoolExecutor(
|
||||||
max_workers=max_threads, thread_name_prefix="event_providers_"
|
max_workers=max_threads, thread_name_prefix="event_providers_"
|
||||||
) as executor:
|
) as executor:
|
||||||
|
|
||||||
for provider_name, provider in event_providers.items():
|
for provider_name, provider in event_providers.items():
|
||||||
|
|
||||||
futures[provider_name] = executor.submit(
|
futures[provider_name] = executor.submit(
|
||||||
provider.latest_events,
|
provider.latest_events,
|
||||||
db_session,
|
db_session,
|
||||||
|
@ -242,6 +250,7 @@ def previous_event(
|
||||||
max_workers=max_threads, thread_name_prefix="event_providers_"
|
max_workers=max_threads, thread_name_prefix="event_providers_"
|
||||||
) as executor:
|
) as executor:
|
||||||
for provider_name, provider in event_providers.items():
|
for provider_name, provider in event_providers.items():
|
||||||
|
|
||||||
futures[provider_name] = executor.submit(
|
futures[provider_name] = executor.submit(
|
||||||
provider.previous_event,
|
provider.previous_event,
|
||||||
db_session,
|
db_session,
|
||||||
|
|
|
@ -278,7 +278,7 @@ class BugoutEventProvider:
|
||||||
return self.entry_event(search_results.results[0])
|
return self.entry_event(search_results.results[0])
|
||||||
|
|
||||||
|
|
||||||
class EthereumTXPoolProvider(BugoutEventProvider):
|
class TXPoolProvider(BugoutEventProvider):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
event_type: str,
|
event_type: str,
|
||||||
|
@ -364,7 +364,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour
|
||||||
4. Amount (in WEI) received
|
4. Amount (in WEI) received
|
||||||
|
|
||||||
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||||
whalewatch_provider = PublicDataProvider(
|
ethereum_whalewatch_provider = PublicDataProvider(
|
||||||
event_type="ethereum_whalewatch",
|
event_type="ethereum_whalewatch",
|
||||||
description=whalewatch_description,
|
description=whalewatch_description,
|
||||||
default_time_interval_seconds=310,
|
default_time_interval_seconds=310,
|
||||||
|
@ -372,12 +372,20 @@ whalewatch_provider = PublicDataProvider(
|
||||||
tags=["crawl_type:ethereum_trending"],
|
tags=["crawl_type:ethereum_trending"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
polygon_whalewatch_provider = PublicDataProvider(
|
||||||
|
event_type="polygon_whalewatch",
|
||||||
|
description=whalewatch_description,
|
||||||
|
default_time_interval_seconds=310,
|
||||||
|
estimated_events_per_time_interval=1,
|
||||||
|
tags=["crawl_type:polygon_trending"],
|
||||||
|
)
|
||||||
|
|
||||||
ethereum_txpool_description = """Event provider for Ethereum transaction pool.
|
ethereum_txpool_description = """Event provider for Ethereum transaction pool.
|
||||||
|
|
||||||
Shows the latest events (from the previous hour) in the Ethereum transaction pool.
|
Shows the latest events (from the previous hour) in the Ethereum transaction pool.
|
||||||
|
|
||||||
To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||||
ethereum_txpool_provider = EthereumTXPoolProvider(
|
ethereum_txpool_provider = TXPoolProvider(
|
||||||
event_type="ethereum_txpool",
|
event_type="ethereum_txpool",
|
||||||
description=ethereum_txpool_description,
|
description=ethereum_txpool_description,
|
||||||
default_time_interval_seconds=5,
|
default_time_interval_seconds=5,
|
||||||
|
@ -385,6 +393,14 @@ ethereum_txpool_provider = EthereumTXPoolProvider(
|
||||||
tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"],
|
tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
polygon_txpool_provider = TXPoolProvider(
|
||||||
|
event_type="polygon_txpool",
|
||||||
|
description=ethereum_txpool_description,
|
||||||
|
default_time_interval_seconds=5,
|
||||||
|
estimated_events_per_time_interval=50,
|
||||||
|
tags=[f"client:polygon_HUMBUG_CLIENT_ID"],
|
||||||
|
)
|
||||||
|
|
||||||
nft_summary_description = """Event provider for NFT market summaries.
|
nft_summary_description = """Event provider for NFT market summaries.
|
||||||
|
|
||||||
This provider periodically generates NFT market summaries for the last hour of market activity.
|
This provider periodically generates NFT market summaries for the last hour of market activity.
|
||||||
|
|
|
@ -1,437 +0,0 @@
|
||||||
from dataclasses import dataclass, field
|
|
||||||
import logging
|
|
||||||
from typing import cast, Dict, Any, List, Optional, Tuple
|
|
||||||
|
|
||||||
from bugout.app import Bugout
|
|
||||||
from bugout.data import BugoutResource
|
|
||||||
|
|
||||||
from moonstreamdb.models import (
|
|
||||||
EthereumBlock,
|
|
||||||
EthereumTransaction,
|
|
||||||
EthereumLabel,
|
|
||||||
)
|
|
||||||
from sqlalchemy import or_, and_, text
|
|
||||||
from sqlalchemy.orm import Session, Query
|
|
||||||
from sqlalchemy.sql.functions import user
|
|
||||||
|
|
||||||
|
|
||||||
from .. import data
|
|
||||||
from ..stream_boundaries import validate_stream_boundary
|
|
||||||
from ..stream_queries import StreamQuery
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logger.setLevel(logging.WARN)
|
|
||||||
|
|
||||||
|
|
||||||
event_type = "ethereum_blockchain"
|
|
||||||
allowed_tags = ["tag:erc721"]
|
|
||||||
|
|
||||||
description = f"""Event provider for transactions from the Ethereum blockchain.
|
|
||||||
|
|
||||||
To restrict your queries to this provider, add a filter of \"type:{event_type}\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
|
||||||
|
|
||||||
default_time_interval_seconds: int = 5 * 60
|
|
||||||
|
|
||||||
# 200 transactions per block, 4 blocks per minute.
|
|
||||||
estimated_events_per_time_interval: float = 5 * 800
|
|
||||||
|
|
||||||
|
|
||||||
def validate_subscription(
|
|
||||||
subscription_resource_data: data.SubscriptionResourceData,
|
|
||||||
) -> Tuple[bool, List[str]]:
|
|
||||||
"""
|
|
||||||
Checks that the subscription represents a valid subscription to an Ethereum address.
|
|
||||||
|
|
||||||
NOTE: Currently, this function only checks that the address is a nonempty string.
|
|
||||||
"""
|
|
||||||
errors: List[str] = []
|
|
||||||
if subscription_resource_data.address == "":
|
|
||||||
errors.append("address is empty")
|
|
||||||
|
|
||||||
if subscription_resource_data.subscription_type_id != event_type:
|
|
||||||
errors.append(
|
|
||||||
f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}."
|
|
||||||
)
|
|
||||||
|
|
||||||
if errors:
|
|
||||||
return False, errors
|
|
||||||
return True, errors
|
|
||||||
|
|
||||||
|
|
||||||
def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None:
|
|
||||||
"""
|
|
||||||
Stream boundary validator for the ethereum_blockchain event provider.
|
|
||||||
|
|
||||||
Checks that stream boundaries do not exceed periods of greater than 2 hours.
|
|
||||||
|
|
||||||
Raises an error for invalid stream boundaries, else returns None.
|
|
||||||
"""
|
|
||||||
valid_period_seconds = 2 * 60 * 60
|
|
||||||
validate_stream_boundary(
|
|
||||||
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Filters:
|
|
||||||
"""
|
|
||||||
ethereum_blockchain event filters act as a disjunction over queries specifying a from address
|
|
||||||
or a to address.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from_addresses: List[str] = field(default_factory=list)
|
|
||||||
to_addresses: List[str] = field(default_factory=list)
|
|
||||||
labels: List[str] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
def default_filters(subscriptions: List[BugoutResource]) -> Filters:
|
|
||||||
"""
|
|
||||||
Default filter strings for the given list of subscriptions.
|
|
||||||
"""
|
|
||||||
filters = Filters()
|
|
||||||
for subscription in subscriptions:
|
|
||||||
subscription_address = cast(
|
|
||||||
Optional[str], subscription.resource_data.get("address")
|
|
||||||
)
|
|
||||||
if subscription_address is not None:
|
|
||||||
if subscription_address in allowed_tags:
|
|
||||||
filters.labels.append(subscription_address.split(":")[1])
|
|
||||||
else:
|
|
||||||
filters.from_addresses.append(subscription_address)
|
|
||||||
filters.to_addresses.append(subscription_address)
|
|
||||||
else:
|
|
||||||
logger.warn(
|
|
||||||
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
|
||||||
)
|
|
||||||
return filters
|
|
||||||
|
|
||||||
|
|
||||||
def parse_filters(
|
|
||||||
query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
|
||||||
) -> Optional[Filters]:
|
|
||||||
"""
|
|
||||||
Passes raw filter strings into a Filters object which is used to construct a database query
|
|
||||||
for ethereum transactions.
|
|
||||||
|
|
||||||
Filter syntax is:
|
|
||||||
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
|
|
||||||
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
|
|
||||||
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
|
|
||||||
|
|
||||||
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
|
||||||
None indicating that the StreamQuery does not require any data from this provider.
|
|
||||||
"""
|
|
||||||
if query.subscription_types and not any(
|
|
||||||
subtype == event_type for subtype in query.subscription_types
|
|
||||||
):
|
|
||||||
return None
|
|
||||||
|
|
||||||
provider_subscriptions = user_subscriptions.get(event_type)
|
|
||||||
|
|
||||||
# If the user has no subscriptions to this event type, we do not have to return any data!
|
|
||||||
if not provider_subscriptions:
|
|
||||||
return None
|
|
||||||
parsed_filters = default_filters(provider_subscriptions)
|
|
||||||
|
|
||||||
from_prefix_length = len("from:")
|
|
||||||
to_prefix_length = len("to:")
|
|
||||||
|
|
||||||
subscribed_addresses = {
|
|
||||||
subscription.resource_data.get("address")
|
|
||||||
for subscription in provider_subscriptions
|
|
||||||
if subscription.resource_data.get("address") is not None
|
|
||||||
}
|
|
||||||
|
|
||||||
if query.subscriptions:
|
|
||||||
parsed_filters.from_addresses = []
|
|
||||||
parsed_filters.to_addresses = []
|
|
||||||
for provider_type, raw_filter in query.subscriptions:
|
|
||||||
if provider_type != event_type:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if raw_filter.startswith("from:"):
|
|
||||||
address = raw_filter[from_prefix_length:]
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.from_addresses.append(address)
|
|
||||||
elif raw_filter.startswith("to:"):
|
|
||||||
address = raw_filter[to_prefix_length:]
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.to_addresses.append(address)
|
|
||||||
else:
|
|
||||||
address = raw_filter
|
|
||||||
if address in subscribed_addresses:
|
|
||||||
parsed_filters.from_addresses.append(address)
|
|
||||||
parsed_filters.to_addresses.append(address)
|
|
||||||
|
|
||||||
if not (
|
|
||||||
parsed_filters.from_addresses
|
|
||||||
or parsed_filters.to_addresses
|
|
||||||
or parsed_filters.labels
|
|
||||||
):
|
|
||||||
return None
|
|
||||||
|
|
||||||
return parsed_filters
|
|
||||||
|
|
||||||
|
|
||||||
def query_ethereum_transactions(
|
|
||||||
db_session: Session,
|
|
||||||
stream_boundary: data.StreamBoundary,
|
|
||||||
parsed_filters: Filters,
|
|
||||||
) -> Query:
|
|
||||||
"""
|
|
||||||
Builds a database query for Ethereum transactions that occurred within the window of time that
|
|
||||||
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
|
||||||
"""
|
|
||||||
query = db_session.query(
|
|
||||||
EthereumTransaction.hash,
|
|
||||||
EthereumTransaction.block_number,
|
|
||||||
EthereumTransaction.from_address,
|
|
||||||
EthereumTransaction.to_address,
|
|
||||||
EthereumTransaction.gas,
|
|
||||||
EthereumTransaction.gas_price,
|
|
||||||
EthereumTransaction.input,
|
|
||||||
EthereumTransaction.nonce,
|
|
||||||
EthereumTransaction.value,
|
|
||||||
EthereumBlock.timestamp.label("timestamp"),
|
|
||||||
).join(
|
|
||||||
EthereumBlock,
|
|
||||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
|
||||||
)
|
|
||||||
|
|
||||||
if stream_boundary.include_start:
|
|
||||||
query = query.filter(EthereumBlock.timestamp >= stream_boundary.start_time)
|
|
||||||
else:
|
|
||||||
query = query.filter(EthereumBlock.timestamp > stream_boundary.start_time)
|
|
||||||
|
|
||||||
if stream_boundary.end_time is not None:
|
|
||||||
if stream_boundary.include_end:
|
|
||||||
query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time)
|
|
||||||
else:
|
|
||||||
query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time)
|
|
||||||
|
|
||||||
# We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address
|
|
||||||
address_clauses = []
|
|
||||||
|
|
||||||
address_clauses.extend(
|
|
||||||
[
|
|
||||||
EthereumTransaction.from_address == address
|
|
||||||
for address in parsed_filters.from_addresses
|
|
||||||
]
|
|
||||||
+ [
|
|
||||||
EthereumTransaction.to_address == address
|
|
||||||
for address in parsed_filters.to_addresses
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
labels_clause = []
|
|
||||||
|
|
||||||
if parsed_filters.labels:
|
|
||||||
label_clause = (
|
|
||||||
db_session.query(EthereumLabel)
|
|
||||||
.filter(
|
|
||||||
or_(
|
|
||||||
*[
|
|
||||||
EthereumLabel.label.contains(label)
|
|
||||||
for label in list(set(parsed_filters.labels))
|
|
||||||
]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.exists()
|
|
||||||
)
|
|
||||||
labels_clause.append(label_clause)
|
|
||||||
|
|
||||||
subscriptions_clause = address_clauses + labels_clause
|
|
||||||
|
|
||||||
if subscriptions_clause:
|
|
||||||
query = query.filter(or_(*subscriptions_clause))
|
|
||||||
|
|
||||||
return query
|
|
||||||
|
|
||||||
|
|
||||||
def ethereum_transaction_event(row: Tuple) -> data.Event:
|
|
||||||
"""
|
|
||||||
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
|
||||||
into an Event object.
|
|
||||||
"""
|
|
||||||
(
|
|
||||||
hash,
|
|
||||||
block_number,
|
|
||||||
from_address,
|
|
||||||
to_address,
|
|
||||||
gas,
|
|
||||||
gas_price,
|
|
||||||
input,
|
|
||||||
nonce,
|
|
||||||
value,
|
|
||||||
timestamp,
|
|
||||||
) = row
|
|
||||||
return data.Event(
|
|
||||||
event_type=event_type,
|
|
||||||
event_timestamp=timestamp,
|
|
||||||
event_data={
|
|
||||||
"hash": hash,
|
|
||||||
"block_number": block_number,
|
|
||||||
"from": from_address,
|
|
||||||
"to": to_address,
|
|
||||||
"gas": gas,
|
|
||||||
"gas_price": gas_price,
|
|
||||||
"input": input,
|
|
||||||
"nonce": nonce,
|
|
||||||
"value": value,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_events(
|
|
||||||
db_session: Session,
|
|
||||||
bugout_client: Bugout,
|
|
||||||
data_journal_id: str,
|
|
||||||
data_access_token: str,
|
|
||||||
stream_boundary: data.StreamBoundary,
|
|
||||||
query: StreamQuery,
|
|
||||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
||||||
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
|
||||||
"""
|
|
||||||
Returns ethereum_blockchain events for the given addresses in the time period represented
|
|
||||||
by stream_boundary.
|
|
||||||
|
|
||||||
If the query does not require any data from this provider, returns None.
|
|
||||||
"""
|
|
||||||
stream_boundary_validator(stream_boundary)
|
|
||||||
|
|
||||||
parsed_filters = parse_filters(query, user_subscriptions)
|
|
||||||
if parsed_filters is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
ethereum_transactions = query_ethereum_transactions(
|
|
||||||
db_session, stream_boundary, parsed_filters
|
|
||||||
)
|
|
||||||
|
|
||||||
ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc"))
|
|
||||||
|
|
||||||
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
|
||||||
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
|
||||||
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
|
||||||
# do anything to help them get data from teh backend.
|
|
||||||
# The error message on the API side when the statement times out:
|
|
||||||
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
|
||||||
events: List[data.Event] = [
|
|
||||||
ethereum_transaction_event(row) for row in ethereum_transactions
|
|
||||||
]
|
|
||||||
|
|
||||||
if (stream_boundary.end_time is None) and events:
|
|
||||||
stream_boundary.end_time = events[0].event_timestamp
|
|
||||||
stream_boundary.include_end = True
|
|
||||||
|
|
||||||
return stream_boundary, events
|
|
||||||
|
|
||||||
|
|
||||||
def latest_events(
|
|
||||||
db_session: Session,
|
|
||||||
bugout_client: Bugout,
|
|
||||||
data_journal_id: str,
|
|
||||||
data_access_token: str,
|
|
||||||
query: StreamQuery,
|
|
||||||
num_events: int,
|
|
||||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
||||||
) -> Optional[List[data.Event]]:
|
|
||||||
"""
|
|
||||||
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
|
||||||
by the given filters.
|
|
||||||
|
|
||||||
If the query does not require any data from this provider, returns None.
|
|
||||||
"""
|
|
||||||
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
|
||||||
|
|
||||||
stream_boundary = data.StreamBoundary(
|
|
||||||
start_time=0, include_start=True, end_time=None, include_end=False
|
|
||||||
)
|
|
||||||
parsed_filters = parse_filters(query, user_subscriptions)
|
|
||||||
if parsed_filters is None:
|
|
||||||
return None
|
|
||||||
ethereum_transactions = (
|
|
||||||
query_ethereum_transactions(db_session, stream_boundary, parsed_filters)
|
|
||||||
.order_by(text("timestamp desc"))
|
|
||||||
.limit(num_events)
|
|
||||||
)
|
|
||||||
|
|
||||||
return [ethereum_transaction_event(row) for row in ethereum_transactions]
|
|
||||||
|
|
||||||
|
|
||||||
def next_event(
|
|
||||||
db_session: Session,
|
|
||||||
bugout_client: Bugout,
|
|
||||||
data_journal_id: str,
|
|
||||||
data_access_token: str,
|
|
||||||
stream_boundary: data.StreamBoundary,
|
|
||||||
query: StreamQuery,
|
|
||||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
||||||
) -> Optional[data.Event]:
|
|
||||||
"""
|
|
||||||
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
|
||||||
query from this provider.
|
|
||||||
|
|
||||||
If the query does not require any data from this provider, returns None.
|
|
||||||
"""
|
|
||||||
assert (
|
|
||||||
stream_boundary.end_time is not None
|
|
||||||
), "Cannot return next event for up-to-date stream boundary"
|
|
||||||
next_stream_boundary = data.StreamBoundary(
|
|
||||||
start_time=stream_boundary.end_time,
|
|
||||||
include_start=(not stream_boundary.include_end),
|
|
||||||
end_time=None,
|
|
||||||
include_end=False,
|
|
||||||
)
|
|
||||||
parsed_filters = parse_filters(query, user_subscriptions)
|
|
||||||
if parsed_filters is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
maybe_ethereum_transaction = (
|
|
||||||
query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters)
|
|
||||||
.order_by(text("timestamp asc"))
|
|
||||||
.limit(1)
|
|
||||||
).one_or_none()
|
|
||||||
|
|
||||||
if maybe_ethereum_transaction is None:
|
|
||||||
return None
|
|
||||||
return ethereum_transaction_event(maybe_ethereum_transaction)
|
|
||||||
|
|
||||||
|
|
||||||
def previous_event(
|
|
||||||
db_session: Session,
|
|
||||||
bugout_client: Bugout,
|
|
||||||
data_journal_id: str,
|
|
||||||
data_access_token: str,
|
|
||||||
stream_boundary: data.StreamBoundary,
|
|
||||||
query: StreamQuery,
|
|
||||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
|
||||||
) -> Optional[data.Event]:
|
|
||||||
"""
|
|
||||||
Returns the latest event occuring before the given stream boundary corresponding to the given
|
|
||||||
query from this provider.
|
|
||||||
|
|
||||||
If the query does not require any data from this provider, returns None.
|
|
||||||
"""
|
|
||||||
assert (
|
|
||||||
stream_boundary.start_time != 0
|
|
||||||
), "Cannot return previous event for stream starting at time 0"
|
|
||||||
previous_stream_boundary = data.StreamBoundary(
|
|
||||||
start_time=0,
|
|
||||||
include_start=True,
|
|
||||||
end_time=stream_boundary.start_time,
|
|
||||||
include_end=(not stream_boundary.include_start),
|
|
||||||
)
|
|
||||||
parsed_filters = parse_filters(query, user_subscriptions)
|
|
||||||
if parsed_filters is None:
|
|
||||||
return None
|
|
||||||
maybe_ethereum_transaction = (
|
|
||||||
query_ethereum_transactions(
|
|
||||||
db_session, previous_stream_boundary, parsed_filters
|
|
||||||
)
|
|
||||||
.order_by(text("timestamp desc"))
|
|
||||||
.limit(1)
|
|
||||||
).one_or_none()
|
|
||||||
if maybe_ethereum_transaction is None:
|
|
||||||
return None
|
|
||||||
return ethereum_transaction_event(maybe_ethereum_transaction)
|
|
|
@ -0,0 +1,454 @@
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
import logging
|
||||||
|
from typing import cast, Dict, Any, List, Optional, Tuple
|
||||||
|
|
||||||
|
from bugout.app import Bugout
|
||||||
|
from bugout.data import BugoutResource
|
||||||
|
|
||||||
|
from moonstreamdb.blockchain import (
|
||||||
|
get_label_model,
|
||||||
|
AvailableBlockchainType,
|
||||||
|
)
|
||||||
|
from sqlalchemy import or_, and_, text
|
||||||
|
from sqlalchemy.orm import Session, Query
|
||||||
|
from sqlalchemy.sql.expression import label
|
||||||
|
|
||||||
|
|
||||||
|
from .. import data
|
||||||
|
from ..stream_boundaries import validate_stream_boundary
|
||||||
|
from ..stream_queries import StreamQuery
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.WARN)
|
||||||
|
|
||||||
|
|
||||||
|
ethereum_event_type = "ethereum_blockchain"
|
||||||
|
polygon_event_type = "polygon_blockchain"
|
||||||
|
allowed_tags = ["tag:erc721"]
|
||||||
|
|
||||||
|
description = f"""Event provider for transactions from the Ethereum blockchain.
|
||||||
|
|
||||||
|
To restrict your queries to this provider, add a filter of \"type:{ethereum_event_type}\{polygon_event_type}\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||||
|
|
||||||
|
default_time_interval_seconds: int = 5 * 60
|
||||||
|
|
||||||
|
# 200 transactions per block, 4 blocks per minute.
|
||||||
|
estimated_events_per_time_interval: float = 5 * 800
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ArgsFilters:
|
||||||
|
name: str
|
||||||
|
value: Any
|
||||||
|
type: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class LabelsFilters:
|
||||||
|
|
||||||
|
name: str
|
||||||
|
type: str
|
||||||
|
args: List[ArgsFilters] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AddressFilters:
|
||||||
|
|
||||||
|
address: str
|
||||||
|
labels: List[LabelsFilters] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Filters:
|
||||||
|
|
||||||
|
addresses: List[AddressFilters] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
def python_type(expected_type: str) -> Any:
|
||||||
|
if expected_type.startswith("int"):
|
||||||
|
return int
|
||||||
|
elif expected_type.startswith("str"):
|
||||||
|
return str
|
||||||
|
elif expected_type == "float":
|
||||||
|
return float
|
||||||
|
elif expected_type == "bool":
|
||||||
|
return bool
|
||||||
|
elif expected_type == "tuple":
|
||||||
|
return list
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Cannot convert to python type {expected_type}")
|
||||||
|
|
||||||
|
|
||||||
|
class MoonwormProvider:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
event_type: str,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
description: str,
|
||||||
|
streamboaundary_range_limit: int,
|
||||||
|
):
|
||||||
|
self.event_type = event_type
|
||||||
|
self.blockchain = blockchain
|
||||||
|
self.description = description
|
||||||
|
self.valid_period_seconds = streamboaundary_range_limit
|
||||||
|
|
||||||
|
def default_filters(self, subscriptions: List[BugoutResource]) -> Filters:
|
||||||
|
"""
|
||||||
|
Default filter strings for the given list of subscriptions.
|
||||||
|
"""
|
||||||
|
filters = Filters()
|
||||||
|
for subscription in subscriptions:
|
||||||
|
subscription_address = cast(
|
||||||
|
Optional[str], subscription.resource_data.get("address")
|
||||||
|
)
|
||||||
|
if subscription_address is not None:
|
||||||
|
|
||||||
|
# How apply labels?
|
||||||
|
filters.addresses.append(
|
||||||
|
AddressFilters(address=subscription_address, labels=[])
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warn(
|
||||||
|
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||||
|
)
|
||||||
|
return filters
|
||||||
|
|
||||||
|
def events(self, row: Tuple) -> data.Event:
|
||||||
|
"""
|
||||||
|
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
||||||
|
into an Event object.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
block_number,
|
||||||
|
address,
|
||||||
|
transaction_hash,
|
||||||
|
label_data,
|
||||||
|
block_timestamp,
|
||||||
|
log_index,
|
||||||
|
created_at,
|
||||||
|
) = row
|
||||||
|
return data.Event(
|
||||||
|
event_type=self.event_type,
|
||||||
|
event_timestamp=block_timestamp,
|
||||||
|
event_data={
|
||||||
|
"hash": transaction_hash,
|
||||||
|
"block_number": block_number,
|
||||||
|
"address": address,
|
||||||
|
"label_data": label_data,
|
||||||
|
"log_index": log_index,
|
||||||
|
"created_at": created_at,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def parse_filters(
|
||||||
|
self,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[Filters]:
|
||||||
|
"""
|
||||||
|
Passes raw filter strings into a Filters object which is used to construct a database query
|
||||||
|
for ethereum transactions.
|
||||||
|
|
||||||
|
Filter syntax is:
|
||||||
|
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
|
||||||
|
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
|
||||||
|
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
|
||||||
|
|
||||||
|
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
||||||
|
None indicating that the StreamQuery does not require any data from this provider.
|
||||||
|
"""
|
||||||
|
if query.subscription_types and not any(
|
||||||
|
subtype == self.event_type for subtype in query.subscription_types
|
||||||
|
):
|
||||||
|
return None
|
||||||
|
|
||||||
|
provider_subscriptions = user_subscriptions.get(self.event_type)
|
||||||
|
|
||||||
|
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||||
|
if not provider_subscriptions:
|
||||||
|
return None
|
||||||
|
parsed_filters = self.default_filters(provider_subscriptions)
|
||||||
|
|
||||||
|
# from_prefix_length = len("from:")
|
||||||
|
# to_prefix_length = len("to:")
|
||||||
|
|
||||||
|
# subscribed_addresses = {
|
||||||
|
# subscription.resource_data.get("address")
|
||||||
|
# for subscription in provider_subscriptions
|
||||||
|
# if subscription.resource_data.get("address") is not None
|
||||||
|
# }
|
||||||
|
|
||||||
|
# Need check that we can expand logic of parsef filters from query params but it will difficult
|
||||||
|
|
||||||
|
# if query.subscriptions:
|
||||||
|
# parsed_filters.from_addresses = []
|
||||||
|
# parsed_filters.to_addresses = []
|
||||||
|
# for provider_type, raw_filter in query.subscriptions:
|
||||||
|
# if provider_type != event_type:
|
||||||
|
# continue
|
||||||
|
|
||||||
|
# if raw_filter.startswith("from:"):
|
||||||
|
# address = raw_filter[from_prefix_length:]
|
||||||
|
# if address in subscribed_addresses:
|
||||||
|
# parsed_filters.from_addresses.append(address)
|
||||||
|
# elif raw_filter.startswith("to:"):
|
||||||
|
# address = raw_filter[to_prefix_length:]
|
||||||
|
# if address in subscribed_addresses:
|
||||||
|
# parsed_filters.to_addresses.append(address)
|
||||||
|
# else:
|
||||||
|
# address = raw_filter
|
||||||
|
# if address in subscribed_addresses:
|
||||||
|
# parsed_filters.from_addresses.append(address)
|
||||||
|
# parsed_filters.to_addresses.append(address)
|
||||||
|
|
||||||
|
if not (parsed_filters.addresses):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return parsed_filters
|
||||||
|
|
||||||
|
def stream_boundary_validator(self, stream_boundary: data.StreamBoundary) -> None:
|
||||||
|
"""
|
||||||
|
Stream boundary validator for the events provider.
|
||||||
|
|
||||||
|
Checks that stream boundaries do not exceed periods of greater than 24 hours.
|
||||||
|
|
||||||
|
Raises an error for invalid stream boundaries, else returns None.
|
||||||
|
"""
|
||||||
|
valid_period_seconds = 24 * 60 * 60
|
||||||
|
validate_stream_boundary(
|
||||||
|
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def query_events(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
parsed_filters: Filters,
|
||||||
|
) -> Query:
|
||||||
|
"""
|
||||||
|
Builds a database query for Ethereum transactions that occurred within the window of time that
|
||||||
|
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
Labels = get_label_model(self.blockchain)
|
||||||
|
|
||||||
|
query = db_session.query(
|
||||||
|
Labels.block_number,
|
||||||
|
Labels.address,
|
||||||
|
Labels.transaction_hash,
|
||||||
|
Labels.label_data,
|
||||||
|
Labels.block_timestamp,
|
||||||
|
Labels.log_index,
|
||||||
|
Labels.created_at,
|
||||||
|
).filter(Labels.label == "moonworm")
|
||||||
|
|
||||||
|
if stream_boundary.include_start:
|
||||||
|
query = query.filter(Labels.block_timestamp >= stream_boundary.start_time)
|
||||||
|
else:
|
||||||
|
query = query.filter(Labels.block_timestamp > stream_boundary.start_time)
|
||||||
|
|
||||||
|
if stream_boundary.end_time is not None:
|
||||||
|
if stream_boundary.include_end:
|
||||||
|
query = query.filter(Labels.block_timestamp <= stream_boundary.end_time)
|
||||||
|
else:
|
||||||
|
query = query.filter(Labels.block_timestamp <= stream_boundary.end_time)
|
||||||
|
|
||||||
|
addresses_filters = []
|
||||||
|
|
||||||
|
for address_filter in parsed_filters.addresses:
|
||||||
|
labels_filters = []
|
||||||
|
for label_filter in address_filter.labels:
|
||||||
|
args_filters = []
|
||||||
|
for arg in label.args:
|
||||||
|
args_filters.append(
|
||||||
|
Labels.label_data["args"][arg.name]
|
||||||
|
== python_type(arg.type)(arg.value)
|
||||||
|
)
|
||||||
|
|
||||||
|
labels_filters.append(
|
||||||
|
and_(
|
||||||
|
*(
|
||||||
|
Labels.label_data["type"] == label_filter.type,
|
||||||
|
Labels.label_data["name"] == label_filter.name,
|
||||||
|
or_(*args_filters),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
addresses_filters.append(
|
||||||
|
and_(
|
||||||
|
*(
|
||||||
|
Labels.address == address_filter.address,
|
||||||
|
or_(*args_filters),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
query = query.filters(or_(*addresses_filters))
|
||||||
|
|
||||||
|
return query
|
||||||
|
|
||||||
|
def get_events(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
||||||
|
"""
|
||||||
|
Returns ethereum_blockchain events for the given addresses in the time period represented
|
||||||
|
by stream_boundary.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
self.stream_boundary_validator(stream_boundary)
|
||||||
|
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
ethereum_transactions = self.query_events(
|
||||||
|
db_session, stream_boundary, parsed_filters
|
||||||
|
)
|
||||||
|
|
||||||
|
ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc"))
|
||||||
|
|
||||||
|
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
||||||
|
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
||||||
|
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
||||||
|
# do anything to help them get data from teh backend.
|
||||||
|
# The error message on the API side when the statement times out:
|
||||||
|
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
||||||
|
events: List[data.Event] = [self.events(row) for row in ethereum_transactions]
|
||||||
|
|
||||||
|
if (stream_boundary.end_time is None) and events:
|
||||||
|
stream_boundary.end_time = events[0].event_timestamp
|
||||||
|
stream_boundary.include_end = True
|
||||||
|
|
||||||
|
return stream_boundary, events
|
||||||
|
|
||||||
|
def latest_events(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
query: StreamQuery,
|
||||||
|
num_events: int,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[List[data.Event]]:
|
||||||
|
"""
|
||||||
|
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
||||||
|
by the given filters.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
||||||
|
|
||||||
|
stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=0, include_start=True, end_time=None, include_end=False
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
ethereum_transactions = (
|
||||||
|
self.query_events(db_session, stream_boundary, parsed_filters)
|
||||||
|
.order_by(text("timestamp desc"))
|
||||||
|
.limit(num_events)
|
||||||
|
)
|
||||||
|
|
||||||
|
return [self.events(row) for row in ethereum_transactions]
|
||||||
|
|
||||||
|
def next_event(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[data.Event]:
|
||||||
|
"""
|
||||||
|
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
||||||
|
query from this provider.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert (
|
||||||
|
stream_boundary.end_time is not None
|
||||||
|
), "Cannot return next event for up-to-date stream boundary"
|
||||||
|
next_stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=stream_boundary.end_time,
|
||||||
|
include_start=(not stream_boundary.include_end),
|
||||||
|
end_time=None,
|
||||||
|
include_end=False,
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
maybe_ethereum_transaction = (
|
||||||
|
self.query_events(db_session, next_stream_boundary, parsed_filters)
|
||||||
|
.order_by(text("timestamp asc"))
|
||||||
|
.limit(1)
|
||||||
|
).one_or_none()
|
||||||
|
|
||||||
|
if maybe_ethereum_transaction is None:
|
||||||
|
return None
|
||||||
|
return self.events(maybe_ethereum_transaction)
|
||||||
|
|
||||||
|
def previous_event(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[data.Event]:
|
||||||
|
"""
|
||||||
|
Returns the latest event occuring before the given stream boundary corresponding to the given
|
||||||
|
query from this provider.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert (
|
||||||
|
stream_boundary.start_time != 0
|
||||||
|
), "Cannot return previous event for stream starting at time 0"
|
||||||
|
previous_stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=0,
|
||||||
|
include_start=True,
|
||||||
|
end_time=stream_boundary.start_time,
|
||||||
|
include_end=(not stream_boundary.include_start),
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
maybe_ethereum_transaction = (
|
||||||
|
self.query_events(db_session, previous_stream_boundary, parsed_filters)
|
||||||
|
.order_by(text("timestamp desc"))
|
||||||
|
.limit(1)
|
||||||
|
).one_or_none()
|
||||||
|
if maybe_ethereum_transaction is None:
|
||||||
|
return None
|
||||||
|
return self.events(maybe_ethereum_transaction)
|
||||||
|
|
||||||
|
|
||||||
|
EthereumMoonwormProvider = MoonwormProvider(
|
||||||
|
event_type="ethereum_blockchain",
|
||||||
|
blockchain=AvailableBlockchainType("ethereum"),
|
||||||
|
description="Provider for resiving transactions from Ethereum tables.",
|
||||||
|
streamboaundary_range_limit=2 * 60 * 60,
|
||||||
|
)
|
||||||
|
|
||||||
|
PolygonMoonwormProvider = MoonwormProvider(
|
||||||
|
event_type="polygon_blockchain",
|
||||||
|
blockchain=AvailableBlockchainType("polygon"),
|
||||||
|
description="Provider for resiving transactions from Polygon tables.",
|
||||||
|
streamboaundary_range_limit=2 * 60 * 60,
|
||||||
|
)
|
|
@ -0,0 +1,472 @@
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
import logging
|
||||||
|
from typing import cast, Dict, Any, List, Optional, Tuple
|
||||||
|
|
||||||
|
from bugout.app import Bugout
|
||||||
|
from bugout.data import BugoutResource
|
||||||
|
|
||||||
|
from moonstreamdb.blockchain import (
|
||||||
|
get_label_model,
|
||||||
|
get_block_model,
|
||||||
|
get_transaction_model,
|
||||||
|
AvailableBlockchainType,
|
||||||
|
)
|
||||||
|
from sqlalchemy import or_, and_, text
|
||||||
|
from sqlalchemy.orm import Session, Query
|
||||||
|
|
||||||
|
|
||||||
|
from .. import data
|
||||||
|
from ..stream_boundaries import validate_stream_boundary
|
||||||
|
from ..stream_queries import StreamQuery
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.WARN)
|
||||||
|
|
||||||
|
|
||||||
|
allowed_tags = ["tag:erc721"]
|
||||||
|
|
||||||
|
default_time_interval_seconds: int = 5 * 60
|
||||||
|
|
||||||
|
# 200 transactions per block, 4 blocks per minute.
|
||||||
|
estimated_events_per_time_interval: float = 5 * 800
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Filters:
|
||||||
|
"""
|
||||||
|
ethereum_blockchain event filters act as a disjunction over queries specifying a from address
|
||||||
|
or a to address.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from_addresses: List[str] = field(default_factory=list)
|
||||||
|
to_addresses: List[str] = field(default_factory=list)
|
||||||
|
labels: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
class TransactionsProvider:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
event_type: str,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
description: str,
|
||||||
|
streamboaundary_range_limit: int,
|
||||||
|
):
|
||||||
|
self.event_type = event_type
|
||||||
|
self.blockchain = blockchain
|
||||||
|
self.description = description
|
||||||
|
self.valid_period_seconds = streamboaundary_range_limit
|
||||||
|
|
||||||
|
def validate_subscription(
|
||||||
|
self, subscription_resource_data: data.SubscriptionResourceData, event_type
|
||||||
|
) -> Tuple[bool, List[str]]:
|
||||||
|
"""
|
||||||
|
Checks that the subscription represents a valid subscription to an Ethereum address.
|
||||||
|
|
||||||
|
NOTE: Currently, this function only checks that the address is a nonempty string.
|
||||||
|
"""
|
||||||
|
errors: List[str] = []
|
||||||
|
if subscription_resource_data.address == "":
|
||||||
|
errors.append("address is empty")
|
||||||
|
|
||||||
|
if subscription_resource_data.subscription_type_id != event_type:
|
||||||
|
errors.append(
|
||||||
|
f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}."
|
||||||
|
)
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
return False, errors
|
||||||
|
return True, errors
|
||||||
|
|
||||||
|
def stream_boundary_validator(self, stream_boundary: data.StreamBoundary) -> None:
|
||||||
|
"""
|
||||||
|
Stream boundary validator for the transactions provider.
|
||||||
|
|
||||||
|
Checks that stream boundaries do not exceed periods of greater than 2 hours.
|
||||||
|
|
||||||
|
Raises an error for invalid stream boundaries, else returns None.
|
||||||
|
"""
|
||||||
|
valid_period_seconds = self.valid_period_seconds
|
||||||
|
validate_stream_boundary(
|
||||||
|
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def default_filters(self, subscriptions: List[BugoutResource]) -> Filters:
|
||||||
|
"""
|
||||||
|
Default filter strings for the given list of subscriptions.
|
||||||
|
"""
|
||||||
|
filters = Filters()
|
||||||
|
for subscription in subscriptions:
|
||||||
|
subscription_address = cast(
|
||||||
|
Optional[str], subscription.resource_data.get("address")
|
||||||
|
)
|
||||||
|
if subscription_address is not None:
|
||||||
|
if subscription_address in allowed_tags:
|
||||||
|
filters.labels.append(subscription_address.split(":")[1])
|
||||||
|
else:
|
||||||
|
filters.from_addresses.append(subscription_address)
|
||||||
|
filters.to_addresses.append(subscription_address)
|
||||||
|
else:
|
||||||
|
logger.warn(
|
||||||
|
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||||
|
)
|
||||||
|
return filters
|
||||||
|
|
||||||
|
def parse_filters(
|
||||||
|
self,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[Filters]:
|
||||||
|
"""
|
||||||
|
Passes raw filter strings into a Filters object which is used to construct a database query
|
||||||
|
for ethereum transactions.
|
||||||
|
|
||||||
|
Filter syntax is:
|
||||||
|
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
|
||||||
|
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
|
||||||
|
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
|
||||||
|
|
||||||
|
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
||||||
|
None indicating that the StreamQuery does not require any data from this provider.
|
||||||
|
"""
|
||||||
|
if query.subscription_types and not any(
|
||||||
|
subtype == self.event_type for subtype in query.subscription_types
|
||||||
|
):
|
||||||
|
return None
|
||||||
|
|
||||||
|
provider_subscriptions = user_subscriptions.get(self.event_type)
|
||||||
|
|
||||||
|
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||||
|
if not provider_subscriptions:
|
||||||
|
return None
|
||||||
|
parsed_filters = self.default_filters(provider_subscriptions)
|
||||||
|
|
||||||
|
from_prefix_length = len("from:")
|
||||||
|
to_prefix_length = len("to:")
|
||||||
|
|
||||||
|
subscribed_addresses = {
|
||||||
|
subscription.resource_data.get("address")
|
||||||
|
for subscription in provider_subscriptions
|
||||||
|
if subscription.resource_data.get("address") is not None
|
||||||
|
}
|
||||||
|
|
||||||
|
if query.subscriptions:
|
||||||
|
parsed_filters.from_addresses = []
|
||||||
|
parsed_filters.to_addresses = []
|
||||||
|
for provider_type, raw_filter in query.subscriptions:
|
||||||
|
if provider_type != self.event_type:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if raw_filter.startswith("from:"):
|
||||||
|
address = raw_filter[from_prefix_length:]
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.from_addresses.append(address)
|
||||||
|
elif raw_filter.startswith("to:"):
|
||||||
|
address = raw_filter[to_prefix_length:]
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.to_addresses.append(address)
|
||||||
|
else:
|
||||||
|
address = raw_filter
|
||||||
|
if address in subscribed_addresses:
|
||||||
|
parsed_filters.from_addresses.append(address)
|
||||||
|
parsed_filters.to_addresses.append(address)
|
||||||
|
|
||||||
|
if not (
|
||||||
|
parsed_filters.from_addresses
|
||||||
|
or parsed_filters.to_addresses
|
||||||
|
or parsed_filters.labels
|
||||||
|
):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return parsed_filters
|
||||||
|
|
||||||
|
def query_transactions(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
parsed_filters: Filters,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
) -> Query:
|
||||||
|
"""
|
||||||
|
Builds a database query for Ethereum transactions that occurred within the window of time that
|
||||||
|
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
Transactions = get_transaction_model(self.blockchain)
|
||||||
|
Blocks = get_block_model(self.blockchain)
|
||||||
|
Labels = get_label_model(self.blockchain)
|
||||||
|
|
||||||
|
query = db_session.query(
|
||||||
|
Transactions.hash,
|
||||||
|
Transactions.block_number,
|
||||||
|
Transactions.from_address,
|
||||||
|
Transactions.to_address,
|
||||||
|
Transactions.gas,
|
||||||
|
Transactions.gas_price,
|
||||||
|
Transactions.input,
|
||||||
|
Transactions.nonce,
|
||||||
|
Transactions.value,
|
||||||
|
Blocks.timestamp.label("timestamp"),
|
||||||
|
).join(
|
||||||
|
Blocks,
|
||||||
|
Transactions.block_number == Blocks.block_number,
|
||||||
|
)
|
||||||
|
|
||||||
|
if stream_boundary.include_start:
|
||||||
|
query = query.filter(Blocks.timestamp >= stream_boundary.start_time)
|
||||||
|
else:
|
||||||
|
query = query.filter(Blocks.timestamp > stream_boundary.start_time)
|
||||||
|
|
||||||
|
if stream_boundary.end_time is not None:
|
||||||
|
if stream_boundary.include_end:
|
||||||
|
query = query.filter(Blocks.timestamp <= stream_boundary.end_time)
|
||||||
|
else:
|
||||||
|
query = query.filter(Blocks.timestamp <= stream_boundary.end_time)
|
||||||
|
|
||||||
|
# We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address
|
||||||
|
address_clauses = []
|
||||||
|
|
||||||
|
address_clauses.extend(
|
||||||
|
[
|
||||||
|
Transactions.from_address == address
|
||||||
|
for address in parsed_filters.from_addresses
|
||||||
|
]
|
||||||
|
+ [
|
||||||
|
Transactions.to_address == address
|
||||||
|
for address in parsed_filters.to_addresses
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
labels_clause = []
|
||||||
|
|
||||||
|
if parsed_filters.labels:
|
||||||
|
label_clause = (
|
||||||
|
db_session.query(Labels)
|
||||||
|
.filter(
|
||||||
|
or_(
|
||||||
|
*[
|
||||||
|
Labels.label.contains(label)
|
||||||
|
for label in list(set(parsed_filters.labels))
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.exists()
|
||||||
|
)
|
||||||
|
labels_clause.append(label_clause)
|
||||||
|
|
||||||
|
subscriptions_clause = address_clauses + labels_clause
|
||||||
|
|
||||||
|
if subscriptions_clause:
|
||||||
|
query = query.filter(or_(*subscriptions_clause))
|
||||||
|
|
||||||
|
return query
|
||||||
|
|
||||||
|
def ethereum_transaction_event(self, row: Tuple) -> data.Event:
|
||||||
|
"""
|
||||||
|
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
||||||
|
into an Event object.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
hash,
|
||||||
|
block_number,
|
||||||
|
from_address,
|
||||||
|
to_address,
|
||||||
|
gas,
|
||||||
|
gas_price,
|
||||||
|
input,
|
||||||
|
nonce,
|
||||||
|
value,
|
||||||
|
timestamp,
|
||||||
|
) = row
|
||||||
|
return data.Event(
|
||||||
|
event_type=self.event_type,
|
||||||
|
event_timestamp=timestamp,
|
||||||
|
event_data={
|
||||||
|
"hash": hash,
|
||||||
|
"block_number": block_number,
|
||||||
|
"from": from_address,
|
||||||
|
"to": to_address,
|
||||||
|
"gas": gas,
|
||||||
|
"gas_price": gas_price,
|
||||||
|
"input": input,
|
||||||
|
"nonce": nonce,
|
||||||
|
"value": value,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_events(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
||||||
|
"""
|
||||||
|
Returns ethereum_blockchain events for the given addresses in the time period represented
|
||||||
|
by stream_boundary.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
self.stream_boundary_validator(stream_boundary)
|
||||||
|
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
ethereum_transactions = self.query_transactions(
|
||||||
|
db_session, stream_boundary, parsed_filters, blockchain
|
||||||
|
)
|
||||||
|
|
||||||
|
ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc"))
|
||||||
|
|
||||||
|
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
||||||
|
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
||||||
|
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
||||||
|
# do anything to help them get data from teh backend.
|
||||||
|
# The error message on the API side when the statement times out:
|
||||||
|
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
||||||
|
events: List[data.Event] = [
|
||||||
|
self.ethereum_transaction_event(row) for row in ethereum_transactions
|
||||||
|
]
|
||||||
|
|
||||||
|
if (stream_boundary.end_time is None) and events:
|
||||||
|
stream_boundary.end_time = events[0].event_timestamp
|
||||||
|
stream_boundary.include_end = True
|
||||||
|
|
||||||
|
return stream_boundary, events
|
||||||
|
|
||||||
|
def latest_events(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
query: StreamQuery,
|
||||||
|
num_events: int,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[List[data.Event]]:
|
||||||
|
"""
|
||||||
|
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
||||||
|
by the given filters.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
||||||
|
|
||||||
|
stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=0, include_start=True, end_time=None, include_end=False
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
ethereum_transactions = (
|
||||||
|
self.query_transactions(
|
||||||
|
db_session, stream_boundary, parsed_filters, blockchain
|
||||||
|
)
|
||||||
|
.order_by(text("timestamp desc"))
|
||||||
|
.limit(num_events)
|
||||||
|
)
|
||||||
|
|
||||||
|
return [self.ethereum_transaction_event(row) for row in ethereum_transactions]
|
||||||
|
|
||||||
|
def next_event(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[data.Event]:
|
||||||
|
"""
|
||||||
|
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
||||||
|
query from this provider.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert (
|
||||||
|
stream_boundary.end_time is not None
|
||||||
|
), "Cannot return next event for up-to-date stream boundary"
|
||||||
|
next_stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=stream_boundary.end_time,
|
||||||
|
include_start=(not stream_boundary.include_end),
|
||||||
|
end_time=None,
|
||||||
|
include_end=False,
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
maybe_ethereum_transaction = (
|
||||||
|
self.query_transactions(
|
||||||
|
db_session, next_stream_boundary, parsed_filters, blockchain
|
||||||
|
)
|
||||||
|
.order_by(text("timestamp asc"))
|
||||||
|
.limit(1)
|
||||||
|
).one_or_none()
|
||||||
|
|
||||||
|
if maybe_ethereum_transaction is None:
|
||||||
|
return None
|
||||||
|
return self.ethereum_transaction_event(maybe_ethereum_transaction)
|
||||||
|
|
||||||
|
def previous_event(
|
||||||
|
self,
|
||||||
|
db_session: Session,
|
||||||
|
blockchain: AvailableBlockchainType,
|
||||||
|
bugout_client: Bugout,
|
||||||
|
data_journal_id: str,
|
||||||
|
data_access_token: str,
|
||||||
|
stream_boundary: data.StreamBoundary,
|
||||||
|
query: StreamQuery,
|
||||||
|
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||||
|
) -> Optional[data.Event]:
|
||||||
|
"""
|
||||||
|
Returns the latest event occuring before the given stream boundary corresponding to the given
|
||||||
|
query from this provider.
|
||||||
|
|
||||||
|
If the query does not require any data from this provider, returns None.
|
||||||
|
"""
|
||||||
|
assert (
|
||||||
|
stream_boundary.start_time != 0
|
||||||
|
), "Cannot return previous event for stream starting at time 0"
|
||||||
|
previous_stream_boundary = data.StreamBoundary(
|
||||||
|
start_time=0,
|
||||||
|
include_start=True,
|
||||||
|
end_time=stream_boundary.start_time,
|
||||||
|
include_end=(not stream_boundary.include_start),
|
||||||
|
)
|
||||||
|
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||||
|
if parsed_filters is None:
|
||||||
|
return None
|
||||||
|
maybe_ethereum_transaction = (
|
||||||
|
self.query_transactions(
|
||||||
|
db_session, previous_stream_boundary, parsed_filters, blockchain
|
||||||
|
)
|
||||||
|
.order_by(text("timestamp desc"))
|
||||||
|
.limit(1)
|
||||||
|
).one_or_none()
|
||||||
|
if maybe_ethereum_transaction is None:
|
||||||
|
return None
|
||||||
|
return self.ethereum_transaction_event(maybe_ethereum_transaction)
|
||||||
|
|
||||||
|
|
||||||
|
ErhereumTransactions = TransactionsProvider(
|
||||||
|
event_type="ethereum_blockchain",
|
||||||
|
blockchain=AvailableBlockchainType("ethereum"),
|
||||||
|
description="Provider for resiving transactions from Ethereum tables.",
|
||||||
|
streamboaundary_range_limit=2 * 60 * 60,
|
||||||
|
)
|
||||||
|
|
||||||
|
PolygonTransactions = TransactionsProvider(
|
||||||
|
event_type="polygon_blockchain",
|
||||||
|
blockchain=AvailableBlockchainType("polygon"),
|
||||||
|
description="Provider for resiving transactions from Polygon tables.",
|
||||||
|
streamboaundary_range_limit=2 * 60 * 60,
|
||||||
|
)
|
|
@ -0,0 +1,72 @@
|
||||||
|
from .db import yield_db_session, yield_db_session_ctx
|
||||||
|
from .models import (
|
||||||
|
EthereumBlock,
|
||||||
|
EthereumLabel,
|
||||||
|
EthereumTransaction,
|
||||||
|
PolygonBlock,
|
||||||
|
PolygonLabel,
|
||||||
|
PolygonTransaction,
|
||||||
|
)
|
||||||
|
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from typing import Type, Union
|
||||||
|
|
||||||
|
|
||||||
|
class AvailableBlockchainType(Enum):
|
||||||
|
ETHEREUM = "ethereum"
|
||||||
|
POLYGON = "polygon"
|
||||||
|
|
||||||
|
|
||||||
|
def get_block_model(
|
||||||
|
blockchain_type: AvailableBlockchainType,
|
||||||
|
) -> Type[Union[EthereumBlock, PolygonBlock]]:
|
||||||
|
"""
|
||||||
|
Depends on provided blockchain type: Ethereum or Polygon,
|
||||||
|
set proper blocks model: EthereumBlock or PolygonBlock.
|
||||||
|
"""
|
||||||
|
block_model: Type[Union[EthereumBlock, PolygonBlock]]
|
||||||
|
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||||
|
block_model = EthereumBlock
|
||||||
|
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||||
|
block_model = PolygonBlock
|
||||||
|
else:
|
||||||
|
raise Exception("Unsupported blockchain type provided")
|
||||||
|
|
||||||
|
return block_model
|
||||||
|
|
||||||
|
|
||||||
|
def get_label_model(
|
||||||
|
blockchain_type: AvailableBlockchainType,
|
||||||
|
) -> Type[Union[EthereumLabel, PolygonLabel]]:
|
||||||
|
"""
|
||||||
|
Depends on provided blockchain type: Ethereum or Polygon,
|
||||||
|
set proper block label model: EthereumLabel or PolygonLabel.
|
||||||
|
"""
|
||||||
|
label_model: Type[Union[EthereumLabel, PolygonLabel]]
|
||||||
|
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||||
|
label_model = EthereumLabel
|
||||||
|
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||||
|
label_model = PolygonLabel
|
||||||
|
else:
|
||||||
|
raise Exception("Unsupported blockchain type provided")
|
||||||
|
|
||||||
|
return label_model
|
||||||
|
|
||||||
|
|
||||||
|
def get_transaction_model(
|
||||||
|
blockchain_type: AvailableBlockchainType,
|
||||||
|
) -> Type[Union[EthereumTransaction, PolygonTransaction]]:
|
||||||
|
"""
|
||||||
|
Depends on provided blockchain type: Ethereum or Polygon,
|
||||||
|
set proper block transactions model: EthereumTransaction or PolygonTransaction.
|
||||||
|
"""
|
||||||
|
transaction_model: Type[Union[EthereumTransaction, PolygonTransaction]]
|
||||||
|
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||||
|
transaction_model = EthereumTransaction
|
||||||
|
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||||
|
transaction_model = PolygonTransaction
|
||||||
|
else:
|
||||||
|
raise Exception("Unsupported blockchain type provided")
|
||||||
|
|
||||||
|
return transaction_model
|
Ładowanie…
Reference in New Issue