diff --git a/backend/moonstreamapi/data.py b/backend/moonstreamapi/data.py index 08ca56a8..8376b2f2 100644 --- a/backend/moonstreamapi/data.py +++ b/backend/moonstreamapi/data.py @@ -170,7 +170,6 @@ class GetEventsResponse(BaseModel): class TxinfoEthereumBlockchainRequest(BaseModel): tx: EthereumTransaction - class EthereumSmartContractSourceInfo(BaseModel): name: str source_code: str diff --git a/backend/moonstreamapi/providers/__init__.py b/backend/moonstreamapi/providers/__init__.py index f884fe1b..7dc9e582 100644 --- a/backend/moonstreamapi/providers/__init__.py +++ b/backend/moonstreamapi/providers/__init__.py @@ -32,7 +32,7 @@ from bugout.app import Bugout from bugout.data import BugoutResource from sqlalchemy.orm import Session -from . import bugout, ethereum_blockchain +from . import bugout, transactions, moonworm_provider from .. import data from ..stream_queries import StreamQuery @@ -47,8 +47,13 @@ class ReceivingEventsException(Exception): event_providers: Dict[str, Any] = { - ethereum_blockchain.event_type: ethereum_blockchain, - bugout.whalewatch_provider.event_type: bugout.whalewatch_provider, + moonworm_provider.EthereumMoonwormProvider.event_type: moonworm_provider.EthereumMoonwormProvider, + moonworm_provider.PolygonMoonwormProvider.event_type: moonworm_provider.PolygonMoonwormProvider, + transactions.ErhereumTransactions.event_type: transactions.ErhereumTransactions, + transactions.PolygonTransactions.event_type: transactions.PolygonTransactions, + bugout.polygon_whalewatch_provider.event_type: bugout.polygon_whalewatch_provider, + bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider, + bugout.ethereum_whalewatch_provider.event_type: bugout.ethereum_whalewatch_provider, bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider, } @@ -75,6 +80,7 @@ def get_events( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: for provider_name, provider in event_providers.items(): + futures[provider_name] = executor.submit( provider.get_events, db_session, @@ -132,7 +138,9 @@ def latest_events( with ThreadPoolExecutor( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: + for provider_name, provider in event_providers.items(): + futures[provider_name] = executor.submit( provider.latest_events, db_session, @@ -242,6 +250,7 @@ def previous_event( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: for provider_name, provider in event_providers.items(): + futures[provider_name] = executor.submit( provider.previous_event, db_session, diff --git a/backend/moonstreamapi/providers/bugout.py b/backend/moonstreamapi/providers/bugout.py index f7b29f76..bc52700b 100644 --- a/backend/moonstreamapi/providers/bugout.py +++ b/backend/moonstreamapi/providers/bugout.py @@ -278,7 +278,7 @@ class BugoutEventProvider: return self.entry_event(search_results.results[0]) -class EthereumTXPoolProvider(BugoutEventProvider): +class TXPoolProvider(BugoutEventProvider): def __init__( self, event_type: str, @@ -364,7 +364,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour 4. Amount (in WEI) received To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint.""" -whalewatch_provider = PublicDataProvider( +ethereum_whalewatch_provider = PublicDataProvider( event_type="ethereum_whalewatch", description=whalewatch_description, default_time_interval_seconds=310, @@ -372,12 +372,20 @@ whalewatch_provider = PublicDataProvider( tags=["crawl_type:ethereum_trending"], ) +polygon_whalewatch_provider = PublicDataProvider( + event_type="polygon_whalewatch", + description=whalewatch_description, + default_time_interval_seconds=310, + estimated_events_per_time_interval=1, + tags=["crawl_type:polygon_trending"], +) + ethereum_txpool_description = """Event provider for Ethereum transaction pool. Shows the latest events (from the previous hour) in the Ethereum transaction pool. To restrict your queries to this provider, add a filter of \"type:ethereum_txpool\" to your query (query parameter: \"q\") on the /streams endpoint.""" -ethereum_txpool_provider = EthereumTXPoolProvider( +ethereum_txpool_provider = TXPoolProvider( event_type="ethereum_txpool", description=ethereum_txpool_description, default_time_interval_seconds=5, @@ -385,6 +393,14 @@ ethereum_txpool_provider = EthereumTXPoolProvider( tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"], ) +polygon_txpool_provider = TXPoolProvider( + event_type="polygon_txpool", + description=ethereum_txpool_description, + default_time_interval_seconds=5, + estimated_events_per_time_interval=50, + tags=[f"client:polygon_HUMBUG_CLIENT_ID"], +) + nft_summary_description = """Event provider for NFT market summaries. This provider periodically generates NFT market summaries for the last hour of market activity. diff --git a/backend/moonstreamapi/providers/ethereum_blockchain.py b/backend/moonstreamapi/providers/ethereum_blockchain.py deleted file mode 100644 index b2c173bc..00000000 --- a/backend/moonstreamapi/providers/ethereum_blockchain.py +++ /dev/null @@ -1,437 +0,0 @@ -from dataclasses import dataclass, field -import logging -from typing import cast, Dict, Any, List, Optional, Tuple - -from bugout.app import Bugout -from bugout.data import BugoutResource - -from moonstreamdb.models import ( - EthereumBlock, - EthereumTransaction, - EthereumLabel, -) -from sqlalchemy import or_, and_, text -from sqlalchemy.orm import Session, Query -from sqlalchemy.sql.functions import user - - -from .. import data -from ..stream_boundaries import validate_stream_boundary -from ..stream_queries import StreamQuery - - -logger = logging.getLogger(__name__) -logger.setLevel(logging.WARN) - - -event_type = "ethereum_blockchain" -allowed_tags = ["tag:erc721"] - -description = f"""Event provider for transactions from the Ethereum blockchain. - -To restrict your queries to this provider, add a filter of \"type:{event_type}\" to your query (query parameter: \"q\") on the /streams endpoint.""" - -default_time_interval_seconds: int = 5 * 60 - -# 200 transactions per block, 4 blocks per minute. -estimated_events_per_time_interval: float = 5 * 800 - - -def validate_subscription( - subscription_resource_data: data.SubscriptionResourceData, -) -> Tuple[bool, List[str]]: - """ - Checks that the subscription represents a valid subscription to an Ethereum address. - - NOTE: Currently, this function only checks that the address is a nonempty string. - """ - errors: List[str] = [] - if subscription_resource_data.address == "": - errors.append("address is empty") - - if subscription_resource_data.subscription_type_id != event_type: - errors.append( - f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}." - ) - - if errors: - return False, errors - return True, errors - - -def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None: - """ - Stream boundary validator for the ethereum_blockchain event provider. - - Checks that stream boundaries do not exceed periods of greater than 2 hours. - - Raises an error for invalid stream boundaries, else returns None. - """ - valid_period_seconds = 2 * 60 * 60 - validate_stream_boundary( - stream_boundary, valid_period_seconds, raise_when_invalid=True - ) - - -@dataclass -class Filters: - """ - ethereum_blockchain event filters act as a disjunction over queries specifying a from address - or a to address. - """ - - from_addresses: List[str] = field(default_factory=list) - to_addresses: List[str] = field(default_factory=list) - labels: List[str] = field(default_factory=list) - - -def default_filters(subscriptions: List[BugoutResource]) -> Filters: - """ - Default filter strings for the given list of subscriptions. - """ - filters = Filters() - for subscription in subscriptions: - subscription_address = cast( - Optional[str], subscription.resource_data.get("address") - ) - if subscription_address is not None: - if subscription_address in allowed_tags: - filters.labels.append(subscription_address.split(":")[1]) - else: - filters.from_addresses.append(subscription_address) - filters.to_addresses.append(subscription_address) - else: - logger.warn( - f"Could not find subscription address for subscription with resource id: {subscription.id}" - ) - return filters - - -def parse_filters( - query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]] -) -> Optional[Filters]: - """ - Passes raw filter strings into a Filters object which is used to construct a database query - for ethereum transactions. - - Filter syntax is: - - "from:
" - specifies that we want to include all transactions with "" as a source - - "to:" - specifies that we want to include all transactions with "" as a destination - - "" - specifies that we want to include all transactions with "" as a source AND all transactions with "" as a destination - - If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns - None indicating that the StreamQuery does not require any data from this provider. - """ - if query.subscription_types and not any( - subtype == event_type for subtype in query.subscription_types - ): - return None - - provider_subscriptions = user_subscriptions.get(event_type) - - # If the user has no subscriptions to this event type, we do not have to return any data! - if not provider_subscriptions: - return None - parsed_filters = default_filters(provider_subscriptions) - - from_prefix_length = len("from:") - to_prefix_length = len("to:") - - subscribed_addresses = { - subscription.resource_data.get("address") - for subscription in provider_subscriptions - if subscription.resource_data.get("address") is not None - } - - if query.subscriptions: - parsed_filters.from_addresses = [] - parsed_filters.to_addresses = [] - for provider_type, raw_filter in query.subscriptions: - if provider_type != event_type: - continue - - if raw_filter.startswith("from:"): - address = raw_filter[from_prefix_length:] - if address in subscribed_addresses: - parsed_filters.from_addresses.append(address) - elif raw_filter.startswith("to:"): - address = raw_filter[to_prefix_length:] - if address in subscribed_addresses: - parsed_filters.to_addresses.append(address) - else: - address = raw_filter - if address in subscribed_addresses: - parsed_filters.from_addresses.append(address) - parsed_filters.to_addresses.append(address) - - if not ( - parsed_filters.from_addresses - or parsed_filters.to_addresses - or parsed_filters.labels - ): - return None - - return parsed_filters - - -def query_ethereum_transactions( - db_session: Session, - stream_boundary: data.StreamBoundary, - parsed_filters: Filters, -) -> Query: - """ - Builds a database query for Ethereum transactions that occurred within the window of time that - the given stream_boundary represents and satisfying the constraints of parsed_filters. - """ - query = db_session.query( - EthereumTransaction.hash, - EthereumTransaction.block_number, - EthereumTransaction.from_address, - EthereumTransaction.to_address, - EthereumTransaction.gas, - EthereumTransaction.gas_price, - EthereumTransaction.input, - EthereumTransaction.nonce, - EthereumTransaction.value, - EthereumBlock.timestamp.label("timestamp"), - ).join( - EthereumBlock, - EthereumTransaction.block_number == EthereumBlock.block_number, - ) - - if stream_boundary.include_start: - query = query.filter(EthereumBlock.timestamp >= stream_boundary.start_time) - else: - query = query.filter(EthereumBlock.timestamp > stream_boundary.start_time) - - if stream_boundary.end_time is not None: - if stream_boundary.include_end: - query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time) - else: - query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time) - - # We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address - address_clauses = [] - - address_clauses.extend( - [ - EthereumTransaction.from_address == address - for address in parsed_filters.from_addresses - ] - + [ - EthereumTransaction.to_address == address - for address in parsed_filters.to_addresses - ] - ) - - labels_clause = [] - - if parsed_filters.labels: - label_clause = ( - db_session.query(EthereumLabel) - .filter( - or_( - *[ - EthereumLabel.label.contains(label) - for label in list(set(parsed_filters.labels)) - ] - ) - ) - .exists() - ) - labels_clause.append(label_clause) - - subscriptions_clause = address_clauses + labels_clause - - if subscriptions_clause: - query = query.filter(or_(*subscriptions_clause)) - - return query - - -def ethereum_transaction_event(row: Tuple) -> data.Event: - """ - Parses a result from the result set of a database query for Ethereum transactions with block timestamp - into an Event object. - """ - ( - hash, - block_number, - from_address, - to_address, - gas, - gas_price, - input, - nonce, - value, - timestamp, - ) = row - return data.Event( - event_type=event_type, - event_timestamp=timestamp, - event_data={ - "hash": hash, - "block_number": block_number, - "from": from_address, - "to": to_address, - "gas": gas, - "gas_price": gas_price, - "input": input, - "nonce": nonce, - "value": value, - }, - ) - - -def get_events( - db_session: Session, - bugout_client: Bugout, - data_journal_id: str, - data_access_token: str, - stream_boundary: data.StreamBoundary, - query: StreamQuery, - user_subscriptions: Dict[str, List[BugoutResource]], -) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]: - """ - Returns ethereum_blockchain events for the given addresses in the time period represented - by stream_boundary. - - If the query does not require any data from this provider, returns None. - """ - stream_boundary_validator(stream_boundary) - - parsed_filters = parse_filters(query, user_subscriptions) - if parsed_filters is None: - return None - - ethereum_transactions = query_ethereum_transactions( - db_session, stream_boundary, parsed_filters - ) - - ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc")) - - # TODO(zomglings): Catch the operational error denoting that the statement timed out here - # and wrap it in an error that tells the API to return the appropriate 400 response. Currently, - # when the statement times out, the API returns a 500 status code to the client, which doesn't - # do anything to help them get data from teh backend. - # The error message on the API side when the statement times out: - # > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout - events: List[data.Event] = [ - ethereum_transaction_event(row) for row in ethereum_transactions - ] - - if (stream_boundary.end_time is None) and events: - stream_boundary.end_time = events[0].event_timestamp - stream_boundary.include_end = True - - return stream_boundary, events - - -def latest_events( - db_session: Session, - bugout_client: Bugout, - data_journal_id: str, - data_access_token: str, - query: StreamQuery, - num_events: int, - user_subscriptions: Dict[str, List[BugoutResource]], -) -> Optional[List[data.Event]]: - """ - Returns the num_events latest events from the current provider, subject to the constraints imposed - by the given filters. - - If the query does not require any data from this provider, returns None. - """ - assert num_events > 0, f"num_events ({num_events}) should be positive." - - stream_boundary = data.StreamBoundary( - start_time=0, include_start=True, end_time=None, include_end=False - ) - parsed_filters = parse_filters(query, user_subscriptions) - if parsed_filters is None: - return None - ethereum_transactions = ( - query_ethereum_transactions(db_session, stream_boundary, parsed_filters) - .order_by(text("timestamp desc")) - .limit(num_events) - ) - - return [ethereum_transaction_event(row) for row in ethereum_transactions] - - -def next_event( - db_session: Session, - bugout_client: Bugout, - data_journal_id: str, - data_access_token: str, - stream_boundary: data.StreamBoundary, - query: StreamQuery, - user_subscriptions: Dict[str, List[BugoutResource]], -) -> Optional[data.Event]: - """ - Returns the earliest event occuring after the given stream boundary corresponding to the given - query from this provider. - - If the query does not require any data from this provider, returns None. - """ - assert ( - stream_boundary.end_time is not None - ), "Cannot return next event for up-to-date stream boundary" - next_stream_boundary = data.StreamBoundary( - start_time=stream_boundary.end_time, - include_start=(not stream_boundary.include_end), - end_time=None, - include_end=False, - ) - parsed_filters = parse_filters(query, user_subscriptions) - if parsed_filters is None: - return None - - maybe_ethereum_transaction = ( - query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters) - .order_by(text("timestamp asc")) - .limit(1) - ).one_or_none() - - if maybe_ethereum_transaction is None: - return None - return ethereum_transaction_event(maybe_ethereum_transaction) - - -def previous_event( - db_session: Session, - bugout_client: Bugout, - data_journal_id: str, - data_access_token: str, - stream_boundary: data.StreamBoundary, - query: StreamQuery, - user_subscriptions: Dict[str, List[BugoutResource]], -) -> Optional[data.Event]: - """ - Returns the latest event occuring before the given stream boundary corresponding to the given - query from this provider. - - If the query does not require any data from this provider, returns None. - """ - assert ( - stream_boundary.start_time != 0 - ), "Cannot return previous event for stream starting at time 0" - previous_stream_boundary = data.StreamBoundary( - start_time=0, - include_start=True, - end_time=stream_boundary.start_time, - include_end=(not stream_boundary.include_start), - ) - parsed_filters = parse_filters(query, user_subscriptions) - if parsed_filters is None: - return None - maybe_ethereum_transaction = ( - query_ethereum_transactions( - db_session, previous_stream_boundary, parsed_filters - ) - .order_by(text("timestamp desc")) - .limit(1) - ).one_or_none() - if maybe_ethereum_transaction is None: - return None - return ethereum_transaction_event(maybe_ethereum_transaction) diff --git a/backend/moonstreamapi/providers/moonworm_provider.py b/backend/moonstreamapi/providers/moonworm_provider.py new file mode 100644 index 00000000..65d7c83e --- /dev/null +++ b/backend/moonstreamapi/providers/moonworm_provider.py @@ -0,0 +1,454 @@ +from dataclasses import dataclass, field +import logging +from typing import cast, Dict, Any, List, Optional, Tuple + +from bugout.app import Bugout +from bugout.data import BugoutResource + +from moonstreamdb.blockchain import ( + get_label_model, + AvailableBlockchainType, +) +from sqlalchemy import or_, and_, text +from sqlalchemy.orm import Session, Query +from sqlalchemy.sql.expression import label + + +from .. import data +from ..stream_boundaries import validate_stream_boundary +from ..stream_queries import StreamQuery + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARN) + + +ethereum_event_type = "ethereum_blockchain" +polygon_event_type = "polygon_blockchain" +allowed_tags = ["tag:erc721"] + +description = f"""Event provider for transactions from the Ethereum blockchain. + +To restrict your queries to this provider, add a filter of \"type:{ethereum_event_type}\{polygon_event_type}\" to your query (query parameter: \"q\") on the /streams endpoint.""" + +default_time_interval_seconds: int = 5 * 60 + +# 200 transactions per block, 4 blocks per minute. +estimated_events_per_time_interval: float = 5 * 800 + + +@dataclass +class ArgsFilters: + name: str + value: Any + type: str + + +@dataclass +class LabelsFilters: + + name: str + type: str + args: List[ArgsFilters] = field(default_factory=list) + + +@dataclass +class AddressFilters: + + address: str + labels: List[LabelsFilters] = field(default_factory=list) + + +@dataclass +class Filters: + + addresses: List[AddressFilters] = field(default_factory=list) + + +def python_type(expected_type: str) -> Any: + if expected_type.startswith("int"): + return int + elif expected_type.startswith("str"): + return str + elif expected_type == "float": + return float + elif expected_type == "bool": + return bool + elif expected_type == "tuple": + return list + else: + raise ValueError(f"Cannot convert to python type {expected_type}") + + +class MoonwormProvider: + def __init__( + self, + event_type: str, + blockchain: AvailableBlockchainType, + description: str, + streamboaundary_range_limit: int, + ): + self.event_type = event_type + self.blockchain = blockchain + self.description = description + self.valid_period_seconds = streamboaundary_range_limit + + def default_filters(self, subscriptions: List[BugoutResource]) -> Filters: + """ + Default filter strings for the given list of subscriptions. + """ + filters = Filters() + for subscription in subscriptions: + subscription_address = cast( + Optional[str], subscription.resource_data.get("address") + ) + if subscription_address is not None: + + # How apply labels? + filters.addresses.append( + AddressFilters(address=subscription_address, labels=[]) + ) + else: + logger.warn( + f"Could not find subscription address for subscription with resource id: {subscription.id}" + ) + return filters + + def events(self, row: Tuple) -> data.Event: + """ + Parses a result from the result set of a database query for Ethereum transactions with block timestamp + into an Event object. + """ + ( + block_number, + address, + transaction_hash, + label_data, + block_timestamp, + log_index, + created_at, + ) = row + return data.Event( + event_type=self.event_type, + event_timestamp=block_timestamp, + event_data={ + "hash": transaction_hash, + "block_number": block_number, + "address": address, + "label_data": label_data, + "log_index": log_index, + "created_at": created_at, + }, + ) + + def parse_filters( + self, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[Filters]: + """ + Passes raw filter strings into a Filters object which is used to construct a database query + for ethereum transactions. + + Filter syntax is: + - "from:" - specifies that we want to include all transactions with "" as a source + - "to:" - specifies that we want to include all transactions with "" as a destination + - "" - specifies that we want to include all transactions with "" as a source AND all transactions with "" as a destination + + If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns + None indicating that the StreamQuery does not require any data from this provider. + """ + if query.subscription_types and not any( + subtype == self.event_type for subtype in query.subscription_types + ): + return None + + provider_subscriptions = user_subscriptions.get(self.event_type) + + # If the user has no subscriptions to this event type, we do not have to return any data! + if not provider_subscriptions: + return None + parsed_filters = self.default_filters(provider_subscriptions) + + # from_prefix_length = len("from:") + # to_prefix_length = len("to:") + + # subscribed_addresses = { + # subscription.resource_data.get("address") + # for subscription in provider_subscriptions + # if subscription.resource_data.get("address") is not None + # } + + # Need check that we can expand logic of parsef filters from query params but it will difficult + + # if query.subscriptions: + # parsed_filters.from_addresses = [] + # parsed_filters.to_addresses = [] + # for provider_type, raw_filter in query.subscriptions: + # if provider_type != event_type: + # continue + + # if raw_filter.startswith("from:"): + # address = raw_filter[from_prefix_length:] + # if address in subscribed_addresses: + # parsed_filters.from_addresses.append(address) + # elif raw_filter.startswith("to:"): + # address = raw_filter[to_prefix_length:] + # if address in subscribed_addresses: + # parsed_filters.to_addresses.append(address) + # else: + # address = raw_filter + # if address in subscribed_addresses: + # parsed_filters.from_addresses.append(address) + # parsed_filters.to_addresses.append(address) + + if not (parsed_filters.addresses): + return None + + return parsed_filters + + def stream_boundary_validator(self, stream_boundary: data.StreamBoundary) -> None: + """ + Stream boundary validator for the events provider. + + Checks that stream boundaries do not exceed periods of greater than 24 hours. + + Raises an error for invalid stream boundaries, else returns None. + """ + valid_period_seconds = 24 * 60 * 60 + validate_stream_boundary( + stream_boundary, valid_period_seconds, raise_when_invalid=True + ) + + def query_events( + self, + db_session: Session, + stream_boundary: data.StreamBoundary, + parsed_filters: Filters, + ) -> Query: + """ + Builds a database query for Ethereum transactions that occurred within the window of time that + the given stream_boundary represents and satisfying the constraints of parsed_filters. + """ + + Labels = get_label_model(self.blockchain) + + query = db_session.query( + Labels.block_number, + Labels.address, + Labels.transaction_hash, + Labels.label_data, + Labels.block_timestamp, + Labels.log_index, + Labels.created_at, + ).filter(Labels.label == "moonworm") + + if stream_boundary.include_start: + query = query.filter(Labels.block_timestamp >= stream_boundary.start_time) + else: + query = query.filter(Labels.block_timestamp > stream_boundary.start_time) + + if stream_boundary.end_time is not None: + if stream_boundary.include_end: + query = query.filter(Labels.block_timestamp <= stream_boundary.end_time) + else: + query = query.filter(Labels.block_timestamp <= stream_boundary.end_time) + + addresses_filters = [] + + for address_filter in parsed_filters.addresses: + labels_filters = [] + for label_filter in address_filter.labels: + args_filters = [] + for arg in label.args: + args_filters.append( + Labels.label_data["args"][arg.name] + == python_type(arg.type)(arg.value) + ) + + labels_filters.append( + and_( + *( + Labels.label_data["type"] == label_filter.type, + Labels.label_data["name"] == label_filter.name, + or_(*args_filters), + ) + ) + ) + addresses_filters.append( + and_( + *( + Labels.address == address_filter.address, + or_(*args_filters), + ) + ) + ) + query = query.filters(or_(*addresses_filters)) + + return query + + def get_events( + self, + db_session: Session, + blockchain: AvailableBlockchainType, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]: + """ + Returns ethereum_blockchain events for the given addresses in the time period represented + by stream_boundary. + + If the query does not require any data from this provider, returns None. + """ + self.stream_boundary_validator(stream_boundary) + + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + + ethereum_transactions = self.query_events( + db_session, stream_boundary, parsed_filters + ) + + ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc")) + + # TODO(zomglings): Catch the operational error denoting that the statement timed out here + # and wrap it in an error that tells the API to return the appropriate 400 response. Currently, + # when the statement times out, the API returns a 500 status code to the client, which doesn't + # do anything to help them get data from teh backend. + # The error message on the API side when the statement times out: + # > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout + events: List[data.Event] = [self.events(row) for row in ethereum_transactions] + + if (stream_boundary.end_time is None) and events: + stream_boundary.end_time = events[0].event_timestamp + stream_boundary.include_end = True + + return stream_boundary, events + + def latest_events( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + query: StreamQuery, + num_events: int, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[List[data.Event]]: + """ + Returns the num_events latest events from the current provider, subject to the constraints imposed + by the given filters. + + If the query does not require any data from this provider, returns None. + """ + assert num_events > 0, f"num_events ({num_events}) should be positive." + + stream_boundary = data.StreamBoundary( + start_time=0, include_start=True, end_time=None, include_end=False + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + ethereum_transactions = ( + self.query_events(db_session, stream_boundary, parsed_filters) + .order_by(text("timestamp desc")) + .limit(num_events) + ) + + return [self.events(row) for row in ethereum_transactions] + + def next_event( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Returns the earliest event occuring after the given stream boundary corresponding to the given + query from this provider. + + If the query does not require any data from this provider, returns None. + """ + assert ( + stream_boundary.end_time is not None + ), "Cannot return next event for up-to-date stream boundary" + next_stream_boundary = data.StreamBoundary( + start_time=stream_boundary.end_time, + include_start=(not stream_boundary.include_end), + end_time=None, + include_end=False, + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + + maybe_ethereum_transaction = ( + self.query_events(db_session, next_stream_boundary, parsed_filters) + .order_by(text("timestamp asc")) + .limit(1) + ).one_or_none() + + if maybe_ethereum_transaction is None: + return None + return self.events(maybe_ethereum_transaction) + + def previous_event( + self, + db_session: Session, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Returns the latest event occuring before the given stream boundary corresponding to the given + query from this provider. + + If the query does not require any data from this provider, returns None. + """ + assert ( + stream_boundary.start_time != 0 + ), "Cannot return previous event for stream starting at time 0" + previous_stream_boundary = data.StreamBoundary( + start_time=0, + include_start=True, + end_time=stream_boundary.start_time, + include_end=(not stream_boundary.include_start), + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + maybe_ethereum_transaction = ( + self.query_events(db_session, previous_stream_boundary, parsed_filters) + .order_by(text("timestamp desc")) + .limit(1) + ).one_or_none() + if maybe_ethereum_transaction is None: + return None + return self.events(maybe_ethereum_transaction) + + +EthereumMoonwormProvider = MoonwormProvider( + event_type="ethereum_blockchain", + blockchain=AvailableBlockchainType("ethereum"), + description="Provider for resiving transactions from Ethereum tables.", + streamboaundary_range_limit=2 * 60 * 60, +) + +PolygonMoonwormProvider = MoonwormProvider( + event_type="polygon_blockchain", + blockchain=AvailableBlockchainType("polygon"), + description="Provider for resiving transactions from Polygon tables.", + streamboaundary_range_limit=2 * 60 * 60, +) diff --git a/backend/moonstreamapi/providers/transactions.py b/backend/moonstreamapi/providers/transactions.py new file mode 100644 index 00000000..4e724d84 --- /dev/null +++ b/backend/moonstreamapi/providers/transactions.py @@ -0,0 +1,472 @@ +from dataclasses import dataclass, field +import logging +from typing import cast, Dict, Any, List, Optional, Tuple + +from bugout.app import Bugout +from bugout.data import BugoutResource + +from moonstreamdb.blockchain import ( + get_label_model, + get_block_model, + get_transaction_model, + AvailableBlockchainType, +) +from sqlalchemy import or_, and_, text +from sqlalchemy.orm import Session, Query + + +from .. import data +from ..stream_boundaries import validate_stream_boundary +from ..stream_queries import StreamQuery + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARN) + + +allowed_tags = ["tag:erc721"] + +default_time_interval_seconds: int = 5 * 60 + +# 200 transactions per block, 4 blocks per minute. +estimated_events_per_time_interval: float = 5 * 800 + + +@dataclass +class Filters: + """ + ethereum_blockchain event filters act as a disjunction over queries specifying a from address + or a to address. + """ + + from_addresses: List[str] = field(default_factory=list) + to_addresses: List[str] = field(default_factory=list) + labels: List[str] = field(default_factory=list) + + +class TransactionsProvider: + def __init__( + self, + event_type: str, + blockchain: AvailableBlockchainType, + description: str, + streamboaundary_range_limit: int, + ): + self.event_type = event_type + self.blockchain = blockchain + self.description = description + self.valid_period_seconds = streamboaundary_range_limit + + def validate_subscription( + self, subscription_resource_data: data.SubscriptionResourceData, event_type + ) -> Tuple[bool, List[str]]: + """ + Checks that the subscription represents a valid subscription to an Ethereum address. + + NOTE: Currently, this function only checks that the address is a nonempty string. + """ + errors: List[str] = [] + if subscription_resource_data.address == "": + errors.append("address is empty") + + if subscription_resource_data.subscription_type_id != event_type: + errors.append( + f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}." + ) + + if errors: + return False, errors + return True, errors + + def stream_boundary_validator(self, stream_boundary: data.StreamBoundary) -> None: + """ + Stream boundary validator for the transactions provider. + + Checks that stream boundaries do not exceed periods of greater than 2 hours. + + Raises an error for invalid stream boundaries, else returns None. + """ + valid_period_seconds = self.valid_period_seconds + validate_stream_boundary( + stream_boundary, valid_period_seconds, raise_when_invalid=True + ) + + def default_filters(self, subscriptions: List[BugoutResource]) -> Filters: + """ + Default filter strings for the given list of subscriptions. + """ + filters = Filters() + for subscription in subscriptions: + subscription_address = cast( + Optional[str], subscription.resource_data.get("address") + ) + if subscription_address is not None: + if subscription_address in allowed_tags: + filters.labels.append(subscription_address.split(":")[1]) + else: + filters.from_addresses.append(subscription_address) + filters.to_addresses.append(subscription_address) + else: + logger.warn( + f"Could not find subscription address for subscription with resource id: {subscription.id}" + ) + return filters + + def parse_filters( + self, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[Filters]: + """ + Passes raw filter strings into a Filters object which is used to construct a database query + for ethereum transactions. + + Filter syntax is: + - "from:" - specifies that we want to include all transactions with "" as a source + - "to:" - specifies that we want to include all transactions with "" as a destination + - "" - specifies that we want to include all transactions with "" as a source AND all transactions with "" as a destination + + If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns + None indicating that the StreamQuery does not require any data from this provider. + """ + if query.subscription_types and not any( + subtype == self.event_type for subtype in query.subscription_types + ): + return None + + provider_subscriptions = user_subscriptions.get(self.event_type) + + # If the user has no subscriptions to this event type, we do not have to return any data! + if not provider_subscriptions: + return None + parsed_filters = self.default_filters(provider_subscriptions) + + from_prefix_length = len("from:") + to_prefix_length = len("to:") + + subscribed_addresses = { + subscription.resource_data.get("address") + for subscription in provider_subscriptions + if subscription.resource_data.get("address") is not None + } + + if query.subscriptions: + parsed_filters.from_addresses = [] + parsed_filters.to_addresses = [] + for provider_type, raw_filter in query.subscriptions: + if provider_type != self.event_type: + continue + + if raw_filter.startswith("from:"): + address = raw_filter[from_prefix_length:] + if address in subscribed_addresses: + parsed_filters.from_addresses.append(address) + elif raw_filter.startswith("to:"): + address = raw_filter[to_prefix_length:] + if address in subscribed_addresses: + parsed_filters.to_addresses.append(address) + else: + address = raw_filter + if address in subscribed_addresses: + parsed_filters.from_addresses.append(address) + parsed_filters.to_addresses.append(address) + + if not ( + parsed_filters.from_addresses + or parsed_filters.to_addresses + or parsed_filters.labels + ): + return None + + return parsed_filters + + def query_transactions( + self, + db_session: Session, + stream_boundary: data.StreamBoundary, + parsed_filters: Filters, + blockchain: AvailableBlockchainType, + ) -> Query: + """ + Builds a database query for Ethereum transactions that occurred within the window of time that + the given stream_boundary represents and satisfying the constraints of parsed_filters. + """ + + Transactions = get_transaction_model(self.blockchain) + Blocks = get_block_model(self.blockchain) + Labels = get_label_model(self.blockchain) + + query = db_session.query( + Transactions.hash, + Transactions.block_number, + Transactions.from_address, + Transactions.to_address, + Transactions.gas, + Transactions.gas_price, + Transactions.input, + Transactions.nonce, + Transactions.value, + Blocks.timestamp.label("timestamp"), + ).join( + Blocks, + Transactions.block_number == Blocks.block_number, + ) + + if stream_boundary.include_start: + query = query.filter(Blocks.timestamp >= stream_boundary.start_time) + else: + query = query.filter(Blocks.timestamp > stream_boundary.start_time) + + if stream_boundary.end_time is not None: + if stream_boundary.include_end: + query = query.filter(Blocks.timestamp <= stream_boundary.end_time) + else: + query = query.filter(Blocks.timestamp <= stream_boundary.end_time) + + # We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address + address_clauses = [] + + address_clauses.extend( + [ + Transactions.from_address == address + for address in parsed_filters.from_addresses + ] + + [ + Transactions.to_address == address + for address in parsed_filters.to_addresses + ] + ) + + labels_clause = [] + + if parsed_filters.labels: + label_clause = ( + db_session.query(Labels) + .filter( + or_( + *[ + Labels.label.contains(label) + for label in list(set(parsed_filters.labels)) + ] + ) + ) + .exists() + ) + labels_clause.append(label_clause) + + subscriptions_clause = address_clauses + labels_clause + + if subscriptions_clause: + query = query.filter(or_(*subscriptions_clause)) + + return query + + def ethereum_transaction_event(self, row: Tuple) -> data.Event: + """ + Parses a result from the result set of a database query for Ethereum transactions with block timestamp + into an Event object. + """ + ( + hash, + block_number, + from_address, + to_address, + gas, + gas_price, + input, + nonce, + value, + timestamp, + ) = row + return data.Event( + event_type=self.event_type, + event_timestamp=timestamp, + event_data={ + "hash": hash, + "block_number": block_number, + "from": from_address, + "to": to_address, + "gas": gas, + "gas_price": gas_price, + "input": input, + "nonce": nonce, + "value": value, + }, + ) + + def get_events( + self, + db_session: Session, + blockchain: AvailableBlockchainType, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]: + """ + Returns ethereum_blockchain events for the given addresses in the time period represented + by stream_boundary. + + If the query does not require any data from this provider, returns None. + """ + self.stream_boundary_validator(stream_boundary) + + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + + ethereum_transactions = self.query_transactions( + db_session, stream_boundary, parsed_filters, blockchain + ) + + ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc")) + + # TODO(zomglings): Catch the operational error denoting that the statement timed out here + # and wrap it in an error that tells the API to return the appropriate 400 response. Currently, + # when the statement times out, the API returns a 500 status code to the client, which doesn't + # do anything to help them get data from teh backend. + # The error message on the API side when the statement times out: + # > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout + events: List[data.Event] = [ + self.ethereum_transaction_event(row) for row in ethereum_transactions + ] + + if (stream_boundary.end_time is None) and events: + stream_boundary.end_time = events[0].event_timestamp + stream_boundary.include_end = True + + return stream_boundary, events + + def latest_events( + self, + db_session: Session, + blockchain: AvailableBlockchainType, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + query: StreamQuery, + num_events: int, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[List[data.Event]]: + """ + Returns the num_events latest events from the current provider, subject to the constraints imposed + by the given filters. + + If the query does not require any data from this provider, returns None. + """ + assert num_events > 0, f"num_events ({num_events}) should be positive." + + stream_boundary = data.StreamBoundary( + start_time=0, include_start=True, end_time=None, include_end=False + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + ethereum_transactions = ( + self.query_transactions( + db_session, stream_boundary, parsed_filters, blockchain + ) + .order_by(text("timestamp desc")) + .limit(num_events) + ) + + return [self.ethereum_transaction_event(row) for row in ethereum_transactions] + + def next_event( + self, + db_session: Session, + blockchain: AvailableBlockchainType, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Returns the earliest event occuring after the given stream boundary corresponding to the given + query from this provider. + + If the query does not require any data from this provider, returns None. + """ + assert ( + stream_boundary.end_time is not None + ), "Cannot return next event for up-to-date stream boundary" + next_stream_boundary = data.StreamBoundary( + start_time=stream_boundary.end_time, + include_start=(not stream_boundary.include_end), + end_time=None, + include_end=False, + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + + maybe_ethereum_transaction = ( + self.query_transactions( + db_session, next_stream_boundary, parsed_filters, blockchain + ) + .order_by(text("timestamp asc")) + .limit(1) + ).one_or_none() + + if maybe_ethereum_transaction is None: + return None + return self.ethereum_transaction_event(maybe_ethereum_transaction) + + def previous_event( + self, + db_session: Session, + blockchain: AvailableBlockchainType, + bugout_client: Bugout, + data_journal_id: str, + data_access_token: str, + stream_boundary: data.StreamBoundary, + query: StreamQuery, + user_subscriptions: Dict[str, List[BugoutResource]], + ) -> Optional[data.Event]: + """ + Returns the latest event occuring before the given stream boundary corresponding to the given + query from this provider. + + If the query does not require any data from this provider, returns None. + """ + assert ( + stream_boundary.start_time != 0 + ), "Cannot return previous event for stream starting at time 0" + previous_stream_boundary = data.StreamBoundary( + start_time=0, + include_start=True, + end_time=stream_boundary.start_time, + include_end=(not stream_boundary.include_start), + ) + parsed_filters = self.parse_filters(query, user_subscriptions) + if parsed_filters is None: + return None + maybe_ethereum_transaction = ( + self.query_transactions( + db_session, previous_stream_boundary, parsed_filters, blockchain + ) + .order_by(text("timestamp desc")) + .limit(1) + ).one_or_none() + if maybe_ethereum_transaction is None: + return None + return self.ethereum_transaction_event(maybe_ethereum_transaction) + + +ErhereumTransactions = TransactionsProvider( + event_type="ethereum_blockchain", + blockchain=AvailableBlockchainType("ethereum"), + description="Provider for resiving transactions from Ethereum tables.", + streamboaundary_range_limit=2 * 60 * 60, +) + +PolygonTransactions = TransactionsProvider( + event_type="polygon_blockchain", + blockchain=AvailableBlockchainType("polygon"), + description="Provider for resiving transactions from Polygon tables.", + streamboaundary_range_limit=2 * 60 * 60, +) diff --git a/db/moonstreamdb/blockchain.py b/db/moonstreamdb/blockchain.py new file mode 100644 index 00000000..250cfba5 --- /dev/null +++ b/db/moonstreamdb/blockchain.py @@ -0,0 +1,72 @@ +from .db import yield_db_session, yield_db_session_ctx +from .models import ( + EthereumBlock, + EthereumLabel, + EthereumTransaction, + PolygonBlock, + PolygonLabel, + PolygonTransaction, +) + +from enum import Enum + +from typing import Type, Union + + +class AvailableBlockchainType(Enum): + ETHEREUM = "ethereum" + POLYGON = "polygon" + + +def get_block_model( + blockchain_type: AvailableBlockchainType, +) -> Type[Union[EthereumBlock, PolygonBlock]]: + """ + Depends on provided blockchain type: Ethereum or Polygon, + set proper blocks model: EthereumBlock or PolygonBlock. + """ + block_model: Type[Union[EthereumBlock, PolygonBlock]] + if blockchain_type == AvailableBlockchainType.ETHEREUM: + block_model = EthereumBlock + elif blockchain_type == AvailableBlockchainType.POLYGON: + block_model = PolygonBlock + else: + raise Exception("Unsupported blockchain type provided") + + return block_model + + +def get_label_model( + blockchain_type: AvailableBlockchainType, +) -> Type[Union[EthereumLabel, PolygonLabel]]: + """ + Depends on provided blockchain type: Ethereum or Polygon, + set proper block label model: EthereumLabel or PolygonLabel. + """ + label_model: Type[Union[EthereumLabel, PolygonLabel]] + if blockchain_type == AvailableBlockchainType.ETHEREUM: + label_model = EthereumLabel + elif blockchain_type == AvailableBlockchainType.POLYGON: + label_model = PolygonLabel + else: + raise Exception("Unsupported blockchain type provided") + + return label_model + + +def get_transaction_model( + blockchain_type: AvailableBlockchainType, +) -> Type[Union[EthereumTransaction, PolygonTransaction]]: + """ + Depends on provided blockchain type: Ethereum or Polygon, + set proper block transactions model: EthereumTransaction or PolygonTransaction. + """ + transaction_model: Type[Union[EthereumTransaction, PolygonTransaction]] + if blockchain_type == AvailableBlockchainType.ETHEREUM: + transaction_model = EthereumTransaction + elif blockchain_type == AvailableBlockchainType.POLYGON: + transaction_model = PolygonTransaction + else: + raise Exception("Unsupported blockchain type provided") + + return transaction_model